Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/cluster-client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use mz_ore::cast::CastLossy;
use mz_ore::metric;
use mz_ore::metrics::{
CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry,
Rule,
};
use mz_ore::stats::SlidingMinMax;
use prometheus::core::{AtomicF64, AtomicU64};
Expand All @@ -37,16 +38,61 @@ impl ControllerMetrics {
help: "A summary of the second-by-second lag of the dataflow frontier relative \
to wallclock time, aggregated over the last minute.",
var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
rules: [
Rule::ClusterNameLookup {
cluster_id_label: "instance_id".into(),
output_label: "cluster_name".into(),
},
Rule::ReplicaNameLookup {
cluster_id_label: "instance_id".into(),
replica_id_label: "replica_id".into(),
output_label: "replica_name".into(),
},
Rule::ObjectNameLookup {
object_id_label: "collection_id".into(),
output_label: "collection_name".into(),
},
],
)),
dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_sum",
help: "The total sum of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
rules: [
Rule::ClusterNameLookup {
cluster_id_label: "instance_id".into(),
output_label: "cluster_name".into(),
},
Rule::ReplicaNameLookup {
cluster_id_label: "instance_id".into(),
replica_id_label: "replica_id".into(),
output_label: "replica_name".into(),
},
Rule::ObjectNameLookup {
object_id_label: "collection_id".into(),
output_label: "collection_name".into(),
},
],
)),
dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_count",
help: "The total count of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
rules: [
Rule::ClusterNameLookup {
cluster_id_label: "instance_id".into(),
output_label: "cluster_name".into(),
},
Rule::ReplicaNameLookup {
cluster_id_label: "instance_id".into(),
replica_id_label: "replica_id".into(),
output_label: "replica_name".into(),
},
Rule::ObjectNameLookup {
object_id_label: "collection_id".into(),
output_label: "collection_name".into(),
},
],
)),
}
}
Expand Down
49 changes: 46 additions & 3 deletions src/compute-client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use mz_ore::metric;
use mz_ore::metrics::raw::UIntGaugeVec;
use mz_ore::metrics::{
CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
IntCounterVec, MetricVecExt, MetricsRegistry,
IntCounterVec, MetricVecExt, MetricsRegistry, Rule,
};
use mz_ore::stats::histogram_seconds_buckets;
use mz_repr::GlobalId;
Expand Down Expand Up @@ -74,119 +74,162 @@ pub struct ComputeControllerMetrics {
shared: ControllerMetrics,
}

/// Rule for metrics that carry an `instance_id` label and want a resolved
/// `instance_name`. The `metric!` macro evaluates each entry in `rules: [...]`
/// per registration, so we factor the boilerplate behind this helper.
fn instance_name_rule() -> Rule {
Rule::ClusterNameLookup {
cluster_id_label: "instance_id".into(),
output_label: "instance_name".into(),
}
}

/// Rule for metrics that additionally carry a `replica_id` label and want
/// a resolved `replica_name`.
fn replica_name_rule() -> Rule {
Rule::ReplicaNameLookup {
cluster_id_label: "instance_id".into(),
replica_id_label: "replica_id".into(),
output_label: "replica_name".into(),
}
}

impl ComputeControllerMetrics {
/// Create a metrics instance registered into the given registry.
pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
ComputeControllerMetrics {
let metrics = ComputeControllerMetrics {
commands_total: metrics_registry.register(metric!(
name: "mz_compute_commands_total",
help: "The total number of compute commands sent.",
var_labels: ["instance_id", "replica_id", "command_type"],
rules: [instance_name_rule(), replica_name_rule()],
)),
command_message_bytes_total: metrics_registry.register(metric!(
name: "mz_compute_command_message_bytes_total",
help: "The total number of bytes sent in compute command messages.",
var_labels: ["instance_id", "replica_id"],
rules: [instance_name_rule(), replica_name_rule()],
)),
responses_total: metrics_registry.register(metric!(
name: "mz_compute_responses_total",
help: "The total number of compute responses sent.",
var_labels: ["instance_id", "replica_id", "response_type"],
rules: [instance_name_rule(), replica_name_rule()],
)),
response_message_bytes_total: metrics_registry.register(metric!(
name: "mz_compute_response_message_bytes_total",
help: "The total number of bytes sent in compute response messages.",
var_labels: ["instance_id", "replica_id"],
rules: [instance_name_rule(), replica_name_rule()],
)),
replica_count: metrics_registry.register(metric!(
name: "mz_compute_controller_replica_count",
help: "The number of replicas.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
collection_count: metrics_registry.register(metric!(
name: "mz_compute_controller_collection_count",
help: "The number of installed compute collections.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
collection_unscheduled_count: metrics_registry.register(metric!(
name: "mz_compute_controller_collection_unscheduled_count",
help: "The number of installed but unscheduled compute collections.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
peek_count: metrics_registry.register(metric!(
name: "mz_compute_controller_peek_count",
help: "The number of pending peeks.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
subscribe_count: metrics_registry.register(metric!(
name: "mz_compute_controller_subscribe_count",
help: "The number of active subscribes.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
copy_to_count: metrics_registry.register(metric!(
name: "mz_compute_controller_copy_to_count",
help: "The number of active copy tos.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
command_queue_size: metrics_registry.register(metric!(
name: "mz_compute_controller_command_queue_size",
help: "The size of the compute command queue.",
var_labels: ["instance_id", "replica_id"],
rules: [instance_name_rule(), replica_name_rule()],
)),
response_send_count: metrics_registry.register(metric!(
name: "mz_compute_controller_response_send_count",
help: "The number of sends on the compute response queue.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
response_recv_count: metrics_registry.register(metric!(
name: "mz_compute_controller_response_recv_count",
help: "The number of receives on the compute response queue.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
hydration_queue_size: metrics_registry.register(metric!(
name: "mz_compute_controller_hydration_queue_size",
help: "The size of the compute hydration queue.",
var_labels: ["instance_id", "replica_id"],
rules: [instance_name_rule(), replica_name_rule()],
)),
history_command_count: metrics_registry.register(metric!(
name: "mz_compute_controller_history_command_count",
help: "The number of commands in the controller's command history.",
var_labels: ["instance_id", "command_type"],
rules: [instance_name_rule()],
)),
history_dataflow_count: metrics_registry.register(metric!(
name: "mz_compute_controller_history_dataflow_count",
help: "The number of dataflows in the controller's command history.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
peeks_total: metrics_registry.register(metric!(
name: "mz_compute_peeks_total",
help: "The total number of peeks served.",
var_labels: ["instance_id", "result"],
rules: [instance_name_rule()],
)),
peek_duration_seconds: metrics_registry.register(metric!(
name: "mz_compute_peek_duration_seconds",
help: "A histogram of peek durations since restart.",
var_labels: ["instance_id", "result"],
buckets: histogram_seconds_buckets(0.000_500, 32.),
rules: [instance_name_rule()],
)),
connected_replica_count: metrics_registry.register(metric!(
name: "mz_compute_controller_connected_replica_count",
help: "The number of replicas successfully connected to the compute controller.",
var_labels: ["instance_id"],
rules: [instance_name_rule()],
)),
replica_connects_total: metrics_registry.register(metric!(
name: "mz_compute_controller_replica_connects_total",
help: "The total number of replica (re-)connections made by the compute controller.",
var_labels: ["instance_id", "replica_id"],
rules: [instance_name_rule(), replica_name_rule()],
)),
replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
help: "The total time the compute controller spent waiting for replica (re-)connection.",
var_labels: ["instance_id", "replica_id"],
rules: [instance_name_rule(), replica_name_rule()],
)),

shared,
}
};

metrics
}

/// Return an object suitable for tracking metrics for the given compute instance.
Expand Down
Loading
Loading