Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ All notable changes to this project will be documented in this file.
- Filter devices by type-specific capacity during auto-selection so clients are not provisioned onto devices that have reached their unicast, multicast publisher, or multicast subscriber limits
- Collector
- fallback to any probe if anchor probes aren't available
- Device Controller
- store config agent version info in clickhouse
- Smartcontract
- Fix multicast group allowlist add/remove for AccessPasses created with `allow_multiple_ip=true`; the processors were rejecting requests with a real client IP because the stored IP is always `0.0.0.0` for these passes ([#3551](https://github.com/malbeclabs/doublezero/issues/3551))
- SDK now auto-detects the correct AccessPass PDA (static or dynamic) for allowlist operations based on whether an `allow_multiple_ip` pass exists
Expand Down
23 changes: 21 additions & 2 deletions controlplane/controller/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ CREATE USER controller_devnet IDENTIFIED BY 'your_password';

-- Grant permissions
GRANT SELECT, INSERT, CREATE TABLE, SHOW COLUMNS ON devnet.controller_grpc_getconfig_success TO controller_devnet;
GRANT SELECT, INSERT, CREATE TABLE, SHOW COLUMNS ON devnet.controller_agent_versions TO controller_devnet;
```

The controller will automatically create the `controller_grpc_getconfig_success` table on startup and batch-insert GetConfig events every 10 seconds.
The controller automatically creates both tables on startup and batch-inserts every 10 seconds.

#### Table Schema
#### Table Schemas

**GetConfig events** — one row per successful GetConfig poll:

```sql
CREATE TABLE devnet.controller_grpc_getconfig_success (
Expand All @@ -40,3 +43,19 @@ CREATE TABLE devnet.controller_grpc_getconfig_success (
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, device_pubkey)
```

**Agent versions** — latest agent and controller version per device. Uses `ReplacingMergeTree` so ClickHouse merges rows down to one per device; query with `FINAL` for deduplicated results:

```sql
CREATE TABLE devnet.controller_agent_versions (
device_pubkey LowCardinality(String),
updated_at DateTime64(3),
agent_version LowCardinality(String) DEFAULT '',
agent_commit LowCardinality(String) DEFAULT '',
agent_date LowCardinality(String) DEFAULT '',
controller_version LowCardinality(String) DEFAULT '',
controller_commit LowCardinality(String) DEFAULT '',
controller_date LowCardinality(String) DEFAULT ''
) ENGINE = ReplacingMergeTree(updated_at)
ORDER BY device_pubkey
```
2 changes: 1 addition & 1 deletion controlplane/controller/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (c *ControllerCommand) Run() error {
}
chPass := os.Getenv("CLICKHOUSE_PASS")
chTLSDisabled := os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true"
cw, err := controller.NewClickhouseWriter(log, chAddr, chDB, chUser, chPass, chTLSDisabled)
cw, err := controller.NewClickhouseWriter(log, chAddr, chDB, chUser, chPass, chTLSDisabled, version, commit, date)
if err != nil {
log.Warn("clickhouse connection failed, continuing without clickhouse", "addr", chAddr, "error", err)
} else {
Expand Down
99 changes: 89 additions & 10 deletions controlplane/controller/internal/controller/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,30 @@ type getConfigEvent struct {
DevicePubkey string
}

type versionEvent struct {
DevicePubkey string
UpdatedAt time.Time
AgentVersion string
AgentCommit string
AgentDate string
ControllerVersion string
ControllerCommit string
ControllerDate string
}

type ClickhouseWriter struct {
conn clickhouse.Conn
log *slog.Logger
db string
mu sync.Mutex
events []getConfigEvent
versions []versionEvent
consecutiveErrors int

// Controller build info, stamped on every version event.
controllerVersion string
controllerCommit string
controllerDate string
}

// consecutiveErrorThreshold is the number of consecutive flush failures
Expand Down Expand Up @@ -53,7 +70,7 @@ func buildClickhouseOptions(addr, db, user, pass string, disableTLS bool) *click
return opts
}

func NewClickhouseWriter(log *slog.Logger, addr, db, user, pass string, disableTLS bool) (*ClickhouseWriter, error) {
func NewClickhouseWriter(log *slog.Logger, addr, db, user, pass string, disableTLS bool, controllerVersion, controllerCommit, controllerDate string) (*ClickhouseWriter, error) {
chOpts := buildClickhouseOptions(addr, db, user, pass, disableTLS)
conn, err := clickhouse.Open(chOpts)
if err != nil {
Expand All @@ -63,13 +80,16 @@ func NewClickhouseWriter(log *slog.Logger, addr, db, user, pass string, disableT
return nil, fmt.Errorf("error pinging clickhouse: %w", err)
}
return &ClickhouseWriter{
conn: conn,
log: log,
db: db,
conn: conn,
log: log,
db: db,
controllerVersion: controllerVersion,
controllerCommit: controllerCommit,
controllerDate: controllerDate,
}, nil
}

func (cw *ClickhouseWriter) CreateTable(ctx context.Context) error {
func (cw *ClickhouseWriter) CreateTables(ctx context.Context) error {
err := cw.conn.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS "%s".controller_grpc_getconfig_success (
timestamp DateTime64(3),
Expand All @@ -79,8 +99,26 @@ func (cw *ClickhouseWriter) CreateTable(ctx context.Context) error {
ORDER BY (timestamp, device_pubkey)
`, cw.db))
if err != nil {
return fmt.Errorf("error creating table: %w", err)
return fmt.Errorf("error creating getconfig table: %w", err)
}

err = cw.conn.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS "%s".controller_agent_versions (
device_pubkey LowCardinality(String),
updated_at DateTime64(3),
agent_version LowCardinality(String) DEFAULT '',
agent_commit LowCardinality(String) DEFAULT '',
agent_date LowCardinality(String) DEFAULT '',
controller_version LowCardinality(String) DEFAULT '',
controller_commit LowCardinality(String) DEFAULT '',
controller_date LowCardinality(String) DEFAULT ''
) ENGINE = ReplacingMergeTree(updated_at)
ORDER BY device_pubkey
`, cw.db))
if err != nil {
return fmt.Errorf("error creating agent_versions table: %w", err)
}

return nil
}

Expand All @@ -90,6 +128,15 @@ func (cw *ClickhouseWriter) Record(event getConfigEvent) {
cw.mu.Unlock()
}

func (cw *ClickhouseWriter) RecordVersion(event versionEvent) {
event.ControllerVersion = cw.controllerVersion
event.ControllerCommit = cw.controllerCommit
event.ControllerDate = cw.controllerDate
cw.mu.Lock()
cw.versions = append(cw.versions, event)
cw.mu.Unlock()
}

func (cw *ClickhouseWriter) Run(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand All @@ -106,14 +153,21 @@ func (cw *ClickhouseWriter) Run(ctx context.Context) {

func (cw *ClickhouseWriter) flush(ctx context.Context) {
cw.mu.Lock()
if len(cw.events) == 0 {
cw.mu.Unlock()
return
}
events := cw.events
cw.events = nil
versions := cw.versions
cw.versions = nil
cw.mu.Unlock()

if len(events) > 0 {
cw.flushEvents(ctx, events)
}
if len(versions) > 0 {
cw.flushVersions(ctx, versions)
}
}

func (cw *ClickhouseWriter) flushEvents(ctx context.Context, events []getConfigEvent) {
batch, err := cw.conn.PrepareBatch(ctx, fmt.Sprintf(
`INSERT INTO "%s".controller_grpc_getconfig_success (timestamp, device_pubkey)`, cw.db,
))
Expand All @@ -139,6 +193,31 @@ func (cw *ClickhouseWriter) flush(ctx context.Context) {
cw.log.Debug("flushed getconfig events to clickhouse", "count", len(events))
}

func (cw *ClickhouseWriter) flushVersions(ctx context.Context, versions []versionEvent) {
batch, err := cw.conn.PrepareBatch(ctx, fmt.Sprintf(
`INSERT INTO "%s".controller_agent_versions (device_pubkey, updated_at, agent_version, agent_commit, agent_date, controller_version, controller_commit, controller_date)`, cw.db,
))
if err != nil {
cw.recordFlushError("error preparing clickhouse versions batch", err)
return
}
for _, v := range versions {
if err := batch.Append(v.DevicePubkey, v.UpdatedAt, v.AgentVersion, v.AgentCommit, v.AgentDate, v.ControllerVersion, v.ControllerCommit, v.ControllerDate); err != nil {
cw.logFlushError("error appending to clickhouse versions batch", err)
}
}
if err := batch.Send(); err != nil {
_ = batch.Close()
cw.recordFlushError("error sending clickhouse versions batch", err)
return
}
if err := batch.Close(); err != nil {
cw.recordFlushError("error closing clickhouse versions batch", err)
return
}
cw.log.Debug("flushed version events to clickhouse", "count", len(versions))
}

// recordFlushError increments the consecutive error counter and logs at the
// appropriate level. Transient errors are logged as WARN; persistent errors
// (exceeding consecutiveErrorThreshold) are logged as ERROR.
Expand Down
11 changes: 9 additions & 2 deletions controlplane/controller/internal/controller/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ func (c *Controller) Run(ctx context.Context) error {
}()

if c.clickhouse != nil {
if err := c.clickhouse.CreateTable(ctx); err != nil {
c.log.Warn("error creating clickhouse table, continuing without clickhouse", "error", err)
if err := c.clickhouse.CreateTables(ctx); err != nil {
c.log.Warn("error creating clickhouse tables, continuing without clickhouse", "error", err)
c.clickhouse = nil
} else {
go c.clickhouse.Run(ctx)
Expand Down Expand Up @@ -824,6 +824,13 @@ func (c *Controller) GetConfig(ctx context.Context, req *pb.ConfigRequest) (*pb.
Timestamp: reqStart,
DevicePubkey: req.GetPubkey(),
})
c.clickhouse.RecordVersion(versionEvent{
DevicePubkey: req.GetPubkey(),
UpdatedAt: reqStart,
AgentVersion: agentVersion,
AgentCommit: agentCommit,
AgentDate: agentDate,
})
}
return resp, nil
}
Expand Down
Loading