From e0c3e2b2ecdcc17edc62eb298020b32e78d4b66c Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Thu, 14 May 2026 17:23:45 -0400 Subject: [PATCH 1/4] ore/metrics: introduce Rule enum and per-metric registration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new mod rule with a declarative Rule enum (ClusterNameLookup, ReplicaNameLookup, ObjectNameLookup), a CatalogNameLookup trait, and a Rule::apply that stamps resolved name labels onto a MetricFamily. Threads an optional rules: [...] field through the metric! macro and MakeCollectorOpts. MetricsRegistry::register stores those rules keyed by the fully-qualified Prometheus metric name and exposes them via a new rules_by_metric() accessor. No HTTP wiring yet — follow-ups connect the registry to the federated endpoint via a response header. --- src/environmentd/src/http/sql.rs | 1 + src/ore/src/metrics.rs | 79 +++++++ src/ore/src/metrics/rule.rs | 392 +++++++++++++++++++++++++++++++ 3 files changed, 472 insertions(+) create mode 100644 src/ore/src/metrics/rule.rs diff --git a/src/environmentd/src/http/sql.rs b/src/environmentd/src/http/sql.rs index 476b5281d23eb..47fbe26539045 100644 --- a/src/environmentd/src/http/sql.rs +++ b/src/environmentd/src/http/sql.rs @@ -175,6 +175,7 @@ async fn execute_promsql_query( metrics_registry.register::>(MakeCollectorOpts { opts: Opts::new(query.metric_name, query.help).variable_labels(label_names), buckets: None, + rules: Vec::new(), }) }); diff --git a/src/ore/src/metrics.rs b/src/ore/src/metrics.rs index 036267efe39a2..7203dd627306d 100644 --- a/src/ore/src/metrics.rs +++ b/src/ore/src/metrics.rs @@ -41,6 +41,7 @@ //! } //! ``` +use std::collections::BTreeMap; use std::fmt; use std::fmt::{Debug, Formatter}; use std::future::Future; @@ -59,9 +60,11 @@ use prometheus::proto::MetricFamily; use prometheus::{HistogramOpts, Registry}; mod delete_on_drop; +mod rule; pub use delete_on_drop::*; pub use prometheus::Opts as PrometheusOpts; +pub use rule::{CatalogNameLookup, Rule}; /// Define a metric for use in materialize. #[macro_export] @@ -73,6 +76,7 @@ macro_rules! metric { $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })? $(, var_labels: [ $($vl_name:expr),* ])? $(, buckets: $bk_name:expr)? + $(, rules: [ $($rule:expr),* $(,)? ])? $(,)? ) => {{ let const_labels = (&[ @@ -91,6 +95,7 @@ macro_rules! metric { .const_labels(const_labels) .variable_labels(var_labels), buckets: None, + rules: vec![ $($($rule),*)? ], }; // Set buckets if passed $(mk_opts.buckets = Some($bk_name);)* @@ -106,6 +111,10 @@ pub struct MakeCollectorOpts { /// Buckets to be used with Histogram and HistogramVec. Must be set to create Histogram types /// and must not be set for other types. pub buckets: Option>, + /// Enrichment rules attached to this metric. Applied by env's federated + /// scraper to stamp human-readable name labels (e.g. `cluster_name`) + /// alongside the ID labels the metric already carries. + pub rules: Vec, } /// The materialize metrics registry. @@ -115,6 +124,10 @@ pub struct MetricsRegistry { inner: Registry, #[derivative(Debug = "ignore")] postprocessors: Arc) + Send + Sync>>>>, + /// Enrichment rules declared per-metric via [`MakeCollectorOpts::rules`]. + /// Keyed by the fully-qualified Prometheus metric name (matches + /// `MetricFamily::name()` at scrape time). + rules_by_metric: Arc>>>, } /// A wrapper for metrics to require delete on drop semantics @@ -203,14 +216,41 @@ impl MetricsRegistry { MetricsRegistry { inner: Registry::new(), postprocessors: Arc::new(Mutex::new(vec![])), + rules_by_metric: Arc::new(Mutex::new(BTreeMap::new())), } } + /// Returns a snapshot of per-metric enrichment rules, keyed by the + /// fully-qualified Prometheus metric name. Populated by [`Self::register`] + /// from [`MakeCollectorOpts::rules`]. + pub fn rules_by_metric(&self) -> BTreeMap> { + self.rules_by_metric + .lock() + .expect("lock poisoned") + .clone() + } + + /// Records the per-metric rules from `opts` under the metric's + /// fully-qualified name, if any are declared. + fn record_per_metric_rules(&self, opts: &MakeCollectorOpts) { + if opts.rules.is_empty() { + return; + } + let fq_name = opts.opts.fq_name(); + self.rules_by_metric + .lock() + .expect("lock poisoned") + .entry(fq_name) + .or_default() + .extend(opts.rules.iter().cloned()); + } + /// Register a metric defined with the [`metric`] macro. pub fn register(&self, opts: MakeCollectorOpts) -> M where M: MakeCollector, { + self.record_per_metric_rules(&opts); let collector = M::make_collector(opts); self.inner.register(Box::new(collector.clone())).unwrap(); collector @@ -225,6 +265,7 @@ impl MetricsRegistry { where P: Atomic + 'static, { + self.record_per_metric_rules(&opts); let gauge = ComputedGenericGauge { gauge: GenericGauge::make_collector(opts), f: Arc::new(f), @@ -1050,4 +1091,42 @@ mod tests { // We filtered wall time to < 10ms, so our wall time metric should be filtered out. assert_eq!(wall_counter.value(), 0.0); } + + #[crate::test] + fn register_metric_stores_rules() { + let registry = MetricsRegistry::new(); + let cluster_rule = super::Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }; + let replica_rule = super::Rule::ReplicaNameLookup { + cluster_id_label: "cluster_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + }; + let _: prometheus::IntCounter = registry.register(crate::metric!( + name: "mz_test_register_metric_stores_rules", + help: "test metric", + rules: [ + cluster_rule.clone(), + replica_rule.clone(), + ], + )); + let by_metric = registry.rules_by_metric(); + assert_eq!(by_metric.len(), 1); + let rules = by_metric + .get("mz_test_register_metric_stores_rules") + .expect("rules registered under fq name"); + assert_eq!(rules, &vec![cluster_rule, replica_rule]); + } + + #[crate::test] + fn register_metric_without_rules_omits_entry() { + let registry = MetricsRegistry::new(); + let _: prometheus::IntCounter = registry.register(crate::metric!( + name: "mz_test_register_metric_without_rules", + help: "test metric without rules", + )); + assert!(registry.rules_by_metric().is_empty()); + } } diff --git a/src/ore/src/metrics/rule.rs b/src/ore/src/metrics/rule.rs new file mode 100644 index 0000000000000..1105029d190da --- /dev/null +++ b/src/ore/src/metrics/rule.rs @@ -0,0 +1,392 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Enrichment rules for Prometheus metrics. +//! +//! Subsystems register [`Rule`]s alongside their metrics on a +//! [`crate::metrics::MetricsRegistry`]. When env's federated `/metrics` +//! endpoint scrapes a remote registry, it resolves the rules against the +//! catalog to stamp human-readable name labels (e.g. `cluster_name`, +//! `source_name`) onto metrics that today only carry opaque IDs. + +use prometheus::proto::{LabelPair, MetricFamily}; +use serde::{Deserialize, Serialize}; + +/// Resolves catalog IDs to human-readable names for [`Rule`] application. +/// +/// Implemented by environmentd's `Catalog`. Lives in `mz_ore` so that +/// [`Rule::apply`] can be invoked without pulling the adapter crate into +/// the dependency graph of every metric producer. +pub trait CatalogNameLookup { + /// Returns the name of the cluster with the given ID, if it exists. + fn cluster_name(&self, cluster_id: &str) -> Option; + /// Returns the name of the replica with the given (cluster, replica) IDs. + fn replica_name(&self, cluster_id: &str, replica_id: &str) -> Option; + /// Returns the name of the catalog object with the given global ID. + fn object_name(&self, global_id: &str) -> Option; +} + +/// A declarative enrichment rule applied to a metric family at scrape time. +/// +/// Each variant reads one or more ID labels already present on a metric and +/// adds **exactly one** resolved name label. Metric owners pick the output +/// name explicitly so a metric can register multiple rules side-by-side +/// (e.g. one `ClusterNameLookup` for `cluster_name` and one +/// `ReplicaNameLookup` for `replica_name`) without colliding. +#[derive( + Clone, + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash +)] +#[serde(rename_all = "snake_case", tag = "kind")] +pub enum Rule { + /// Reads the cluster ID from `cluster_id_label` and writes the resolved + /// cluster name into `output_label`. + ClusterNameLookup { + /// Name of the label on the metric that carries the cluster ID. + cluster_id_label: String, + /// Name of the label to add with the resolved cluster name. + output_label: String, + }, + /// Reads the cluster ID from `cluster_id_label` and the replica ID from + /// `replica_id_label`, then writes the resolved replica name into + /// `output_label`. + ReplicaNameLookup { + /// Name of the label on the metric that carries the cluster ID. + cluster_id_label: String, + /// Name of the label on the metric that carries the replica ID. + replica_id_label: String, + /// Name of the label to add with the resolved replica name. + output_label: String, + }, + /// Reads a `GlobalId` from `object_id_label` and writes the resolved + /// catalog object name into `output_label`. + ObjectNameLookup { + /// Name of the label on the metric that carries the global ID. + object_id_label: String, + /// Name of the label to add with the resolved object name. + output_label: String, + }, +} + +impl Rule { + /// The label name this rule writes into. + pub fn output_label(&self) -> &str { + match self { + Rule::ClusterNameLookup { output_label, .. } + | Rule::ReplicaNameLookup { output_label, .. } + | Rule::ObjectNameLookup { output_label, .. } => output_label, + } + } + + /// Applies the rule to every metric in `family`. + /// + /// No-op for a metric when (a) a required input ID label is missing, + /// (b) `lookup` returns `None` for the input, or (c) `output_label` is + /// already present on the metric (collision-skip; Prometheus rejects + /// duplicate label names). + pub fn apply(&self, family: &mut MetricFamily, lookup: &L) { + let output_label = self.output_label().to_owned(); + for metric in family.mut_metric() { + let labels = metric.get_label(); + if labels.iter().any(|l| l.name() == output_label) { + continue; + } + let resolved = match self { + Rule::ClusterNameLookup { + cluster_id_label, .. + } => find_label(labels, cluster_id_label).and_then(|cid| lookup.cluster_name(cid)), + Rule::ReplicaNameLookup { + cluster_id_label, + replica_id_label, + .. + } => match ( + find_label(labels, cluster_id_label), + find_label(labels, replica_id_label), + ) { + (Some(cid), Some(rid)) => lookup.replica_name(cid, rid), + _ => None, + }, + Rule::ObjectNameLookup { + object_id_label, .. + } => find_label(labels, object_id_label).and_then(|oid| lookup.object_name(oid)), + }; + let Some(value) = resolved else { continue }; + let mut all = metric.take_label(); + let mut pair = LabelPair::default(); + pair.set_name(output_label.clone()); + pair.set_value(value); + all.push(pair); + metric.set_label(all); + } + } +} + +fn find_label<'a>(labels: &'a [LabelPair], name: &str) -> Option<&'a str> { + labels.iter().find(|l| l.name() == name).map(|l| l.value()) +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use prometheus::proto::{Counter, Metric, MetricFamily, MetricType}; + + use super::*; + + /// In-memory [`CatalogNameLookup`] for unit tests. + #[derive(Default)] + struct FakeCatalog { + clusters: BTreeMap, + replicas: BTreeMap<(String, String), String>, + objects: BTreeMap, + } + + impl FakeCatalog { + fn with_cluster(mut self, id: &str, name: &str) -> Self { + self.clusters.insert(id.into(), name.into()); + self + } + fn with_replica(mut self, cid: &str, rid: &str, name: &str) -> Self { + self.replicas.insert((cid.into(), rid.into()), name.into()); + self + } + fn with_object(mut self, id: &str, name: &str) -> Self { + self.objects.insert(id.into(), name.into()); + self + } + } + + impl CatalogNameLookup for FakeCatalog { + fn cluster_name(&self, cluster_id: &str) -> Option { + self.clusters.get(cluster_id).cloned() + } + fn replica_name(&self, cluster_id: &str, replica_id: &str) -> Option { + self.replicas + .get(&(cluster_id.to_owned(), replica_id.to_owned())) + .cloned() + } + fn object_name(&self, global_id: &str) -> Option { + self.objects.get(global_id).cloned() + } + } + + fn label(name: &str, value: &str) -> LabelPair { + let mut p = LabelPair::default(); + p.set_name(name.into()); + p.set_value(value.into()); + p + } + + fn family_with_labels(labels: Vec) -> MetricFamily { + let mut family = MetricFamily::default(); + family.set_name("test_metric".into()); + family.set_field_type(MetricType::COUNTER); + let mut metric = Metric::default(); + let mut counter = Counter::default(); + counter.set_value(1.0); + metric.set_counter(counter); + metric.set_label(labels); + family.set_metric(vec![metric]); + family + } + + fn label_names(family: &MetricFamily) -> Vec<&str> { + family.get_metric()[0] + .get_label() + .iter() + .map(|l| l.name()) + .collect() + } + + fn label_value<'a>(family: &'a MetricFamily, name: &str) -> Option<&'a str> { + family.get_metric()[0] + .get_label() + .iter() + .find(|l| l.name() == name) + .map(|l| l.value()) + } + + #[crate::test] + fn serde_round_trip_cluster_name_lookup() { + let rule = Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }; + let json = serde_json::to_string(&rule).unwrap(); + assert_eq!( + json, + r#"{"kind":"cluster_name_lookup","cluster_id_label":"cluster_id","output_label":"cluster_name"}"# + ); + let back: Rule = serde_json::from_str(&json).unwrap(); + assert_eq!(back, rule); + } + + #[crate::test] + fn serde_round_trip_replica_name_lookup() { + let rule = Rule::ReplicaNameLookup { + cluster_id_label: "c".into(), + replica_id_label: "r".into(), + output_label: "n".into(), + }; + let json = serde_json::to_string(&rule).unwrap(); + assert_eq!( + json, + r#"{"kind":"replica_name_lookup","cluster_id_label":"c","replica_id_label":"r","output_label":"n"}"# + ); + let back: Rule = serde_json::from_str(&json).unwrap(); + assert_eq!(back, rule); + } + + #[crate::test] + fn serde_round_trip_object_name_lookup() { + let rule = Rule::ObjectNameLookup { + object_id_label: "source_id".into(), + output_label: "source_name".into(), + }; + let json = serde_json::to_string(&rule).unwrap(); + assert_eq!( + json, + r#"{"kind":"object_name_lookup","object_id_label":"source_id","output_label":"source_name"}"# + ); + let back: Rule = serde_json::from_str(&json).unwrap(); + assert_eq!(back, rule); + } + + #[crate::test] + fn cluster_name_lookup_attaches_name() { + let mut family = family_with_labels(vec![label("cluster_id", "u1")]); + let catalog = FakeCatalog::default().with_cluster("u1", "quickstart"); + let rule = Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }; + rule.apply(&mut family, &catalog); + assert_eq!(label_names(&family), vec!["cluster_id", "cluster_name"]); + assert_eq!(label_value(&family, "cluster_name"), Some("quickstart")); + } + + #[crate::test] + fn replica_name_lookup_attaches_name() { + let mut family = + family_with_labels(vec![label("cluster_id", "u1"), label("replica_id", "u2")]); + let catalog = FakeCatalog::default().with_replica("u1", "u2", "r1"); + let rule = Rule::ReplicaNameLookup { + cluster_id_label: "cluster_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + }; + rule.apply(&mut family, &catalog); + assert_eq!( + label_names(&family), + vec!["cluster_id", "replica_id", "replica_name"] + ); + assert_eq!(label_value(&family, "replica_name"), Some("r1")); + } + + #[crate::test] + fn object_name_lookup_attaches_name_with_custom_output() { + let mut family = family_with_labels(vec![label("collection_id", "s100")]); + let catalog = FakeCatalog::default().with_object("s100", "my_source"); + let rule = Rule::ObjectNameLookup { + object_id_label: "collection_id".into(), + output_label: "source_name".into(), + }; + rule.apply(&mut family, &catalog); + assert_eq!(label_names(&family), vec!["collection_id", "source_name"]); + assert_eq!(label_value(&family, "source_name"), Some("my_source")); + } + + #[crate::test] + fn apply_is_noop_when_input_label_missing() { + let mut family = family_with_labels(vec![label("unrelated", "x")]); + let catalog = FakeCatalog::default().with_cluster("u1", "quickstart"); + let rule = Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }; + rule.apply(&mut family, &catalog); + assert_eq!(label_names(&family), vec!["unrelated"]); + } + + #[crate::test] + fn apply_is_noop_when_lookup_returns_none() { + let mut family = family_with_labels(vec![label("cluster_id", "u99")]); + let catalog = FakeCatalog::default(); // no clusters + let rule = Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }; + rule.apply(&mut family, &catalog); + assert_eq!(label_names(&family), vec!["cluster_id"]); + } + + #[crate::test] + fn apply_skips_when_output_label_already_present() { + let mut family = family_with_labels(vec![ + label("cluster_id", "u1"), + label("cluster_name", "preset"), + ]); + let catalog = FakeCatalog::default().with_cluster("u1", "quickstart"); + let rule = Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }; + rule.apply(&mut family, &catalog); + // Existing value preserved; not overwritten, not duplicated. + assert_eq!(label_names(&family), vec!["cluster_id", "cluster_name"]); + assert_eq!(label_value(&family, "cluster_name"), Some("preset")); + } + + #[crate::test] + fn apply_uses_custom_input_label_name() { + let mut family = family_with_labels(vec![label("src_cluster", "u1")]); + let catalog = FakeCatalog::default().with_cluster("u1", "quickstart"); + let rule = Rule::ClusterNameLookup { + cluster_id_label: "src_cluster".into(), + output_label: "src_cluster_name".into(), + }; + rule.apply(&mut family, &catalog); + assert_eq!(label_value(&family, "src_cluster_name"), Some("quickstart")); + } + + #[crate::test] + fn composing_two_rules_attaches_both_names() { + let mut family = + family_with_labels(vec![label("cluster_id", "u1"), label("replica_id", "u2")]); + let catalog = FakeCatalog::default() + .with_cluster("u1", "quickstart") + .with_replica("u1", "u2", "r1"); + let cluster_rule = Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }; + let replica_rule = Rule::ReplicaNameLookup { + cluster_id_label: "cluster_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + }; + cluster_rule.apply(&mut family, &catalog); + replica_rule.apply(&mut family, &catalog); + assert_eq!(label_value(&family, "cluster_name"), Some("quickstart")); + assert_eq!(label_value(&family, "replica_name"), Some("r1")); + } +} From d602d6bea0918cbb6c16afe7361ce71cb8ac8dd3 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Thu, 14 May 2026 17:46:29 -0400 Subject: [PATCH 2/4] http-util: opt-in X-Materialize-Enrich-Rules response header Adds two header constants and a wants_enrich_rules helper. When a caller sends X-Materialize-Accept-Enrich-Rules, handle_prometheus serializes registry.rules_by_metric() and attaches it as X-Materialize-Enrich-Rules in the response. Header is opt-in so default Prometheus scrapers see clean responses; only consumers that understand the per-metric rules wire format request it (today: env's federated /metrics/public scraper). --- src/http-util/src/lib.rs | 95 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/src/http-util/src/lib.rs b/src/http-util/src/lib.rs index eeacb5d3c4593..90276c992cf22 100644 --- a/src/http-util/src/lib.rs +++ b/src/http-util/src/lib.rs @@ -32,6 +32,17 @@ pub const PROMETHEUS_PROTOBUF_CONTENT_TYPE: &str = "application/vnd.google.proto proto=io.prometheus.client.MetricFamily; \ encoding=delimited"; +/// Request header sent by callers that understand and want +/// [`MATERIALIZE_ENRICH_RULES_HEADER`] in the response. Any value opts in; only +/// presence matters. +pub const MATERIALIZE_ACCEPT_ENRICH_RULES_HEADER: &str = "x-materialize-accept-enrich-rules"; + +/// Response header listing the [`mz_ore::metrics::Rule`]s registered on the +/// metrics registry, serialized as a JSON array. Emitted by +/// [`handle_prometheus`] only when the caller opts in via +/// [`MATERIALIZE_ACCEPT_ENRICH_RULES_HEADER`]. +pub const MATERIALIZE_ENRICH_RULES_HEADER: &str = "x-materialize-enrich-rules"; + fn wants_prometheus_protobuf(headers: &HeaderMap) -> bool { headers .get_all(axum::http::header::ACCEPT) @@ -40,6 +51,10 @@ fn wants_prometheus_protobuf(headers: &HeaderMap) -> bool { .any(|v| v.contains(PROMETHEUS_PROTOBUF_CONTENT_TYPE)) } +fn wants_enrich_rules(headers: &HeaderMap) -> bool { + headers.contains_key(MATERIALIZE_ACCEPT_ENRICH_RULES_HEADER) +} + /// Renders a template into an HTTP response. pub fn template_response(template: T) -> Html where @@ -134,6 +149,10 @@ pub async fn handle_liveness_check() -> impl IntoResponse { /// protobuf format (`application/vnd.google.protobuf`), the response is a /// length-delimited stream of `io.prometheus.client.MetricFamily` messages. /// Otherwise the standard Prometheus text format is returned. +/// +/// If the caller sends [`MATERIALIZE_ACCEPT_ENRICH_RULES_HEADER`] and the +/// registry has any rules registered, the response includes +/// [`MATERIALIZE_ENRICH_RULES_HEADER`] with a JSON array of the rules. #[allow(clippy::unused_async)] pub async fn handle_prometheus( registry: &MetricsRegistry, @@ -159,7 +178,20 @@ pub async fn handle_prometheus( ContentType::text() }; - Ok((TypedHeader(content_type), buf).into_response()) + let mut resp = (TypedHeader(content_type), buf).into_response(); + if wants_enrich_rules(&headers) { + let rules_by_metric = registry.rules_by_metric(); + if !rules_by_metric.is_empty() { + let json = serde_json::to_string(&rules_by_metric) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + resp.headers_mut().insert( + MATERIALIZE_ENRICH_RULES_HEADER, + HeaderValue::from_str(&json) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, + ); + } + } + Ok(resp) } #[derive(Serialize, Deserialize)] @@ -267,11 +299,72 @@ where #[cfg(test)] mod tests { + use std::collections::BTreeMap; + + use axum::http::HeaderMap; use http::header::{ACCESS_CONTROL_ALLOW_ORIGIN, ORIGIN}; use http::{HeaderValue, Method, Request, Response}; + use mz_ore::metric; + use mz_ore::metrics::{MetricsRegistry, Rule}; use tower::{Service, ServiceBuilder, ServiceExt}; use tower_http::cors::CorsLayer; + use super::{ + MATERIALIZE_ACCEPT_ENRICH_RULES_HEADER, MATERIALIZE_ENRICH_RULES_HEADER, handle_prometheus, + }; + + fn registry_with_rules() -> MetricsRegistry { + let registry = MetricsRegistry::new(); + let _: prometheus::IntCounter = registry.register(metric!( + name: "mz_test_handle_prometheus_metric", + help: "test metric carrying a per-metric enrichment rule", + rules: [ + Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }, + ], + )); + registry + } + + #[mz_ore::test(tokio::test)] + async fn handle_prometheus_emits_rules_header_when_opted_in() { + let registry = registry_with_rules(); + let mut headers = HeaderMap::new(); + headers.insert( + MATERIALIZE_ACCEPT_ENRICH_RULES_HEADER, + HeaderValue::from_static("1"), + ); + let resp = handle_prometheus(®istry, headers).await.unwrap(); + let value = resp + .headers() + .get(MATERIALIZE_ENRICH_RULES_HEADER) + .expect("rules header present"); + let parsed: BTreeMap> = + serde_json::from_str(value.to_str().unwrap()).unwrap(); + assert_eq!(parsed, registry.rules_by_metric()); + } + + #[mz_ore::test(tokio::test)] + async fn handle_prometheus_omits_header_without_opt_in() { + let registry = registry_with_rules(); + let resp = handle_prometheus(®istry, HeaderMap::new()).await.unwrap(); + assert!(resp.headers().get(MATERIALIZE_ENRICH_RULES_HEADER).is_none()); + } + + #[mz_ore::test(tokio::test)] + async fn handle_prometheus_omits_header_when_no_rules() { + let registry = MetricsRegistry::new(); + let mut headers = HeaderMap::new(); + headers.insert( + MATERIALIZE_ACCEPT_ENRICH_RULES_HEADER, + HeaderValue::from_static("1"), + ); + let resp = handle_prometheus(®istry, headers).await.unwrap(); + assert!(resp.headers().get(MATERIALIZE_ENRICH_RULES_HEADER).is_none()); + } + #[mz_ore::test(tokio::test)] async fn test_cors() { async fn test_request(cors: &CorsLayer, origin: &HeaderValue) -> Option { From 3301f5b3f8b3c494784ab4b3b9ba9e07a62694aa Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Thu, 14 May 2026 17:46:59 -0400 Subject: [PATCH 3/4] environmentd/metrics_public: apply enrichment rules per metric Wires env's federated /metrics/public endpoint to the per-metric rules mechanism: scraper sends the opt-in header on every replica scrape, parses X-Materialize-Enrich-Rules off the response as a map of metric name -> rules, and applies them only to the matching family. Adds CatalogLookup wrapping &Catalog (implements CatalogNameLookup with typed ClusterId / ReplicaId / GlobalId parsing), splits the old add_replica_labels into stamp_scrape_context_labels (the three labels that come from the connection) plus rule application. Env-local metrics get the same per-metric treatment via metrics_registry .rules_by_metric(). E2E test_metrics_public_endpoint is expected red after this commit until callers declare rules on their metrics in the next change. --- src/environmentd/src/http/metrics_public.rs | 350 ++++++++++++++++---- 1 file changed, 292 insertions(+), 58 deletions(-) diff --git a/src/environmentd/src/http/metrics_public.rs b/src/environmentd/src/http/metrics_public.rs index f7f897f6d071f..3aa582a3dfa3d 100644 --- a/src/environmentd/src/http/metrics_public.rs +++ b/src/environmentd/src/http/metrics_public.rs @@ -10,31 +10,65 @@ //! Federated `/metrics` endpoint. //! //! Aggregates environmentd's local metrics with every clusterd replica's -//! `/metrics` output, adding `cluster_id`, `replica_id`, `process`, -//! `cluster_name`, and `replica_name` labels onto the clusterd-sourced -//! metrics. +//! `/metrics` output, adding `cluster_id`, `replica_id`, and `process` labels +//! drawn from the scrape context, then applying any [`Rule`]s the replica +//! advertised via the `X-Materialize-Enrich-Rules` response header. use std::collections::BTreeMap; +use std::str::FromStr; use std::sync::Arc; +/// Per-metric enrichment rules carried in the `X-Materialize-Enrich-Rules` +/// response header. Empty when the remote registry didn't advertise any. +type RulesByMetric = BTreeMap>; + use axum::Extension; use axum::body::Body; use axum::response::{IntoResponse, Response}; use axum_extra::TypedHeader; use futures::future::join_all; use headers::ContentType; -use http::{Method, Request, StatusCode}; +use http::{HeaderValue, Method, Request, StatusCode}; use http_body_util::BodyExt; +use mz_adapter::catalog::Catalog; use mz_adapter_types::dyncfgs::ENABLE_PUBLIC_METRICS_ENDPOINT; use mz_controller_types::{ClusterId, ReplicaId}; -use mz_ore::metrics::MetricsRegistry; +use mz_ore::metrics::{CatalogNameLookup, MetricsRegistry, Rule}; +use mz_repr::GlobalId; use prometheus::Encoder; +use prometheus::proto::{LabelPair, MetricFamily}; use crate::http::AuthedClient; use crate::http::cluster::{ ClusterProxyConfig, proxy_replica_request, rewrite_request_for_replica, }; +/// Adapter that lets [`Rule::apply`] resolve catalog names without `mz_ore` +/// having to depend on the adapter crate. +struct CatalogLookup<'a>(&'a Catalog); + +impl CatalogNameLookup for CatalogLookup<'_> { + fn cluster_name(&self, cluster_id: &str) -> Option { + let cid = ClusterId::from_str(cluster_id).ok()?; + self.0.try_get_cluster(cid).map(|c| c.name.clone()) + } + + fn replica_name(&self, cluster_id: &str, replica_id: &str) -> Option { + let cid = ClusterId::from_str(cluster_id).ok()?; + let rid = ReplicaId::from_str(replica_id).ok()?; + self.0 + .try_get_cluster_replica(cid, rid) + .map(|r| r.name.clone()) + } + + fn object_name(&self, global_id: &str) -> Option { + let gid = GlobalId::from_str(global_id).ok()?; + self.0 + .try_get_entry_by_global_id(&gid) + .map(|e| e.name().item.clone()) + } +} + /// The customer-facing `/metrics` endpoint. /// /// Aggregates environmentd's local metrics with every clusterd replica's @@ -49,8 +83,21 @@ pub(crate) async fn handle_public_metrics( return StatusCode::SERVICE_UNAVAILABLE.into_response(); } - // Start with env's local gather() output. - let mut all_families: Vec = metrics_registry.gather(); + let lookup = CatalogLookup(&catalog); + + // Start with env's local gather() output, applying env's own registered + // rules so env-side metrics (e.g. `replica_connects_total`) get their + // `cluster_name`/`replica_name`/`object_name` labels too. Rules apply + // only to the metric they were declared on. + let mut all_families: Vec = metrics_registry.gather(); + let env_rules_by_metric = metrics_registry.rules_by_metric(); + for family in &mut all_families { + if let Some(rules) = env_rules_by_metric.get(family.name()) { + for rule in rules { + rule.apply(family, &lookup); + } + } + } // Fan out to every (cluster, replica, process). let scrapes: Vec<(ClusterId, ReplicaId, usize, String)> = config @@ -68,8 +115,8 @@ pub(crate) async fn handle_public_metrics( let results = join_all(scrapes.into_iter().map(scrape_replica_metrics_endpoint)).await; for (cluster_id, replica_id, process, result) in results { - let bytes = match result { - Ok(b) => b, + let scrape = match result { + Ok(s) => s, Err(e) => { tracing::warn!( %cluster_id, @@ -80,13 +127,7 @@ pub(crate) async fn handle_public_metrics( continue; } }; - let cluster_name = catalog - .try_get_cluster(cluster_id) - .map(|cluster| cluster.name.clone()); - let replica_name = catalog - .try_get_cluster_replica(cluster_id, replica_id) - .map(|replica| replica.name.clone()); - let families = match mz_prometheus_protobuf::decode_length_delimited(&bytes) { + let families = match mz_prometheus_protobuf::decode_length_delimited(&scrape.body) { Ok(f) => f, Err(e) => { tracing::warn!( @@ -99,14 +140,12 @@ pub(crate) async fn handle_public_metrics( } }; for mut family in families { - add_replica_labels( - &mut family, - cluster_id, - replica_id, - process, - cluster_name.as_deref(), - replica_name.as_deref(), - ); + stamp_scrape_context_labels(&mut family, cluster_id, replica_id, process); + if let Some(rules) = scrape.rules.get(family.name()) { + for rule in rules { + rule.apply(&mut family, &lookup); + } + } all_families.push(family); } } @@ -130,9 +169,17 @@ pub(crate) async fn handle_public_metrics( (TypedHeader(ContentType::text()), body).into_response() } +/// A scrape's response body plus any [`Rule`]s the remote registry advertised +/// in the `X-Materialize-Enrich-Rules` header, keyed by the +/// fully-qualified metric name. +struct ReplicaScrape { + body: Vec, + rules: RulesByMetric, +} + async fn scrape_replica_metrics_endpoint( (cluster_id, replica_id, process, addr): (ClusterId, ReplicaId, usize, String), -) -> (ClusterId, ReplicaId, usize, Result, String>) { +) -> (ClusterId, ReplicaId, usize, Result) { let result = scrape_replica_metrics_endpoint_inner(cluster_id, replica_id, process, &addr).await; (cluster_id, replica_id, process, result) @@ -143,7 +190,7 @@ async fn scrape_replica_metrics_endpoint_inner( replica_id: ReplicaId, process: usize, addr: &str, -) -> Result, String> { +) -> Result { let mut req = Request::builder() .method(Method::GET) .uri("/metrics") @@ -151,6 +198,10 @@ async fn scrape_replica_metrics_endpoint_inner( http::header::ACCEPT, mz_http_util::PROMETHEUS_PROTOBUF_CONTENT_TYPE, ) + .header( + mz_http_util::MATERIALIZE_ACCEPT_ENRICH_RULES_HEADER, + HeaderValue::from_static("1"), + ) .body(Body::empty()) .map_err(|e| format!("building request: {e}"))?; rewrite_request_for_replica(&mut req, addr, cluster_id, replica_id, process, "/metrics") @@ -161,6 +212,7 @@ async fn scrape_replica_metrics_endpoint_inner( if !resp.status().is_success() { return Err(format!("upstream status {}", resp.status())); } + let rules = parse_rules_header(resp.headers()); let body = resp.into_body(); let collected = body .collect() @@ -168,28 +220,50 @@ async fn scrape_replica_metrics_endpoint_inner( .map_err(|e| format!("collecting body: {e}"))? .to_bytes(); - Ok(collected.to_vec()) + Ok(ReplicaScrape { + body: collected.to_vec(), + rules, + }) +} + +/// Parses the `X-Materialize-Enrich-Rules` response header into a map of +/// metric-name → rules. Missing header is normal (legacy clusterd); parse +/// errors log a warning and yield an empty map rather than failing the entire +/// scrape. +fn parse_rules_header(headers: &http::HeaderMap) -> RulesByMetric { + let Some(value) = headers.get(mz_http_util::MATERIALIZE_ENRICH_RULES_HEADER) else { + return BTreeMap::new(); + }; + let s = match value.to_str() { + Ok(s) => s, + Err(e) => { + tracing::warn!("invalid utf-8 in enrich-rules header: {e}"); + return BTreeMap::new(); + } + }; + match serde_json::from_str::(s) { + Ok(rules) => rules, + Err(e) => { + tracing::warn!("failed to parse enrich-rules header: {e}"); + BTreeMap::new() + } + } } -fn add_replica_labels( - family: &mut prometheus::proto::MetricFamily, +/// Stamps the three scrape-context labels (`cluster_id`, `replica_id`, +/// `process`) on every metric in `family`. These come from the scraper's +/// connection state, not from the metric itself, so they aren't rule-driven. +fn stamp_scrape_context_labels( + family: &mut MetricFamily, cluster_id: ClusterId, replica_id: ReplicaId, process: usize, - cluster_name: Option<&str>, - replica_name: Option<&str>, ) { for metric in family.mut_metric() { let mut labels = metric.take_label(); labels.push(label_pair("cluster_id", cluster_id.to_string())); labels.push(label_pair("replica_id", replica_id.to_string())); labels.push(label_pair("process", process.to_string())); - if let Some(n) = cluster_name { - labels.push(label_pair("cluster_name", n.to_owned())); - } - if let Some(n) = replica_name { - labels.push(label_pair("replica_name", n.to_owned())); - } metric.set_label(labels); } } @@ -217,8 +291,8 @@ fn merge_families_by_name( merged_families } -fn label_pair(name: &str, value: String) -> prometheus::proto::LabelPair { - let mut pair = prometheus::proto::LabelPair::default(); +fn label_pair(name: &str, value: String) -> LabelPair { + let mut pair = LabelPair::default(); pair.set_name(name.to_owned()); pair.set_value(value); pair @@ -226,27 +300,52 @@ fn label_pair(name: &str, value: String) -> prometheus::proto::LabelPair { #[cfg(test)] mod tests { + use std::collections::BTreeMap; + + use prometheus::proto::{Counter, Metric, MetricType}; + use super::*; - fn make_family(name: &str) -> prometheus::proto::MetricFamily { - let mut family = prometheus::proto::MetricFamily::default(); + fn make_family(name: &str) -> MetricFamily { + let mut family = MetricFamily::default(); family.set_name(name.to_owned()); family.set_help(format!("help for {name}")); - family.set_field_type(prometheus::proto::MetricType::COUNTER); - let mut metric = prometheus::proto::Metric::default(); - let mut counter = prometheus::proto::Counter::default(); + family.set_field_type(MetricType::COUNTER); + let mut metric = Metric::default(); + let mut counter = Counter::default(); counter.set_value(1.0); metric.set_counter(counter); family.set_metric(vec![metric]); family } + /// Test stand-in for [`CatalogLookup`]. + #[derive(Default)] + struct FakeCatalog { + clusters: BTreeMap, + replicas: BTreeMap<(String, String), String>, + } + + impl CatalogNameLookup for FakeCatalog { + fn cluster_name(&self, cluster_id: &str) -> Option { + self.clusters.get(cluster_id).cloned() + } + fn replica_name(&self, cluster_id: &str, replica_id: &str) -> Option { + self.replicas + .get(&(cluster_id.to_owned(), replica_id.to_owned())) + .cloned() + } + fn object_name(&self, _global_id: &str) -> Option { + None + } + } + #[mz_ore::test] - fn adds_three_labels_when_names_unknown() { + fn adds_scrape_context_adds_three_labels() { let mut family = make_family("test"); let cluster_id: ClusterId = "u1".parse().expect("cluster id"); let replica_id: ReplicaId = "u2".parse().expect("replica id"); - add_replica_labels(&mut family, cluster_id, replica_id, 7, None, None); + stamp_scrape_context_labels(&mut family, cluster_id, replica_id, 7); let labels = family.get_metric()[0].get_label(); let names: Vec<_> = labels.iter().map(|l| l.name()).collect(); assert_eq!(names, vec!["cluster_id", "replica_id", "process"]); @@ -258,13 +357,12 @@ mod tests { fn shared_metric_name_emits_single_help_and_type() { let env_family = make_family("mz_persist_test"); let mut clusterd_family = make_family("mz_persist_test"); - add_replica_labels( + + stamp_scrape_context_labels( &mut clusterd_family, "u1".parse().unwrap(), "u2".parse().unwrap(), 0, - Some("quickstart"), - Some("r1"), ); let merged = merge_families_by_name(vec![env_family, clusterd_family]); @@ -286,19 +384,43 @@ mod tests { assert_eq!(type_, 1, "{text}"); } + /// Mirrors `handle_public_metrics`'s clusterd path: stamp the scrape + /// context labels, then apply rules from a per-metric map keyed by family + /// name. #[mz_ore::test] - fn adds_five_labels_when_names_known() { + fn apply_scraped_rules_yields_five_labels() { let mut family = make_family("test"); let cluster_id: ClusterId = "u1".parse().expect("cluster id"); let replica_id: ReplicaId = "u2".parse().expect("replica id"); - add_replica_labels( - &mut family, - cluster_id, - replica_id, - 0, - Some("quickstart"), - Some("r1"), - ); + + let catalog = FakeCatalog { + clusters: BTreeMap::from([("u1".into(), "quickstart".into())]), + replicas: BTreeMap::from([(("u1".into(), "u2".into()), "r1".into())]), + }; + + // Rules advertised by the remote registry for this specific metric. + let scraped: RulesByMetric = BTreeMap::from([( + "test".to_owned(), + vec![ + Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }, + Rule::ReplicaNameLookup { + cluster_id_label: "cluster_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + }, + ], + )]); + + stamp_scrape_context_labels(&mut family, cluster_id, replica_id, 0); + if let Some(rules) = scraped.get(family.name()) { + for rule in rules { + rule.apply(&mut family, &catalog); + } + } + let labels = family.get_metric()[0].get_label(); let names: Vec<_> = labels.iter().map(|l| l.name()).collect(); assert_eq!( @@ -314,4 +436,116 @@ mod tests { let values: Vec<_> = labels.iter().map(|l| l.value()).collect(); assert_eq!(values, vec!["u1", "u2", "0", "quickstart", "r1"]); } + + /// When the catalog can't resolve the IDs, the per-metric rules silently + /// no-op and the family is left with just the three scrape-context labels. + #[mz_ore::test] + fn rules_noop_when_catalog_misses() { + let mut family = make_family("test"); + let cluster_id: ClusterId = "u1".parse().expect("cluster id"); + let replica_id: ReplicaId = "u2".parse().expect("replica id"); + let catalog = FakeCatalog::default(); // catalog has neither cluster nor replica + + let scraped: RulesByMetric = BTreeMap::from([( + "test".to_owned(), + vec![ + Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }, + Rule::ReplicaNameLookup { + cluster_id_label: "cluster_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + }, + ], + )]); + + stamp_scrape_context_labels(&mut family, cluster_id, replica_id, 0); + if let Some(rules) = scraped.get(family.name()) { + for rule in rules { + rule.apply(&mut family, &catalog); + } + } + + let names: Vec<_> = family.get_metric()[0] + .get_label() + .iter() + .map(|l| l.name()) + .collect(); + assert_eq!(names, vec!["cluster_id", "replica_id", "process"]); + } + + #[mz_ore::test] + fn parse_rules_header_missing_yields_empty() { + let headers = http::HeaderMap::new(); + assert!(parse_rules_header(&headers).is_empty()); + } + + #[mz_ore::test] + fn parse_rules_header_round_trips() { + let rules: RulesByMetric = BTreeMap::from([( + "mz_replica_connects_total".to_owned(), + vec![Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }], + )]); + let json = serde_json::to_string(&rules).unwrap(); + let mut headers = http::HeaderMap::new(); + headers.insert( + mz_http_util::MATERIALIZE_ENRICH_RULES_HEADER, + HeaderValue::from_str(&json).unwrap(), + ); + assert_eq!(parse_rules_header(&headers), rules); + } + + #[mz_ore::test] + fn parse_rules_header_invalid_json_yields_empty() { + let mut headers = http::HeaderMap::new(); + headers.insert( + mz_http_util::MATERIALIZE_ENRICH_RULES_HEADER, + HeaderValue::from_static("not valid json"), + ); + assert!(parse_rules_header(&headers).is_empty()); + } + + /// Mirrors what `handle_public_metrics` does for env-local metrics: + /// gather, then apply env's own registered rules. We exercise it by + /// hand here because the env-local metrics in the handler come from + /// `metrics_registry.gather()`, but the relevant code path + /// (rule.apply on each family) is the same. + #[mz_ore::test] + fn apply_env_rules_to_local_metrics() { + // Simulate `metrics_registry.gather()` returning a family with a + // `cluster_id` label — what an env-side metric like + // `mz_replica_connects_total` looks like in practice. + let mut family = make_family("mz_replica_connects_total"); + let mut existing = prometheus::proto::LabelPair::default(); + existing.set_name("cluster_id".into()); + existing.set_value("u1".into()); + let metric = &mut family.mut_metric()[0]; + let mut labels = metric.take_label(); + labels.push(existing); + metric.set_label(labels); + + let catalog = FakeCatalog { + clusters: BTreeMap::from([("u1".into(), "quickstart".into())]), + replicas: BTreeMap::new(), + }; + let env_rule = Rule::ClusterNameLookup { + cluster_id_label: "cluster_id".into(), + output_label: "cluster_name".into(), + }; + env_rule.apply(&mut family, &catalog); + + let labels = family.get_metric()[0].get_label(); + let names: Vec<_> = labels.iter().map(|l| l.name()).collect(); + assert_eq!(names, vec!["cluster_id", "cluster_name"]); + let cluster_name = labels + .iter() + .find(|l| l.name() == "cluster_name") + .map(|l| l.value()); + assert_eq!(cluster_name, Some("quickstart")); + } } From 3412db45ca578acb3231760bbf95bb8323c2d239 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Thu, 14 May 2026 18:00:19 -0400 Subject: [PATCH 4/4] controller, compute, storage: declare per-metric enrichment rules Migrates the three subsystems with metrics carrying cluster_id / replica_id / source_id / parent_source_id / collection_id labels to declare their enrichment rules inline via the metric! macro's new rules: [...] field: - cluster-client: ControllerMetrics adds ClusterNameLookup, ReplicaNameLookup, and ObjectNameLookup (for collection_id) to each of the three dataflow wallclock-lag metrics. - compute-client: ComputeControllerMetrics gains instance_name_rule() and replica_name_rule() helpers; every metric with an instance_id / replica_id label is decorated. - storage/statistics: SourceStatisticsMetricDefs gains source_name_rule() and parent_source_name_rule() helpers; every source metric with source_id / parent_source_id is decorated. These declarations replace what the prototype's global register_rule calls used to do, but scoped per-metric so labels can't bleed across unrelated families. E2E test_metrics_public_endpoint is back to green via cluster-client's mz_dataflow_wallclock_lag_seconds rule. --- src/cluster-client/src/metrics.rs | 46 +++++++++++++++++++++++++++++ src/compute-client/src/metrics.rs | 49 +++++++++++++++++++++++++++++-- src/storage/src/statistics.rs | 33 ++++++++++++++++++++- 3 files changed, 124 insertions(+), 4 deletions(-) diff --git a/src/cluster-client/src/metrics.rs b/src/cluster-client/src/metrics.rs index e947b4f5acb89..a2a1d94f46a63 100644 --- a/src/cluster-client/src/metrics.rs +++ b/src/cluster-client/src/metrics.rs @@ -13,6 +13,7 @@ use mz_ore::cast::CastLossy; use mz_ore::metric; use mz_ore::metrics::{ CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry, + Rule, }; use mz_ore::stats::SlidingMinMax; use prometheus::core::{AtomicF64, AtomicU64}; @@ -37,16 +38,61 @@ impl ControllerMetrics { help: "A summary of the second-by-second lag of the dataflow frontier relative \ to wallclock time, aggregated over the last minute.", var_labels: ["instance_id", "replica_id", "collection_id", "quantile"], + rules: [ + Rule::ClusterNameLookup { + cluster_id_label: "instance_id".into(), + output_label: "cluster_name".into(), + }, + Rule::ReplicaNameLookup { + cluster_id_label: "instance_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + }, + Rule::ObjectNameLookup { + object_id_label: "collection_id".into(), + output_label: "collection_name".into(), + }, + ], )), dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!( name: "mz_dataflow_wallclock_lag_seconds_sum", help: "The total sum of dataflow wallclock lag measurements.", var_labels: ["instance_id", "replica_id", "collection_id"], + rules: [ + Rule::ClusterNameLookup { + cluster_id_label: "instance_id".into(), + output_label: "cluster_name".into(), + }, + Rule::ReplicaNameLookup { + cluster_id_label: "instance_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + }, + Rule::ObjectNameLookup { + object_id_label: "collection_id".into(), + output_label: "collection_name".into(), + }, + ], )), dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!( name: "mz_dataflow_wallclock_lag_seconds_count", help: "The total count of dataflow wallclock lag measurements.", var_labels: ["instance_id", "replica_id", "collection_id"], + rules: [ + Rule::ClusterNameLookup { + cluster_id_label: "instance_id".into(), + output_label: "cluster_name".into(), + }, + Rule::ReplicaNameLookup { + cluster_id_label: "instance_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + }, + Rule::ObjectNameLookup { + object_id_label: "collection_id".into(), + output_label: "collection_name".into(), + }, + ], )), } } diff --git a/src/compute-client/src/metrics.rs b/src/compute-client/src/metrics.rs index 35ef0c1455ee6..1f45e1a1393f7 100644 --- a/src/compute-client/src/metrics.rs +++ b/src/compute-client/src/metrics.rs @@ -21,7 +21,7 @@ use mz_ore::metric; use mz_ore::metrics::raw::UIntGaugeVec; use mz_ore::metrics::{ CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec, - IntCounterVec, MetricVecExt, MetricsRegistry, + IntCounterVec, MetricVecExt, MetricsRegistry, Rule, }; use mz_ore::stats::histogram_seconds_buckets; use mz_repr::GlobalId; @@ -74,119 +74,162 @@ pub struct ComputeControllerMetrics { shared: ControllerMetrics, } +/// Rule for metrics that carry an `instance_id` label and want a resolved +/// `instance_name`. The `metric!` macro evaluates each entry in `rules: [...]` +/// per registration, so we factor the boilerplate behind this helper. +fn instance_name_rule() -> Rule { + Rule::ClusterNameLookup { + cluster_id_label: "instance_id".into(), + output_label: "instance_name".into(), + } +} + +/// Rule for metrics that additionally carry a `replica_id` label and want +/// a resolved `replica_name`. +fn replica_name_rule() -> Rule { + Rule::ReplicaNameLookup { + cluster_id_label: "instance_id".into(), + replica_id_label: "replica_id".into(), + output_label: "replica_name".into(), + } +} + impl ComputeControllerMetrics { /// Create a metrics instance registered into the given registry. pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self { - ComputeControllerMetrics { + let metrics = ComputeControllerMetrics { commands_total: metrics_registry.register(metric!( name: "mz_compute_commands_total", help: "The total number of compute commands sent.", var_labels: ["instance_id", "replica_id", "command_type"], + rules: [instance_name_rule(), replica_name_rule()], )), command_message_bytes_total: metrics_registry.register(metric!( name: "mz_compute_command_message_bytes_total", help: "The total number of bytes sent in compute command messages.", var_labels: ["instance_id", "replica_id"], + rules: [instance_name_rule(), replica_name_rule()], )), responses_total: metrics_registry.register(metric!( name: "mz_compute_responses_total", help: "The total number of compute responses sent.", var_labels: ["instance_id", "replica_id", "response_type"], + rules: [instance_name_rule(), replica_name_rule()], )), response_message_bytes_total: metrics_registry.register(metric!( name: "mz_compute_response_message_bytes_total", help: "The total number of bytes sent in compute response messages.", var_labels: ["instance_id", "replica_id"], + rules: [instance_name_rule(), replica_name_rule()], )), replica_count: metrics_registry.register(metric!( name: "mz_compute_controller_replica_count", help: "The number of replicas.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), collection_count: metrics_registry.register(metric!( name: "mz_compute_controller_collection_count", help: "The number of installed compute collections.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), collection_unscheduled_count: metrics_registry.register(metric!( name: "mz_compute_controller_collection_unscheduled_count", help: "The number of installed but unscheduled compute collections.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), peek_count: metrics_registry.register(metric!( name: "mz_compute_controller_peek_count", help: "The number of pending peeks.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), subscribe_count: metrics_registry.register(metric!( name: "mz_compute_controller_subscribe_count", help: "The number of active subscribes.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), copy_to_count: metrics_registry.register(metric!( name: "mz_compute_controller_copy_to_count", help: "The number of active copy tos.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), command_queue_size: metrics_registry.register(metric!( name: "mz_compute_controller_command_queue_size", help: "The size of the compute command queue.", var_labels: ["instance_id", "replica_id"], + rules: [instance_name_rule(), replica_name_rule()], )), response_send_count: metrics_registry.register(metric!( name: "mz_compute_controller_response_send_count", help: "The number of sends on the compute response queue.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), response_recv_count: metrics_registry.register(metric!( name: "mz_compute_controller_response_recv_count", help: "The number of receives on the compute response queue.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), hydration_queue_size: metrics_registry.register(metric!( name: "mz_compute_controller_hydration_queue_size", help: "The size of the compute hydration queue.", var_labels: ["instance_id", "replica_id"], + rules: [instance_name_rule(), replica_name_rule()], )), history_command_count: metrics_registry.register(metric!( name: "mz_compute_controller_history_command_count", help: "The number of commands in the controller's command history.", var_labels: ["instance_id", "command_type"], + rules: [instance_name_rule()], )), history_dataflow_count: metrics_registry.register(metric!( name: "mz_compute_controller_history_dataflow_count", help: "The number of dataflows in the controller's command history.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), peeks_total: metrics_registry.register(metric!( name: "mz_compute_peeks_total", help: "The total number of peeks served.", var_labels: ["instance_id", "result"], + rules: [instance_name_rule()], )), peek_duration_seconds: metrics_registry.register(metric!( name: "mz_compute_peek_duration_seconds", help: "A histogram of peek durations since restart.", var_labels: ["instance_id", "result"], buckets: histogram_seconds_buckets(0.000_500, 32.), + rules: [instance_name_rule()], )), connected_replica_count: metrics_registry.register(metric!( name: "mz_compute_controller_connected_replica_count", help: "The number of replicas successfully connected to the compute controller.", var_labels: ["instance_id"], + rules: [instance_name_rule()], )), replica_connects_total: metrics_registry.register(metric!( name: "mz_compute_controller_replica_connects_total", help: "The total number of replica (re-)connections made by the compute controller.", var_labels: ["instance_id", "replica_id"], + rules: [instance_name_rule(), replica_name_rule()], )), replica_connect_wait_time_seconds_total: metrics_registry.register(metric!( name: "mz_compute_controller_replica_connect_wait_time_seconds_total", help: "The total time the compute controller spent waiting for replica (re-)connection.", var_labels: ["instance_id", "replica_id"], + rules: [instance_name_rule(), replica_name_rule()], )), shared, - } + }; + + metrics } /// Return an object suitable for tracking metrics for the given compute instance. diff --git a/src/storage/src/statistics.rs b/src/storage/src/statistics.rs index 226baccc4947e..346a4018dbf12 100644 --- a/src/storage/src/statistics.rs +++ b/src/storage/src/statistics.rs @@ -29,7 +29,7 @@ use std::time::Instant; use mz_ore::metric; use mz_ore::metrics::{ - DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricsRegistry, + DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricsRegistry, Rule, UIntGaugeVec, }; use mz_repr::{GlobalId, Timestamp}; @@ -66,6 +66,24 @@ pub(crate) struct SourceStatisticsMetricDefs { pub(crate) envelope_state_tombstones: UIntGaugeVec, } +/// Rule for source metrics that carry a `source_id` label and want a +/// resolved `source_name`. +fn source_name_rule() -> Rule { + Rule::ObjectNameLookup { + object_id_label: "source_id".into(), + output_label: "source_name".into(), + } +} + +/// Rule for source metrics that additionally carry a `parent_source_id` label +/// and want a resolved `parent_source_name`. +fn parent_source_name_rule() -> Rule { + Rule::ObjectNameLookup { + object_id_label: "parent_source_id".into(), + output_label: "parent_source_name".into(), + } +} + impl SourceStatisticsMetricDefs { pub(crate) fn register_with(registry: &MetricsRegistry) -> Self { Self { @@ -73,66 +91,79 @@ impl SourceStatisticsMetricDefs { name: "mz_source_snapshot_committed", help: "Whether or not the worker has committed the initial snapshot for a source.", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), messages_received: registry.register(metric!( name: "mz_source_messages_received", help: "The number of raw messages the worker has received from upstream.", var_labels: ["source_id", "worker_id", "parent_source_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), updates_staged: registry.register(metric!( name: "mz_source_updates_staged", help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), updates_committed: registry.register(metric!( name: "mz_source_updates_committed", help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), bytes_received: registry.register(metric!( name: "mz_source_bytes_received", help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.", var_labels: ["source_id", "worker_id", "parent_source_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), bytes_indexed: registry.register(metric!( name: "mz_source_bytes_indexed", help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), records_indexed: registry.register(metric!( name: "mz_source_records_indexed", help: "The number of records in the source envelope state. This will be specific to the envelope in use", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), envelope_state_tombstones: registry.register(metric!( name: "mz_source_envelope_state_tombstones", help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), rehydration_latency_ms: registry.register(metric!( name: "mz_source_rehydration_latency_ms", help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"], + rules: [source_name_rule(), parent_source_name_rule()], )), offset_known: registry.register(metric!( name: "mz_source_offset_known", help: "The total number of _values_ (source-defined unit) present in upstream.", var_labels: ["source_id", "worker_id", "shard_id"], + rules: [source_name_rule()], )), offset_committed: registry.register(metric!( name: "mz_source_offset_committed", help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.", var_labels: ["source_id", "worker_id", "shard_id"], + rules: [source_name_rule()], )), snapshot_records_known: registry.register(metric!( name: "mz_source_snapshot_records_known", help: "The total number of records in the source's snapshot", var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), snapshot_records_staged: registry.register(metric!( name: "mz_source_snapshot_records_staged", help: "The total number of records read from the source's snapshot", var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"], + rules: [source_name_rule(), parent_source_name_rule()], )), } }