Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/api.zig
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,27 @@ fn handleHealth(ctx: *Context) HttpResponse {

fn handleMetrics(ctx: *Context) HttpResponse {
const m = ctx.metrics orelse return plainResponse(200, "nullboiler_metrics_disabled 1\n");
const body = m.renderPrometheus(ctx.allocator) catch return plainResponse(500, "nullboiler_metrics_render_error 1\n");
const sample = computeGaugeSample(ctx);
const body = m.renderPrometheusWithSample(ctx.allocator, sample) catch return plainResponse(500, "nullboiler_metrics_render_error 1\n");
return plainResponse(200, body);
}

/// Sample live "right now" values for the gauge metrics. Each helper falls
/// back to 0 on error so a flaky DB query never poisons the entire /metrics
/// response — the counter half stays valid.
fn computeGaugeSample(ctx: *Context) metrics_mod.Metrics.Sample {
const drain_value: i64 = if (ctx.drain_mode) |d|
(if (d.load(.acquire)) @as(i64, 1) else @as(i64, 0))
else
0;
return .{
.runs_in_flight = ctx.store.countRunsInFlight() catch 0,
.steps_in_flight = ctx.store.countStepsRunning() catch 0,
.workers_healthy = ctx.store.countWorkersByStatus("active") catch 0,
.drain_mode = drain_value,
};
}

fn handleEnableDrain(ctx: *Context) HttpResponse {
const drain = ctx.drain_mode orelse {
return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"drain mode is not configured\"}}");
Expand Down
27 changes: 27 additions & 0 deletions src/metrics.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,22 @@ pub const Metrics = struct {
_ = counter.fetchAdd(1, .monotonic);
}

/// Snapshot of live "right now" values, sampled from the source-of-truth
/// state at Prometheus scrape time (DB and the drain atomic). Sampled —
/// not maintained via inc/dec wiring across engine/store hot-paths —
/// because the gauge is then accurate by construction.
pub const Sample = struct {
runs_in_flight: i64 = 0,
steps_in_flight: i64 = 0,
workers_healthy: i64 = 0,
drain_mode: i64 = 0,
};

pub fn renderPrometheus(self: *const Metrics, allocator: std.mem.Allocator) ![]const u8 {
return self.renderPrometheusWithSample(allocator, .{});
}

pub fn renderPrometheusWithSample(self: *const Metrics, allocator: std.mem.Allocator, sample: Sample) ![]const u8 {
return std.fmt.allocPrint(
allocator,
\\# TYPE nullboiler_http_requests_total counter
Expand All @@ -42,6 +57,14 @@ pub const Metrics = struct {
\\nullboiler_callback_sent_total {d}
\\# TYPE nullboiler_callback_failed_total counter
\\nullboiler_callback_failed_total {d}
\\# TYPE nullboiler_runs_in_flight gauge
\\nullboiler_runs_in_flight {d}
\\# TYPE nullboiler_steps_in_flight gauge
\\nullboiler_steps_in_flight {d}
\\# TYPE nullboiler_workers_healthy gauge
\\nullboiler_workers_healthy {d}
\\# TYPE nullboiler_drain_mode gauge
\\nullboiler_drain_mode {d}
\\
,
.{
Expand All @@ -56,6 +79,10 @@ pub const Metrics = struct {
self.worker_health_failures_total.load(.monotonic),
self.callback_sent_total.load(.monotonic),
self.callback_failed_total.load(.monotonic),
sample.runs_in_flight,
sample.steps_in_flight,
sample.workers_healthy,
sample.drain_mode,
},
);
}
Expand Down
34 changes: 34 additions & 0 deletions src/store.zig
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,40 @@ pub const Store = struct {
return colInt(stmt, 0);
}

pub fn countRunsInFlight(self: *Self) !i64 {
const sql = "SELECT COUNT(*) FROM runs WHERE status IN ('running', 'pending')";
var stmt: ?*c.sqlite3_stmt = null;
if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) {
return error.SqlitePrepareFailed;
}
defer _ = c.sqlite3_finalize(stmt);
if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return 0;
return colInt(stmt, 0);
}

pub fn countStepsRunning(self: *Self) !i64 {
const sql = "SELECT COUNT(*) FROM steps WHERE status = 'running'";
var stmt: ?*c.sqlite3_stmt = null;
if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) {
return error.SqlitePrepareFailed;
}
defer _ = c.sqlite3_finalize(stmt);
if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return 0;
return colInt(stmt, 0);
}

pub fn countWorkersByStatus(self: *Self, status: []const u8) !i64 {
const sql = "SELECT COUNT(*) FROM workers WHERE status = ?";
var stmt: ?*c.sqlite3_stmt = null;
if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) {
return error.SqlitePrepareFailed;
}
defer _ = c.sqlite3_finalize(stmt);
_ = c.sqlite3_bind_text(stmt, 1, status.ptr, @intCast(status.len), SQLITE_STATIC);
if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return 0;
return colInt(stmt, 0);
}

pub fn getChildSteps(self: *Self, allocator: std.mem.Allocator, parent_step_id: []const u8) ![]types.StepRow {
const sql = "SELECT id, run_id, def_step_id, type, status, worker_id, input_json, output_json, error_text, attempt, max_attempts, timeout_ms, next_attempt_at_ms, parent_step_id, item_index, created_at_ms, updated_at_ms, started_at_ms, ended_at_ms, child_run_id, iteration_index FROM steps WHERE parent_step_id = ? ORDER BY item_index ASC";
var stmt: ?*c.sqlite3_stmt = null;
Expand Down