From aa85ecb8df2d37c9f6899ce949a95becf43f8928 Mon Sep 17 00:00:00 2001 From: Kures <14836932+Kures@users.noreply.github.com> Date: Fri, 8 May 2026 18:58:09 +0300 Subject: [PATCH] feat(observability): add live-state gauges to /metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds four gauge metrics derived from the source-of-truth state at Prometheus scrape time, complementing the existing 11 cumulative counters: nullboiler_runs_in_flight runs in 'running' or 'pending' state nullboiler_steps_in_flight steps currently 'running' nullboiler_workers_healthy workers in 'active' state nullboiler_drain_mode 1 when drain is active, 0 otherwise Implementation samples the SoT — DB COUNT(*) queries and the existing drain atomic — at exposition time rather than maintaining inc/dec wiring across the engine and store hot-paths. Gauge values are accurate by construction and the patch makes zero changes to business-logic call sites. Files touched: src/metrics.zig - new Metrics.Sample struct with the four gauge values - new renderPrometheusWithSample(allocator, sample) that emits the 11 counters followed by 4 # TYPE ... gauge lines - existing renderPrometheus(allocator) preserved as a thin wrapper so all 340 existing tests continue to pass unchanged src/store.zig - countRunsInFlight(): 'SELECT COUNT(*) FROM runs WHERE status IN (running, pending)' - countStepsRunning(): 'SELECT COUNT(*) FROM steps WHERE status = running' - countWorkersByStatus(status): generic, used here with 'active' matches the existing prepare/finalize idiom used by countStepsByStatus and countRunningStepsByWorker in the same file src/api.zig - handleMetrics now calls computeGaugeSample(ctx) and passes the result into renderPrometheusWithSample - computeGaugeSample falls back to 0 on any DB error so a flaky query never poisons the entire /metrics response — the counter half stays valid Validation: zig build test --summary all Build Summary: 9/9 steps succeeded; 340/340 tests passed Live smoke test against a running binary built from this branch is blocked by an unrelated regression on main (filed separately): every HTTP response is dropped before reaching the client because writer flush succeeds but bytes never leave userspace. The published ghcr.io/nullclaw/nullboiler:2026.3.2 image (built before that regression) does not yet expose these gauges, so end-to-end smoke will be possible after the upstream HTTP fix lands. Closes the live-state half of P1-03 alongside the dashboard updates on feat/grafana-dashboards. --- src/api.zig | 19 ++++++++++++++++++- src/metrics.zig | 27 +++++++++++++++++++++++++++ src/store.zig | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/src/api.zig b/src/api.zig index 221e023..0657bac 100644 --- a/src/api.zig +++ b/src/api.zig @@ -282,10 +282,27 @@ fn handleHealth(ctx: *Context) HttpResponse { fn handleMetrics(ctx: *Context) HttpResponse { const m = ctx.metrics orelse return plainResponse(200, "nullboiler_metrics_disabled 1\n"); - const body = m.renderPrometheus(ctx.allocator) catch return plainResponse(500, "nullboiler_metrics_render_error 1\n"); + const sample = computeGaugeSample(ctx); + const body = m.renderPrometheusWithSample(ctx.allocator, sample) catch return plainResponse(500, "nullboiler_metrics_render_error 1\n"); return plainResponse(200, body); } +/// Sample live "right now" values for the gauge metrics. Each helper falls +/// back to 0 on error so a flaky DB query never poisons the entire /metrics +/// response — the counter half stays valid. +fn computeGaugeSample(ctx: *Context) metrics_mod.Metrics.Sample { + const drain_value: i64 = if (ctx.drain_mode) |d| + (if (d.load(.acquire)) @as(i64, 1) else @as(i64, 0)) + else + 0; + return .{ + .runs_in_flight = ctx.store.countRunsInFlight() catch 0, + .steps_in_flight = ctx.store.countStepsRunning() catch 0, + .workers_healthy = ctx.store.countWorkersByStatus("active") catch 0, + .drain_mode = drain_value, + }; +} + fn handleEnableDrain(ctx: *Context) HttpResponse { const drain = ctx.drain_mode orelse { return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"drain mode is not configured\"}}"); diff --git a/src/metrics.zig b/src/metrics.zig index 25aa02b..55ab7d0 100644 --- a/src/metrics.zig +++ b/src/metrics.zig @@ -17,7 +17,22 @@ pub const Metrics = struct { _ = counter.fetchAdd(1, .monotonic); } + /// Snapshot of live "right now" values, sampled from the source-of-truth + /// state at Prometheus scrape time (DB and the drain atomic). Sampled — + /// not maintained via inc/dec wiring across engine/store hot-paths — + /// because the gauge is then accurate by construction. + pub const Sample = struct { + runs_in_flight: i64 = 0, + steps_in_flight: i64 = 0, + workers_healthy: i64 = 0, + drain_mode: i64 = 0, + }; + pub fn renderPrometheus(self: *const Metrics, allocator: std.mem.Allocator) ![]const u8 { + return self.renderPrometheusWithSample(allocator, .{}); + } + + pub fn renderPrometheusWithSample(self: *const Metrics, allocator: std.mem.Allocator, sample: Sample) ![]const u8 { return std.fmt.allocPrint( allocator, \\# TYPE nullboiler_http_requests_total counter @@ -42,6 +57,14 @@ pub const Metrics = struct { \\nullboiler_callback_sent_total {d} \\# TYPE nullboiler_callback_failed_total counter \\nullboiler_callback_failed_total {d} + \\# TYPE nullboiler_runs_in_flight gauge + \\nullboiler_runs_in_flight {d} + \\# TYPE nullboiler_steps_in_flight gauge + \\nullboiler_steps_in_flight {d} + \\# TYPE nullboiler_workers_healthy gauge + \\nullboiler_workers_healthy {d} + \\# TYPE nullboiler_drain_mode gauge + \\nullboiler_drain_mode {d} \\ , .{ @@ -56,6 +79,10 @@ pub const Metrics = struct { self.worker_health_failures_total.load(.monotonic), self.callback_sent_total.load(.monotonic), self.callback_failed_total.load(.monotonic), + sample.runs_in_flight, + sample.steps_in_flight, + sample.workers_healthy, + sample.drain_mode, }, ); } diff --git a/src/store.zig b/src/store.zig index d9236e0..f5d3fb1 100644 --- a/src/store.zig +++ b/src/store.zig @@ -859,6 +859,40 @@ pub const Store = struct { return colInt(stmt, 0); } + pub fn countRunsInFlight(self: *Self) !i64 { + const sql = "SELECT COUNT(*) FROM runs WHERE status IN ('running', 'pending')"; + var stmt: ?*c.sqlite3_stmt = null; + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { + return error.SqlitePrepareFailed; + } + defer _ = c.sqlite3_finalize(stmt); + if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return 0; + return colInt(stmt, 0); + } + + pub fn countStepsRunning(self: *Self) !i64 { + const sql = "SELECT COUNT(*) FROM steps WHERE status = 'running'"; + var stmt: ?*c.sqlite3_stmt = null; + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { + return error.SqlitePrepareFailed; + } + defer _ = c.sqlite3_finalize(stmt); + if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return 0; + return colInt(stmt, 0); + } + + pub fn countWorkersByStatus(self: *Self, status: []const u8) !i64 { + const sql = "SELECT COUNT(*) FROM workers WHERE status = ?"; + var stmt: ?*c.sqlite3_stmt = null; + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { + return error.SqlitePrepareFailed; + } + defer _ = c.sqlite3_finalize(stmt); + _ = c.sqlite3_bind_text(stmt, 1, status.ptr, @intCast(status.len), SQLITE_STATIC); + if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return 0; + return colInt(stmt, 0); + } + pub fn getChildSteps(self: *Self, allocator: std.mem.Allocator, parent_step_id: []const u8) ![]types.StepRow { const sql = "SELECT id, run_id, def_step_id, type, status, worker_id, input_json, output_json, error_text, attempt, max_attempts, timeout_ms, next_attempt_at_ms, parent_step_id, item_index, created_at_ms, updated_at_ms, started_at_ms, ended_at_ms, child_run_id, iteration_index FROM steps WHERE parent_step_id = ? ORDER BY item_index ASC"; var stmt: ?*c.sqlite3_stmt = null;