Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file.

### Changes

- DeviceHealthOracle
- Detect impaired links from the `link_rollup_5m` ClickHouse table (`isis_down=true` or `a_loss_pct`/`z_loss_pct` above a configurable threshold) and write `LinkHealth = Impaired` onchain. Recover to `ReadyForService` only after every bucket in the recovery window (reuses `--drained-slot-count`) is clean — this asymmetry surfaces real impairment quickly while preventing borderline links from flapping. New flag `--link-loss-threshold` (default `5.0`). `LinkHealth` is reported as a signal only — the serviceability program does not gate `link.status` on it ([#2652](https://github.com/malbeclabs/doublezero/issues/2652))
- Smartcontract
- Migrate read callers in the CLI, sentinel, client, controlplane admin, and Rust SDK topology helper to read interfaces from `Device::new_interfaces` instead of the legacy `interfaces` enum vec, and adopt the `Device::find_interface` signature that returns `&NewInterface`. The legacy `interfaces` slot is still written on-disk via the per-write V2 projection from #3667; this PR only migrates reads. The temporary `Device::find_interface_legacy` helper is retained for the smartcontract program processors, which migrate in a later issue. Activator is intentionally excluded — it is deprecated ([#3659](https://github.com/malbeclabs/doublezero/issues/3659))
- Activator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"log/slog"
"math"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -44,6 +45,7 @@ var (
slackWebhookURL = flag.String("slack-webhook-url", "", "The Slack webhook URL to send alerts")
provisioningSlotCount = flag.Uint64("provisioning-slot-count", defaultProvisioningSlotCount, "Burn-in slot count for new devices/links (~20 hours at 200000)")
drainedSlotCount = flag.Uint64("drained-slot-count", defaultDrainedSlotCount, "Burn-in slot count for reactivated devices/links (~30 min at 5000)")
linkLossThreshold = flag.Float64("link-loss-threshold", 5.0, "Per-direction packet loss percentage above which a link is considered impaired (link_rollup_5m a_loss_pct/z_loss_pct)")
version = "dev"
commit = "none"
date = "unknown"
Expand All @@ -57,6 +59,14 @@ func main() {
os.Exit(0)
}

// Reject obviously broken thresholds early. A negative or NaN threshold would
// silently flag every link as impaired (or never), triggering an onchain
// write storm or hiding real failures — better to fail fast at startup.
if math.IsNaN(*linkLossThreshold) || math.IsInf(*linkLossThreshold, 0) || *linkLossThreshold < 0 || *linkLossThreshold > 100 {
fmt.Fprintf(os.Stderr, "invalid --link-loss-threshold %v: must be in [0, 100]\n", *linkLossThreshold)
os.Exit(1)
}

logLevel := slog.LevelInfo
if *verbose {
logLevel = slog.LevelDebug
Expand Down Expand Up @@ -131,6 +141,8 @@ func main() {

// Initialize ClickHouse-dependent criteria.
var deviceCriteria []worker.DeviceCriterion
var linkImpairmentCriteria []worker.LinkCriterion
var linkRecoveryCriteria []worker.LinkCriterion
if chAddr := os.Getenv("CLICKHOUSE_ADDR"); chAddr != "" {
chDB := os.Getenv("CLICKHOUSE_DB")
if chDB == "" {
Expand All @@ -148,10 +160,14 @@ func main() {
log.Warn("ClickHouse connection failed, continuing without ClickHouse-based criteria", "addr", chAddr, "error", err)
} else {
defer chClient.Close()
log.Info("ClickHouse enabled", "addr", chAddr, "db", chDB, "user", chUser, "tls", !chTLSDisabled)
log.Info("ClickHouse enabled", "addr", chAddr, "db", chDB, "user", chUser, "tls", !chTLSDisabled, "linkLossThreshold", *linkLossThreshold)
controllerSuccess := worker.NewControllerSuccessCriterion(chClient, log)
interfaceCounters := worker.NewInterfaceCountersCriterion(chClient, log)
deviceCriteria = append(deviceCriteria, controllerSuccess, interfaceCounters)
linkImpairmentCriteria = append(linkImpairmentCriteria,
worker.NewLinkHealthCriterion(worker.LinkHealthModeImpairment, chClient, *linkLossThreshold, log))
linkRecoveryCriteria = append(linkRecoveryCriteria,
worker.NewLinkHealthCriterion(worker.LinkHealthModeRecovery, chClient, *linkLossThreshold, log))
}
} else {
log.Error("ClickHouse disabled (CLICKHOUSE_ADDR not set), no ClickHouse-based criteria")
Expand All @@ -164,6 +180,8 @@ func main() {
}
linkEvaluator := &worker.LinkHealthEvaluator{
ReadyForServiceCriteria: nil,
ImpairmentCriteria: linkImpairmentCriteria,
RecoveryCriteria: linkRecoveryCriteria,
Log: log,
}

Expand Down
101 changes: 101 additions & 0 deletions controlplane/device-health-oracle/internal/worker/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package worker
import (
"context"
"crypto/tls"
"database/sql"
"errors"
"fmt"
"regexp"
"strings"
Expand Down Expand Up @@ -106,6 +108,105 @@ func (c *ClickHouseClient) InterfaceCountersCoverage(ctx context.Context, device
return int64(minutesWithRecords), nil
}

// LinkHealthRecentResult is the most recent rollup bucket's health fields plus
// the bucket timestamp, returned by LinkHealthChecker.LinkHealthRecent. The
// bucket timestamp lets callers enforce a recency floor (a stale bucket means
// the telemetry pipeline is broken — neither demote nor recover on stale data).
type LinkHealthRecentResult struct {
BucketTs time.Time
IsisDown bool
ALossPct float64
ZLossPct float64
}

// LinkHealthWindowResult summarises a recovery-window check. AllClean is true
// when every distinct bucket (deduplicated by latest ingested_at) is clean.
// Bad and Total are exposed for operator diagnostics.
type LinkHealthWindowResult struct {
Bad uint64
Total uint64
AllClean bool
}

// LinkHealthChecker queries ClickHouse for link health rollup records.
type LinkHealthChecker interface {
// LinkHealthRecent returns the most recent non-provisioning link_rollup_5m
// bucket for the given link. Multiple rows for the same bucket are
// disambiguated by ingested_at (most recently ingested wins). Returns
// found=false when there is no data for the link.
LinkHealthRecent(ctx context.Context, linkPubkey string) (result LinkHealthRecentResult, found bool, err error)

// LinkHealthWindowAllClean returns true if every distinct non-provisioning
// bucket in [start, end] is clean (isis_down=false AND a_loss_pct <=
// threshold AND z_loss_pct <= threshold). Late-arriving rows for the same
// bucket are deduplicated by ingested_at so a corrected re-write doesn't
// keep a link Impaired even after every distinct bucket reads as clean.
// Returns found=false when there are no buckets in the window for this link.
LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (result LinkHealthWindowResult, found bool, err error)
}

// LinkHealthRecent returns the latest bucket's health fields for the given
// link, ignoring buckets where provisioning=true. Multiple rows for the same
// bucket are disambiguated by selecting the most recently ingested row.
func (c *ClickHouseClient) LinkHealthRecent(ctx context.Context, linkPubkey string) (LinkHealthRecentResult, bool, error) {
query := fmt.Sprintf(
`SELECT bucket_ts, isis_down, a_loss_pct, z_loss_pct
FROM "%s".link_rollup_5m
WHERE link_pk = ?
AND provisioning = false
ORDER BY bucket_ts DESC, ingested_at DESC
LIMIT 1`,
c.db,
)

var r LinkHealthRecentResult
err := c.conn.QueryRow(ctx, query, linkPubkey).Scan(&r.BucketTs, &r.IsisDown, &r.ALossPct, &r.ZLossPct)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return LinkHealthRecentResult{}, false, nil
}
return LinkHealthRecentResult{}, false, fmt.Errorf("clickhouse query: %w", err)
}
return r, true, nil
}

// LinkHealthWindowAllClean returns whether every distinct bucket for the given
// link in [start, end] is clean. The inner query deduplicates late-arriving
// rows by selecting the latest ingested_at per bucket; the outer aggregate
// counts distinct dirty buckets without streaming rows.
func (c *ClickHouseClient) LinkHealthWindowAllClean(ctx context.Context, linkPubkey string, start, end time.Time, lossThreshold float64) (LinkHealthWindowResult, bool, error) {
query := fmt.Sprintf(
`SELECT
countIf(isis_down = true OR a_loss_pct > ? OR z_loss_pct > ?) AS bad_buckets,
count() AS total_buckets
FROM (
SELECT
bucket_ts,
argMax(isis_down, ingested_at) AS isis_down,
argMax(a_loss_pct, ingested_at) AS a_loss_pct,
argMax(z_loss_pct, ingested_at) AS z_loss_pct
FROM "%s".link_rollup_5m
WHERE link_pk = ?
AND bucket_ts >= ?
AND bucket_ts <= ?
AND provisioning = false
GROUP BY bucket_ts
)`,
c.db,
)

var r LinkHealthWindowResult
err := c.conn.QueryRow(ctx, query, lossThreshold, lossThreshold, linkPubkey, start, end).Scan(&r.Bad, &r.Total)
if err != nil {
return LinkHealthWindowResult{}, false, fmt.Errorf("clickhouse query: %w", err)
}
if r.Total == 0 {
return LinkHealthWindowResult{}, false, nil
}
r.AllClean = r.Bad == 0
return r, true, nil
}

func (c *ClickHouseClient) Close() error {
return c.conn.Close()
}
181 changes: 181 additions & 0 deletions controlplane/device-health-oracle/internal/worker/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package worker

import (
"context"
"database/sql"
"errors"
"testing"
"time"
Expand Down Expand Up @@ -167,6 +168,186 @@ func TestInterfaceCountersCoverage_QuotesDatabaseName(t *testing.T) {
assert.Equal(t, int64(0), minutes)
}

func TestLinkHealthRecent_ReturnsLatestBucket(t *testing.T) {
bucketTs := time.Date(2026, 5, 6, 16, 10, 0, 0, time.UTC)
conn := &mockConn{
queryRowFunc: func(_ context.Context, query string, args ...any) driver.Row {
assert.Contains(t, query, `"testdb".link_rollup_5m`)
assert.Contains(t, query, "provisioning = false")
assert.Contains(t, query, "bucket_ts")
assert.Contains(t, query, "ORDER BY bucket_ts DESC, ingested_at DESC")
assert.Contains(t, query, "LIMIT 1")
assert.Len(t, args, 1)
assert.Equal(t, "linkABC", args[0])
return &mockRow{
scanFunc: func(dest ...any) error {
*(dest[0].(*time.Time)) = bucketTs
*(dest[1].(*bool)) = true
*(dest[2].(*float64)) = 12.5
*(dest[3].(*float64)) = 0.0
return nil
},
}
},
}

client := &ClickHouseClient{conn: conn, db: "testdb"}
r, found, err := client.LinkHealthRecent(context.Background(), "linkABC")
require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, bucketTs, r.BucketTs)
assert.True(t, r.IsisDown)
assert.Equal(t, 12.5, r.ALossPct)
assert.Equal(t, 0.0, r.ZLossPct)
}

func TestLinkHealthRecent_NoData_ReturnsFoundFalse(t *testing.T) {
conn := &mockConn{
queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row {
return &mockRow{
scanFunc: func(_ ...any) error {
return sql.ErrNoRows
},
}
},
}

client := &ClickHouseClient{conn: conn, db: "testdb"}
_, found, err := client.LinkHealthRecent(context.Background(), "linkABC")
require.NoError(t, err)
assert.False(t, found)
}

func TestLinkHealthRecent_QueryError(t *testing.T) {
conn := &mockConn{
queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row {
return &mockRow{
scanFunc: func(_ ...any) error {
return errors.New("connection reset")
},
}
},
}

client := &ClickHouseClient{conn: conn, db: "testdb"}
_, _, err := client.LinkHealthRecent(context.Background(), "linkABC")
assert.ErrorContains(t, err, "connection reset")
}

func TestLinkHealthRecent_QuotesDatabaseName(t *testing.T) {
conn := &mockConn{
queryRowFunc: func(_ context.Context, query string, _ ...any) driver.Row {
assert.Contains(t, query, `"mainnet-beta".link_rollup_5m`)
return &mockRow{
scanFunc: func(dest ...any) error {
*(dest[0].(*time.Time)) = time.Now()
*(dest[1].(*bool)) = false
*(dest[2].(*float64)) = 0
*(dest[3].(*float64)) = 0
return nil
},
}
},
}
client := &ClickHouseClient{conn: conn, db: "mainnet-beta"}
_, found, err := client.LinkHealthRecent(context.Background(), "linkABC")
require.NoError(t, err)
assert.True(t, found)
}

func TestLinkHealthWindowAllClean_AllClean(t *testing.T) {
conn := &mockConn{
queryRowFunc: func(_ context.Context, query string, args ...any) driver.Row {
assert.Contains(t, query, `"testdb".link_rollup_5m`)
assert.Contains(t, query, "provisioning = false")
assert.Contains(t, query, "countIf")
// Inner subquery dedupes ingested_at duplicates per bucket via argMax.
assert.Contains(t, query, "argMax")
assert.Contains(t, query, "GROUP BY bucket_ts")
// Args order: lossThreshold, lossThreshold, linkPubkey, start, end
assert.Len(t, args, 5)
assert.Equal(t, 5.0, args[0])
assert.Equal(t, 5.0, args[1])
assert.Equal(t, "linkABC", args[2])
return &mockRow{
scanFunc: func(dest ...any) error {
*(dest[0].(*uint64)) = 0
*(dest[1].(*uint64)) = 6
return nil
},
}
},
}

client := &ClickHouseClient{conn: conn, db: "testdb"}
r, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC",
time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2026, 1, 1, 1, 0, 0, 0, time.UTC),
5.0)
require.NoError(t, err)
assert.True(t, found)
assert.True(t, r.AllClean)
assert.Equal(t, uint64(0), r.Bad)
assert.Equal(t, uint64(6), r.Total)
}

func TestLinkHealthWindowAllClean_HasBadBuckets(t *testing.T) {
conn := &mockConn{
queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row {
return &mockRow{
scanFunc: func(dest ...any) error {
*(dest[0].(*uint64)) = 2
*(dest[1].(*uint64)) = 6
return nil
},
}
},
}
client := &ClickHouseClient{conn: conn, db: "testdb"}
r, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC",
time.Now().Add(-1*time.Hour), time.Now(), 5.0)
require.NoError(t, err)
assert.True(t, found)
assert.False(t, r.AllClean)
assert.Equal(t, uint64(2), r.Bad)
assert.Equal(t, uint64(6), r.Total)
}

func TestLinkHealthWindowAllClean_NoBucketsInWindow(t *testing.T) {
conn := &mockConn{
queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row {
return &mockRow{
scanFunc: func(dest ...any) error {
*(dest[0].(*uint64)) = 0
*(dest[1].(*uint64)) = 0
return nil
},
}
},
}
client := &ClickHouseClient{conn: conn, db: "testdb"}
_, found, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC",
time.Now().Add(-1*time.Hour), time.Now(), 5.0)
require.NoError(t, err)
assert.False(t, found)
}

func TestLinkHealthWindowAllClean_QueryError(t *testing.T) {
conn := &mockConn{
queryRowFunc: func(_ context.Context, _ string, _ ...any) driver.Row {
return &mockRow{
scanFunc: func(_ ...any) error {
return errors.New("connection reset")
},
}
},
}
client := &ClickHouseClient{conn: conn, db: "testdb"}
_, _, err := client.LinkHealthWindowAllClean(context.Background(), "linkABC",
time.Now().Add(-1*time.Hour), time.Now(), 5.0)
assert.ErrorContains(t, err, "connection reset")
}

func TestNewClickHouseClient_StripsScheme(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading
Loading