From e1f58975b4c1b60b1b12156e8aa0c7b53e978779 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Mon, 2 Mar 2026 10:29:01 -0500 Subject: [PATCH] chore: Add support for metrics-exporter-dogstatsd This will enable usage of native distributions and will lower the number number of custom metrics that taskbroker creates as venuer aggregates won't be used anymore. --- Cargo.lock | 132 +++++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 1 + src/config.rs | 13 ++++- src/metrics.rs | 50 ++++++++++++++----- 4 files changed, 178 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec1f3551..1d17802f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,6 +698,12 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "equivalent" version = "1.0.2" @@ -793,6 +799,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1005,7 +1017,16 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", ] [[package]] @@ -1014,7 +1035,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" dependencies = [ - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -1339,7 +1360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -1541,6 +1562,21 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "metrics-exporter-dogstatsd" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "961f3712d8a7cfe14caaf74c3af503fe701cee6439ff49a7a3ebd04bf49c0502" +dependencies = [ + "bytes", + "itoa", + "metrics", + "metrics-util", + "ryu", + "thiserror 2.0.16", + "tracing", +] + [[package]] name = "metrics-exporter-statsd" version = "0.9.0" @@ -1552,6 +1588,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "metrics-util" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.16.1", + "indexmap", + "metrics", + "ordered-float", + "quanta", + "radix_trie", + "rand 0.9.2", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1595,6 +1651,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1751,6 +1816,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "5.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4779c6901a562440c3786d08192c6fbda7c1c2060edd10006b05ee35d10f2d" +dependencies = [ + "num-traits", +] + [[package]] name = "os_info" version = "3.12.0" @@ -2029,6 +2103,21 @@ dependencies = [ "prost", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.1+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.38.3" @@ -2053,6 +2142,16 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2112,6 +2211,24 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.3", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "rayon" version = "1.11.0" @@ -2673,6 +2790,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "sketches-ddsketch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" + [[package]] name = "slab" version = "0.4.11" @@ -2757,7 +2880,7 @@ dependencies = [ "futures-intrusive", "futures-io", "futures-util", - "hashbrown", + "hashbrown 0.15.5", "hashlink", "indexmap", "log", @@ -3000,6 +3123,7 @@ dependencies = [ "http-body-util", "libsqlite3-sys", "metrics", + "metrics-exporter-dogstatsd", "metrics-exporter-statsd", "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 67119a08..95049f37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ http = "1.1.0" http-body-util = "0.1.2" libsqlite3-sys = "0.30.1" metrics = "0.24.0" +metrics-exporter-dogstatsd = "0.9.6" metrics-exporter-statsd = "0.9.0" prost = "0.13" prost-types = "0.13.3" diff --git a/src/config.rs b/src/config.rs index 046de4f7..5150f09a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,6 +39,9 @@ pub struct Config { /// The statsd address to report metrics to. pub statsd_addr: String, + /// The dogstatsd metrics host. + pub metrics_host: String, + /// Default tags to add to all metrics. pub default_metrics_tags: BTreeMap, @@ -246,12 +249,15 @@ impl Default for Config { sentry_dsn: None, sentry_env: None, traces_sample_rate: Some(0.0), - log_filter: "info,librdkafka=warn,h2=off".to_owned(), + log_filter: + "info,librdkafka=warn,h2=off,metrics_exporter_dogstatsd::forwarder::sync=off" + .to_owned(), log_format: LogFormat::Text, grpc_addr: "0.0.0.0".to_owned(), grpc_port: 50051, grpc_shared_secret: vec![], statsd_addr: "127.0.0.1:8126".parse().unwrap(), + metrics_host: "".to_owned(), default_metrics_tags: Default::default(), kafka_cluster: "127.0.0.1:9092".to_owned(), kafka_consumer_group: "taskworker".to_owned(), @@ -432,7 +438,10 @@ mod tests { }; assert_eq!(config.sentry_dsn, None); assert_eq!(config.sentry_env, None); - assert_eq!(config.log_filter, "info,librdkafka=warn,h2=off"); + assert_eq!( + config.log_filter, + "info,librdkafka=warn,h2=off,metrics_exporter_dogstatsd::forwarder::sync=off" + ); assert_eq!(config.log_format, LogFormat::Text); assert_eq!(config.grpc_port, 50051); assert_eq!(config.kafka_topic, "taskworker"); diff --git a/src/metrics.rs b/src/metrics.rs index 3a69e5df..685bef66 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,4 +1,5 @@ use crate::config::Config; +use metrics_exporter_dogstatsd::DogStatsDBuilder; use metrics_exporter_statsd::StatsdBuilder; use std::{ collections::BTreeMap, @@ -6,6 +7,7 @@ use std::{ }; pub struct MetricsConfig { + pub metrics_host: String, pub statsd_addr: SocketAddr, pub default_tags: BTreeMap, } @@ -21,25 +23,49 @@ impl MetricsConfig { }; MetricsConfig { statsd_addr: *statsd_addr, + metrics_host: config.metrics_host.clone(), default_tags: config.default_metrics_tags.clone(), } } } pub fn init(metrics_config: MetricsConfig) { - let address = metrics_config.statsd_addr; + if metrics_config.metrics_host.is_empty() { + // Fallback to statsd forwarder + let address = metrics_config.statsd_addr; - let builder = StatsdBuilder::from(address.ip().to_string(), address.port()); + let builder = StatsdBuilder::from(address.ip().to_string(), address.port()); - let recorder = metrics_config - .default_tags - .into_iter() - .fold( - builder.with_queue_size(5000).with_buffer_size(1024), - |builder, (key, value)| builder.with_default_tag(key, value), - ) - .build(Some("taskbroker")) - .expect("Could not create StatsdRecorder"); + let recorder = metrics_config + .default_tags + .into_iter() + .fold( + builder.with_queue_size(5000).with_buffer_size(1024), + |builder, (key, value)| builder.with_default_tag(key, value), + ) + .build(Some("taskbroker")) + .expect("Could not create StatsdRecorder"); - metrics::set_global_recorder(recorder).expect("Could not set global metrics recorder") + metrics::set_global_recorder(recorder).expect("Could not set global metrics recorder") + } else { + let default_tags = metrics_config + .default_tags + .into_iter() + .map(|(key, value)| metrics::Label::new(key, value)) + .collect(); + + // Use dogstatsd exporter if enabled. + let builder = DogStatsDBuilder::default() + .with_remote_address(metrics_config.metrics_host) + .expect("Could not set metrics host address") + .with_telemetry(true) + .send_histograms_as_distributions(true) + .with_histogram_sampling(true) + .set_global_prefix("taskbroker") + .with_global_labels(default_tags); + + builder + .install() + .expect("Could not create DogStatsDBuilder"); + } }