From 90172dd9c3bdb5c162bfbc1c730c8f7c3ec33f39 Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Wed, 6 May 2026 16:29:54 +0000 Subject: [PATCH 1/2] device-health-oracle: detect link impairment from monitoring data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add bidirectional LinkHealth transitions backed by the existing link_rollup_5m ClickHouse table: - ReadyForService -> Impaired when the most recent rollup bucket has ISIS down or per-direction packet loss above --link-loss-threshold (default 5%). - Impaired -> ReadyForService only after every bucket in the recovery window (reuses --drained-slot-count) is clean. The asymmetry — fast demote, slow recover — keeps borderline links from flapping while still surfacing real impairment quickly. LinkHealth is a signal only; the serviceability program does not gate link.status on it. Refs #2652 --- CHANGELOG.md | 2 + .../cmd/device-health-oracle/main.go | 11 +- .../internal/worker/clickhouse.go | 73 ++++++ .../internal/worker/clickhouse_test.go | 169 ++++++++++++++ .../internal/worker/criteria.go | 65 +++++- .../internal/worker/criteria_test.go | 113 ++++++++++ .../internal/worker/link_health.go | 134 +++++++++++ .../internal/worker/link_health_test.go | 210 ++++++++++++++++++ 8 files changed, 769 insertions(+), 8 deletions(-) create mode 100644 controlplane/device-health-oracle/internal/worker/link_health.go create mode 100644 controlplane/device-health-oracle/internal/worker/link_health_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ec9c8afc1c..c48e83e07d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file. ### Changes +- DeviceHealthOracle + - Detect impaired links from the `link_rollup_5m` ClickHouse table (`isis_down=true` or `a_loss_pct`/`z_loss_pct` above a configurable threshold) and write `LinkHealth = Impaired` onchain. Recover to `ReadyForService` only after every bucket in the recovery window (reuses `--drained-slot-count`) is clean — this asymmetry surfaces real impairment quickly while preventing borderline links from flapping. New flag `--link-loss-threshold` (default `5.0`). `LinkHealth` is reported as a signal only — the serviceability program does not gate `link.status` on it ([#2652](https://github.com/malbeclabs/doublezero/issues/2652)) - Smartcontract - Migrate read callers in the CLI, sentinel, client, controlplane admin, and Rust SDK topology helper to read interfaces from `Device::new_interfaces` instead of the legacy `interfaces` enum vec, and adopt the `Device::find_interface` signature that returns `&NewInterface`. The legacy `interfaces` slot is still written on-disk via the per-write V2 projection from #3667; this PR only migrates reads. The temporary `Device::find_interface_legacy` helper is retained for the smartcontract program processors, which migrate in a later issue. Activator is intentionally excluded — it is deprecated ([#3659](https://github.com/malbeclabs/doublezero/issues/3659)) - Activator diff --git a/controlplane/device-health-oracle/cmd/device-health-oracle/main.go b/controlplane/device-health-oracle/cmd/device-health-oracle/main.go index b08ed22cd6..9c3cf834f3 100644 --- a/controlplane/device-health-oracle/cmd/device-health-oracle/main.go +++ b/controlplane/device-health-oracle/cmd/device-health-oracle/main.go @@ -44,6 +44,7 @@ var ( slackWebhookURL = flag.String("slack-webhook-url", "", "The Slack webhook URL to send alerts") provisioningSlotCount = flag.Uint64("provisioning-slot-count", defaultProvisioningSlotCount, "Burn-in slot count for new devices/links (~20 hours at 200000)") drainedSlotCount = flag.Uint64("drained-slot-count", defaultDrainedSlotCount, "Burn-in slot count for reactivated devices/links (~30 min at 5000)") + linkLossThreshold = flag.Float64("link-loss-threshold", 5.0, "Per-direction packet loss percentage above which a link is considered impaired (link_rollup_5m a_loss_pct/z_loss_pct)") version = "dev" commit = "none" date = "unknown" @@ -131,6 +132,8 @@ func main() { // Initialize ClickHouse-dependent criteria. var deviceCriteria []worker.DeviceCriterion + var linkImpairmentCriteria []worker.LinkCriterion + var linkRecoveryCriteria []worker.LinkCriterion if chAddr := os.Getenv("CLICKHOUSE_ADDR"); chAddr != "" { chDB := os.Getenv("CLICKHOUSE_DB") if chDB == "" { @@ -148,10 +151,14 @@ func main() { log.Warn("ClickHouse connection failed, continuing without ClickHouse-based criteria", "addr", chAddr, "error", err) } else { defer chClient.Close() - log.Info("ClickHouse enabled", "addr", chAddr, "db", chDB, "user", chUser, "tls", !chTLSDisabled) + log.Info("ClickHouse enabled", "addr", chAddr, "db", chDB, "user", chUser, "tls", !chTLSDisabled, "linkLossThreshold", *linkLossThreshold) controllerSuccess := worker.NewControllerSuccessCriterion(chClient, log) interfaceCounters := worker.NewInterfaceCountersCriterion(chClient, log) deviceCriteria = append(deviceCriteria, controllerSuccess, interfaceCounters) + linkImpairmentCriteria = append(linkImpairmentCriteria, + worker.NewLinkHealthCriterion(worker.LinkHealthModeImpairment, chClient, *linkLossThreshold, log)) + linkRecoveryCriteria = append(linkRecoveryCriteria, + worker.NewLinkHealthCriterion(worker.LinkHealthModeRecovery, chClient, *linkLossThreshold, log)) } } else { log.Error("ClickHouse disabled (CLICKHOUSE_ADDR not set), no ClickHouse-based criteria") @@ -164,6 +171,8 @@ func main() { } linkEvaluator := &worker.LinkHealthEvaluator{ ReadyForServiceCriteria: nil, + ImpairmentCriteria: linkImpairmentCriteria, + RecoveryCriteria: linkRecoveryCriteria, Log: log, } diff --git a/controlplane/device-health-oracle/internal/worker/clickhouse.go b/controlplane/device-health-oracle/internal/worker/clickhouse.go index 05b93dee34..8335023f12 100644 --- a/controlplane/device-health-oracle/internal/worker/clickhouse.go +++ b/controlplane/device-health-oracle/internal/worker/clickhouse.go @@ -3,6 +3,8 @@ package worker import ( "context" "crypto/tls" + "database/sql" + "errors" "fmt" "regexp" "strings" @@ -106,6 +108,77 @@ func (c *ClickHouseClient) InterfaceCountersCoverage(ctx context.Context, device return int64(minutesWithRecords), nil } +// LinkHealthChecker queries ClickHouse for link health rollup records. +type LinkHealthChecker interface { + // LinkHealthRecent returns the most recent non-provisioning link_rollup_5m + // bucket for the given link. Multiple rows for the same bucket are + // disambiguated by ingested_at (most recently ingested wins). Returns + // found=false when there is no data for the link. + LinkHealthRecent(ctx context.Context, linkPubkey string) (isisDown bool, aLossPct, zLossPct float64, found bool, err error) + + // LinkHealthWindowAllClean returns true if every non-provisioning bucket in + // [start, end] is clean (isis_down=false AND a_loss_pct <= threshold AND + // z_loss_pct <= threshold). The threshold filter is pushed into the query + // so the result is a single bool and we do not stream rows. Returns + // found=false when there are no buckets in the window for this link. + LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (allClean bool, found bool, err error) +} + +// LinkHealthRecent returns the latest closed bucket's health fields for the +// given link, ignoring buckets where provisioning=true. +func (c *ClickHouseClient) LinkHealthRecent(ctx context.Context, linkPubkey string) (bool, float64, float64, bool, error) { + query := fmt.Sprintf( + `SELECT isis_down, a_loss_pct, z_loss_pct + FROM "%s".link_rollup_5m + WHERE link_pk = ? + AND provisioning = false + ORDER BY bucket_ts DESC, ingested_at DESC + LIMIT 1`, + c.db, + ) + + var ( + isisDown bool + aLossPct float64 + zLossPct float64 + ) + err := c.conn.QueryRow(ctx, query, linkPubkey).Scan(&isisDown, &aLossPct, &zLossPct) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, 0, 0, false, nil + } + return false, 0, 0, false, fmt.Errorf("clickhouse query: %w", err) + } + return isisDown, aLossPct, zLossPct, true, nil +} + +// LinkHealthWindowAllClean returns true when every non-provisioning bucket for +// the given link in [start, end] is clean. The "any bad row" sentinel pattern +// keeps the query result a single integer regardless of window length. +func (c *ClickHouseClient) LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (bool, bool, error) { + query := fmt.Sprintf( + `SELECT + countIf(isis_down = true OR a_loss_pct > ? OR z_loss_pct > ?) AS bad_buckets, + count() AS total_buckets + FROM "%s".link_rollup_5m + WHERE link_pk = ? + AND bucket_ts >= ? + AND bucket_ts <= ? + AND provisioning = false`, + c.db, + ) + + var bad, total uint64 + err := c.conn.QueryRow(ctx, query, lossThreshold, lossThreshold, linkPubkey, start, end).Scan(&bad, &total) + if err != nil { + return false, false, fmt.Errorf("clickhouse query: %w", err) + } + if total == 0 { + return false, false, nil + } + return bad == 0, true, nil +} + func (c *ClickHouseClient) Close() error { return c.conn.Close() } diff --git a/controlplane/device-health-oracle/internal/worker/clickhouse_test.go b/controlplane/device-health-oracle/internal/worker/clickhouse_test.go index 9643617a86..cb684a31ff 100644 --- a/controlplane/device-health-oracle/internal/worker/clickhouse_test.go +++ b/controlplane/device-health-oracle/internal/worker/clickhouse_test.go @@ -2,6 +2,7 @@ package worker import ( "context" + "database/sql" "errors" "testing" "time" @@ -167,6 +168,174 @@ func TestInterfaceCountersCoverage_QuotesDatabaseName(t *testing.T) { assert.Equal(t, int64(0), minutes) } +func TestLinkHealthRecent_ReturnsLatestBucket(t *testing.T) { + conn := &mockConn{ + queryRowFunc: func(_ context.Context, query string, args ...any) driver.Row { + assert.Contains(t, query, `"testdb".link_rollup_5m`) + assert.Contains(t, query, "provisioning = false") + assert.Contains(t, query, "ORDER BY bucket_ts DESC, ingested_at DESC") + assert.Contains(t, query, "LIMIT 1") + assert.Len(t, args, 1) + assert.Equal(t, "linkABC", args[0]) + return &mockRow{ + scanFunc: func(dest ...any) error { + *(dest[0].(*bool)) = true + *(dest[1].(*float64)) = 12.5 + *(dest[2].(*float64)) = 0.0 + return nil + }, + } + }, + } + + client := &ClickHouseClient{conn: conn, db: "testdb"} + isisDown, aLoss, zLoss, found, err := client.LinkHealthRecent(context.Background(), "linkABC") + require.NoError(t, err) + assert.True(t, found) + assert.True(t, isisDown) + assert.Equal(t, 12.5, aLoss) + assert.Equal(t, 0.0, zLoss) +} + +func TestLinkHealthRecent_NoData_ReturnsFoundFalse(t *testing.T) { + conn := &mockConn{ + queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row { + return &mockRow{ + scanFunc: func(_ ...any) error { + return sql.ErrNoRows + }, + } + }, + } + + client := &ClickHouseClient{conn: conn, db: "testdb"} + _, _, _, found, err := client.LinkHealthRecent(context.Background(), "linkABC") + require.NoError(t, err) + assert.False(t, found) +} + +func TestLinkHealthRecent_QueryError(t *testing.T) { + conn := &mockConn{ + queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row { + return &mockRow{ + scanFunc: func(_ ...any) error { + return errors.New("connection reset") + }, + } + }, + } + + client := &ClickHouseClient{conn: conn, db: "testdb"} + _, _, _, _, err := client.LinkHealthRecent(context.Background(), "linkABC") + assert.ErrorContains(t, err, "connection reset") +} + +func TestLinkHealthRecent_QuotesDatabaseName(t *testing.T) { + conn := &mockConn{ + queryRowFunc: func(_ context.Context, query string, _ ...any) driver.Row { + assert.Contains(t, query, `"mainnet-beta".link_rollup_5m`) + return &mockRow{ + scanFunc: func(dest ...any) error { + *(dest[0].(*bool)) = false + *(dest[1].(*float64)) = 0 + *(dest[2].(*float64)) = 0 + return nil + }, + } + }, + } + client := &ClickHouseClient{conn: conn, db: "mainnet-beta"} + _, _, _, found, err := client.LinkHealthRecent(context.Background(), "linkABC") + require.NoError(t, err) + assert.True(t, found) +} + +func TestLinkHealthWindowAllClean_AllClean(t *testing.T) { + conn := &mockConn{ + queryRowFunc: func(_ context.Context, query string, args ...any) driver.Row { + assert.Contains(t, query, `"testdb".link_rollup_5m`) + assert.Contains(t, query, "provisioning = false") + assert.Contains(t, query, "countIf") + // Args order: lossThreshold, lossThreshold, linkPubkey, start, end + assert.Len(t, args, 5) + assert.Equal(t, 5.0, args[0]) + assert.Equal(t, 5.0, args[1]) + assert.Equal(t, "linkABC", args[2]) + return &mockRow{ + scanFunc: func(dest ...any) error { + *(dest[0].(*uint64)) = 0 + *(dest[1].(*uint64)) = 6 + return nil + }, + } + }, + } + + client := &ClickHouseClient{conn: conn, db: "testdb"} + allClean, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC", + time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2026, 1, 1, 1, 0, 0, 0, time.UTC), + 5.0) + require.NoError(t, err) + assert.True(t, found) + assert.True(t, allClean) +} + +func TestLinkHealthWindowAllClean_HasBadBuckets(t *testing.T) { + conn := &mockConn{ + queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row { + return &mockRow{ + scanFunc: func(dest ...any) error { + *(dest[0].(*uint64)) = 2 + *(dest[1].(*uint64)) = 6 + return nil + }, + } + }, + } + client := &ClickHouseClient{conn: conn, db: "testdb"} + allClean, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC", + time.Now().Add(-1*time.Hour), time.Now(), 5.0) + require.NoError(t, err) + assert.True(t, found) + assert.False(t, allClean) +} + +func TestLinkHealthWindowAllClean_NoBucketsInWindow(t *testing.T) { + conn := &mockConn{ + queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row { + return &mockRow{ + scanFunc: func(dest ...any) error { + *(dest[0].(*uint64)) = 0 + *(dest[1].(*uint64)) = 0 + return nil + }, + } + }, + } + client := &ClickHouseClient{conn: conn, db: "testdb"} + _, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC", + time.Now().Add(-1*time.Hour), time.Now(), 5.0) + require.NoError(t, err) + assert.False(t, found) +} + +func TestLinkHealthWindowAllClean_QueryError(t *testing.T) { + conn := &mockConn{ + queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row { + return &mockRow{ + scanFunc: func(_ ...any) error { + return errors.New("connection reset") + }, + } + }, + } + client := &ClickHouseClient{conn: conn, db: "testdb"} + _, _, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC", + time.Now().Add(-1*time.Hour), time.Now(), 5.0) + assert.ErrorContains(t, err, "connection reset") +} + func TestNewClickHouseClient_StripsScheme(t *testing.T) { tests := []struct { name string diff --git a/controlplane/device-health-oracle/internal/worker/criteria.go b/controlplane/device-health-oracle/internal/worker/criteria.go index a0e9d67e3c..25074c5aae 100644 --- a/controlplane/device-health-oracle/internal/worker/criteria.go +++ b/controlplane/device-health-oracle/internal/worker/criteria.go @@ -41,6 +41,21 @@ func DeviceBurnIn(ctx context.Context, status serviceability.DeviceStatus) (star return start, burnIn.Now, expectedMinutes, true } +// LinkBurnIn extracts BurnInTimes from the context and returns the link recovery +// window — the period over which link health must be continuously clean for a +// link to recover from Impaired back to ReadyForService. The window is derived +// from DrainedStart (already resolved from DrainedSlotCount once per tick). +// Returns ok=false if the context has no BurnInTimes, and expectedMinutes=0 +// when the window has zero length (e.g. a newly created environment). +func LinkBurnIn(ctx context.Context) (start time.Time, now time.Time, expectedMinutes int64, ok bool) { + burnIn, ok := ctx.Value(burnInTimesKey{}).(BurnInTimes) + if !ok { + return time.Time{}, time.Time{}, 0, false + } + expectedMinutes = max(int64(burnIn.Now.Sub(burnIn.DrainedStart).Minutes()), 0) + return burnIn.DrainedStart, burnIn.Now, expectedMinutes, true +} + // DeviceCriterion evaluates whether a device meets a specific readiness requirement. // Check returns (passed, reason). Reason is a human-readable explanation when passed is false. type DeviceCriterion interface { @@ -111,10 +126,21 @@ func (e *DeviceHealthEvaluator) checkAll(ctx context.Context, device serviceabil return true } -// LinkHealthEvaluator evaluates a link's health based on criteria. -// Links have a single stage: Pending → ReadyForService. +// LinkHealthEvaluator evaluates a link's health based on criteria, supporting +// bidirectional transitions between ReadyForService and Impaired: +// - Pending/Unknown → ReadyForService when ReadyForServiceCriteria pass. +// - ReadyForService → Impaired when any ImpairmentCriteria fail (point-in-time +// check on the most recent telemetry bucket — fast demotion). +// - Impaired → ReadyForService when RecoveryCriteria pass over the full +// recovery window (slow recovery to prevent flapping). +// +// The asymmetry — fast demotion via the latest bucket, slow recovery requiring +// every bucket in the window to be clean — is intentional: it keeps borderline +// links from flapping while still surfacing real impairment quickly. type LinkHealthEvaluator struct { ReadyForServiceCriteria []LinkCriterion + ImpairmentCriteria []LinkCriterion + RecoveryCriteria []LinkCriterion Log *slog.Logger } @@ -122,12 +148,38 @@ type LinkHealthEvaluator struct { func (e *LinkHealthEvaluator) Evaluate(ctx context.Context, link serviceability.Link) serviceability.LinkHealth { current := link.LinkHealth - if current == serviceability.LinkHealthReadyForService { + switch current { + case serviceability.LinkHealthReadyForService: + // No impairment criteria configured ⇒ no demotion path (preserves + // behavior of deployments without ClickHouse wired up). + if len(e.ImpairmentCriteria) == 0 { + return current + } + if !e.checkAllLink(ctx, link, e.ImpairmentCriteria) { + return serviceability.LinkHealthImpaired + } return current + + case serviceability.LinkHealthImpaired: + if len(e.RecoveryCriteria) == 0 { + return current + } + if !e.checkAllLink(ctx, link, e.RecoveryCriteria) { + return current + } + return serviceability.LinkHealthReadyForService + + default: + if !e.checkAllLink(ctx, link, e.ReadyForServiceCriteria) { + return current + } + return serviceability.LinkHealthReadyForService } +} +func (e *LinkHealthEvaluator) checkAllLink(ctx context.Context, link serviceability.Link, criteria []LinkCriterion) bool { linkPubkey := solana.PublicKeyFromBytes(link.PubKey[:]).String() - for _, c := range e.ReadyForServiceCriteria { + for _, c := range criteria { passed, reason := c.Check(ctx, link) if !passed { e.Log.Info("Link criterion not met", @@ -135,9 +187,8 @@ func (e *LinkHealthEvaluator) Evaluate(ctx context.Context, link serviceability. "code", link.Code, "criterion", c.Name(), "reason", reason) - return current + return false } } - - return serviceability.LinkHealthReadyForService + return true } diff --git a/controlplane/device-health-oracle/internal/worker/criteria_test.go b/controlplane/device-health-oracle/internal/worker/criteria_test.go index 4c743b276a..ca1c68b2cf 100644 --- a/controlplane/device-health-oracle/internal/worker/criteria_test.go +++ b/controlplane/device-health-oracle/internal/worker/criteria_test.go @@ -5,6 +5,7 @@ import ( "log/slog" "os" "testing" + "time" "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" "github.com/stretchr/testify/assert" @@ -169,3 +170,115 @@ func TestLinkHealthEvaluator_CriterionFails_BlocksAdvancement(t *testing.T) { result := eval.Evaluate(context.Background(), link) assert.Equal(t, serviceability.LinkHealthPending, result, "should not advance when criterion fails") } + +func TestLinkHealthEvaluator_ReadyForService_ImpairmentFails_DemotesToImpaired(t *testing.T) { + failing := &mockLinkCriterion{name: "impair", result: false, reason: "isis down"} + eval := &LinkHealthEvaluator{ + ImpairmentCriteria: []LinkCriterion{failing}, + Log: testLogger(), + } + + link := serviceability.Link{LinkHealth: serviceability.LinkHealthReadyForService} + result := eval.Evaluate(context.Background(), link) + assert.Equal(t, serviceability.LinkHealthImpaired, result, "RFS link with failing impairment criterion should demote") +} + +func TestLinkHealthEvaluator_ReadyForService_ImpairmentPasses_Stays(t *testing.T) { + passing := &mockLinkCriterion{name: "impair", result: true} + eval := &LinkHealthEvaluator{ + ImpairmentCriteria: []LinkCriterion{passing}, + Log: testLogger(), + } + + link := serviceability.Link{LinkHealth: serviceability.LinkHealthReadyForService} + result := eval.Evaluate(context.Background(), link) + assert.Equal(t, serviceability.LinkHealthReadyForService, result) +} + +func TestLinkHealthEvaluator_ReadyForService_NoImpairmentCriteria_Stays(t *testing.T) { + // Backwards compat: deployments without ClickHouse have no impairment criteria + // and must not see any RFS demotion path. + eval := &LinkHealthEvaluator{Log: testLogger()} + + link := serviceability.Link{LinkHealth: serviceability.LinkHealthReadyForService} + result := eval.Evaluate(context.Background(), link) + assert.Equal(t, serviceability.LinkHealthReadyForService, result) +} + +func TestLinkHealthEvaluator_Impaired_RecoveryPasses_PromotesToRFS(t *testing.T) { + passing := &mockLinkCriterion{name: "recovery", result: true} + eval := &LinkHealthEvaluator{ + RecoveryCriteria: []LinkCriterion{passing}, + Log: testLogger(), + } + + link := serviceability.Link{LinkHealth: serviceability.LinkHealthImpaired} + result := eval.Evaluate(context.Background(), link) + assert.Equal(t, serviceability.LinkHealthReadyForService, result) +} + +func TestLinkHealthEvaluator_Impaired_RecoveryFails_Stays(t *testing.T) { + failing := &mockLinkCriterion{name: "recovery", result: false, reason: "still bad"} + eval := &LinkHealthEvaluator{ + RecoveryCriteria: []LinkCriterion{failing}, + Log: testLogger(), + } + + link := serviceability.Link{LinkHealth: serviceability.LinkHealthImpaired} + result := eval.Evaluate(context.Background(), link) + assert.Equal(t, serviceability.LinkHealthImpaired, result) +} + +func TestLinkHealthEvaluator_Impaired_NoRecoveryCriteria_Stays(t *testing.T) { + eval := &LinkHealthEvaluator{Log: testLogger()} + + link := serviceability.Link{LinkHealth: serviceability.LinkHealthImpaired} + result := eval.Evaluate(context.Background(), link) + assert.Equal(t, serviceability.LinkHealthImpaired, result) +} + +func TestLinkHealthEvaluator_Impaired_MixedRecovery_AnyFailKeepsImpaired(t *testing.T) { + passing := &mockLinkCriterion{name: "recovery_pass", result: true} + failing := &mockLinkCriterion{name: "recovery_fail", result: false, reason: "nope"} + eval := &LinkHealthEvaluator{ + RecoveryCriteria: []LinkCriterion{passing, failing}, + Log: testLogger(), + } + + link := serviceability.Link{LinkHealth: serviceability.LinkHealthImpaired} + result := eval.Evaluate(context.Background(), link) + assert.Equal(t, serviceability.LinkHealthImpaired, result) +} + +func TestLinkBurnIn_ExtractsDrainedWindow(t *testing.T) { + now := time.Now() + drainedStart := now.Add(-30 * time.Minute) + ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ + ProvisioningStart: now.Add(-20 * time.Hour), + DrainedStart: drainedStart, + Now: now, + }) + + start, end, expectedMinutes, ok := LinkBurnIn(ctx) + assert.True(t, ok) + assert.Equal(t, drainedStart, start) + assert.Equal(t, now, end) + assert.Equal(t, int64(30), expectedMinutes) +} + +func TestLinkBurnIn_NoContextValues(t *testing.T) { + _, _, _, ok := LinkBurnIn(context.Background()) + assert.False(t, ok) +} + +func TestLinkBurnIn_ZeroLengthWindow(t *testing.T) { + now := time.Now() + ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ + DrainedStart: now, + Now: now, + }) + + _, _, expectedMinutes, ok := LinkBurnIn(ctx) + assert.True(t, ok) + assert.Equal(t, int64(0), expectedMinutes) +} diff --git a/controlplane/device-health-oracle/internal/worker/link_health.go b/controlplane/device-health-oracle/internal/worker/link_health.go new file mode 100644 index 0000000000..cade1fbf6e --- /dev/null +++ b/controlplane/device-health-oracle/internal/worker/link_health.go @@ -0,0 +1,134 @@ +package worker + +import ( + "context" + "fmt" + "log/slog" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" +) + +// LinkHealthMode controls which time window LinkHealthCriterion evaluates. +type LinkHealthMode int + +const ( + // LinkHealthModeImpairment checks the most recent link_rollup_5m bucket. + // Used to detect impairment fast (RFS → Impaired transition). + LinkHealthModeImpairment LinkHealthMode = iota + // LinkHealthModeRecovery checks every bucket in the recovery window. + // Used to gate recovery (Impaired → RFS) — every bucket must be clean. + LinkHealthModeRecovery +) + +// LinkHealthCriterion evaluates link impairment from the link_rollup_5m table. +// In ImpairmentMode it inspects the latest bucket; in RecoveryMode it requires +// every bucket in the recovery window (resolved via LinkBurnIn) to be clean. +// +// "Clean" means: isis_down=false AND a_loss_pct <= LossThreshold AND +// z_loss_pct <= LossThreshold. Buckets with provisioning=true are excluded +// to avoid flagging links that are still being brought up. +// +// "No data" handling differs by mode: in ImpairmentMode, missing data is +// treated as a pass (we cannot conclude a link is impaired without telemetry); +// in RecoveryMode, missing data is treated as a fail (we cannot conclude a +// link has been continuously clean without telemetry). The net effect is that +// a link without telemetry stays at its current health. +type LinkHealthCriterion struct { + mode LinkHealthMode + checker LinkHealthChecker + lossThreshold float64 + log *slog.Logger +} + +func NewLinkHealthCriterion(mode LinkHealthMode, checker LinkHealthChecker, lossThreshold float64, log *slog.Logger) *LinkHealthCriterion { + return &LinkHealthCriterion{ + mode: mode, + checker: checker, + lossThreshold: lossThreshold, + log: log, + } +} + +func (c *LinkHealthCriterion) Name() string { + if c.mode == LinkHealthModeRecovery { + return "link_health_recovery" + } + return "link_health_impairment" +} + +func (c *LinkHealthCriterion) Check(ctx context.Context, link serviceability.Link) (bool, string) { + pubkey := solana.PublicKeyFromBytes(link.PubKey[:]).String() + + if c.mode == LinkHealthModeRecovery { + return c.checkRecovery(ctx, link, pubkey) + } + return c.checkImpairment(ctx, link, pubkey) +} + +func (c *LinkHealthCriterion) checkImpairment(ctx context.Context, link serviceability.Link, pubkey string) (bool, string) { + isisDown, aLossPct, zLossPct, found, err := c.checker.LinkHealthRecent(ctx, pubkey) + if err != nil { + c.log.Error("Failed to query link health recent", + "link", pubkey, "code", link.Code, "error", err) + return false, fmt.Sprintf("clickhouse query failed: %v", err) + } + if !found { + // No telemetry → cannot conclude impairment. Hold current health. + return true, "" + } + + c.log.Debug("Link health recent", + "link", pubkey, "code", link.Code, + "isisDown", isisDown, + "aLossPct", aLossPct, + "zLossPct", zLossPct, + "lossThreshold", c.lossThreshold) + + if isisDown { + return false, "isis adjacency down" + } + if aLossPct > c.lossThreshold { + return false, fmt.Sprintf("a-side loss %.2f%% > %.2f%%", aLossPct, c.lossThreshold) + } + if zLossPct > c.lossThreshold { + return false, fmt.Sprintf("z-side loss %.2f%% > %.2f%%", zLossPct, c.lossThreshold) + } + return true, "" +} + +func (c *LinkHealthCriterion) checkRecovery(ctx context.Context, link serviceability.Link, pubkey string) (bool, string) { + start, now, expectedMinutes, ok := LinkBurnIn(ctx) + if !ok { + return false, "burn-in times not available in context" + } + // Zero-length window means we can't recover yet — keep at Impaired. (This + // is the inverse of the device-side "expectedMinutes==0 ⇒ pass" rule: + // here the window is a recovery dwell, not a burn-in.) + if expectedMinutes == 0 { + return false, "recovery window not yet established" + } + + allClean, found, err := c.checker.LinkHealthWindowAllClean(ctx, pubkey, start, now, c.lossThreshold) + if err != nil { + c.log.Error("Failed to query link health recovery window", + "link", pubkey, "code", link.Code, "error", err) + return false, fmt.Sprintf("clickhouse query failed: %v", err) + } + if !found { + return false, "no rollup data in recovery window" + } + + c.log.Debug("Link health recovery window", + "link", pubkey, "code", link.Code, + "allClean", allClean, + "start", start, + "end", now, + "expectedMinutes", expectedMinutes, + "lossThreshold", c.lossThreshold) + + if !allClean { + return false, "recovery window contains impaired buckets" + } + return true, "" +} diff --git a/controlplane/device-health-oracle/internal/worker/link_health_test.go b/controlplane/device-health-oracle/internal/worker/link_health_test.go new file mode 100644 index 0000000000..e67e7b62e9 --- /dev/null +++ b/controlplane/device-health-oracle/internal/worker/link_health_test.go @@ -0,0 +1,210 @@ +package worker + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/stretchr/testify/assert" +) + +type mockLinkHealthChecker struct { + recentFunc func(ctx context.Context, linkPubkey string) (bool, float64, float64, bool, error) + windowFunc func(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (bool, bool, error) +} + +func (m *mockLinkHealthChecker) LinkHealthRecent(ctx context.Context, linkPubkey string) (bool, float64, float64, bool, error) { + return m.recentFunc(ctx, linkPubkey) +} + +func (m *mockLinkHealthChecker) LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (bool, bool, error) { + return m.windowFunc(ctx, linkPubkey, start, end, lossThreshold) +} + +func TestLinkHealthCriterion_Name(t *testing.T) { + imp := NewLinkHealthCriterion(LinkHealthModeImpairment, &mockLinkHealthChecker{}, 5.0, testLogger()) + rec := NewLinkHealthCriterion(LinkHealthModeRecovery, &mockLinkHealthChecker{}, 5.0, testLogger()) + assert.Equal(t, "link_health_impairment", imp.Name()) + assert.Equal(t, "link_health_recovery", rec.Name()) +} + +func TestLinkHealthCriterion_Impairment_NoData_Passes(t *testing.T) { + checker := &mockLinkHealthChecker{ + recentFunc: func(_ context.Context, _ string) (bool, float64, float64, bool, error) { + return false, 0, 0, false, nil + }, + } + c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) + link := serviceability.Link{LinkHealth: serviceability.LinkHealthReadyForService} + + passed, _ := c.Check(context.Background(), link) + assert.True(t, passed, "no data must not flag a link as impaired") +} + +func TestLinkHealthCriterion_Impairment_IsisDown_Fails(t *testing.T) { + checker := &mockLinkHealthChecker{ + recentFunc: func(_ context.Context, _ string) (bool, float64, float64, bool, error) { + return true, 0, 0, true, nil + }, + } + c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) + + passed, reason := c.Check(context.Background(), serviceability.Link{}) + assert.False(t, passed) + assert.Contains(t, reason, "isis") +} + +func TestLinkHealthCriterion_Impairment_LossExceedsThreshold(t *testing.T) { + tests := []struct { + name string + aLoss float64 + zLoss float64 + expected bool + }{ + {"both clean", 1.0, 1.0, true}, + {"a above threshold", 6.0, 1.0, false}, + {"z above threshold", 1.0, 6.0, false}, + {"a exactly at threshold", 5.0, 0, true}, + {"z exactly at threshold", 0, 5.0, true}, + {"both far above", 80.0, 90.0, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + checker := &mockLinkHealthChecker{ + recentFunc: func(_ context.Context, _ string) (bool, float64, float64, bool, error) { + return false, tt.aLoss, tt.zLoss, true, nil + }, + } + c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) + passed, _ := c.Check(context.Background(), serviceability.Link{}) + assert.Equal(t, tt.expected, passed) + }) + } +} + +func TestLinkHealthCriterion_Impairment_QueryError_Fails(t *testing.T) { + checker := &mockLinkHealthChecker{ + recentFunc: func(_ context.Context, _ string) (bool, float64, float64, bool, error) { + return false, 0, 0, false, errors.New("connection reset") + }, + } + c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) + + passed, reason := c.Check(context.Background(), serviceability.Link{}) + assert.False(t, passed) + assert.Contains(t, reason, "clickhouse query failed") +} + +func TestLinkHealthCriterion_Recovery_NoBurnInContext_Fails(t *testing.T) { + c := NewLinkHealthCriterion(LinkHealthModeRecovery, &mockLinkHealthChecker{}, 5.0, testLogger()) + passed, reason := c.Check(context.Background(), serviceability.Link{}) + assert.False(t, passed) + assert.Contains(t, reason, "burn-in times not available") +} + +func TestLinkHealthCriterion_Recovery_ZeroWindow_Fails(t *testing.T) { + now := time.Now() + ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ + DrainedStart: now, + Now: now, + }) + c := NewLinkHealthCriterion(LinkHealthModeRecovery, &mockLinkHealthChecker{}, 5.0, testLogger()) + + passed, reason := c.Check(ctx, serviceability.Link{}) + assert.False(t, passed) + assert.Contains(t, reason, "recovery window not yet established") +} + +func TestLinkHealthCriterion_Recovery_AllClean_Passes(t *testing.T) { + now := time.Now() + ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ + DrainedStart: now.Add(-30 * time.Minute), + Now: now, + }) + checker := &mockLinkHealthChecker{ + windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (bool, bool, error) { + return true, true, nil + }, + } + c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, 5.0, testLogger()) + + passed, _ := c.Check(ctx, serviceability.Link{}) + assert.True(t, passed) +} + +func TestLinkHealthCriterion_Recovery_NotAllClean_Fails(t *testing.T) { + now := time.Now() + ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ + DrainedStart: now.Add(-30 * time.Minute), + Now: now, + }) + checker := &mockLinkHealthChecker{ + windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (bool, bool, error) { + return false, true, nil + }, + } + c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, 5.0, testLogger()) + + passed, reason := c.Check(ctx, serviceability.Link{}) + assert.False(t, passed) + assert.Contains(t, reason, "impaired buckets") +} + +func TestLinkHealthCriterion_Recovery_NoData_Fails(t *testing.T) { + // No telemetry in window means we can't conclude the link has been clean. + now := time.Now() + ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ + DrainedStart: now.Add(-30 * time.Minute), + Now: now, + }) + checker := &mockLinkHealthChecker{ + windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (bool, bool, error) { + return false, false, nil + }, + } + c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, 5.0, testLogger()) + + passed, reason := c.Check(ctx, serviceability.Link{}) + assert.False(t, passed) + assert.Contains(t, reason, "no rollup data") +} + +func TestLinkHealthCriterion_Recovery_QueryError_Fails(t *testing.T) { + now := time.Now() + ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ + DrainedStart: now.Add(-30 * time.Minute), + Now: now, + }) + checker := &mockLinkHealthChecker{ + windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (bool, bool, error) { + return false, false, errors.New("boom") + }, + } + c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, 5.0, testLogger()) + + passed, reason := c.Check(ctx, serviceability.Link{}) + assert.False(t, passed) + assert.Contains(t, reason, "clickhouse query failed") +} + +func TestLinkHealthCriterion_Recovery_PassesThresholdToChecker(t *testing.T) { + now := time.Now() + ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ + DrainedStart: now.Add(-30 * time.Minute), + Now: now, + }) + const threshold = 7.5 + var observed float64 + checker := &mockLinkHealthChecker{ + windowFunc: func(_ context.Context, _ string, _, _ time.Time, lossThreshold float64) (bool, bool, error) { + observed = lossThreshold + return true, true, nil + }, + } + c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, threshold, testLogger()) + + _, _ = c.Check(ctx, serviceability.Link{}) + assert.Equal(t, threshold, observed) +} From 9f90cb87e7d5d6418b3dac1e4613282b71155fff Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Wed, 6 May 2026 16:41:24 +0000 Subject: [PATCH 2/2] device-health-oracle: address review findings on link health MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Architecture review (HIGH): - Recovery SQL counts duplicate ingested_at rows rather than distinct buckets. Wrap in argMax-per-bucket subquery so a corrected late row cannot keep an Impaired link stuck even after every distinct bucket reads as clean. - LinkHealthRecent had no recency floor — a stale latest bucket (telemetry pipeline broken) could indefinitely demote/keep a link impaired. Return bucket_ts and treat anything older than 15 minutes (3x rollup cadence) as no data. Architecture review (MEDIUM): - Increment MetricCriterionResults symmetrically in checkAllLink so link impairment/recovery rates are graphable. Security review (LOW): - Validate --link-loss-threshold is finite and within [0, 100] at startup; fail fast on misconfiguration. Plus debuggability: include bucket timestamps in impairment fail reasons; surface bad/total bucket counts in recovery debug logs and the failure reason. Refs #2652 --- .../cmd/device-health-oracle/main.go | 9 ++ .../internal/worker/clickhouse.go | 100 +++++++++++------- .../internal/worker/clickhouse_test.go | 44 +++++--- .../internal/worker/criteria.go | 2 + .../internal/worker/link_health.go | 50 ++++++--- .../internal/worker/link_health_test.go | 78 +++++++++----- 6 files changed, 193 insertions(+), 90 deletions(-) diff --git a/controlplane/device-health-oracle/cmd/device-health-oracle/main.go b/controlplane/device-health-oracle/cmd/device-health-oracle/main.go index 9c3cf834f3..c4ae905c2d 100644 --- a/controlplane/device-health-oracle/cmd/device-health-oracle/main.go +++ b/controlplane/device-health-oracle/cmd/device-health-oracle/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "log/slog" + "math" "net" "net/http" "os" @@ -58,6 +59,14 @@ func main() { os.Exit(0) } + // Reject obviously broken thresholds early. A negative or NaN threshold would + // silently flag every link as impaired (or never), triggering an onchain + // write storm or hiding real failures — better to fail fast at startup. + if math.IsNaN(*linkLossThreshold) || math.IsInf(*linkLossThreshold, 0) || *linkLossThreshold < 0 || *linkLossThreshold > 100 { + fmt.Fprintf(os.Stderr, "invalid --link-loss-threshold %v: must be in [0, 100]\n", *linkLossThreshold) + os.Exit(1) + } + logLevel := slog.LevelInfo if *verbose { logLevel = slog.LevelDebug diff --git a/controlplane/device-health-oracle/internal/worker/clickhouse.go b/controlplane/device-health-oracle/internal/worker/clickhouse.go index 8335023f12..1c26ba9a0e 100644 --- a/controlplane/device-health-oracle/internal/worker/clickhouse.go +++ b/controlplane/device-health-oracle/internal/worker/clickhouse.go @@ -108,27 +108,49 @@ func (c *ClickHouseClient) InterfaceCountersCoverage(ctx context.Context, device return int64(minutesWithRecords), nil } +// LinkHealthRecentResult is the most recent rollup bucket's health fields plus +// the bucket timestamp, returned by LinkHealthChecker.LinkHealthRecent. The +// bucket timestamp lets callers enforce a recency floor (a stale bucket means +// the telemetry pipeline is broken — neither demote nor recover on stale data). +type LinkHealthRecentResult struct { + BucketTs time.Time + IsisDown bool + ALossPct float64 + ZLossPct float64 +} + +// LinkHealthWindowResult summarises a recovery-window check. AllClean is true +// when every distinct bucket (deduplicated by latest ingested_at) is clean. +// Bad and Total are exposed for operator diagnostics. +type LinkHealthWindowResult struct { + Bad uint64 + Total uint64 + AllClean bool +} + // LinkHealthChecker queries ClickHouse for link health rollup records. type LinkHealthChecker interface { // LinkHealthRecent returns the most recent non-provisioning link_rollup_5m // bucket for the given link. Multiple rows for the same bucket are // disambiguated by ingested_at (most recently ingested wins). Returns // found=false when there is no data for the link. - LinkHealthRecent(ctx context.Context, linkPubkey string) (isisDown bool, aLossPct, zLossPct float64, found bool, err error) - - // LinkHealthWindowAllClean returns true if every non-provisioning bucket in - // [start, end] is clean (isis_down=false AND a_loss_pct <= threshold AND - // z_loss_pct <= threshold). The threshold filter is pushed into the query - // so the result is a single bool and we do not stream rows. Returns - // found=false when there are no buckets in the window for this link. - LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (allClean bool, found bool, err error) + LinkHealthRecent(ctx context.Context, linkPubkey string) (result LinkHealthRecentResult, found bool, err error) + + // LinkHealthWindowAllClean returns true if every distinct non-provisioning + // bucket in [start, end] is clean (isis_down=false AND a_loss_pct <= + // threshold AND z_loss_pct <= threshold). Late-arriving rows for the same + // bucket are deduplicated by ingested_at so a corrected re-write doesn't + // keep a link Impaired even after every distinct bucket reads as clean. + // Returns found=false when there are no buckets in the window for this link. + LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (result LinkHealthWindowResult, found bool, err error) } -// LinkHealthRecent returns the latest closed bucket's health fields for the -// given link, ignoring buckets where provisioning=true. -func (c *ClickHouseClient) LinkHealthRecent(ctx context.Context, linkPubkey string) (bool, float64, float64, bool, error) { +// LinkHealthRecent returns the latest bucket's health fields for the given +// link, ignoring buckets where provisioning=true. Multiple rows for the same +// bucket are disambiguated by selecting the most recently ingested row. +func (c *ClickHouseClient) LinkHealthRecent(ctx context.Context, linkPubkey string) (LinkHealthRecentResult, bool, error) { query := fmt.Sprintf( - `SELECT isis_down, a_loss_pct, z_loss_pct + `SELECT bucket_ts, isis_down, a_loss_pct, z_loss_pct FROM "%s".link_rollup_5m WHERE link_pk = ? AND provisioning = false @@ -137,46 +159,52 @@ func (c *ClickHouseClient) LinkHealthRecent(ctx context.Context, linkPubkey stri c.db, ) - var ( - isisDown bool - aLossPct float64 - zLossPct float64 - ) - err := c.conn.QueryRow(ctx, query, linkPubkey).Scan(&isisDown, &aLossPct, &zLossPct) + var r LinkHealthRecentResult + err := c.conn.QueryRow(ctx, query, linkPubkey).Scan(&r.BucketTs, &r.IsisDown, &r.ALossPct, &r.ZLossPct) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return false, 0, 0, false, nil + return LinkHealthRecentResult{}, false, nil } - return false, 0, 0, false, fmt.Errorf("clickhouse query: %w", err) + return LinkHealthRecentResult{}, false, fmt.Errorf("clickhouse query: %w", err) } - return isisDown, aLossPct, zLossPct, true, nil + return r, true, nil } -// LinkHealthWindowAllClean returns true when every non-provisioning bucket for -// the given link in [start, end] is clean. The "any bad row" sentinel pattern -// keeps the query result a single integer regardless of window length. -func (c *ClickHouseClient) LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (bool, bool, error) { +// LinkHealthWindowAllClean returns whether every distinct bucket for the given +// link in [start, end] is clean. The inner query deduplicates late-arriving +// rows by selecting the latest ingested_at per bucket; the outer aggregate +// counts distinct dirty buckets without streaming rows. +func (c *ClickHouseClient) LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (LinkHealthWindowResult, bool, error) { query := fmt.Sprintf( `SELECT countIf(isis_down = true OR a_loss_pct > ? OR z_loss_pct > ?) AS bad_buckets, count() AS total_buckets - FROM "%s".link_rollup_5m - WHERE link_pk = ? - AND bucket_ts >= ? - AND bucket_ts <= ? - AND provisioning = false`, + FROM ( + SELECT + bucket_ts, + argMax(isis_down, ingested_at) AS isis_down, + argMax(a_loss_pct, ingested_at) AS a_loss_pct, + argMax(z_loss_pct, ingested_at) AS z_loss_pct + FROM "%s".link_rollup_5m + WHERE link_pk = ? + AND bucket_ts >= ? + AND bucket_ts <= ? + AND provisioning = false + GROUP BY bucket_ts + )`, c.db, ) - var bad, total uint64 - err := c.conn.QueryRow(ctx, query, lossThreshold, lossThreshold, linkPubkey, start, end).Scan(&bad, &total) + var r LinkHealthWindowResult + err := c.conn.QueryRow(ctx, query, lossThreshold, lossThreshold, linkPubkey, start, end).Scan(&r.Bad, &r.Total) if err != nil { - return false, false, fmt.Errorf("clickhouse query: %w", err) + return LinkHealthWindowResult{}, false, fmt.Errorf("clickhouse query: %w", err) } - if total == 0 { - return false, false, nil + if r.Total == 0 { + return LinkHealthWindowResult{}, false, nil } - return bad == 0, true, nil + r.AllClean = r.Bad == 0 + return r, true, nil } func (c *ClickHouseClient) Close() error { diff --git a/controlplane/device-health-oracle/internal/worker/clickhouse_test.go b/controlplane/device-health-oracle/internal/worker/clickhouse_test.go index cb684a31ff..5cf425763c 100644 --- a/controlplane/device-health-oracle/internal/worker/clickhouse_test.go +++ b/controlplane/device-health-oracle/internal/worker/clickhouse_test.go @@ -169,19 +169,22 @@ func TestInterfaceCountersCoverage_QuotesDatabaseName(t *testing.T) { } func TestLinkHealthRecent_ReturnsLatestBucket(t *testing.T) { + bucketTs := time.Date(2026, 5, 6, 16, 10, 0, 0, time.UTC) conn := &mockConn{ queryRowFunc: func(_ context.Context, query string, args ...any) driver.Row { assert.Contains(t, query, `"testdb".link_rollup_5m`) assert.Contains(t, query, "provisioning = false") + assert.Contains(t, query, "bucket_ts") assert.Contains(t, query, "ORDER BY bucket_ts DESC, ingested_at DESC") assert.Contains(t, query, "LIMIT 1") assert.Len(t, args, 1) assert.Equal(t, "linkABC", args[0]) return &mockRow{ scanFunc: func(dest ...any) error { - *(dest[0].(*bool)) = true - *(dest[1].(*float64)) = 12.5 - *(dest[2].(*float64)) = 0.0 + *(dest[0].(*time.Time)) = bucketTs + *(dest[1].(*bool)) = true + *(dest[2].(*float64)) = 12.5 + *(dest[3].(*float64)) = 0.0 return nil }, } @@ -189,12 +192,13 @@ func TestLinkHealthRecent_ReturnsLatestBucket(t *testing.T) { } client := &ClickHouseClient{conn: conn, db: "testdb"} - isisDown, aLoss, zLoss, found, err := client.LinkHealthRecent(context.Background(), "linkABC") + r, found, err := client.LinkHealthRecent(context.Background(), "linkABC") require.NoError(t, err) assert.True(t, found) - assert.True(t, isisDown) - assert.Equal(t, 12.5, aLoss) - assert.Equal(t, 0.0, zLoss) + assert.Equal(t, bucketTs, r.BucketTs) + assert.True(t, r.IsisDown) + assert.Equal(t, 12.5, r.ALossPct) + assert.Equal(t, 0.0, r.ZLossPct) } func TestLinkHealthRecent_NoData_ReturnsFoundFalse(t *testing.T) { @@ -209,7 +213,7 @@ func TestLinkHealthRecent_NoData_ReturnsFoundFalse(t *testing.T) { } client := &ClickHouseClient{conn: conn, db: "testdb"} - _, _, _, found, err := client.LinkHealthRecent(context.Background(), "linkABC") + _, found, err := client.LinkHealthRecent(context.Background(), "linkABC") require.NoError(t, err) assert.False(t, found) } @@ -226,7 +230,7 @@ func TestLinkHealthRecent_QueryError(t *testing.T) { } client := &ClickHouseClient{conn: conn, db: "testdb"} - _, _, _, _, err := client.LinkHealthRecent(context.Background(), "linkABC") + _, _, err := client.LinkHealthRecent(context.Background(), "linkABC") assert.ErrorContains(t, err, "connection reset") } @@ -236,16 +240,17 @@ func TestLinkHealthRecent_QuotesDatabaseName(t *testing.T) { assert.Contains(t, query, `"mainnet-beta".link_rollup_5m`) return &mockRow{ scanFunc: func(dest ...any) error { - *(dest[0].(*bool)) = false - *(dest[1].(*float64)) = 0 + *(dest[0].(*time.Time)) = time.Now() + *(dest[1].(*bool)) = false *(dest[2].(*float64)) = 0 + *(dest[3].(*float64)) = 0 return nil }, } }, } client := &ClickHouseClient{conn: conn, db: "mainnet-beta"} - _, _, _, found, err := client.LinkHealthRecent(context.Background(), "linkABC") + _, found, err := client.LinkHealthRecent(context.Background(), "linkABC") require.NoError(t, err) assert.True(t, found) } @@ -256,6 +261,9 @@ func TestLinkHealthWindowAllClean_AllClean(t *testing.T) { assert.Contains(t, query, `"testdb".link_rollup_5m`) assert.Contains(t, query, "provisioning = false") assert.Contains(t, query, "countIf") + // Inner subquery dedupes ingested_at duplicates per bucket via argMax. + assert.Contains(t, query, "argMax") + assert.Contains(t, query, "GROUP BY bucket_ts") // Args order: lossThreshold, lossThreshold, linkPubkey, start, end assert.Len(t, args, 5) assert.Equal(t, 5.0, args[0]) @@ -272,13 +280,15 @@ func TestLinkHealthWindowAllClean_AllClean(t *testing.T) { } client := &ClickHouseClient{conn: conn, db: "testdb"} - allClean, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC", + r, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC", time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2026, 1, 1, 1, 0, 0, 0, time.UTC), 5.0) require.NoError(t, err) assert.True(t, found) - assert.True(t, allClean) + assert.True(t, r.AllClean) + assert.Equal(t, uint64(0), r.Bad) + assert.Equal(t, uint64(6), r.Total) } func TestLinkHealthWindowAllClean_HasBadBuckets(t *testing.T) { @@ -294,11 +304,13 @@ func TestLinkHealthWindowAllClean_HasBadBuckets(t *testing.T) { }, } client := &ClickHouseClient{conn: conn, db: "testdb"} - allClean, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC", + r, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC", time.Now().Add(-1*time.Hour), time.Now(), 5.0) require.NoError(t, err) assert.True(t, found) - assert.False(t, allClean) + assert.False(t, r.AllClean) + assert.Equal(t, uint64(2), r.Bad) + assert.Equal(t, uint64(6), r.Total) } func TestLinkHealthWindowAllClean_NoBucketsInWindow(t *testing.T) { diff --git a/controlplane/device-health-oracle/internal/worker/criteria.go b/controlplane/device-health-oracle/internal/worker/criteria.go index 25074c5aae..5444d09dd6 100644 --- a/controlplane/device-health-oracle/internal/worker/criteria.go +++ b/controlplane/device-health-oracle/internal/worker/criteria.go @@ -187,8 +187,10 @@ func (e *LinkHealthEvaluator) checkAllLink(ctx context.Context, link serviceabil "code", link.Code, "criterion", c.Name(), "reason", reason) + MetricCriterionResults.WithLabelValues(c.Name(), "fail").Inc() return false } + MetricCriterionResults.WithLabelValues(c.Name(), "pass").Inc() } return true } diff --git a/controlplane/device-health-oracle/internal/worker/link_health.go b/controlplane/device-health-oracle/internal/worker/link_health.go index cade1fbf6e..6d773a3b5d 100644 --- a/controlplane/device-health-oracle/internal/worker/link_health.go +++ b/controlplane/device-health-oracle/internal/worker/link_health.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/gagliardetto/solana-go" "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" @@ -21,6 +22,12 @@ const ( LinkHealthModeRecovery ) +// linkHealthRecentMaxAge is how recent the latest rollup bucket must be for +// the impairment check to act on it. Anything older is treated as "no data" — +// neither demoting nor recovering on stale telemetry. Sized at 3× the 5-minute +// rollup cadence to absorb a single missed bucket plus an ingest delay. +const linkHealthRecentMaxAge = 15 * time.Minute + // LinkHealthCriterion evaluates link impairment from the link_rollup_5m table. // In ImpairmentMode it inspects the latest bucket; in RecoveryMode it requires // every bucket in the recovery window (resolved via LinkBurnIn) to be clean. @@ -67,7 +74,7 @@ func (c *LinkHealthCriterion) Check(ctx context.Context, link serviceability.Lin } func (c *LinkHealthCriterion) checkImpairment(ctx context.Context, link serviceability.Link, pubkey string) (bool, string) { - isisDown, aLossPct, zLossPct, found, err := c.checker.LinkHealthRecent(ctx, pubkey) + r, found, err := c.checker.LinkHealthRecent(ctx, pubkey) if err != nil { c.log.Error("Failed to query link health recent", "link", pubkey, "code", link.Code, "error", err) @@ -78,21 +85,34 @@ func (c *LinkHealthCriterion) checkImpairment(ctx context.Context, link servicea return true, "" } + // A stale latest bucket means the rollup pipeline is broken for this link; + // treat it like "no data" rather than acting on a frozen snapshot. Without + // this floor, a stuck pipeline at the moment of an ISIS flap would keep the + // link Impaired indefinitely even after the link recovered. + age := time.Since(r.BucketTs) + if age > linkHealthRecentMaxAge { + c.log.Debug("Link health latest bucket is stale; treating as no data", + "link", pubkey, "code", link.Code, + "bucketTs", r.BucketTs, "age", age, "maxAge", linkHealthRecentMaxAge) + return true, "" + } + c.log.Debug("Link health recent", "link", pubkey, "code", link.Code, - "isisDown", isisDown, - "aLossPct", aLossPct, - "zLossPct", zLossPct, + "bucketTs", r.BucketTs, + "isisDown", r.IsisDown, + "aLossPct", r.ALossPct, + "zLossPct", r.ZLossPct, "lossThreshold", c.lossThreshold) - if isisDown { - return false, "isis adjacency down" + if r.IsisDown { + return false, fmt.Sprintf("isis adjacency down (bucket=%s)", r.BucketTs.UTC().Format(time.RFC3339)) } - if aLossPct > c.lossThreshold { - return false, fmt.Sprintf("a-side loss %.2f%% > %.2f%%", aLossPct, c.lossThreshold) + if r.ALossPct > c.lossThreshold { + return false, fmt.Sprintf("a-side loss %.2f%% > %.2f%% (bucket=%s)", r.ALossPct, c.lossThreshold, r.BucketTs.UTC().Format(time.RFC3339)) } - if zLossPct > c.lossThreshold { - return false, fmt.Sprintf("z-side loss %.2f%% > %.2f%%", zLossPct, c.lossThreshold) + if r.ZLossPct > c.lossThreshold { + return false, fmt.Sprintf("z-side loss %.2f%% > %.2f%% (bucket=%s)", r.ZLossPct, c.lossThreshold, r.BucketTs.UTC().Format(time.RFC3339)) } return true, "" } @@ -109,7 +129,7 @@ func (c *LinkHealthCriterion) checkRecovery(ctx context.Context, link serviceabi return false, "recovery window not yet established" } - allClean, found, err := c.checker.LinkHealthWindowAllClean(ctx, pubkey, start, now, c.lossThreshold) + r, found, err := c.checker.LinkHealthWindowAllClean(ctx, pubkey, start, now, c.lossThreshold) if err != nil { c.log.Error("Failed to query link health recovery window", "link", pubkey, "code", link.Code, "error", err) @@ -121,14 +141,16 @@ func (c *LinkHealthCriterion) checkRecovery(ctx context.Context, link serviceabi c.log.Debug("Link health recovery window", "link", pubkey, "code", link.Code, - "allClean", allClean, + "allClean", r.AllClean, + "badBuckets", r.Bad, + "totalBuckets", r.Total, "start", start, "end", now, "expectedMinutes", expectedMinutes, "lossThreshold", c.lossThreshold) - if !allClean { - return false, "recovery window contains impaired buckets" + if !r.AllClean { + return false, fmt.Sprintf("recovery window has %d/%d impaired buckets", r.Bad, r.Total) } return true, "" } diff --git a/controlplane/device-health-oracle/internal/worker/link_health_test.go b/controlplane/device-health-oracle/internal/worker/link_health_test.go index e67e7b62e9..c109fa091c 100644 --- a/controlplane/device-health-oracle/internal/worker/link_health_test.go +++ b/controlplane/device-health-oracle/internal/worker/link_health_test.go @@ -11,18 +11,24 @@ import ( ) type mockLinkHealthChecker struct { - recentFunc func(ctx context.Context, linkPubkey string) (bool, float64, float64, bool, error) - windowFunc func(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (bool, bool, error) + recentFunc func(ctx context.Context, linkPubkey string) (LinkHealthRecentResult, bool, error) + windowFunc func(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (LinkHealthWindowResult, bool, error) } -func (m *mockLinkHealthChecker) LinkHealthRecent(ctx context.Context, linkPubkey string) (bool, float64, float64, bool, error) { +func (m *mockLinkHealthChecker) LinkHealthRecent(ctx context.Context, linkPubkey string) (LinkHealthRecentResult, bool, error) { return m.recentFunc(ctx, linkPubkey) } -func (m *mockLinkHealthChecker) LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (bool, bool, error) { +func (m *mockLinkHealthChecker) LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (LinkHealthWindowResult, bool, error) { return m.windowFunc(ctx, linkPubkey, start, end, lossThreshold) } +// freshBucket returns a bucket timestamp that's recent enough to pass the +// stale-data floor in checkImpairment. +func freshBucket() time.Time { + return time.Now().Add(-1 * time.Minute) +} + func TestLinkHealthCriterion_Name(t *testing.T) { imp := NewLinkHealthCriterion(LinkHealthModeImpairment, &mockLinkHealthChecker{}, 5.0, testLogger()) rec := NewLinkHealthCriterion(LinkHealthModeRecovery, &mockLinkHealthChecker{}, 5.0, testLogger()) @@ -32,8 +38,8 @@ func TestLinkHealthCriterion_Name(t *testing.T) { func TestLinkHealthCriterion_Impairment_NoData_Passes(t *testing.T) { checker := &mockLinkHealthChecker{ - recentFunc: func(_ context.Context, _ string) (bool, float64, float64, bool, error) { - return false, 0, 0, false, nil + recentFunc: func(_ context.Context, _ string) (LinkHealthRecentResult, bool, error) { + return LinkHealthRecentResult{}, false, nil }, } c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) @@ -43,10 +49,30 @@ func TestLinkHealthCriterion_Impairment_NoData_Passes(t *testing.T) { assert.True(t, passed, "no data must not flag a link as impaired") } +func TestLinkHealthCriterion_Impairment_StaleBucket_Passes(t *testing.T) { + // A latest bucket older than the recency floor signals a broken telemetry + // pipeline. Don't act on it — neither demote nor recover. + checker := &mockLinkHealthChecker{ + recentFunc: func(_ context.Context, _ string) (LinkHealthRecentResult, bool, error) { + return LinkHealthRecentResult{ + BucketTs: time.Now().Add(-1 * time.Hour), + IsisDown: true, + ALossPct: 100, + ZLossPct: 100, + }, true, nil + }, + } + c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) + + passed, _ := c.Check(context.Background(), serviceability.Link{}) + assert.True(t, passed, "stale bucket should be treated as no data even when it indicates impairment") +} + func TestLinkHealthCriterion_Impairment_IsisDown_Fails(t *testing.T) { + bucket := freshBucket() checker := &mockLinkHealthChecker{ - recentFunc: func(_ context.Context, _ string) (bool, float64, float64, bool, error) { - return true, 0, 0, true, nil + recentFunc: func(_ context.Context, _ string) (LinkHealthRecentResult, bool, error) { + return LinkHealthRecentResult{BucketTs: bucket, IsisDown: true}, true, nil }, } c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) @@ -54,6 +80,7 @@ func TestLinkHealthCriterion_Impairment_IsisDown_Fails(t *testing.T) { passed, reason := c.Check(context.Background(), serviceability.Link{}) assert.False(t, passed) assert.Contains(t, reason, "isis") + assert.Contains(t, reason, "bucket=") } func TestLinkHealthCriterion_Impairment_LossExceedsThreshold(t *testing.T) { @@ -73,8 +100,12 @@ func TestLinkHealthCriterion_Impairment_LossExceedsThreshold(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { checker := &mockLinkHealthChecker{ - recentFunc: func(_ context.Context, _ string) (bool, float64, float64, bool, error) { - return false, tt.aLoss, tt.zLoss, true, nil + recentFunc: func(_ context.Context, _ string) (LinkHealthRecentResult, bool, error) { + return LinkHealthRecentResult{ + BucketTs: freshBucket(), + ALossPct: tt.aLoss, + ZLossPct: tt.zLoss, + }, true, nil }, } c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) @@ -86,8 +117,8 @@ func TestLinkHealthCriterion_Impairment_LossExceedsThreshold(t *testing.T) { func TestLinkHealthCriterion_Impairment_QueryError_Fails(t *testing.T) { checker := &mockLinkHealthChecker{ - recentFunc: func(_ context.Context, _ string) (bool, float64, float64, bool, error) { - return false, 0, 0, false, errors.New("connection reset") + recentFunc: func(_ context.Context, _ string) (LinkHealthRecentResult, bool, error) { + return LinkHealthRecentResult{}, false, errors.New("connection reset") }, } c := NewLinkHealthCriterion(LinkHealthModeImpairment, checker, 5.0, testLogger()) @@ -124,8 +155,8 @@ func TestLinkHealthCriterion_Recovery_AllClean_Passes(t *testing.T) { Now: now, }) checker := &mockLinkHealthChecker{ - windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (bool, bool, error) { - return true, true, nil + windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (LinkHealthWindowResult, bool, error) { + return LinkHealthWindowResult{Bad: 0, Total: 6, AllClean: true}, true, nil }, } c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, 5.0, testLogger()) @@ -141,27 +172,26 @@ func TestLinkHealthCriterion_Recovery_NotAllClean_Fails(t *testing.T) { Now: now, }) checker := &mockLinkHealthChecker{ - windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (bool, bool, error) { - return false, true, nil + windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (LinkHealthWindowResult, bool, error) { + return LinkHealthWindowResult{Bad: 2, Total: 6, AllClean: false}, true, nil }, } c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, 5.0, testLogger()) passed, reason := c.Check(ctx, serviceability.Link{}) assert.False(t, passed) - assert.Contains(t, reason, "impaired buckets") + assert.Contains(t, reason, "2/6") } func TestLinkHealthCriterion_Recovery_NoData_Fails(t *testing.T) { - // No telemetry in window means we can't conclude the link has been clean. now := time.Now() ctx := ContextWithBurnInTimes(context.Background(), BurnInTimes{ DrainedStart: now.Add(-30 * time.Minute), Now: now, }) checker := &mockLinkHealthChecker{ - windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (bool, bool, error) { - return false, false, nil + windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (LinkHealthWindowResult, bool, error) { + return LinkHealthWindowResult{}, false, nil }, } c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, 5.0, testLogger()) @@ -178,8 +208,8 @@ func TestLinkHealthCriterion_Recovery_QueryError_Fails(t *testing.T) { Now: now, }) checker := &mockLinkHealthChecker{ - windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (bool, bool, error) { - return false, false, errors.New("boom") + windowFunc: func(_ context.Context, _ string, _, _ time.Time, _ float64) (LinkHealthWindowResult, bool, error) { + return LinkHealthWindowResult{}, false, errors.New("boom") }, } c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, 5.0, testLogger()) @@ -198,9 +228,9 @@ func TestLinkHealthCriterion_Recovery_PassesThresholdToChecker(t *testing.T) { const threshold = 7.5 var observed float64 checker := &mockLinkHealthChecker{ - windowFunc: func(_ context.Context, _ string, _, _ time.Time, lossThreshold float64) (bool, bool, error) { + windowFunc: func(_ context.Context, _ string, _, _ time.Time, lossThreshold float64) (LinkHealthWindowResult, bool, error) { observed = lossThreshold - return true, true, nil + return LinkHealthWindowResult{AllClean: true, Total: 6}, true, nil }, } c := NewLinkHealthCriterion(LinkHealthModeRecovery, checker, threshold, testLogger())