From ca4f2fc21a2f5d960e41afdf68b0c29d7ad0b18c Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Thu, 23 Apr 2026 23:24:27 +0000 Subject: [PATCH 1/2] controller: add controller_agent_versions ClickHouse table Write agent and controller version info to a separate ReplacingMergeTree table keyed by device_pubkey. ClickHouse merges rows down to one per device; Lake queries with FINAL. The existing controller_grpc_getconfig_success table stays lean (timestamp + device_pubkey only) for event tracking. --- controlplane/controller/README.md | 23 ++++- .../controller/cmd/controller/main.go | 2 +- .../internal/controller/clickhouse.go | 99 +++++++++++++++++-- .../controller/internal/controller/server.go | 11 ++- 4 files changed, 120 insertions(+), 15 deletions(-) diff --git a/controlplane/controller/README.md b/controlplane/controller/README.md index 27dbc7b87a..8698281f58 100644 --- a/controlplane/controller/README.md +++ b/controlplane/controller/README.md @@ -26,11 +26,14 @@ CREATE USER controller_devnet IDENTIFIED BY 'your_password'; -- Grant permissions GRANT SELECT, INSERT, CREATE TABLE, SHOW COLUMNS ON devnet.controller_grpc_getconfig_success TO controller_devnet; +GRANT SELECT, INSERT, CREATE TABLE, SHOW COLUMNS ON devnet.controller_agent_versions TO controller_devnet; ``` -The controller will automatically create the `controller_grpc_getconfig_success` table on startup and batch-insert GetConfig events every 10 seconds. +The controller automatically creates both tables on startup and batch-inserts every 10 seconds. -#### Table Schema +#### Table Schemas + +**GetConfig events** — one row per successful GetConfig poll: ```sql CREATE TABLE devnet.controller_grpc_getconfig_success ( @@ -40,3 +43,19 @@ CREATE TABLE devnet.controller_grpc_getconfig_success ( PARTITION BY toYYYYMM(timestamp) ORDER BY (timestamp, device_pubkey) ``` + +**Agent versions** — latest agent and controller version per device. Uses `ReplacingMergeTree` so ClickHouse merges rows down to one per device; query with `FINAL` for deduplicated results: + +```sql +CREATE TABLE devnet.controller_agent_versions ( + device_pubkey LowCardinality(String), + updated_at DateTime64(3), + agent_version LowCardinality(String) DEFAULT '', + agent_commit LowCardinality(String) DEFAULT '', + agent_date LowCardinality(String) DEFAULT '', + controller_version LowCardinality(String) DEFAULT '', + controller_commit LowCardinality(String) DEFAULT '', + controller_date LowCardinality(String) DEFAULT '' +) ENGINE = ReplacingMergeTree(updated_at) +ORDER BY device_pubkey +``` diff --git a/controlplane/controller/cmd/controller/main.go b/controlplane/controller/cmd/controller/main.go index 059db3e56e..e8ae888112 100644 --- a/controlplane/controller/cmd/controller/main.go +++ b/controlplane/controller/cmd/controller/main.go @@ -238,7 +238,7 @@ func (c *ControllerCommand) Run() error { } chPass := os.Getenv("CLICKHOUSE_PASS") chTLSDisabled := os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true" - cw, err := controller.NewClickhouseWriter(log, chAddr, chDB, chUser, chPass, chTLSDisabled) + cw, err := controller.NewClickhouseWriter(log, chAddr, chDB, chUser, chPass, chTLSDisabled, version, commit, date) if err != nil { log.Warn("clickhouse connection failed, continuing without clickhouse", "addr", chAddr, "error", err) } else { diff --git a/controlplane/controller/internal/controller/clickhouse.go b/controlplane/controller/internal/controller/clickhouse.go index 3762c3cb83..15c55b6e54 100644 --- a/controlplane/controller/internal/controller/clickhouse.go +++ b/controlplane/controller/internal/controller/clickhouse.go @@ -17,13 +17,30 @@ type getConfigEvent struct { DevicePubkey string } +type versionEvent struct { + DevicePubkey string + UpdatedAt time.Time + AgentVersion string + AgentCommit string + AgentDate string + ControllerVersion string + ControllerCommit string + ControllerDate string +} + type ClickhouseWriter struct { conn clickhouse.Conn log *slog.Logger db string mu sync.Mutex events []getConfigEvent + versions []versionEvent consecutiveErrors int + + // Controller build info, stamped on every version event. + controllerVersion string + controllerCommit string + controllerDate string } // consecutiveErrorThreshold is the number of consecutive flush failures @@ -53,7 +70,7 @@ func buildClickhouseOptions(addr, db, user, pass string, disableTLS bool) *click return opts } -func NewClickhouseWriter(log *slog.Logger, addr, db, user, pass string, disableTLS bool) (*ClickhouseWriter, error) { +func NewClickhouseWriter(log *slog.Logger, addr, db, user, pass string, disableTLS bool, controllerVersion, controllerCommit, controllerDate string) (*ClickhouseWriter, error) { chOpts := buildClickhouseOptions(addr, db, user, pass, disableTLS) conn, err := clickhouse.Open(chOpts) if err != nil { @@ -63,13 +80,16 @@ func NewClickhouseWriter(log *slog.Logger, addr, db, user, pass string, disableT return nil, fmt.Errorf("error pinging clickhouse: %w", err) } return &ClickhouseWriter{ - conn: conn, - log: log, - db: db, + conn: conn, + log: log, + db: db, + controllerVersion: controllerVersion, + controllerCommit: controllerCommit, + controllerDate: controllerDate, }, nil } -func (cw *ClickhouseWriter) CreateTable(ctx context.Context) error { +func (cw *ClickhouseWriter) CreateTables(ctx context.Context) error { err := cw.conn.Exec(ctx, fmt.Sprintf(` CREATE TABLE IF NOT EXISTS "%s".controller_grpc_getconfig_success ( timestamp DateTime64(3), @@ -79,8 +99,26 @@ func (cw *ClickhouseWriter) CreateTable(ctx context.Context) error { ORDER BY (timestamp, device_pubkey) `, cw.db)) if err != nil { - return fmt.Errorf("error creating table: %w", err) + return fmt.Errorf("error creating getconfig table: %w", err) } + + err = cw.conn.Exec(ctx, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS "%s".controller_agent_versions ( + device_pubkey LowCardinality(String), + updated_at DateTime64(3), + agent_version LowCardinality(String) DEFAULT '', + agent_commit LowCardinality(String) DEFAULT '', + agent_date LowCardinality(String) DEFAULT '', + controller_version LowCardinality(String) DEFAULT '', + controller_commit LowCardinality(String) DEFAULT '', + controller_date LowCardinality(String) DEFAULT '' + ) ENGINE = ReplacingMergeTree(updated_at) + ORDER BY device_pubkey + `, cw.db)) + if err != nil { + return fmt.Errorf("error creating agent_versions table: %w", err) + } + return nil } @@ -90,6 +128,15 @@ func (cw *ClickhouseWriter) Record(event getConfigEvent) { cw.mu.Unlock() } +func (cw *ClickhouseWriter) RecordVersion(event versionEvent) { + event.ControllerVersion = cw.controllerVersion + event.ControllerCommit = cw.controllerCommit + event.ControllerDate = cw.controllerDate + cw.mu.Lock() + cw.versions = append(cw.versions, event) + cw.mu.Unlock() +} + func (cw *ClickhouseWriter) Run(ctx context.Context) { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -106,14 +153,21 @@ func (cw *ClickhouseWriter) Run(ctx context.Context) { func (cw *ClickhouseWriter) flush(ctx context.Context) { cw.mu.Lock() - if len(cw.events) == 0 { - cw.mu.Unlock() - return - } events := cw.events cw.events = nil + versions := cw.versions + cw.versions = nil cw.mu.Unlock() + if len(events) > 0 { + cw.flushEvents(ctx, events) + } + if len(versions) > 0 { + cw.flushVersions(ctx, versions) + } +} + +func (cw *ClickhouseWriter) flushEvents(ctx context.Context, events []getConfigEvent) { batch, err := cw.conn.PrepareBatch(ctx, fmt.Sprintf( `INSERT INTO "%s".controller_grpc_getconfig_success (timestamp, device_pubkey)`, cw.db, )) @@ -139,6 +193,31 @@ func (cw *ClickhouseWriter) flush(ctx context.Context) { cw.log.Debug("flushed getconfig events to clickhouse", "count", len(events)) } +func (cw *ClickhouseWriter) flushVersions(ctx context.Context, versions []versionEvent) { + batch, err := cw.conn.PrepareBatch(ctx, fmt.Sprintf( + `INSERT INTO "%s".controller_agent_versions (device_pubkey, updated_at, agent_version, agent_commit, agent_date, controller_version, controller_commit, controller_date)`, cw.db, + )) + if err != nil { + cw.recordFlushError("error preparing clickhouse versions batch", err) + return + } + for _, v := range versions { + if err := batch.Append(v.DevicePubkey, v.UpdatedAt, v.AgentVersion, v.AgentCommit, v.AgentDate, v.ControllerVersion, v.ControllerCommit, v.ControllerDate); err != nil { + cw.logFlushError("error appending to clickhouse versions batch", err) + } + } + if err := batch.Send(); err != nil { + _ = batch.Close() + cw.recordFlushError("error sending clickhouse versions batch", err) + return + } + if err := batch.Close(); err != nil { + cw.recordFlushError("error closing clickhouse versions batch", err) + return + } + cw.log.Debug("flushed version events to clickhouse", "count", len(versions)) +} + // recordFlushError increments the consecutive error counter and logs at the // appropriate level. Transient errors are logged as WARN; persistent errors // (exceeding consecutiveErrorThreshold) are logged as ERROR. diff --git a/controlplane/controller/internal/controller/server.go b/controlplane/controller/internal/controller/server.go index 07298c7877..2eb6e584eb 100644 --- a/controlplane/controller/internal/controller/server.go +++ b/controlplane/controller/internal/controller/server.go @@ -584,8 +584,8 @@ func (c *Controller) Run(ctx context.Context) error { }() if c.clickhouse != nil { - if err := c.clickhouse.CreateTable(ctx); err != nil { - c.log.Warn("error creating clickhouse table, continuing without clickhouse", "error", err) + if err := c.clickhouse.CreateTables(ctx); err != nil { + c.log.Warn("error creating clickhouse tables, continuing without clickhouse", "error", err) c.clickhouse = nil } else { go c.clickhouse.Run(ctx) @@ -824,6 +824,13 @@ func (c *Controller) GetConfig(ctx context.Context, req *pb.ConfigRequest) (*pb. Timestamp: reqStart, DevicePubkey: req.GetPubkey(), }) + c.clickhouse.RecordVersion(versionEvent{ + DevicePubkey: req.GetPubkey(), + UpdatedAt: reqStart, + AgentVersion: agentVersion, + AgentCommit: agentCommit, + AgentDate: agentDate, + }) } return resp, nil } From afbedc0eef9ff6d72fd6319f99f6fc12cff60d7c Mon Sep 17 00:00:00 2001 From: Nik Weidenbacher Date: Fri, 24 Apr 2026 00:45:30 +0000 Subject: [PATCH 2/2] fixup --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6882136ba3..510bdb904e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ All notable changes to this project will be documented in this file. - Filter devices by type-specific capacity during auto-selection so clients are not provisioned onto devices that have reached their unicast, multicast publisher, or multicast subscriber limits - Collector - fallback to any probe if anchor probes aren't available +- Device Controller + - store config agent version info in clickhouse - Smartcontract - Fix multicast group allowlist add/remove for AccessPasses created with `allow_multiple_ip=true`; the processors were rejecting requests with a real client IP because the stored IP is always `0.0.0.0` for these passes ([#3551](https://github.com/malbeclabs/doublezero/issues/3551)) - SDK now auto-detects the correct AccessPass PDA (static or dynamic) for allowlist operations based on whether an `allow_multiple_ip` pass exists