From f245661b2666af5bbd455f953da6d96568381daa Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Thu, 26 Mar 2026 17:11:19 -0400 Subject: [PATCH 01/13] implement trace stats for serverless compat --- Cargo.lock | 29 ++- crates/datadog-serverless-compat/src/main.rs | 38 +++- crates/datadog-trace-agent/Cargo.toml | 7 +- crates/datadog-trace-agent/src/config.rs | 9 + crates/datadog-trace-agent/src/lib.rs | 2 + .../src/stats_concentrator_service.rs | 194 ++++++++++++++++++ .../datadog-trace-agent/src/stats_flusher.rs | 30 ++- .../src/stats_generator.rs | 49 +++++ .../src/trace_processor.rs | 30 ++- .../tests/integration_test.rs | 16 +- 10 files changed, 380 insertions(+), 24 deletions(-) create mode 100644 crates/datadog-trace-agent/src/stats_concentrator_service.rs create mode 100644 crates/datadog-trace-agent/src/stats_generator.rs diff --git a/Cargo.lock b/Cargo.lock index 8970e45a..c8c582fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -568,6 +568,7 @@ dependencies = [ "libdd-common 3.0.1", "libdd-trace-obfuscation", "libdd-trace-protobuf 3.0.0", + "libdd-trace-stats 1.0.4", "libdd-trace-utils 3.0.0", "reqwest", "rmp-serde", @@ -576,6 +577,7 @@ dependencies = [ "serial_test", "temp-env", "tempfile", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -1509,12 +1511,12 @@ dependencies = [ "http", "http-body-util", "libdd-common 2.0.1", - "libdd-ddsketch", + "libdd-ddsketch 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "libdd-dogstatsd-client", "libdd-telemetry", "libdd-tinybytes 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "libdd-trace-protobuf 2.0.0", - "libdd-trace-stats", + "libdd-trace-stats 1.0.3", "libdd-trace-utils 2.0.2", "rmp-serde", "serde", @@ -1535,6 +1537,14 @@ dependencies = [ "prost 0.14.3", ] +[[package]] +name = "libdd-ddsketch" +version = "1.0.1" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +dependencies = [ + "prost 0.14.3", +] + [[package]] name = "libdd-dogstatsd-client" version = "1.0.1" @@ -1563,7 +1573,7 @@ dependencies = [ "http-body-util", "libc", "libdd-common 2.0.1", - "libdd-ddsketch", + "libdd-ddsketch 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde", "serde_json", "sys-info", @@ -1655,11 +1665,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea447dc8a5d84c6b5eb6ea877c4fea4149fd29f6b45fcfc5cfd7edf82a18e056" dependencies = [ "hashbrown 0.15.5", - "libdd-ddsketch", + "libdd-ddsketch 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "libdd-trace-protobuf 2.0.0", "libdd-trace-utils 2.0.2", ] +[[package]] +name = "libdd-trace-stats" +version = "1.0.4" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +dependencies = [ + "hashbrown 0.15.5", + "libdd-ddsketch 1.0.1 (git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad)", + "libdd-trace-protobuf 3.0.0", + "libdd-trace-utils 3.0.0", +] + [[package]] name = "libdd-trace-utils" version = "2.0.2" diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 8c20c41a..6fa432b6 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -18,7 +18,8 @@ use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, - config, env_verifier, mini_agent, proxy_flusher, stats_flusher, stats_processor, + config, env_verifier, mini_agent, proxy_flusher, stats_concentrator_service, stats_flusher, + stats_generator, stats_processor, trace_flusher::{self, TraceFlusher}, trace_processor, }; @@ -119,6 +120,12 @@ pub async fn main() { .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(DEFAULT_LOG_INTAKE_PORT); + + let dd_serverless_stats_computation_enabled = + env::var("DD_SERVERLESS_STATS_COMPUTATION_ENABLED") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + debug!("Starting serverless trace mini agent"); let env_filter = format!("h2=off,hyper=off,rustls=off,{}", log_level); @@ -144,11 +151,6 @@ pub async fn main() { let env_verifier = Arc::new(env_verifier::ServerlessEnvVerifier::default()); - let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {}); - - let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {}); - let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); - let config = match config::Config::new() { Ok(c) => Arc::new(c), Err(e) => { @@ -157,6 +159,30 @@ pub async fn main() { } }; + // Initialize stats concentrator service and generator conditionally + let (stats_concentrator_handle, stats_generator) = if dd_serverless_stats_computation_enabled { + info!("Serverless stats computation enabled"); + let (stats_concentrator_service, stats_concentrator_handle) = + stats_concentrator_service::StatsConcentratorService::new(config.clone()); + tokio::spawn(stats_concentrator_service.run()); + let stats_generator = Arc::new(stats_generator::StatsGenerator::new( + stats_concentrator_handle.clone(), + )); + (Some(stats_concentrator_handle), Some(stats_generator)) + } else { + info!("Serverless stats computation disabled"); + (None, None) + }; + + let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { + stats_generator: stats_generator.clone(), + }); + + let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher { + stats_concentrator: stats_concentrator_handle.clone(), + }); + let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); + let trace_aggregator = Arc::new(TokioMutex::new(TraceAggregator::default())); let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new( trace_aggregator, diff --git a/crates/datadog-trace-agent/Cargo.toml b/crates/datadog-trace-agent/Cargo.toml index 1a69733a..88221059 100644 --- a/crates/datadog-trace-agent/Cargo.toml +++ b/crates/datadog-trace-agent/Cargo.toml @@ -24,14 +24,19 @@ async-trait = "0.1.64" tracing = { version = "0.1", default-features = false } serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0" +thiserror = { version = "1.0.58", default-features = false } libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } +libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", features = [ "mini_agent", ] } libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } datadog-fips = { path = "../datadog-fips" } -reqwest = { version = "0.12.23", features = ["json", "http2"], default-features = false } +reqwest = { version = "0.12.23", features = [ + "json", + "http2", +], default-features = false } bytes = "1.10.1" [dev-dependencies] diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 5a7b8a8c..bf1d8445 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -109,6 +109,9 @@ pub struct Config { /// timeout for environment verification, in milliseconds pub verify_env_timeout_ms: u64, pub proxy_url: Option, + pub service: Option, + pub env: Option, + pub version: Option, } impl Config { @@ -251,6 +254,9 @@ impl Config { .or_else(|_| env::var("HTTPS_PROXY")) .ok(), tags, + service: env::var("DD_SERVICE").ok(), + env: env::var("DD_ENV").ok(), + version: env::var("DD_VERSION").ok(), }) } } @@ -721,6 +727,9 @@ pub mod test_helpers { proxy_request_retry_backoff_base_ms: 100, verify_env_timeout_ms: 1000, proxy_url: None, + service: None, + env: None, + version: None, } } } diff --git a/crates/datadog-trace-agent/src/lib.rs b/crates/datadog-trace-agent/src/lib.rs index a87bf56b..daeed742 100644 --- a/crates/datadog-trace-agent/src/lib.rs +++ b/crates/datadog-trace-agent/src/lib.rs @@ -13,7 +13,9 @@ pub mod env_verifier; pub mod http_utils; pub mod mini_agent; pub mod proxy_flusher; +pub mod stats_concentrator_service; pub mod stats_flusher; +pub mod stats_generator; pub mod stats_processor; pub mod trace_flusher; pub mod trace_processor; diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs new file mode 100644 index 00000000..6523c856 --- /dev/null +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -0,0 +1,194 @@ +use tokio::sync::{mpsc, oneshot}; + +use crate::config::Config; +use libdd_trace_protobuf::pb; +use libdd_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; +use libdd_trace_stats::span_concentrator::SpanConcentrator; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{Duration, SystemTime}; +use tracing::error; + +const S_TO_NS: u64 = 1_000_000_000; +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds + +#[derive(Debug, thiserror::Error)] +pub enum StatsError { + #[error("Failed to send command to concentrator: {0}")] + SendError(mpsc::error::SendError), + #[error("Failed to receive response from concentrator: {0}")] + RecvError(oneshot::error::RecvError), +} + +#[derive(Clone, Debug, Default)] +pub struct TracerMetadata { + // e.g. "python" + pub language: String, + // e.g. "3.11.0" + pub tracer_version: String, + // e.g. "f45568ad09d5480b99087d86ebda26e6" + pub runtime_id: String, + pub container_id: String, +} + +pub enum ConcentratorCommand { + SetTracerMetadata(TracerMetadata), + // Use a box to reduce the size of the command enum + Add(Box), + Flush(bool, oneshot::Sender>), +} + +pub struct StatsConcentratorHandle { + tx: mpsc::UnboundedSender, + is_tracer_metadata_set: AtomicBool, +} + +impl Clone for StatsConcentratorHandle { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + // Cloning this may cause trace metadata to be set multiple times, + // but it's okay because it's the same for all traces and we don't need to be perfect on dedup. + is_tracer_metadata_set: AtomicBool::new( + self.is_tracer_metadata_set.load(Ordering::Acquire), + ), + } + } +} + +impl StatsConcentratorHandle { + #[must_use] + pub fn new(tx: mpsc::UnboundedSender) -> Self { + Self { + tx, + is_tracer_metadata_set: AtomicBool::new(false), + } + } + + pub fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { + // Set tracer metadata only once for the first trace because + // it is the same for all traces. + if !self.is_tracer_metadata_set.load(Ordering::Acquire) { + self.is_tracer_metadata_set.store(true, Ordering::Release); + let tracer_metadata = TracerMetadata { + language: trace.language_name.clone(), + tracer_version: trace.tracer_version.clone(), + runtime_id: trace.runtime_id.clone(), + container_id: trace.container_id.clone(), + }; + self.tx + .send(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) + .map_err(StatsError::SendError)?; + } + Ok(()) + } + + pub fn add(&self, span: &pb::Span) -> Result<(), StatsError> { + self.tx + .send(ConcentratorCommand::Add(Box::new(span.clone()))) + .map_err(StatsError::SendError)?; + Ok(()) + } + + pub async fn flush(&self, force_flush: bool) -> Result, StatsError> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(ConcentratorCommand::Flush(force_flush, response_tx)) + .map_err(StatsError::SendError)?; + response_rx.await.map_err(StatsError::RecvError) + } +} + +pub struct StatsConcentratorService { + concentrator: SpanConcentrator, + rx: mpsc::UnboundedReceiver, + tracer_metadata: TracerMetadata, + config: Arc, +} + +// A service that handles add() and flush() requests in the same queue, +// to avoid using mutex, which may cause lock contention. +impl StatsConcentratorService { + #[must_use] + pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = StatsConcentratorHandle::new(tx); + // TODO: set span_kinds_stats_computed and peer_tag_keys + let concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_DURATION_NS), + SystemTime::now(), + vec![], + vec![], + ); + let service: StatsConcentratorService = Self { + concentrator, + rx, + // To be set when the first trace is received + tracer_metadata: TracerMetadata::default(), + config, + }; + (service, handle) + } + + pub async fn run(mut self) { + while let Some(command) = self.rx.recv().await { + match command { + ConcentratorCommand::SetTracerMetadata(tracer_metadata) => { + self.tracer_metadata = tracer_metadata; + } + ConcentratorCommand::Add(span) => self.concentrator.add_span(&*span), + ConcentratorCommand::Flush(force_flush, response_tx) => { + self.handle_flush(force_flush, response_tx); + } + } + } + } + + fn handle_flush( + &mut self, + force_flush: bool, + response_tx: oneshot::Sender>, + ) { + let stats_buckets = self.concentrator.flush(SystemTime::now(), force_flush); + let stats = if stats_buckets.is_empty() { + None + } else { + Some(ClientStatsPayload { + // Do not set hostname so the trace stats backend can aggregate stats properly + hostname: String::new(), + env: self.config.env.clone().unwrap_or("unknown-env".to_string()), + // Version is not in the trace payload. Need to read it from config. + version: self.config.version.clone().unwrap_or_default(), + lang: self.tracer_metadata.language.clone(), + tracer_version: self.tracer_metadata.tracer_version.clone(), + runtime_id: self.tracer_metadata.runtime_id.clone(), + // Not supported yet + sequence: 0, + // Not supported yet + agent_aggregation: String::new(), + service: self + .config + .service + .clone() + .unwrap_or_default() + .to_lowercase(), + container_id: self.tracer_metadata.container_id.clone(), + // Not supported yet + tags: vec![], + // Not supported yet + git_commit_sha: String::new(), + // Not supported yet + image_tag: String::new(), + stats: stats_buckets, + // Not supported yet + process_tags: String::new(), + // Not supported yet + process_tags_hash: 0, + }) + }; + let response = response_tx.send(stats); + if let Err(e) = response { + error!("Failed to return trace stats: {e:?}"); + } + } +} diff --git a/crates/datadog-trace-agent/src/stats_flusher.rs b/crates/datadog-trace-agent/src/stats_flusher.rs index 6c6e5805..e187a8d2 100644 --- a/crates/datadog-trace-agent/src/stats_flusher.rs +++ b/crates/datadog-trace-agent/src/stats_flusher.rs @@ -10,6 +10,7 @@ use libdd_trace_protobuf::pb; use libdd_trace_utils::stats_utils; use crate::config::Config; +use crate::stats_concentrator_service::StatsConcentratorHandle; #[async_trait] pub trait StatsFlusher { @@ -25,7 +26,9 @@ pub trait StatsFlusher { } #[derive(Clone)] -pub struct ServerlessStatsFlusher {} +pub struct ServerlessStatsFlusher { + pub stats_concentrator: Option, +} #[async_trait] impl StatsFlusher for ServerlessStatsFlusher { @@ -50,14 +53,31 @@ impl StatsFlusher for ServerlessStatsFlusher { tokio::time::sleep(time::Duration::from_secs(config.stats_flush_interval_secs)).await; let mut buffer = buffer_consumer.lock().await; - if !buffer.is_empty() { - self.flush_stats(config.clone(), buffer.to_vec()).await; - buffer.clear(); + let channel_stats = buffer.to_vec(); + buffer.clear(); + drop(buffer); + + let should_flush = !channel_stats.is_empty() || self.stats_concentrator.is_some(); + if should_flush { + self.flush_stats(config.clone(), channel_stats).await; } } } - async fn flush_stats(&self, config: Arc, stats: Vec) { + async fn flush_stats(&self, config: Arc, mut stats: Vec) { + if let Some(ref concentrator) = self.stats_concentrator { + match concentrator.flush(false).await { + Ok(Some(payload)) => { + debug!("Merged agent-generated stats from concentrator into flush batch"); + stats.push(payload); + } + Ok(None) => {} + Err(e) => { + error!("Failed to flush stats concentrator: {e}"); + } + } + } + if stats.is_empty() { return; } diff --git a/crates/datadog-trace-agent/src/stats_generator.rs b/crates/datadog-trace-agent/src/stats_generator.rs new file mode 100644 index 00000000..2bd730e1 --- /dev/null +++ b/crates/datadog-trace-agent/src/stats_generator.rs @@ -0,0 +1,49 @@ +use crate::stats_concentrator_service::{StatsConcentratorHandle, StatsError}; +use libdd_trace_utils::tracer_payload::TracerPayloadCollection; +use tracing::error; + +pub struct StatsGenerator { + stats_concentrator: StatsConcentratorHandle, +} + +#[derive(Debug, thiserror::Error)] +pub enum StatsGeneratorError { + #[error("Error sending trace stats to the stats concentrator: {0}")] + ConcentratorCommandError(StatsError), + #[error("Unsupported trace payload version. Failed to send trace stats.")] + TracePayloadVersionError, +} + +// Extracts information from traces related to stats and sends it to the stats concentrator +impl StatsGenerator { + #[must_use] + pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { + Self { stats_concentrator } + } + + pub fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { + if let TracerPayloadCollection::V07(traces) = traces { + for trace in traces { + // Set tracer metadata + if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace) { + error!("Failed to set tracer metadata: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); + } + + // Generate stats for each span in the trace + for chunk in &trace.chunks { + for span in &chunk.spans { + if let Err(err) = self.stats_concentrator.add(span) { + error!("Failed to send trace stats: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); + } + } + } + } + Ok(()) + } else { + error!("Unsupported trace payload version. Failed to send trace stats."); + Err(StatsGeneratorError::TracePayloadVersionError) + } + } +} diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 96f82098..afd5d5cd 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use hyper::{StatusCode, http}; use libdd_common::http_common; use tokio::sync::mpsc::Sender; -use tracing::debug; +use tracing::{debug, error}; use libdd_trace_obfuscation::obfuscate::obfuscate_span; use libdd_trace_protobuf::pb; @@ -18,6 +18,7 @@ use libdd_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollec use crate::{ config::Config, http_utils::{self, log_and_create_http_response, log_and_create_traces_success_http_response}, + stats_generator::StatsGenerator, }; const TRACER_PAYLOAD_FUNCTION_TAGS_TAG_KEY: &str = "_dd.tags.function"; @@ -65,7 +66,11 @@ impl TraceChunkProcessor for ChunkProcessor { } } #[derive(Clone)] -pub struct ServerlessTraceProcessor {} +pub struct ServerlessTraceProcessor { + /// The [`StatsGenerator`] to use for generating stats and sending them to + /// the stats concentrator. + pub stats_generator: Option>, +} #[async_trait] impl TraceProcessor for ServerlessTraceProcessor { @@ -139,6 +144,16 @@ impl TraceProcessor for ServerlessTraceProcessor { } } + if let Some(stats_generator) = self.stats_generator.as_ref() { + if tracer_header_tags.client_computed_stats { + debug!( + "Skipping agent-side stats generation: trace payload has Datadog-Client-Computed-Stats" + ); + } else if let Err(e) = stats_generator.send(&payload) { + error!("Stats generator error: {e}"); + } + } + let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake); // send trace payload to our trace flusher @@ -219,6 +234,9 @@ mod tests { ..Default::default() }, tags: Tags::from_env_string("env:test,service:my-service"), + service: Some("test-service".to_string()), + env: Some("test-env".to_string()), + version: Some("1.0.0".to_string()), } } @@ -254,7 +272,9 @@ mod tests { .body(http_common::Body::from(bytes)) .unwrap(); - let trace_processor = trace_processor::ServerlessTraceProcessor {}; + let trace_processor = trace_processor::ServerlessTraceProcessor { + stats_generator: None, + }; let res = trace_processor .process_traces( Arc::new(create_test_config()), @@ -326,7 +346,9 @@ mod tests { .body(http_common::Body::from(bytes)) .unwrap(); - let trace_processor = trace_processor::ServerlessTraceProcessor {}; + let trace_processor = trace_processor::ServerlessTraceProcessor { + stats_generator: None, + }; let res = trace_processor .process_traces( Arc::new(create_test_config()), diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 1491954f..a71e8162 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -54,13 +54,17 @@ pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); MiniAgent { config: config.clone(), - trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_processor: Arc::new(ServerlessTraceProcessor { + stats_generator: None, + }), trace_flusher: Arc::new(ServerlessTraceFlusher::new( aggregator.clone(), config.clone(), )), stats_processor: Arc::new(ServerlessStatsProcessor {}), - stats_flusher: Arc::new(ServerlessStatsFlusher {}), + stats_flusher: Arc::new(ServerlessStatsFlusher { + stats_concentrator: None, + }), env_verifier: Arc::new(MockEnvVerifier), proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), } @@ -110,7 +114,9 @@ async fn test_mini_agent_tcp_handles_requests() { let test_port = config.dd_apm_receiver_port; let mini_agent = MiniAgent { config: config.clone(), - trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_processor: Arc::new(ServerlessTraceProcessor { + stats_generator: None, + }), trace_flusher: Arc::new(MockTraceFlusher), stats_processor: Arc::new(MockStatsProcessor), stats_flusher: Arc::new(MockStatsFlusher), @@ -206,7 +212,9 @@ async fn test_mini_agent_named_pipe_handles_requests() { let mini_agent = MiniAgent { config: config.clone(), - trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_processor: Arc::new(ServerlessTraceProcessor { + stats_generator: None, + }), trace_flusher: Arc::new(MockTraceFlusher), stats_processor: Arc::new(MockStatsProcessor), stats_flusher: Arc::new(MockStatsFlusher), From a42c962b6198ce20b3ae8dbad9de456e6a6bbda4 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Fri, 27 Mar 2026 14:57:58 -0400 Subject: [PATCH 02/13] support span derived primary tags --- Cargo.lock | 21 ++++++++------- crates/datadog-agent-config/Cargo.toml | 4 +-- crates/datadog-serverless-compat/Cargo.toml | 2 +- crates/datadog-trace-agent/Cargo.toml | 12 ++++----- crates/datadog-trace-agent/src/config.rs | 26 +++++++++++++++++++ .../src/stats_concentrator_service.rs | 1 + .../src/trace_processor.rs | 1 + 7 files changed, 48 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8c582fc..b54abfd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1467,7 +1467,7 @@ dependencies = [ [[package]] name = "libdd-common" version = "3.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" dependencies = [ "anyhow", "bytes", @@ -1540,7 +1540,7 @@ dependencies = [ [[package]] name = "libdd-ddsketch" version = "1.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" dependencies = [ "prost 0.14.3", ] @@ -1596,7 +1596,7 @@ dependencies = [ [[package]] name = "libdd-tinybytes" version = "1.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" dependencies = [ "serde", ] @@ -1614,7 +1614,7 @@ dependencies = [ [[package]] name = "libdd-trace-normalization" version = "1.0.3" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" dependencies = [ "anyhow", "libdd-trace-protobuf 3.0.0", @@ -1623,7 +1623,7 @@ dependencies = [ [[package]] name = "libdd-trace-obfuscation" version = "1.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" dependencies = [ "anyhow", "fluent-uri", @@ -1651,7 +1651,7 @@ dependencies = [ [[package]] name = "libdd-trace-protobuf" version = "3.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" dependencies = [ "prost 0.14.3", "serde", @@ -1673,10 +1673,10 @@ dependencies = [ [[package]] name = "libdd-trace-stats" version = "1.0.4" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" dependencies = [ "hashbrown 0.15.5", - "libdd-ddsketch 1.0.1 (git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad)", + "libdd-ddsketch 1.0.1 (git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f)", "libdd-trace-protobuf 3.0.0", "libdd-trace-utils 3.0.0", ] @@ -1712,9 +1712,10 @@ dependencies = [ [[package]] name = "libdd-trace-utils" version = "3.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" dependencies = [ "anyhow", + "base64 0.22.1", "bytes", "cargo-platform", "cargo_metadata", @@ -1727,7 +1728,7 @@ dependencies = [ "hyper", "indexmap", "libdd-common 3.0.1", - "libdd-tinybytes 1.1.0 (git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad)", + "libdd-tinybytes 1.1.0 (git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f)", "libdd-trace-normalization 1.0.3", "libdd-trace-protobuf 3.0.0", "prost 0.14.3", diff --git a/crates/datadog-agent-config/Cargo.toml b/crates/datadog-agent-config/Cargo.toml index 222d7265..bbcc112d 100644 --- a/crates/datadog-agent-config/Cargo.toml +++ b/crates/datadog-agent-config/Cargo.toml @@ -9,8 +9,8 @@ path = "mod.rs" [dependencies] figment = { version = "0.10", default-features = false, features = ["yaml", "env"] } -libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } +libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } log = { version = "0.4", default-features = false } serde = { version = "1.0", default-features = false, features = ["derive"] } serde-aux = { version = "4.7", default-features = false } diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index b84bb15f..a2afde11 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -12,7 +12,7 @@ windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] [dependencies] datadog-logs-agent = { path = "../datadog-logs-agent" } datadog-trace-agent = { path = "../datadog-trace-agent" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } datadog-fips = { path = "../datadog-fips", default-features = false } dogstatsd = { path = "../dogstatsd", default-features = true } reqwest = { version = "0.12.4", default-features = false } diff --git a/crates/datadog-trace-agent/Cargo.toml b/crates/datadog-trace-agent/Cargo.toml index 88221059..d0e066a0 100644 --- a/crates/datadog-trace-agent/Cargo.toml +++ b/crates/datadog-trace-agent/Cargo.toml @@ -25,13 +25,13 @@ tracing = { version = "0.1", default-features = false } serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0" thiserror = { version = "1.0.58", default-features = false } -libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } -libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } -libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", features = [ +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } +libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } +libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f", features = [ "mini_agent", ] } -libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } +libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } datadog-fips = { path = "../datadog-fips" } reqwest = { version = "0.12.23", features = [ "json", @@ -45,6 +45,6 @@ serial_test = "2.0.0" duplicate = "0.4.1" temp-env = "0.3.6" tempfile = "3.3.0" -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", features = [ +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f", features = [ "test-utils", ] } diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index bf1d8445..f0af9d1a 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -14,6 +14,7 @@ use libdd_trace_utils::config_utils::{ trace_stats_url_prefixed, }; use libdd_trace_utils::trace_utils; +use tracing::debug; const DEFAULT_APM_RECEIVER_PORT: u16 = 8126; const DEFAULT_DOGSTATSD_PORT: u16 = 8125; @@ -112,6 +113,8 @@ pub struct Config { pub service: Option, pub env: Option, pub version: Option, + /// Span tag keys used as second primary tags + pub span_derived_primary_tags: Vec, } impl Config { @@ -215,6 +218,17 @@ impl Config { Tags::new() }; + let span_derived_primary_tags = match env::var("DD_APM_SPAN_DERIVED_PRIMARY_TAGS") { + Ok(env_tags) => parse_json_string_list(&env_tags)?, + Err(_) => vec![], + }; + if !span_derived_primary_tags.is_empty() { + debug!( + "span_derived_primary_tags configured: [{}]", + span_derived_primary_tags.join(" ") + ); + } + #[allow(clippy::unwrap_used)] Ok(Config { app_name: Some(app_name), @@ -257,10 +271,21 @@ impl Config { service: env::var("DD_SERVICE").ok(), env: env::var("DD_ENV").ok(), version: env::var("DD_VERSION").ok(), + span_derived_primary_tags, }) } } +/// Parses a JSON array of strings. Invalid JSON is an error and returns []. +fn parse_json_string_list(env_tags: &str) -> Result, Box> { + serde_json::from_str::>(env_tags).map_err(|e| { + anyhow::anyhow!( + "expected a JSON array of strings, e.g. [] or [\"http.url\",\"db.name\"]: {e}" + ) + .into() + }) +} + #[cfg(test)] mod tests { use duplicate::duplicate_item; @@ -730,6 +755,7 @@ pub mod test_helpers { service: None, env: None, version: None, + span_derived_primary_tags: vec![], } } } diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs index 6523c856..5ea4ea65 100644 --- a/crates/datadog-trace-agent/src/stats_concentrator_service.rs +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -119,6 +119,7 @@ impl StatsConcentratorService { SystemTime::now(), vec![], vec![], + config.span_derived_primary_tags.clone(), ); let service: StatsConcentratorService = Self { concentrator, diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index afd5d5cd..5faf5de2 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -237,6 +237,7 @@ mod tests { service: Some("test-service".to_string()), env: Some("test-env".to_string()), version: Some("1.0.0".to_string()), + span_derived_primary_tags: vec![], } } From c13b495482a11bb4965e1bf9503a92d50a0e95d3 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 31 Mar 2026 13:49:09 -0400 Subject: [PATCH 03/13] tests and minor clean up --- crates/datadog-serverless-compat/src/main.rs | 18 +-- crates/datadog-trace-agent/src/config.rs | 54 +++++-- .../datadog-trace-agent/src/stats_flusher.rs | 36 ++++- .../src/trace_processor.rs | 16 +- .../tests/common/helpers.rs | 5 + .../tests/common/mock_server.rs | 16 +- .../tests/integration_test.rs | 145 ++++++++++++++++-- 7 files changed, 241 insertions(+), 49 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 6fa432b6..7e723bcb 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -159,18 +159,17 @@ pub async fn main() { } }; - // Initialize stats concentrator service and generator conditionally let (stats_concentrator_handle, stats_generator) = if dd_serverless_stats_computation_enabled { - info!("Serverless stats computation enabled"); - let (stats_concentrator_service, stats_concentrator_handle) = + info!("serverless stats computation enabled"); + let (service, handle) = stats_concentrator_service::StatsConcentratorService::new(config.clone()); - tokio::spawn(stats_concentrator_service.run()); - let stats_generator = Arc::new(stats_generator::StatsGenerator::new( - stats_concentrator_handle.clone(), - )); - (Some(stats_concentrator_handle), Some(stats_generator)) + tokio::spawn(service.run()); + ( + Some(handle.clone()), + Some(Arc::new(stats_generator::StatsGenerator::new(handle))), + ) } else { - info!("Serverless stats computation disabled"); + info!("serverless stats computation disabled"); (None, None) }; @@ -180,6 +179,7 @@ pub async fn main() { let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher { stats_concentrator: stats_concentrator_handle.clone(), + force_flush_concentrator: false, }); let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index f0af9d1a..29008470 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -219,7 +219,7 @@ impl Config { }; let span_derived_primary_tags = match env::var("DD_APM_SPAN_DERIVED_PRIMARY_TAGS") { - Ok(env_tags) => parse_json_string_list(&env_tags)?, + Ok(env_tags) => parse_json_string_array(&env_tags)?, Err(_) => vec![], }; if !span_derived_primary_tags.is_empty() { @@ -276,18 +276,15 @@ impl Config { } } -/// Parses a JSON array of strings. Invalid JSON is an error and returns []. -fn parse_json_string_list(env_tags: &str) -> Result, Box> { - serde_json::from_str::>(env_tags).map_err(|e| { - anyhow::anyhow!( - "expected a JSON array of strings, e.g. [] or [\"http.url\",\"db.name\"]: {e}" - ) - .into() - }) +/// Parses a JSON array of strings. Returns an error if the JSON is invalid. +fn parse_json_string_array(env_tags: &str) -> Result, Box> { + serde_json::from_str::>(env_tags) + .map_err(|e| anyhow::anyhow!("expected a JSON array of strings: {e}").into()) } #[cfg(test)] mod tests { + use super::parse_json_string_array; use duplicate::duplicate_item; use serial_test::serial; use std::collections::HashMap; @@ -718,6 +715,45 @@ mod tests { }, ); } + + #[test] + fn test_parse_json_string_array_valid() { + // valid json + assert_eq!( + parse_json_string_array(r#"["custom.primary", "test"]"#).unwrap(), + vec!["custom.primary".to_string(), "test".to_string(),], + ); + + // empty json + assert!(parse_json_string_array("[]").unwrap().is_empty()); + } + + #[test] + fn test_parse_json_string_array_invalid() { + // invalid json + assert!( + parse_json_string_array("not a json array") + .unwrap_err() + .to_string() + .contains("expected a JSON array of strings"), + ); + + // empty json + assert!( + parse_json_string_array("") + .unwrap_err() + .to_string() + .contains("expected a JSON array of strings") + ); + + // valid json but wrong shape for Vec + assert!( + parse_json_string_array("[1, 2]") + .unwrap_err() + .to_string() + .contains("expected a JSON array of strings") + ); + } } /// Test helpers for creating Config instances in tests diff --git a/crates/datadog-trace-agent/src/stats_flusher.rs b/crates/datadog-trace-agent/src/stats_flusher.rs index e187a8d2..e8ca2e70 100644 --- a/crates/datadog-trace-agent/src/stats_flusher.rs +++ b/crates/datadog-trace-agent/src/stats_flusher.rs @@ -12,6 +12,14 @@ use libdd_trace_utils::stats_utils; use crate::config::Config; use crate::stats_concentrator_service::StatsConcentratorHandle; +/// Whether the stats flusher should run `flush_stats` +fn should_flush_stats_buffer( + channel_has_tracer_stats: bool, + serverless_stats_enabled: bool, +) -> bool { + channel_has_tracer_stats || serverless_stats_enabled +} + #[async_trait] pub trait StatsFlusher { /// Starts a stats flusher that listens for stats payloads sent to the tokio mpsc Receiver, @@ -28,6 +36,9 @@ pub trait StatsFlusher { #[derive(Clone)] pub struct ServerlessStatsFlusher { pub stats_concentrator: Option, + /// When false, flushes are done on completed buckets + /// When true, flushes are done on any in progress buckets, useful for integration tests + pub force_flush_concentrator: bool, } #[async_trait] @@ -42,6 +53,7 @@ impl StatsFlusher for ServerlessStatsFlusher { let buffer_producer = buffer.clone(); let buffer_consumer = buffer.clone(); + // drain the stats channel continuously into the buffer tokio::spawn(async move { while let Some(stats_payload) = rx.recv().await { let mut buffer = buffer_producer.lock().await; @@ -49,6 +61,7 @@ impl StatsFlusher for ServerlessStatsFlusher { } }); + // flush stats from the bufferon a fixed interval loop { tokio::time::sleep(time::Duration::from_secs(config.stats_flush_interval_secs)).await; @@ -57,7 +70,10 @@ impl StatsFlusher for ServerlessStatsFlusher { buffer.clear(); drop(buffer); - let should_flush = !channel_stats.is_empty() || self.stats_concentrator.is_some(); + let should_flush = should_flush_stats_buffer( + !channel_stats.is_empty(), + self.stats_concentrator.is_some(), + ); if should_flush { self.flush_stats(config.clone(), channel_stats).await; } @@ -65,10 +81,10 @@ impl StatsFlusher for ServerlessStatsFlusher { } async fn flush_stats(&self, config: Arc, mut stats: Vec) { + // flush from stats concentrator if serverless stats are enabled and there are stats to flush if let Some(ref concentrator) = self.stats_concentrator { - match concentrator.flush(false).await { + match concentrator.flush(self.force_flush_concentrator).await { Ok(Some(payload)) => { - debug!("Merged agent-generated stats from concentrator into flush batch"); stats.push(payload); } Ok(None) => {} @@ -110,3 +126,17 @@ impl StatsFlusher for ServerlessStatsFlusher { } } } + +#[cfg(test)] +mod tests { + use super::should_flush_stats_buffer; + + #[test] + fn should_flush_stats_buffer_all_cases() { + // (stats channel empty, serverless stats enabled with concentrator) + assert!(!should_flush_stats_buffer(false, false)); + assert!(should_flush_stats_buffer(true, false)); + assert!(should_flush_stats_buffer(false, true)); + assert!(should_flush_stats_buffer(true, true)); + } +} diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 5faf5de2..82c00831 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -67,8 +67,7 @@ impl TraceChunkProcessor for ChunkProcessor { } #[derive(Clone)] pub struct ServerlessTraceProcessor { - /// The [`StatsGenerator`] to use for generating stats and sending them to - /// the stats concentrator. + /// The stats generator to use for generating stats and sending them to the stats concentrator. pub stats_generator: Option>, } @@ -144,14 +143,11 @@ impl TraceProcessor for ServerlessTraceProcessor { } } - if let Some(stats_generator) = self.stats_generator.as_ref() { - if tracer_header_tags.client_computed_stats { - debug!( - "Skipping agent-side stats generation: trace payload has Datadog-Client-Computed-Stats" - ); - } else if let Err(e) = stats_generator.send(&payload) { - error!("Stats generator error: {e}"); - } + if let Some(stats_generator) = self.stats_generator.as_ref() + && !tracer_header_tags.client_computed_stats + && let Err(e) = stats_generator.send(&payload) + { + error!("Stats generator error: {e}"); } let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake); diff --git a/crates/datadog-trace-agent/tests/common/helpers.rs b/crates/datadog-trace-agent/tests/common/helpers.rs index 6dd8d825..9808f474 100644 --- a/crates/datadog-trace-agent/tests/common/helpers.rs +++ b/crates/datadog-trace-agent/tests/common/helpers.rs @@ -23,6 +23,7 @@ pub async fn send_tcp_request( uri: &str, method: &str, body: Option>, + additional_headers: &[(&str, &str)], ) -> Result, Box> { let stream = timeout( Duration::from_secs(2), @@ -42,6 +43,10 @@ pub async fn send_tcp_request( .method(method) .header("Content-Type", "application/msgpack"); + for (name, value) in additional_headers { + request_builder = request_builder.header(*name, *value); + } + let response = if let Some(body_data) = body { let body_len = body_data.len(); request_builder = request_builder.header("Content-Length", body_len.to_string()); diff --git a/crates/datadog-trace-agent/tests/common/mock_server.rs b/crates/datadog-trace-agent/tests/common/mock_server.rs index f1beb1ac..cd0cd6b9 100644 --- a/crates/datadog-trace-agent/tests/common/mock_server.rs +++ b/crates/datadog-trace-agent/tests/common/mock_server.rs @@ -4,7 +4,7 @@ //! Simple mock HTTP server for testing flushers use http_body_util::BodyExt; -use hyper::{Request, Response, body::Incoming}; +use hyper::{Request, Response, StatusCode, body::Incoming}; use hyper_util::rt::TokioIo; use libdd_common::http_common; use std::net::SocketAddr; @@ -60,6 +60,7 @@ impl MockServer { // Capture the request let method = req.method().to_string(); let path = req.uri().path().to_string(); + let is_stats_intake = path.ends_with("/stats"); let headers: Vec<(String, String)> = req .headers() .iter() @@ -82,11 +83,18 @@ impl MockServer { body: body_bytes, }); - // Return 200 OK + // Trace intake accepts 2xx + // Stats intake accepts 202 + // see `libdd_trace_utils::stats_utils::send_stats_payload_with_client` + let (status, body) = if is_stats_intake { + (StatusCode::ACCEPTED, http_common::Body::empty()) + } else { + (StatusCode::OK, http_common::Body::from(r#"{"ok":true}"#)) + }; Ok::<_, hyper::http::Error>( Response::builder() - .status(200) - .body(http_common::Body::from(r#"{"ok":true}"#)) + .status(status) + .body(body) .unwrap(), ) } diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index a71e8162..fbda371a 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -28,7 +28,7 @@ const FLUSH_WAIT_DURATION: Duration = Duration::from_millis(1500); /// Helper to configure a config with mock server endpoints pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { let trace_url = format!("{}/api/v0.2/traces", mock_server_url); - let stats_url = format!("{}/api/v0.6/stats", mock_server_url); + let stats_url = format!("{}/api/v0.2/stats", mock_server_url); config.trace_intake = libdd_common::Endpoint { url: trace_url.parse().unwrap(), @@ -47,23 +47,31 @@ pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { /// Helper to create a mini agent with real flushers pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { use datadog_trace_agent::{ - aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + aggregator::TraceAggregator, stats_concentrator_service::StatsConcentratorService, + stats_flusher::ServerlessStatsFlusher, stats_generator::StatsGenerator, stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, }; + let (concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(config.clone()); + tokio::spawn(concentrator_service.run()); + + let stats_generator = Some(Arc::new(StatsGenerator::new( + stats_concentrator_handle.clone(), + ))); + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); MiniAgent { config: config.clone(), - trace_processor: Arc::new(ServerlessTraceProcessor { - stats_generator: None, - }), + trace_processor: Arc::new(ServerlessTraceProcessor { stats_generator }), trace_flusher: Arc::new(ServerlessTraceFlusher::new( aggregator.clone(), config.clone(), )), stats_processor: Arc::new(ServerlessStatsProcessor {}), stats_flusher: Arc::new(ServerlessStatsFlusher { - stats_concentrator: None, + stats_concentrator: Some(stats_concentrator_handle), + force_flush_concentrator: true, }), env_verifier: Arc::new(MockEnvVerifier), proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), @@ -106,6 +114,52 @@ pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { ); } +/// Helper to verify stats request sent to mock server +pub fn verify_stats_request(mock_server: &common::mock_server::MockServer) { + let stats_reqs = mock_server.get_requests_for_path("/api/v0.2/stats"); + + assert!( + !stats_reqs.is_empty(), + "Expected at least one stats request to mock server" + ); + + let stats_req = &stats_reqs[0]; + assert_eq!(stats_req.method, "POST", "Expected POST method"); + + let content_type = stats_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + assert_eq!( + content_type, + Some("application/msgpack"), + "Expected msgpack content-type" + ); + + let api_key = stats_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + assert!( + !stats_req.body.is_empty(), + "Expected non-empty stats payload" + ); +} + +/// Helper to verify stats request was not sent to mock server +pub fn verify_no_stats_request(mock_server: &common::mock_server::MockServer) { + let stats_reqs = mock_server.get_requests_for_path("/api/v0.2/stats"); + assert!( + stats_reqs.is_empty(), + "Expected no stats request to mock server, received {} request(s)", + stats_reqs.len() + ); +} + #[cfg(test)] #[tokio::test] #[serial] @@ -133,7 +187,7 @@ async fn test_mini_agent_tcp_handles_requests() { tokio::time::sleep(Duration::from_millis(100)).await; // Test /info endpoint - let info_response = send_tcp_request(test_port, "/info", "GET", None) + let info_response = send_tcp_request(test_port, "/info", "GET", None, &[]) .await .expect("Failed to send /info request"); assert_eq!( @@ -187,9 +241,10 @@ async fn test_mini_agent_tcp_handles_requests() { // Test /v0.4/traces endpoint with real trace data let trace_payload = create_test_trace_payload(); - let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) - .await - .expect("Failed to send /v0.4/traces request"); + let trace_response = + send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload), &[]) + .await + .expect("Failed to send /v0.4/traces request"); assert_eq!( trace_response.status(), StatusCode::OK, @@ -303,7 +358,59 @@ async fn test_mini_agent_tcp_with_real_flushers() { let mut server_ready = false; for _ in 0..20 { tokio::time::sleep(Duration::from_millis(50)).await; - if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None).await + if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None, &[]).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent server failed to start within timeout" + ); + + // Send trace data + let trace_payload = create_test_trace_payload(); + let trace_response = + send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload), &[]) + .await + .expect("Failed to send /v0.4/traces request"); + assert_eq!(trace_response.status(), StatusCode::OK); + + // Wait for flush + tokio::time::sleep(FLUSH_WAIT_DURATION).await; + + verify_trace_request(&mock_server); + verify_stats_request(&mock_server); // Stats generator should generate stats from trace payload + + // Clean up + agent_handle.abort(); +} + +#[cfg(test)] +#[tokio::test] +#[serial] +async fn test_mini_agent_tcp_with_real_flushers_and_tracer_computed_stats() { + let mock_server: MockServer = MockServer::start().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let mut config = create_tcp_test_config(8128); // use different port to avoid race condition with other tests + configure_mock_endpoints(&mut config, &mock_server.url()); + let config = Arc::new(config); + let test_port = config.dd_apm_receiver_port; + + let mini_agent = create_mini_agent_with_real_flushers(config); + + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Wait for server to be ready + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None, &[]).await && response.status().is_success() { server_ready = true; @@ -317,16 +424,24 @@ async fn test_mini_agent_tcp_with_real_flushers() { // Send trace data let trace_payload = create_test_trace_payload(); - let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) - .await - .expect("Failed to send /v0.4/traces request"); + let trace_response = send_tcp_request( + test_port, + "/v0.4/traces", + "POST", + Some(trace_payload), + &[("Datadog-Client-Computed-Stats", "true")], + ) + .await + .expect("Failed to send /v0.4/traces request"); assert_eq!(trace_response.status(), StatusCode::OK); // Wait for flush tokio::time::sleep(FLUSH_WAIT_DURATION).await; verify_trace_request(&mock_server); + verify_no_stats_request(&mock_server); // Stats generator should not generate stats from trace payload when Datadog-Client-Computed-Stats header is present in trace payload + // Clean up agent_handle.abort(); } @@ -378,6 +493,8 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { tokio::time::sleep(FLUSH_WAIT_DURATION).await; verify_trace_request(&mock_server); + verify_stats_request(&mock_server); + // Clean up agent_handle.abort(); } From c0b66711d1b2ca6637419f5e779f8ce58a2b8a4d Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 31 Mar 2026 15:07:56 -0400 Subject: [PATCH 04/13] remove span derived primary tags for now --- Cargo.lock | 21 +++---- crates/datadog-agent-config/Cargo.toml | 4 +- crates/datadog-serverless-compat/Cargo.toml | 2 +- crates/datadog-trace-agent/Cargo.toml | 12 ++-- crates/datadog-trace-agent/src/config.rs | 62 ------------------- .../src/stats_concentrator_service.rs | 1 - .../src/trace_processor.rs | 1 - 7 files changed, 19 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b54abfd2..c8c582fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1467,7 +1467,7 @@ dependencies = [ [[package]] name = "libdd-common" version = "3.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" dependencies = [ "anyhow", "bytes", @@ -1540,7 +1540,7 @@ dependencies = [ [[package]] name = "libdd-ddsketch" version = "1.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" dependencies = [ "prost 0.14.3", ] @@ -1596,7 +1596,7 @@ dependencies = [ [[package]] name = "libdd-tinybytes" version = "1.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" dependencies = [ "serde", ] @@ -1614,7 +1614,7 @@ dependencies = [ [[package]] name = "libdd-trace-normalization" version = "1.0.3" -source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" dependencies = [ "anyhow", "libdd-trace-protobuf 3.0.0", @@ -1623,7 +1623,7 @@ dependencies = [ [[package]] name = "libdd-trace-obfuscation" version = "1.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" dependencies = [ "anyhow", "fluent-uri", @@ -1651,7 +1651,7 @@ dependencies = [ [[package]] name = "libdd-trace-protobuf" version = "3.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" dependencies = [ "prost 0.14.3", "serde", @@ -1673,10 +1673,10 @@ dependencies = [ [[package]] name = "libdd-trace-stats" version = "1.0.4" -source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" dependencies = [ "hashbrown 0.15.5", - "libdd-ddsketch 1.0.1 (git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f)", + "libdd-ddsketch 1.0.1 (git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad)", "libdd-trace-protobuf 3.0.0", "libdd-trace-utils 3.0.0", ] @@ -1712,10 +1712,9 @@ dependencies = [ [[package]] name = "libdd-trace-utils" version = "3.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f#62181a8a2a766419a07cbecd7d6f87ef05dd166f" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" dependencies = [ "anyhow", - "base64 0.22.1", "bytes", "cargo-platform", "cargo_metadata", @@ -1728,7 +1727,7 @@ dependencies = [ "hyper", "indexmap", "libdd-common 3.0.1", - "libdd-tinybytes 1.1.0 (git+https://github.com/DataDog/libdatadog?rev=62181a8a2a766419a07cbecd7d6f87ef05dd166f)", + "libdd-tinybytes 1.1.0 (git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad)", "libdd-trace-normalization 1.0.3", "libdd-trace-protobuf 3.0.0", "prost 0.14.3", diff --git a/crates/datadog-agent-config/Cargo.toml b/crates/datadog-agent-config/Cargo.toml index bbcc112d..222d7265 100644 --- a/crates/datadog-agent-config/Cargo.toml +++ b/crates/datadog-agent-config/Cargo.toml @@ -9,8 +9,8 @@ path = "mod.rs" [dependencies] figment = { version = "0.10", default-features = false, features = ["yaml", "env"] } -libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } +libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } log = { version = "0.4", default-features = false } serde = { version = "1.0", default-features = false, features = ["derive"] } serde-aux = { version = "4.7", default-features = false } diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index a2afde11..b84bb15f 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -12,7 +12,7 @@ windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] [dependencies] datadog-logs-agent = { path = "../datadog-logs-agent" } datadog-trace-agent = { path = "../datadog-trace-agent" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } datadog-fips = { path = "../datadog-fips", default-features = false } dogstatsd = { path = "../dogstatsd", default-features = true } reqwest = { version = "0.12.4", default-features = false } diff --git a/crates/datadog-trace-agent/Cargo.toml b/crates/datadog-trace-agent/Cargo.toml index d0e066a0..88221059 100644 --- a/crates/datadog-trace-agent/Cargo.toml +++ b/crates/datadog-trace-agent/Cargo.toml @@ -25,13 +25,13 @@ tracing = { version = "0.1", default-features = false } serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0" thiserror = { version = "1.0.58", default-features = false } -libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } -libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } -libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f", features = [ +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } +libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } +libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", features = [ "mini_agent", ] } -libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f" } +libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } datadog-fips = { path = "../datadog-fips" } reqwest = { version = "0.12.23", features = [ "json", @@ -45,6 +45,6 @@ serial_test = "2.0.0" duplicate = "0.4.1" temp-env = "0.3.6" tempfile = "3.3.0" -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "62181a8a2a766419a07cbecd7d6f87ef05dd166f", features = [ +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", features = [ "test-utils", ] } diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 29008470..bf1d8445 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -14,7 +14,6 @@ use libdd_trace_utils::config_utils::{ trace_stats_url_prefixed, }; use libdd_trace_utils::trace_utils; -use tracing::debug; const DEFAULT_APM_RECEIVER_PORT: u16 = 8126; const DEFAULT_DOGSTATSD_PORT: u16 = 8125; @@ -113,8 +112,6 @@ pub struct Config { pub service: Option, pub env: Option, pub version: Option, - /// Span tag keys used as second primary tags - pub span_derived_primary_tags: Vec, } impl Config { @@ -218,17 +215,6 @@ impl Config { Tags::new() }; - let span_derived_primary_tags = match env::var("DD_APM_SPAN_DERIVED_PRIMARY_TAGS") { - Ok(env_tags) => parse_json_string_array(&env_tags)?, - Err(_) => vec![], - }; - if !span_derived_primary_tags.is_empty() { - debug!( - "span_derived_primary_tags configured: [{}]", - span_derived_primary_tags.join(" ") - ); - } - #[allow(clippy::unwrap_used)] Ok(Config { app_name: Some(app_name), @@ -271,20 +257,12 @@ impl Config { service: env::var("DD_SERVICE").ok(), env: env::var("DD_ENV").ok(), version: env::var("DD_VERSION").ok(), - span_derived_primary_tags, }) } } -/// Parses a JSON array of strings. Returns an error if the JSON is invalid. -fn parse_json_string_array(env_tags: &str) -> Result, Box> { - serde_json::from_str::>(env_tags) - .map_err(|e| anyhow::anyhow!("expected a JSON array of strings: {e}").into()) -} - #[cfg(test)] mod tests { - use super::parse_json_string_array; use duplicate::duplicate_item; use serial_test::serial; use std::collections::HashMap; @@ -715,45 +693,6 @@ mod tests { }, ); } - - #[test] - fn test_parse_json_string_array_valid() { - // valid json - assert_eq!( - parse_json_string_array(r#"["custom.primary", "test"]"#).unwrap(), - vec!["custom.primary".to_string(), "test".to_string(),], - ); - - // empty json - assert!(parse_json_string_array("[]").unwrap().is_empty()); - } - - #[test] - fn test_parse_json_string_array_invalid() { - // invalid json - assert!( - parse_json_string_array("not a json array") - .unwrap_err() - .to_string() - .contains("expected a JSON array of strings"), - ); - - // empty json - assert!( - parse_json_string_array("") - .unwrap_err() - .to_string() - .contains("expected a JSON array of strings") - ); - - // valid json but wrong shape for Vec - assert!( - parse_json_string_array("[1, 2]") - .unwrap_err() - .to_string() - .contains("expected a JSON array of strings") - ); - } } /// Test helpers for creating Config instances in tests @@ -791,7 +730,6 @@ pub mod test_helpers { service: None, env: None, version: None, - span_derived_primary_tags: vec![], } } } diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs index 5ea4ea65..6523c856 100644 --- a/crates/datadog-trace-agent/src/stats_concentrator_service.rs +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -119,7 +119,6 @@ impl StatsConcentratorService { SystemTime::now(), vec![], vec![], - config.span_derived_primary_tags.clone(), ); let service: StatsConcentratorService = Self { concentrator, diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 82c00831..8b8a1b9b 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -233,7 +233,6 @@ mod tests { service: Some("test-service".to_string()), env: Some("test-env".to_string()), version: Some("1.0.0".to_string()), - span_derived_primary_tags: vec![], } } From 0bb1c11c913bd3f54db37586b03b569d06a342b5 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 31 Mar 2026 15:08:52 -0400 Subject: [PATCH 05/13] address clippy warnings --- crates/datadog-trace-agent/src/mini_agent.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 7fa5a92f..ae074810 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -11,7 +11,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tracing::{debug, error, warn}; +use tracing::{debug, error}; use crate::http_utils::{log_and_create_http_response, verify_request_content_length}; use crate::proxy_flusher::{ProxyFlusher, ProxyRequest}; @@ -191,14 +191,14 @@ impl MiniAgent { let sentinel = std::path::Path::new(LAMBDA_LITE_SENTINEL_PATH); // SAFETY: LAMBDA_LITE_SENTINEL_PATH is a hard-coded absolute path, // so .parent() always returns Some. - if let Some(parent) = sentinel.parent() { - if let Err(e) = tokio::fs::create_dir_all(parent).await { - error!( - "Could not create parent directory for Lambda Lite sentinel \ + if let Some(parent) = sentinel.parent() + && let Err(e) = tokio::fs::create_dir_all(parent).await + { + error!( + "Could not create parent directory for Lambda Lite sentinel \ file at {}: {}.", - LAMBDA_LITE_SENTINEL_PATH, e - ); - } + LAMBDA_LITE_SENTINEL_PATH, e + ); } if let Err(e) = tokio::fs::write(sentinel, b"").await { error!( From 007babcc93f269169933897c9e7152d0582f5ee6 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 31 Mar 2026 15:15:19 -0400 Subject: [PATCH 06/13] clean up comments --- crates/datadog-trace-agent/src/stats_flusher.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/datadog-trace-agent/src/stats_flusher.rs b/crates/datadog-trace-agent/src/stats_flusher.rs index e8ca2e70..f219b270 100644 --- a/crates/datadog-trace-agent/src/stats_flusher.rs +++ b/crates/datadog-trace-agent/src/stats_flusher.rs @@ -61,13 +61,16 @@ impl StatsFlusher for ServerlessStatsFlusher { } }); - // flush stats from the bufferon a fixed interval + // flush stats from the buffer on a fixed interval loop { tokio::time::sleep(time::Duration::from_secs(config.stats_flush_interval_secs)).await; let mut buffer = buffer_consumer.lock().await; + // copy the batch for this flush let channel_stats = buffer.to_vec(); + // reset the buffer so the next tick only sees new stats buffer.clear(); + // release the mutex before flushing stats drop(buffer); let should_flush = should_flush_stats_buffer( From 159b4d62962e7fab504a8ab128c77b55fb93bd9b Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Thu, 2 Apr 2026 13:00:36 -0400 Subject: [PATCH 07/13] use bounded channel for stats concentrator --- .../src/stats_concentrator_service.rs | 19 +++++++++++++------ .../src/stats_generator.rs | 6 +++--- .../src/trace_processor.rs | 2 +- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs index 6523c856..c4191c7e 100644 --- a/crates/datadog-trace-agent/src/stats_concentrator_service.rs +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -12,6 +12,10 @@ use tracing::error; const S_TO_NS: u64 = 1_000_000_000; const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds +/// A bounded channel applies backpressure on the trace request path when the concentrator +/// cannot keep up, instead of growing without limit as an unbounded channel would. +const CONCENTRATOR_COMMAND_CHANNEL_CAPACITY: usize = 8192; + #[derive(Debug, thiserror::Error)] pub enum StatsError { #[error("Failed to send command to concentrator: {0}")] @@ -39,7 +43,7 @@ pub enum ConcentratorCommand { } pub struct StatsConcentratorHandle { - tx: mpsc::UnboundedSender, + tx: mpsc::Sender, is_tracer_metadata_set: AtomicBool, } @@ -58,14 +62,14 @@ impl Clone for StatsConcentratorHandle { impl StatsConcentratorHandle { #[must_use] - pub fn new(tx: mpsc::UnboundedSender) -> Self { + pub fn new(tx: mpsc::Sender) -> Self { Self { tx, is_tracer_metadata_set: AtomicBool::new(false), } } - pub fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { + pub async fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { // Set tracer metadata only once for the first trace because // it is the same for all traces. if !self.is_tracer_metadata_set.load(Ordering::Acquire) { @@ -78,14 +82,16 @@ impl StatsConcentratorHandle { }; self.tx .send(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) + .await .map_err(StatsError::SendError)?; } Ok(()) } - pub fn add(&self, span: &pb::Span) -> Result<(), StatsError> { + pub async fn add(&self, span: &pb::Span) -> Result<(), StatsError> { self.tx .send(ConcentratorCommand::Add(Box::new(span.clone()))) + .await .map_err(StatsError::SendError)?; Ok(()) } @@ -94,6 +100,7 @@ impl StatsConcentratorHandle { let (response_tx, response_rx) = oneshot::channel(); self.tx .send(ConcentratorCommand::Flush(force_flush, response_tx)) + .await .map_err(StatsError::SendError)?; response_rx.await.map_err(StatsError::RecvError) } @@ -101,7 +108,7 @@ impl StatsConcentratorHandle { pub struct StatsConcentratorService { concentrator: SpanConcentrator, - rx: mpsc::UnboundedReceiver, + rx: mpsc::Receiver, tracer_metadata: TracerMetadata, config: Arc, } @@ -111,7 +118,7 @@ pub struct StatsConcentratorService { impl StatsConcentratorService { #[must_use] pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::channel(CONCENTRATOR_COMMAND_CHANNEL_CAPACITY); let handle = StatsConcentratorHandle::new(tx); // TODO: set span_kinds_stats_computed and peer_tag_keys let concentrator = SpanConcentrator::new( diff --git a/crates/datadog-trace-agent/src/stats_generator.rs b/crates/datadog-trace-agent/src/stats_generator.rs index 2bd730e1..1556e25f 100644 --- a/crates/datadog-trace-agent/src/stats_generator.rs +++ b/crates/datadog-trace-agent/src/stats_generator.rs @@ -21,11 +21,11 @@ impl StatsGenerator { Self { stats_concentrator } } - pub fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { + pub async fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { if let TracerPayloadCollection::V07(traces) = traces { for trace in traces { // Set tracer metadata - if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace) { + if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace).await { error!("Failed to set tracer metadata: {err}"); return Err(StatsGeneratorError::ConcentratorCommandError(err)); } @@ -33,7 +33,7 @@ impl StatsGenerator { // Generate stats for each span in the trace for chunk in &trace.chunks { for span in &chunk.spans { - if let Err(err) = self.stats_concentrator.add(span) { + if let Err(err) = self.stats_concentrator.add(span).await { error!("Failed to send trace stats: {err}"); return Err(StatsGeneratorError::ConcentratorCommandError(err)); } diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 8b8a1b9b..a6b8cde2 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -145,7 +145,7 @@ impl TraceProcessor for ServerlessTraceProcessor { if let Some(stats_generator) = self.stats_generator.as_ref() && !tracer_header_tags.client_computed_stats - && let Err(e) = stats_generator.send(&payload) + && let Err(e) = stats_generator.send(&payload).await { error!("Stats generator error: {e}"); } From 45277b4c17e908c1376fd1b738975c373549948b Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Thu, 2 Apr 2026 17:11:13 -0400 Subject: [PATCH 08/13] disable tracer computed stats --- crates/datadog-trace-agent/src/mini_agent.rs | 2 +- crates/datadog-trace-agent/tests/integration_test.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index ae074810..021e736c 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -521,7 +521,7 @@ impl MiniAgent { INFO_ENDPOINT_PATH, PROFILING_ENDPOINT_PATH ], - "client_drop_p0s": true, + "client_drop_p0s": false, "config": config_json } ); diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index fbda371a..ea84f5ba 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -220,8 +220,8 @@ async fn test_mini_agent_tcp_handles_requests() { // Check client_drop_p0s flag assert_eq!( - json["client_drop_p0s"], true, - "Expected client_drop_p0s to be true" + json["client_drop_p0s"], false, + "Expected client_drop_p0s to be false" ); // Check config object From 842b489dd0196a70fcb10d9902b0ad0bc785433a Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Mon, 6 Apr 2026 16:04:53 -0400 Subject: [PATCH 09/13] replace channel based pattern with mutex, set tracer metadata on each payload, flush tracer and agent computed stats separately --- crates/datadog-serverless-compat/src/main.rs | 6 +- crates/datadog-trace-agent/src/lib.rs | 2 +- .../src/stats_concentrator.rs | 113 ++++++++++ .../src/stats_concentrator_service.rs | 201 ------------------ .../datadog-trace-agent/src/stats_flusher.rs | 93 ++++---- .../src/stats_generator.rs | 23 +- .../src/trace_processor.rs | 2 +- .../tests/integration_test.rs | 6 +- 8 files changed, 169 insertions(+), 277 deletions(-) create mode 100644 crates/datadog-trace-agent/src/stats_concentrator.rs delete mode 100644 crates/datadog-trace-agent/src/stats_concentrator_service.rs diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 7e723bcb..4c748e07 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -18,7 +18,7 @@ use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, - config, env_verifier, mini_agent, proxy_flusher, stats_concentrator_service, stats_flusher, + config, env_verifier, mini_agent, proxy_flusher, stats_concentrator, stats_flusher, stats_generator, stats_processor, trace_flusher::{self, TraceFlusher}, trace_processor, @@ -161,9 +161,7 @@ pub async fn main() { let (stats_concentrator_handle, stats_generator) = if dd_serverless_stats_computation_enabled { info!("serverless stats computation enabled"); - let (service, handle) = - stats_concentrator_service::StatsConcentratorService::new(config.clone()); - tokio::spawn(service.run()); + let handle = stats_concentrator::StatsConcentrator::new(config.clone()); ( Some(handle.clone()), Some(Arc::new(stats_generator::StatsGenerator::new(handle))), diff --git a/crates/datadog-trace-agent/src/lib.rs b/crates/datadog-trace-agent/src/lib.rs index daeed742..984957cd 100644 --- a/crates/datadog-trace-agent/src/lib.rs +++ b/crates/datadog-trace-agent/src/lib.rs @@ -13,7 +13,7 @@ pub mod env_verifier; pub mod http_utils; pub mod mini_agent; pub mod proxy_flusher; -pub mod stats_concentrator_service; +pub mod stats_concentrator; pub mod stats_flusher; pub mod stats_generator; pub mod stats_processor; diff --git a/crates/datadog-trace-agent/src/stats_concentrator.rs b/crates/datadog-trace-agent/src/stats_concentrator.rs new file mode 100644 index 00000000..93153624 --- /dev/null +++ b/crates/datadog-trace-agent/src/stats_concentrator.rs @@ -0,0 +1,113 @@ +use crate::config::Config; +use libdd_trace_protobuf::pb; +use libdd_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; +use libdd_trace_stats::span_concentrator::SpanConcentrator; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; + +const S_TO_NS: u64 = 1_000_000_000; +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds + +#[derive(Clone, Debug, Default)] +pub struct TracerMetadata { + // e.g. "python" + pub language: String, + // e.g. "3.11.0" + pub tracer_version: String, + // e.g. "f45568ad09d5480b99087d86ebda26e6" + pub runtime_id: String, + pub container_id: String, +} + +struct ConcentratorState { + concentrator: SpanConcentrator, + tracer_metadata: TracerMetadata, + config: Arc, +} + +/// A cloneable handle to the stats concentrator, safe to share across async tasks. +/// +/// Uses std::sync::Mutex rather than tokio::sync::Mutex because none of the critical +/// sections (add_span, flush) contain await points. Holding a std::sync::Mutex without +/// yielding is safe and avoids the overhead of tokio's async mutex +#[derive(Clone)] +pub struct StatsConcentrator { + inner: Arc>, +} + +impl StatsConcentrator { + #[must_use] + pub fn new(config: Arc) -> Self { + // TODO: set span_kinds_stats_computed and peer_tag_keys + let concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_DURATION_NS), + SystemTime::now(), + vec![], + vec![], + ); + Self { + inner: Arc::new(Mutex::new(ConcentratorState { + concentrator, + tracer_metadata: TracerMetadata::default(), + config, + })), + } + } + + /// Updates tracer metadata from the payload. Called on every trace payload so that + /// metadata is never permanently stale from an incomplete first trace + pub fn set_tracer_metadata(&self, trace: &TracerPayload) { + let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); + inner.tracer_metadata = TracerMetadata { + language: trace.language_name.clone(), + tracer_version: trace.tracer_version.clone(), + runtime_id: trace.runtime_id.clone(), + container_id: trace.container_id.clone(), + }; + } + + /// Adds a span for stats computation + pub fn add(&self, span: &pb::Span) { + let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); + inner.concentrator.add_span(span); + } + + pub fn flush(&self, force_flush: bool) -> Option { + let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); + let stats_buckets = inner.concentrator.flush(SystemTime::now(), force_flush); + if stats_buckets.is_empty() { + return None; + } + Some(ClientStatsPayload { + // Do not set hostname so the trace stats backend can aggregate stats properly + hostname: String::new(), + env: inner + .config + .env + .clone() + .unwrap_or("unknown-env".to_string()), + // Version is not in the trace payload. Need to read it from config. + version: inner.config.version.clone().unwrap_or_default(), + lang: inner.tracer_metadata.language.clone(), + tracer_version: inner.tracer_metadata.tracer_version.clone(), + runtime_id: inner.tracer_metadata.runtime_id.clone(), + // Not supported yet + sequence: 0, + // Not supported yet + agent_aggregation: String::new(), + service: inner.config.service.clone().unwrap_or_default(), + container_id: inner.tracer_metadata.container_id.clone(), + // Not supported yet + tags: vec![], + // Not supported yet + git_commit_sha: String::new(), + // Not supported yet + image_tag: String::new(), + stats: stats_buckets, + // Not supported yet + process_tags: String::new(), + // Not supported yet + process_tags_hash: 0, + }) + } +} diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs deleted file mode 100644 index c4191c7e..00000000 --- a/crates/datadog-trace-agent/src/stats_concentrator_service.rs +++ /dev/null @@ -1,201 +0,0 @@ -use tokio::sync::{mpsc, oneshot}; - -use crate::config::Config; -use libdd_trace_protobuf::pb; -use libdd_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; -use libdd_trace_stats::span_concentrator::SpanConcentrator; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::{Duration, SystemTime}; -use tracing::error; - -const S_TO_NS: u64 = 1_000_000_000; -const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds - -/// A bounded channel applies backpressure on the trace request path when the concentrator -/// cannot keep up, instead of growing without limit as an unbounded channel would. -const CONCENTRATOR_COMMAND_CHANNEL_CAPACITY: usize = 8192; - -#[derive(Debug, thiserror::Error)] -pub enum StatsError { - #[error("Failed to send command to concentrator: {0}")] - SendError(mpsc::error::SendError), - #[error("Failed to receive response from concentrator: {0}")] - RecvError(oneshot::error::RecvError), -} - -#[derive(Clone, Debug, Default)] -pub struct TracerMetadata { - // e.g. "python" - pub language: String, - // e.g. "3.11.0" - pub tracer_version: String, - // e.g. "f45568ad09d5480b99087d86ebda26e6" - pub runtime_id: String, - pub container_id: String, -} - -pub enum ConcentratorCommand { - SetTracerMetadata(TracerMetadata), - // Use a box to reduce the size of the command enum - Add(Box), - Flush(bool, oneshot::Sender>), -} - -pub struct StatsConcentratorHandle { - tx: mpsc::Sender, - is_tracer_metadata_set: AtomicBool, -} - -impl Clone for StatsConcentratorHandle { - fn clone(&self) -> Self { - Self { - tx: self.tx.clone(), - // Cloning this may cause trace metadata to be set multiple times, - // but it's okay because it's the same for all traces and we don't need to be perfect on dedup. - is_tracer_metadata_set: AtomicBool::new( - self.is_tracer_metadata_set.load(Ordering::Acquire), - ), - } - } -} - -impl StatsConcentratorHandle { - #[must_use] - pub fn new(tx: mpsc::Sender) -> Self { - Self { - tx, - is_tracer_metadata_set: AtomicBool::new(false), - } - } - - pub async fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { - // Set tracer metadata only once for the first trace because - // it is the same for all traces. - if !self.is_tracer_metadata_set.load(Ordering::Acquire) { - self.is_tracer_metadata_set.store(true, Ordering::Release); - let tracer_metadata = TracerMetadata { - language: trace.language_name.clone(), - tracer_version: trace.tracer_version.clone(), - runtime_id: trace.runtime_id.clone(), - container_id: trace.container_id.clone(), - }; - self.tx - .send(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) - .await - .map_err(StatsError::SendError)?; - } - Ok(()) - } - - pub async fn add(&self, span: &pb::Span) -> Result<(), StatsError> { - self.tx - .send(ConcentratorCommand::Add(Box::new(span.clone()))) - .await - .map_err(StatsError::SendError)?; - Ok(()) - } - - pub async fn flush(&self, force_flush: bool) -> Result, StatsError> { - let (response_tx, response_rx) = oneshot::channel(); - self.tx - .send(ConcentratorCommand::Flush(force_flush, response_tx)) - .await - .map_err(StatsError::SendError)?; - response_rx.await.map_err(StatsError::RecvError) - } -} - -pub struct StatsConcentratorService { - concentrator: SpanConcentrator, - rx: mpsc::Receiver, - tracer_metadata: TracerMetadata, - config: Arc, -} - -// A service that handles add() and flush() requests in the same queue, -// to avoid using mutex, which may cause lock contention. -impl StatsConcentratorService { - #[must_use] - pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { - let (tx, rx) = mpsc::channel(CONCENTRATOR_COMMAND_CHANNEL_CAPACITY); - let handle = StatsConcentratorHandle::new(tx); - // TODO: set span_kinds_stats_computed and peer_tag_keys - let concentrator = SpanConcentrator::new( - Duration::from_nanos(BUCKET_DURATION_NS), - SystemTime::now(), - vec![], - vec![], - ); - let service: StatsConcentratorService = Self { - concentrator, - rx, - // To be set when the first trace is received - tracer_metadata: TracerMetadata::default(), - config, - }; - (service, handle) - } - - pub async fn run(mut self) { - while let Some(command) = self.rx.recv().await { - match command { - ConcentratorCommand::SetTracerMetadata(tracer_metadata) => { - self.tracer_metadata = tracer_metadata; - } - ConcentratorCommand::Add(span) => self.concentrator.add_span(&*span), - ConcentratorCommand::Flush(force_flush, response_tx) => { - self.handle_flush(force_flush, response_tx); - } - } - } - } - - fn handle_flush( - &mut self, - force_flush: bool, - response_tx: oneshot::Sender>, - ) { - let stats_buckets = self.concentrator.flush(SystemTime::now(), force_flush); - let stats = if stats_buckets.is_empty() { - None - } else { - Some(ClientStatsPayload { - // Do not set hostname so the trace stats backend can aggregate stats properly - hostname: String::new(), - env: self.config.env.clone().unwrap_or("unknown-env".to_string()), - // Version is not in the trace payload. Need to read it from config. - version: self.config.version.clone().unwrap_or_default(), - lang: self.tracer_metadata.language.clone(), - tracer_version: self.tracer_metadata.tracer_version.clone(), - runtime_id: self.tracer_metadata.runtime_id.clone(), - // Not supported yet - sequence: 0, - // Not supported yet - agent_aggregation: String::new(), - service: self - .config - .service - .clone() - .unwrap_or_default() - .to_lowercase(), - container_id: self.tracer_metadata.container_id.clone(), - // Not supported yet - tags: vec![], - // Not supported yet - git_commit_sha: String::new(), - // Not supported yet - image_tag: String::new(), - stats: stats_buckets, - // Not supported yet - process_tags: String::new(), - // Not supported yet - process_tags_hash: 0, - }) - }; - let response = response_tx.send(stats); - if let Err(e) = response { - error!("Failed to return trace stats: {e:?}"); - } - } -} diff --git a/crates/datadog-trace-agent/src/stats_flusher.rs b/crates/datadog-trace-agent/src/stats_flusher.rs index f219b270..47f1bca0 100644 --- a/crates/datadog-trace-agent/src/stats_flusher.rs +++ b/crates/datadog-trace-agent/src/stats_flusher.rs @@ -10,7 +10,7 @@ use libdd_trace_protobuf::pb; use libdd_trace_utils::stats_utils; use crate::config::Config; -use crate::stats_concentrator_service::StatsConcentratorHandle; +use crate::stats_concentrator::StatsConcentrator; /// Whether the stats flusher should run `flush_stats` fn should_flush_stats_buffer( @@ -20,6 +20,29 @@ fn should_flush_stats_buffer( channel_has_tracer_stats || serverless_stats_enabled } +/// Serializes and sends a single `StatsPayload` to the intake. +async fn send_stats_payload(config: &Arc, payload: pb::StatsPayload) { + debug!("Stats payload to be sent: {payload:?}"); + let serialized = match stats_utils::serialize_stats_payload(payload) { + Ok(res) => res, + Err(err) => { + error!("Failed to serialize stats payload, dropping stats: {err}"); + return; + } + }; + #[allow(clippy::unwrap_used)] + match stats_utils::send_stats_payload( + serialized, + &config.trace_stats_intake, + config.trace_stats_intake.api_key.as_ref().unwrap(), + ) + .await + { + Ok(_) => debug!("Successfully flushed stats"), + Err(e) => error!("Error sending stats: {e:?}"), + } +} + #[async_trait] pub trait StatsFlusher { /// Starts a stats flusher that listens for stats payloads sent to the tokio mpsc Receiver, @@ -30,12 +53,12 @@ pub trait StatsFlusher { mut rx: Receiver, ); /// Flushes stats to the Datadog trace stats intake. - async fn flush_stats(&self, config: Arc, traces: Vec); + async fn flush_stats(&self, config: Arc, client_stats: Vec); } #[derive(Clone)] pub struct ServerlessStatsFlusher { - pub stats_concentrator: Option, + pub stats_concentrator: Option, /// When false, flushes are done on completed buckets /// When true, flushes are done on any in progress buckets, useful for integration tests pub force_flush_concentrator: bool, @@ -53,7 +76,7 @@ impl StatsFlusher for ServerlessStatsFlusher { let buffer_producer = buffer.clone(); let buffer_consumer = buffer.clone(); - // drain the stats channel continuously into the buffer + // Drain the stats channel continuously into the buffer tokio::spawn(async move { while let Some(stats_payload) = rx.recv().await { let mut buffer = buffer_producer.lock().await; @@ -61,16 +84,16 @@ impl StatsFlusher for ServerlessStatsFlusher { } }); - // flush stats from the buffer on a fixed interval + // Flush stats from the buffer on a fixed interval loop { tokio::time::sleep(time::Duration::from_secs(config.stats_flush_interval_secs)).await; let mut buffer = buffer_consumer.lock().await; - // copy the batch for this flush + // Copy the batch for this flush let channel_stats = buffer.to_vec(); - // reset the buffer so the next tick only sees new stats + // Reset the buffer so the next tick only sees new stats buffer.clear(); - // release the mutex before flushing stats + // Release the mutex before flushing stats drop(buffer); let should_flush = should_flush_stats_buffer( @@ -83,49 +106,21 @@ impl StatsFlusher for ServerlessStatsFlusher { } } - async fn flush_stats(&self, config: Arc, mut stats: Vec) { - // flush from stats concentrator if serverless stats are enabled and there are stats to flush - if let Some(ref concentrator) = self.stats_concentrator { - match concentrator.flush(self.force_flush_concentrator).await { - Ok(Some(payload)) => { - stats.push(payload); - } - Ok(None) => {} - Err(e) => { - error!("Failed to flush stats concentrator: {e}"); - } - } + /// Flushes client computed stats from the tracer and serverless computed stats as separate payloads + async fn flush_stats(&self, config: Arc, client_stats: Vec) { + // Flush client computed stats from the tracer + if !client_stats.is_empty() { + let payload = stats_utils::construct_stats_payload(client_stats); + send_stats_payload(&config, payload).await; } - if stats.is_empty() { - return; - } - debug!("Flushing {} stats", stats.len()); - - let stats_payload = stats_utils::construct_stats_payload(stats); - - debug!("Stats payload to be sent: {stats_payload:?}"); - - let serialized_stats_payload = match stats_utils::serialize_stats_payload(stats_payload) { - Ok(res) => res, - Err(err) => { - error!("Failed to serialize stats payload, dropping stats: {err}"); - return; - } - }; - - #[allow(clippy::unwrap_used)] - match stats_utils::send_stats_payload( - serialized_stats_payload, - &config.trace_stats_intake, - config.trace_stats_intake.api_key.as_ref().unwrap(), - ) - .await + // Flush serverless computed stats from the concentrator + if let Some(ref concentrator) = self.stats_concentrator + && let Some(agent_stats) = concentrator.flush(self.force_flush_concentrator) { - Ok(_) => debug!("Successfully flushed stats"), - Err(e) => { - error!("Error sending stats: {e:?}") - } + let mut payload = stats_utils::construct_stats_payload(vec![agent_stats]); + payload.client_computed = false; + send_stats_payload(&config, payload).await; } } } @@ -136,7 +131,7 @@ mod tests { #[test] fn should_flush_stats_buffer_all_cases() { - // (stats channel empty, serverless stats enabled with concentrator) + // (stats channel empty, serverless computed stats enabled with concentrator) assert!(!should_flush_stats_buffer(false, false)); assert!(should_flush_stats_buffer(true, false)); assert!(should_flush_stats_buffer(false, true)); diff --git a/crates/datadog-trace-agent/src/stats_generator.rs b/crates/datadog-trace-agent/src/stats_generator.rs index 1556e25f..8cc54abf 100644 --- a/crates/datadog-trace-agent/src/stats_generator.rs +++ b/crates/datadog-trace-agent/src/stats_generator.rs @@ -1,15 +1,13 @@ -use crate::stats_concentrator_service::{StatsConcentratorHandle, StatsError}; +use crate::stats_concentrator::StatsConcentrator; use libdd_trace_utils::tracer_payload::TracerPayloadCollection; use tracing::error; pub struct StatsGenerator { - stats_concentrator: StatsConcentratorHandle, + stats_concentrator: StatsConcentrator, } #[derive(Debug, thiserror::Error)] pub enum StatsGeneratorError { - #[error("Error sending trace stats to the stats concentrator: {0}")] - ConcentratorCommandError(StatsError), #[error("Unsupported trace payload version. Failed to send trace stats.")] TracePayloadVersionError, } @@ -17,26 +15,17 @@ pub enum StatsGeneratorError { // Extracts information from traces related to stats and sends it to the stats concentrator impl StatsGenerator { #[must_use] - pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { + pub fn new(stats_concentrator: StatsConcentrator) -> Self { Self { stats_concentrator } } - pub async fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { + pub fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { if let TracerPayloadCollection::V07(traces) = traces { for trace in traces { - // Set tracer metadata - if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace).await { - error!("Failed to set tracer metadata: {err}"); - return Err(StatsGeneratorError::ConcentratorCommandError(err)); - } - - // Generate stats for each span in the trace + self.stats_concentrator.set_tracer_metadata(trace); for chunk in &trace.chunks { for span in &chunk.spans { - if let Err(err) = self.stats_concentrator.add(span).await { - error!("Failed to send trace stats: {err}"); - return Err(StatsGeneratorError::ConcentratorCommandError(err)); - } + self.stats_concentrator.add(span); } } } diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index a6b8cde2..8b8a1b9b 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -145,7 +145,7 @@ impl TraceProcessor for ServerlessTraceProcessor { if let Some(stats_generator) = self.stats_generator.as_ref() && !tracer_header_tags.client_computed_stats - && let Err(e) = stats_generator.send(&payload).await + && let Err(e) = stats_generator.send(&payload) { error!("Stats generator error: {e}"); } diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index ea84f5ba..1a7ed5f4 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -47,14 +47,12 @@ pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { /// Helper to create a mini agent with real flushers pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { use datadog_trace_agent::{ - aggregator::TraceAggregator, stats_concentrator_service::StatsConcentratorService, + aggregator::TraceAggregator, stats_concentrator::StatsConcentrator, stats_flusher::ServerlessStatsFlusher, stats_generator::StatsGenerator, stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, }; - let (concentrator_service, stats_concentrator_handle) = - StatsConcentratorService::new(config.clone()); - tokio::spawn(concentrator_service.run()); + let stats_concentrator_handle = StatsConcentrator::new(config.clone()); let stats_generator = Some(Arc::new(StatsGenerator::new( stats_concentrator_handle.clone(), From ee8cefbeb020fdda1b525adaa948875364ac868e Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Mon, 6 Apr 2026 16:18:08 -0400 Subject: [PATCH 10/13] clean up stats concentrator configuration --- crates/datadog-trace-agent/src/config.rs | 3 --- .../src/stats_concentrator.rs | 21 ++++++++++++------- .../src/trace_processor.rs | 1 - 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index bf1d8445..99b75b47 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -111,7 +111,6 @@ pub struct Config { pub proxy_url: Option, pub service: Option, pub env: Option, - pub version: Option, } impl Config { @@ -256,7 +255,6 @@ impl Config { tags, service: env::var("DD_SERVICE").ok(), env: env::var("DD_ENV").ok(), - version: env::var("DD_VERSION").ok(), }) } } @@ -729,7 +727,6 @@ pub mod test_helpers { proxy_url: None, service: None, env: None, - version: None, } } } diff --git a/crates/datadog-trace-agent/src/stats_concentrator.rs b/crates/datadog-trace-agent/src/stats_concentrator.rs index 93153624..76052b6b 100644 --- a/crates/datadog-trace-agent/src/stats_concentrator.rs +++ b/crates/datadog-trace-agent/src/stats_concentrator.rs @@ -17,6 +17,10 @@ pub struct TracerMetadata { // e.g. "f45568ad09d5480b99087d86ebda26e6" pub runtime_id: String, pub container_id: String, + // e.g. "prod" + pub env: String, + // e.g. "1.0.0" + pub app_version: String, } struct ConcentratorState { @@ -63,6 +67,8 @@ impl StatsConcentrator { tracer_version: trace.tracer_version.clone(), runtime_id: trace.runtime_id.clone(), container_id: trace.container_id.clone(), + env: trace.env.clone(), + app_version: trace.app_version.clone(), }; } @@ -81,13 +87,13 @@ impl StatsConcentrator { Some(ClientStatsPayload { // Do not set hostname so the trace stats backend can aggregate stats properly hostname: String::new(), - env: inner - .config - .env - .clone() - .unwrap_or("unknown-env".to_string()), - // Version is not in the trace payload. Need to read it from config. - version: inner.config.version.clone().unwrap_or_default(), + // Prefer env from the tracer payload, fall back to agent config + env: if !inner.tracer_metadata.env.is_empty() { + inner.tracer_metadata.env.clone() + } else { + inner.config.env.clone().unwrap_or_default() + }, + version: inner.tracer_metadata.app_version.clone(), lang: inner.tracer_metadata.language.clone(), tracer_version: inner.tracer_metadata.tracer_version.clone(), runtime_id: inner.tracer_metadata.runtime_id.clone(), @@ -95,6 +101,7 @@ impl StatsConcentrator { sequence: 0, // Not supported yet agent_aggregation: String::new(), + // One service per app for serverless service: inner.config.service.clone().unwrap_or_default(), container_id: inner.tracer_metadata.container_id.clone(), // Not supported yet diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 8b8a1b9b..bd385e01 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -232,7 +232,6 @@ mod tests { tags: Tags::from_env_string("env:test,service:my-service"), service: Some("test-service".to_string()), env: Some("test-env".to_string()), - version: Some("1.0.0".to_string()), } } From c2191f862eae020d361f2c2b3c1cc65706278909 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 7 Apr 2026 11:47:10 -0400 Subject: [PATCH 11/13] add shutdown channel to stats flusher --- crates/datadog-serverless-compat/src/main.rs | 4 +- crates/datadog-trace-agent/src/mini_agent.rs | 7 +- .../datadog-trace-agent/src/stats_flusher.rs | 90 +++++++++++-------- .../datadog-trace-agent/tests/common/mocks.rs | 9 +- .../tests/integration_test.rs | 30 ++++--- 5 files changed, 87 insertions(+), 53 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 4c748e07..ef385a34 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -177,7 +177,6 @@ pub async fn main() { let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher { stats_concentrator: stats_concentrator_handle.clone(), - force_flush_concentrator: false, }); let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); @@ -199,8 +198,9 @@ pub async fn main() { proxy_flusher, }); + let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); tokio::spawn(async move { - let res = mini_agent.start_mini_agent().await; + let res = mini_agent.start_mini_agent(shutdown_rx).await; if let Err(e) = res { error!("Error when starting serverless trace mini agent: {e:?}"); } diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 021e736c..8ed2ce67 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -50,7 +50,10 @@ pub struct MiniAgent { } impl MiniAgent { - pub async fn start_mini_agent(&self) -> Result<(), Box> { + pub async fn start_mini_agent( + &self, + shutdown_rx: tokio::sync::oneshot::Receiver<()>, + ) -> Result<(), Box> { let now = Instant::now(); // verify we are in a serverless function environment. if not, shut down the mini agent. @@ -93,7 +96,7 @@ impl MiniAgent { let stats_config = self.config.clone(); let stats_flusher_handle = tokio::spawn(async move { stats_flusher - .start_stats_flusher(stats_config, stats_rx) + .start_stats_flusher(stats_config, stats_rx, shutdown_rx) .await; }); diff --git a/crates/datadog-trace-agent/src/stats_flusher.rs b/crates/datadog-trace-agent/src/stats_flusher.rs index 47f1bca0..e1953c6e 100644 --- a/crates/datadog-trace-agent/src/stats_flusher.rs +++ b/crates/datadog-trace-agent/src/stats_flusher.rs @@ -3,7 +3,8 @@ use async_trait::async_trait; use std::{sync::Arc, time}; -use tokio::sync::{Mutex, mpsc::Receiver}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot; use tracing::{debug, error}; use libdd_trace_protobuf::pb; @@ -46,22 +47,28 @@ async fn send_stats_payload(config: &Arc, payload: pb::StatsPayload) { #[async_trait] pub trait StatsFlusher { /// Starts a stats flusher that listens for stats payloads sent to the tokio mpsc Receiver, - /// implementing flushing logic that calls flush_stats. + /// implementing flushing logic that calls flush_stats. Runs until the shutdown signal fires, + /// at which point it performs a final force flush and returns. async fn start_stats_flusher( &self, config: Arc, - mut rx: Receiver, + rx: Receiver, + shutdown_rx: oneshot::Receiver<()>, ); /// Flushes stats to the Datadog trace stats intake. - async fn flush_stats(&self, config: Arc, client_stats: Vec); + /// `force_flush` controls whether in-progress concentrator buckets are flushed (true on + /// shutdown, false on normal interval flushes). + async fn flush_stats( + &self, + config: Arc, + client_stats: Vec, + force_flush: bool, + ); } #[derive(Clone)] pub struct ServerlessStatsFlusher { pub stats_concentrator: Option, - /// When false, flushes are done on completed buckets - /// When true, flushes are done on any in progress buckets, useful for integration tests - pub force_flush_concentrator: bool, } #[async_trait] @@ -70,44 +77,51 @@ impl StatsFlusher for ServerlessStatsFlusher { &self, config: Arc, mut rx: Receiver, + mut shutdown_rx: oneshot::Receiver<()>, ) { - let buffer: Arc>> = Arc::new(Mutex::new(Vec::new())); - - let buffer_producer = buffer.clone(); - let buffer_consumer = buffer.clone(); - - // Drain the stats channel continuously into the buffer - tokio::spawn(async move { - while let Some(stats_payload) = rx.recv().await { - let mut buffer = buffer_producer.lock().await; - buffer.push(stats_payload); - } - }); + let mut interval = + tokio::time::interval(time::Duration::from_secs(config.stats_flush_interval_secs)); + let mut buffer: Vec = Vec::new(); - // Flush stats from the buffer on a fixed interval loop { - tokio::time::sleep(time::Duration::from_secs(config.stats_flush_interval_secs)).await; - - let mut buffer = buffer_consumer.lock().await; - // Copy the batch for this flush - let channel_stats = buffer.to_vec(); - // Reset the buffer so the next tick only sees new stats - buffer.clear(); - // Release the mutex before flushing stats - drop(buffer); - - let should_flush = should_flush_stats_buffer( - !channel_stats.is_empty(), - self.stats_concentrator.is_some(), - ); - if should_flush { - self.flush_stats(config.clone(), channel_stats).await; + tokio::select! { + // Receive client stats and add them to the buffer + Some(stats) = rx.recv() => { + buffer.push(stats); + } + + // Drain client stats in buffer and stats from concentrator on interval + _ = interval.tick() => { + let client_stats = std::mem::take(&mut buffer); + let should_flush = should_flush_stats_buffer( + !client_stats.is_empty(), + self.stats_concentrator.is_some(), + ); + if should_flush { + self.flush_stats(config.clone(), client_stats, false).await; + } + } + + _ = &mut shutdown_rx => { + // Drain any client stats that arrived before the shutdown signal + while let Ok(stats) = rx.try_recv() { + buffer.push(stats); + } + // Force flush all in progress concentrator buckets on shutdown signal + self.flush_stats(config.clone(), std::mem::take(&mut buffer), true).await; + return; + } } } } /// Flushes client computed stats from the tracer and serverless computed stats as separate payloads - async fn flush_stats(&self, config: Arc, client_stats: Vec) { + async fn flush_stats( + &self, + config: Arc, + client_stats: Vec, + force_flush: bool, + ) { // Flush client computed stats from the tracer if !client_stats.is_empty() { let payload = stats_utils::construct_stats_payload(client_stats); @@ -116,7 +130,7 @@ impl StatsFlusher for ServerlessStatsFlusher { // Flush serverless computed stats from the concentrator if let Some(ref concentrator) = self.stats_concentrator - && let Some(agent_stats) = concentrator.flush(self.force_flush_concentrator) + && let Some(agent_stats) = concentrator.flush(force_flush) { let mut payload = stats_utils::construct_stats_payload(vec![agent_stats]); payload.client_computed = false; diff --git a/crates/datadog-trace-agent/tests/common/mocks.rs b/crates/datadog-trace-agent/tests/common/mocks.rs index 842c45f0..dfd98ccd 100644 --- a/crates/datadog-trace-agent/tests/common/mocks.rs +++ b/crates/datadog-trace-agent/tests/common/mocks.rs @@ -12,6 +12,7 @@ use libdd_trace_protobuf::pb; use libdd_trace_utils::trace_utils::{self, MiniAgentMetadata, SendData}; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::oneshot; /// Mock trace processor that returns 200 OK for all requests #[allow(dead_code)] @@ -86,6 +87,7 @@ impl StatsFlusher for MockStatsFlusher { &self, _config: Arc, mut stats_rx: Receiver, + _shutdown_rx: oneshot::Receiver<()>, ) { // Consume messages from the channel without processing them while let Some(_stats) = stats_rx.recv().await { @@ -93,7 +95,12 @@ impl StatsFlusher for MockStatsFlusher { } } - async fn flush_stats(&self, _config: Arc, _traces: Vec) { + async fn flush_stats( + &self, + _config: Arc, + _traces: Vec, + _force_flush: bool, + ) { // Do nothing } } diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 1a7ed5f4..eb5fa6b8 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -69,7 +69,6 @@ pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { stats_processor: Arc::new(ServerlessStatsProcessor {}), stats_flusher: Arc::new(ServerlessStatsFlusher { stats_concentrator: Some(stats_concentrator_handle), - force_flush_concentrator: true, }), env_verifier: Arc::new(MockEnvVerifier), proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), @@ -178,7 +177,8 @@ async fn test_mini_agent_tcp_handles_requests() { // Start the mini agent let agent_handle = tokio::spawn(async move { - let _ = mini_agent.start_mini_agent().await; + let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let _ = mini_agent.start_mini_agent(shutdown_rx).await; }); // Give server time to start @@ -277,7 +277,8 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Start the mini agent let agent_handle = tokio::spawn(async move { - let _ = mini_agent.start_mini_agent().await; + let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let _ = mini_agent.start_mini_agent(shutdown_rx).await; }); // Give server time to create pipe @@ -348,8 +349,9 @@ async fn test_mini_agent_tcp_with_real_flushers() { let mini_agent = create_mini_agent_with_real_flushers(config); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let agent_handle = tokio::spawn(async move { - let _ = mini_agent.start_mini_agent().await; + let _ = mini_agent.start_mini_agent(shutdown_rx).await; }); // Wait for server to be ready @@ -376,10 +378,13 @@ async fn test_mini_agent_tcp_with_real_flushers() { .expect("Failed to send /v0.4/traces request"); assert_eq!(trace_response.status(), StatusCode::OK); - // Wait for flush + // Wait for trace flush tokio::time::sleep(FLUSH_WAIT_DURATION).await; - verify_trace_request(&mock_server); + + // Trigger shutdown to force flush in progress concentrator buckets + let _ = shutdown_tx.send(()); + tokio::time::sleep(FLUSH_WAIT_DURATION).await; verify_stats_request(&mock_server); // Stats generator should generate stats from trace payload // Clean up @@ -401,7 +406,8 @@ async fn test_mini_agent_tcp_with_real_flushers_and_tracer_computed_stats() { let mini_agent = create_mini_agent_with_real_flushers(config); let agent_handle = tokio::spawn(async move { - let _ = mini_agent.start_mini_agent().await; + let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let _ = mini_agent.start_mini_agent(shutdown_rx).await; }); // Wait for server to be ready @@ -459,8 +465,9 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { let mini_agent = create_mini_agent_with_real_flushers(config); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let agent_handle = tokio::spawn(async move { - let _ = mini_agent.start_mini_agent().await; + let _ = mini_agent.start_mini_agent(shutdown_rx).await; }); // Wait for server to be ready @@ -487,10 +494,13 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { .expect("Failed to send /v0.4/traces request over named pipe"); assert_eq!(trace_response.status(), StatusCode::OK); - // Wait for flush + // Wait for trace flush tokio::time::sleep(FLUSH_WAIT_DURATION).await; - verify_trace_request(&mock_server); + + // Trigger shutdown to force flush in progress concentrator buckets + let _ = shutdown_tx.send(()); + tokio::time::sleep(FLUSH_WAIT_DURATION).await; verify_stats_request(&mock_server); // Clean up From fadc3ab0a36899a4022d7afe0a0f429f2f58cb03 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 7 Apr 2026 17:14:12 -0400 Subject: [PATCH 12/13] use bounded channel and skip stats computation if channel is full, including debug log for the channel capacity --- crates/datadog-serverless-compat/src/main.rs | 6 +- crates/datadog-trace-agent/src/lib.rs | 2 +- .../src/stats_concentrator.rs | 120 ---------- .../src/stats_concentrator_service.rs | 221 ++++++++++++++++++ .../datadog-trace-agent/src/stats_flusher.rs | 24 +- .../src/stats_generator.rs | 22 +- .../tests/integration_test.rs | 5 +- 7 files changed, 260 insertions(+), 140 deletions(-) delete mode 100644 crates/datadog-trace-agent/src/stats_concentrator.rs create mode 100644 crates/datadog-trace-agent/src/stats_concentrator_service.rs diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index ef385a34..0aae8072 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -18,7 +18,7 @@ use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, - config, env_verifier, mini_agent, proxy_flusher, stats_concentrator, stats_flusher, + config, env_verifier, mini_agent, proxy_flusher, stats_concentrator_service, stats_flusher, stats_generator, stats_processor, trace_flusher::{self, TraceFlusher}, trace_processor, @@ -161,7 +161,9 @@ pub async fn main() { let (stats_concentrator_handle, stats_generator) = if dd_serverless_stats_computation_enabled { info!("serverless stats computation enabled"); - let handle = stats_concentrator::StatsConcentrator::new(config.clone()); + let (service, handle) = + stats_concentrator_service::StatsConcentratorService::new(config.clone()); + tokio::spawn(service.run()); ( Some(handle.clone()), Some(Arc::new(stats_generator::StatsGenerator::new(handle))), diff --git a/crates/datadog-trace-agent/src/lib.rs b/crates/datadog-trace-agent/src/lib.rs index 984957cd..daeed742 100644 --- a/crates/datadog-trace-agent/src/lib.rs +++ b/crates/datadog-trace-agent/src/lib.rs @@ -13,7 +13,7 @@ pub mod env_verifier; pub mod http_utils; pub mod mini_agent; pub mod proxy_flusher; -pub mod stats_concentrator; +pub mod stats_concentrator_service; pub mod stats_flusher; pub mod stats_generator; pub mod stats_processor; diff --git a/crates/datadog-trace-agent/src/stats_concentrator.rs b/crates/datadog-trace-agent/src/stats_concentrator.rs deleted file mode 100644 index 76052b6b..00000000 --- a/crates/datadog-trace-agent/src/stats_concentrator.rs +++ /dev/null @@ -1,120 +0,0 @@ -use crate::config::Config; -use libdd_trace_protobuf::pb; -use libdd_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; -use libdd_trace_stats::span_concentrator::SpanConcentrator; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; - -const S_TO_NS: u64 = 1_000_000_000; -const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds - -#[derive(Clone, Debug, Default)] -pub struct TracerMetadata { - // e.g. "python" - pub language: String, - // e.g. "3.11.0" - pub tracer_version: String, - // e.g. "f45568ad09d5480b99087d86ebda26e6" - pub runtime_id: String, - pub container_id: String, - // e.g. "prod" - pub env: String, - // e.g. "1.0.0" - pub app_version: String, -} - -struct ConcentratorState { - concentrator: SpanConcentrator, - tracer_metadata: TracerMetadata, - config: Arc, -} - -/// A cloneable handle to the stats concentrator, safe to share across async tasks. -/// -/// Uses std::sync::Mutex rather than tokio::sync::Mutex because none of the critical -/// sections (add_span, flush) contain await points. Holding a std::sync::Mutex without -/// yielding is safe and avoids the overhead of tokio's async mutex -#[derive(Clone)] -pub struct StatsConcentrator { - inner: Arc>, -} - -impl StatsConcentrator { - #[must_use] - pub fn new(config: Arc) -> Self { - // TODO: set span_kinds_stats_computed and peer_tag_keys - let concentrator = SpanConcentrator::new( - Duration::from_nanos(BUCKET_DURATION_NS), - SystemTime::now(), - vec![], - vec![], - ); - Self { - inner: Arc::new(Mutex::new(ConcentratorState { - concentrator, - tracer_metadata: TracerMetadata::default(), - config, - })), - } - } - - /// Updates tracer metadata from the payload. Called on every trace payload so that - /// metadata is never permanently stale from an incomplete first trace - pub fn set_tracer_metadata(&self, trace: &TracerPayload) { - let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); - inner.tracer_metadata = TracerMetadata { - language: trace.language_name.clone(), - tracer_version: trace.tracer_version.clone(), - runtime_id: trace.runtime_id.clone(), - container_id: trace.container_id.clone(), - env: trace.env.clone(), - app_version: trace.app_version.clone(), - }; - } - - /// Adds a span for stats computation - pub fn add(&self, span: &pb::Span) { - let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); - inner.concentrator.add_span(span); - } - - pub fn flush(&self, force_flush: bool) -> Option { - let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); - let stats_buckets = inner.concentrator.flush(SystemTime::now(), force_flush); - if stats_buckets.is_empty() { - return None; - } - Some(ClientStatsPayload { - // Do not set hostname so the trace stats backend can aggregate stats properly - hostname: String::new(), - // Prefer env from the tracer payload, fall back to agent config - env: if !inner.tracer_metadata.env.is_empty() { - inner.tracer_metadata.env.clone() - } else { - inner.config.env.clone().unwrap_or_default() - }, - version: inner.tracer_metadata.app_version.clone(), - lang: inner.tracer_metadata.language.clone(), - tracer_version: inner.tracer_metadata.tracer_version.clone(), - runtime_id: inner.tracer_metadata.runtime_id.clone(), - // Not supported yet - sequence: 0, - // Not supported yet - agent_aggregation: String::new(), - // One service per app for serverless - service: inner.config.service.clone().unwrap_or_default(), - container_id: inner.tracer_metadata.container_id.clone(), - // Not supported yet - tags: vec![], - // Not supported yet - git_commit_sha: String::new(), - // Not supported yet - image_tag: String::new(), - stats: stats_buckets, - // Not supported yet - process_tags: String::new(), - // Not supported yet - process_tags_hash: 0, - }) - } -} diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs new file mode 100644 index 00000000..74c9f0d1 --- /dev/null +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -0,0 +1,221 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use tokio::sync::{mpsc, oneshot}; + +use crate::config::Config; +use libdd_trace_protobuf::pb; +use libdd_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; +use libdd_trace_stats::span_concentrator::SpanConcentrator; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tracing::{debug, error}; + +const S_TO_NS: u64 = 1_000_000_000; +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds + +/// When the channel is full, spans are dropped from stats computation rather than blocking +/// the trace request path or growing without bound. +const CONCENTRATOR_COMMAND_CHANNEL_CAPACITY: usize = 8192; + +#[derive(Debug, thiserror::Error)] +pub enum StatsError { + #[error("Failed to send command to concentrator: {0}")] + SendError(Box>), + #[error("Failed to receive response from concentrator: {0}")] + RecvError(oneshot::error::RecvError), +} + +#[derive(Clone, Debug, Default)] +pub struct TracerMetadata { + // e.g. "python" + pub language: String, + // e.g. "3.11.0" + pub tracer_version: String, + // e.g. "f45568ad09d5480b99087d86ebda26e6" + pub runtime_id: String, + pub container_id: String, + // e.g. "prod" + pub env: String, + // e.g. "1.0.0" + pub app_version: String, +} + +pub enum ConcentratorCommand { + SetTracerMetadata(TracerMetadata), + // Use a box to reduce the size of the command enum + Add(Box), + Flush(bool, oneshot::Sender>), +} + +/// A cloneable handle to the stats concentrator service, safe to share across async tasks. +#[derive(Clone)] +pub struct StatsConcentratorHandle { + tx: mpsc::Sender, +} + +impl StatsConcentratorHandle { + #[must_use] + pub fn new(tx: mpsc::Sender) -> Self { + Self { tx } + } + + /// Updates tracer metadata from the payload. Called on every trace payload so that + /// metadata is never permanently stale from an incomplete first trace. + pub fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { + let tracer_metadata = TracerMetadata { + language: trace.language_name.clone(), + tracer_version: trace.tracer_version.clone(), + runtime_id: trace.runtime_id.clone(), + container_id: trace.container_id.clone(), + env: trace.env.clone(), + app_version: trace.app_version.clone(), + }; + match self + .tx + .try_send(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) + { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => { + debug!("Stats concentrator channel full, skipping tracer metadata update"); + Ok(()) + } + Err(e) => Err(StatsError::SendError(Box::new(e))), + } + } + + /// Adds a span for stats computation. + pub fn add(&self, span: &pb::Span) -> Result<(), StatsError> { + match self + .tx + .try_send(ConcentratorCommand::Add(Box::new(span.clone()))) + { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => { + debug!("Stats concentrator channel full, skipping span stats computation"); + Ok(()) + } + Err(e) => Err(StatsError::SendError(Box::new(e))), + } + } + + pub async fn flush(&self, force_flush: bool) -> Result, StatsError> { + let (response_tx, response_rx) = oneshot::channel(); + match self + .tx + .try_send(ConcentratorCommand::Flush(force_flush, response_tx)) + { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + debug!("Stats concentrator channel full, skipping stats flush"); + return Ok(None); + } + Err(e) => return Err(StatsError::SendError(Box::new(e))), + } + response_rx.await.map_err(StatsError::RecvError) + } +} + +pub struct StatsConcentratorService { + concentrator: SpanConcentrator, + rx: mpsc::Receiver, + tracer_metadata: TracerMetadata, + config: Arc, +} + +impl StatsConcentratorService { + #[must_use] + pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { + let (tx, rx) = mpsc::channel(CONCENTRATOR_COMMAND_CHANNEL_CAPACITY); + let handle = StatsConcentratorHandle::new(tx); + // TODO: set span_kinds_stats_computed and peer_tag_keys + let concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_DURATION_NS), + SystemTime::now(), + vec![], + vec![], + ); + let service = Self { + concentrator, + rx, + tracer_metadata: TracerMetadata::default(), + config, + }; + (service, handle) + } + + pub async fn run(mut self) { + let mut log_interval = tokio::time::interval(Duration::from_secs(1)); + log_interval.tick().await; // discard first immediate tick + loop { + tokio::select! { + command = self.rx.recv() => { + match command { + Some(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) => { + self.tracer_metadata = tracer_metadata; + } + Some(ConcentratorCommand::Add(span)) => self.concentrator.add_span(&*span), + Some(ConcentratorCommand::Flush(force_flush, response_tx)) => { + self.handle_flush(force_flush, response_tx); + } + None => return, + } + } + _ = log_interval.tick() => { + debug!( + used = self.rx.len(), + capacity = CONCENTRATOR_COMMAND_CHANNEL_CAPACITY, + "Stats concentrator channel fill" + ); + } + } + } + } + + fn handle_flush( + &mut self, + force_flush: bool, + response_tx: oneshot::Sender>, + ) { + let stats_buckets = self.concentrator.flush(SystemTime::now(), force_flush); + let stats = if stats_buckets.is_empty() { + None + } else { + Some(ClientStatsPayload { + // Do not set hostname so the trace stats backend can aggregate stats properly + hostname: String::new(), + // Prefer env from the tracer payload, fall back to agent config + env: if !self.tracer_metadata.env.is_empty() { + self.tracer_metadata.env.clone() + } else { + self.config.env.clone().unwrap_or_default() + }, + version: self.tracer_metadata.app_version.clone(), + lang: self.tracer_metadata.language.clone(), + tracer_version: self.tracer_metadata.tracer_version.clone(), + runtime_id: self.tracer_metadata.runtime_id.clone(), + // Not supported yet + sequence: 0, + // Not supported yet + agent_aggregation: String::new(), + // One service per app for serverless + service: self.config.service.clone().unwrap_or_default(), + container_id: self.tracer_metadata.container_id.clone(), + // Not supported yet + tags: vec![], + // Not supported yet + git_commit_sha: String::new(), + // Not supported yet + image_tag: String::new(), + stats: stats_buckets, + // Not supported yet + process_tags: String::new(), + // Not supported yet + process_tags_hash: 0, + }) + }; + if let Err(e) = response_tx.send(stats) { + error!("Failed to return trace stats: {e:?}"); + } + } +} diff --git a/crates/datadog-trace-agent/src/stats_flusher.rs b/crates/datadog-trace-agent/src/stats_flusher.rs index e1953c6e..93052d22 100644 --- a/crates/datadog-trace-agent/src/stats_flusher.rs +++ b/crates/datadog-trace-agent/src/stats_flusher.rs @@ -11,7 +11,7 @@ use libdd_trace_protobuf::pb; use libdd_trace_utils::stats_utils; use crate::config::Config; -use crate::stats_concentrator::StatsConcentrator; +use crate::stats_concentrator_service::StatsConcentratorHandle; /// Whether the stats flusher should run `flush_stats` fn should_flush_stats_buffer( @@ -68,7 +68,7 @@ pub trait StatsFlusher { #[derive(Clone)] pub struct ServerlessStatsFlusher { - pub stats_concentrator: Option, + pub stats_concentrator: Option, } #[async_trait] @@ -107,7 +107,7 @@ impl StatsFlusher for ServerlessStatsFlusher { while let Ok(stats) = rx.try_recv() { buffer.push(stats); } - // Force flush all in progress concentrator buckets on shutdown signal + // Force flush all in progress concentrator stats buckets on shutdown signal self.flush_stats(config.clone(), std::mem::take(&mut buffer), true).await; return; } @@ -128,13 +128,17 @@ impl StatsFlusher for ServerlessStatsFlusher { send_stats_payload(&config, payload).await; } - // Flush serverless computed stats from the concentrator - if let Some(ref concentrator) = self.stats_concentrator - && let Some(agent_stats) = concentrator.flush(force_flush) - { - let mut payload = stats_utils::construct_stats_payload(vec![agent_stats]); - payload.client_computed = false; - send_stats_payload(&config, payload).await; + // Flush concentrator stats + if let Some(ref concentrator) = self.stats_concentrator { + match concentrator.flush(force_flush).await { + Ok(Some(agent_stats)) => { + let mut payload = stats_utils::construct_stats_payload(vec![agent_stats]); + payload.client_computed = false; + send_stats_payload(&config, payload).await; + } + Ok(None) => {} + Err(e) => error!("Failed to flush concentrator stats: {e}"), + } } } } diff --git a/crates/datadog-trace-agent/src/stats_generator.rs b/crates/datadog-trace-agent/src/stats_generator.rs index 8cc54abf..a7de3f1e 100644 --- a/crates/datadog-trace-agent/src/stats_generator.rs +++ b/crates/datadog-trace-agent/src/stats_generator.rs @@ -1,13 +1,18 @@ -use crate::stats_concentrator::StatsConcentrator; +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::stats_concentrator_service::{StatsConcentratorHandle, StatsError}; use libdd_trace_utils::tracer_payload::TracerPayloadCollection; use tracing::error; pub struct StatsGenerator { - stats_concentrator: StatsConcentrator, + stats_concentrator: StatsConcentratorHandle, } #[derive(Debug, thiserror::Error)] pub enum StatsGeneratorError { + #[error("Failed to send command to stats concentrator: {0}")] + ConcentratorCommandError(StatsError), #[error("Unsupported trace payload version. Failed to send trace stats.")] TracePayloadVersionError, } @@ -15,17 +20,24 @@ pub enum StatsGeneratorError { // Extracts information from traces related to stats and sends it to the stats concentrator impl StatsGenerator { #[must_use] - pub fn new(stats_concentrator: StatsConcentrator) -> Self { + pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { Self { stats_concentrator } } pub fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { if let TracerPayloadCollection::V07(traces) = traces { for trace in traces { - self.stats_concentrator.set_tracer_metadata(trace); + if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace) { + error!("Failed to set tracer metadata: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); + } + for chunk in &trace.chunks { for span in &chunk.spans { - self.stats_concentrator.add(span); + if let Err(err) = self.stats_concentrator.add(span) { + error!("Failed to send trace stats: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); + } } } } diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index eb5fa6b8..358f98cf 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -47,12 +47,13 @@ pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { /// Helper to create a mini agent with real flushers pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { use datadog_trace_agent::{ - aggregator::TraceAggregator, stats_concentrator::StatsConcentrator, + aggregator::TraceAggregator, stats_concentrator_service::StatsConcentratorService, stats_flusher::ServerlessStatsFlusher, stats_generator::StatsGenerator, stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, }; - let stats_concentrator_handle = StatsConcentrator::new(config.clone()); + let (service, stats_concentrator_handle) = StatsConcentratorService::new(config.clone()); + tokio::spawn(service.run()); let stats_generator = Some(Arc::new(StatsGenerator::new( stats_concentrator_handle.clone(), From 6c21e6170ac933a6b076e62bae5aadc25a5c8503 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 8 Apr 2026 11:48:13 -0400 Subject: [PATCH 13/13] send traces to concentrator channel instead of individual spans --- .../src/stats_concentrator_service.rs | 73 +++++-------------- .../src/stats_generator.rs | 47 +++++++----- 2 files changed, 44 insertions(+), 76 deletions(-) diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs index 74c9f0d1..db0bcac8 100644 --- a/crates/datadog-trace-agent/src/stats_concentrator_service.rs +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -1,22 +1,21 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use crate::config::Config; -use libdd_trace_protobuf::pb; -use libdd_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; +use libdd_trace_protobuf::pb::{ClientStatsPayload, TraceChunk}; use libdd_trace_stats::span_concentrator::SpanConcentrator; -use std::sync::Arc; use std::time::{Duration, SystemTime}; use tracing::{debug, error}; const S_TO_NS: u64 = 1_000_000_000; const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds -/// When the channel is full, spans are dropped from stats computation rather than blocking +/// When the channel is full, traces are dropped from stats computation rather than blocking /// the trace request path or growing without bound. -const CONCENTRATOR_COMMAND_CHANNEL_CAPACITY: usize = 8192; +const CONCENTRATOR_COMMAND_CHANNEL_CAPACITY: usize = 128; #[derive(Debug, thiserror::Error)] pub enum StatsError { @@ -42,9 +41,7 @@ pub struct TracerMetadata { } pub enum ConcentratorCommand { - SetTracerMetadata(TracerMetadata), - // Use a box to reduce the size of the command enum - Add(Box), + AddChunk(Box, TracerMetadata), Flush(bool, oneshot::Sender>), } @@ -60,39 +57,15 @@ impl StatsConcentratorHandle { Self { tx } } - /// Updates tracer metadata from the payload. Called on every trace payload so that - /// metadata is never permanently stale from an incomplete first trace. - pub fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { - let tracer_metadata = TracerMetadata { - language: trace.language_name.clone(), - tracer_version: trace.tracer_version.clone(), - runtime_id: trace.runtime_id.clone(), - container_id: trace.container_id.clone(), - env: trace.env.clone(), - app_version: trace.app_version.clone(), - }; - match self - .tx - .try_send(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) - { - Ok(()) => Ok(()), - Err(mpsc::error::TrySendError::Full(_)) => { - debug!("Stats concentrator channel full, skipping tracer metadata update"); - Ok(()) - } - Err(e) => Err(StatsError::SendError(Box::new(e))), - } - } - - /// Adds a span for stats computation. - pub fn add(&self, span: &pb::Span) -> Result<(), StatsError> { + /// Adds a trace chunk for stats computation. + pub fn add_chunk(&self, chunk: TraceChunk, metadata: TracerMetadata) -> Result<(), StatsError> { match self .tx - .try_send(ConcentratorCommand::Add(Box::new(span.clone()))) + .try_send(ConcentratorCommand::AddChunk(Box::new(chunk), metadata)) { Ok(()) => Ok(()), Err(mpsc::error::TrySendError::Full(_)) => { - debug!("Stats concentrator channel full, skipping span stats computation"); + debug!("Stats concentrator channel full, skipping stats computation"); Ok(()) } Err(e) => Err(StatsError::SendError(Box::new(e))), @@ -145,28 +118,16 @@ impl StatsConcentratorService { } pub async fn run(mut self) { - let mut log_interval = tokio::time::interval(Duration::from_secs(1)); - log_interval.tick().await; // discard first immediate tick - loop { - tokio::select! { - command = self.rx.recv() => { - match command { - Some(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) => { - self.tracer_metadata = tracer_metadata; - } - Some(ConcentratorCommand::Add(span)) => self.concentrator.add_span(&*span), - Some(ConcentratorCommand::Flush(force_flush, response_tx)) => { - self.handle_flush(force_flush, response_tx); - } - None => return, + while let Some(command) = self.rx.recv().await { + match command { + ConcentratorCommand::AddChunk(chunk, metadata) => { + self.tracer_metadata = metadata; + for span in &chunk.spans { + self.concentrator.add_span(span); } } - _ = log_interval.tick() => { - debug!( - used = self.rx.len(), - capacity = CONCENTRATOR_COMMAND_CHANNEL_CAPACITY, - "Stats concentrator channel fill" - ); + ConcentratorCommand::Flush(force_flush, response_tx) => { + self.handle_flush(force_flush, response_tx); } } } diff --git a/crates/datadog-trace-agent/src/stats_generator.rs b/crates/datadog-trace-agent/src/stats_generator.rs index a7de3f1e..73af7755 100644 --- a/crates/datadog-trace-agent/src/stats_generator.rs +++ b/crates/datadog-trace-agent/src/stats_generator.rs @@ -1,7 +1,7 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::stats_concentrator_service::{StatsConcentratorHandle, StatsError}; +use crate::stats_concentrator_service::{StatsConcentratorHandle, StatsError, TracerMetadata}; use libdd_trace_utils::tracer_payload::TracerPayloadCollection; use tracing::error; @@ -13,38 +13,45 @@ pub struct StatsGenerator { pub enum StatsGeneratorError { #[error("Failed to send command to stats concentrator: {0}")] ConcentratorCommandError(StatsError), - #[error("Unsupported trace payload version. Failed to send trace stats.")] - TracePayloadVersionError, + #[error("Unsupported tracer payload version. Failed to send trace stats.")] + TracerPayloadVersionError, } -// Extracts information from traces related to stats and sends it to the stats concentrator +// Sends tracer payloads to the stats concentrator impl StatsGenerator { #[must_use] pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { Self { stats_concentrator } } - pub fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { - if let TracerPayloadCollection::V07(traces) = traces { - for trace in traces { - if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace) { - error!("Failed to set tracer metadata: {err}"); - return Err(StatsGeneratorError::ConcentratorCommandError(err)); - } - - for chunk in &trace.chunks { - for span in &chunk.spans { - if let Err(err) = self.stats_concentrator.add(span) { - error!("Failed to send trace stats: {err}"); - return Err(StatsGeneratorError::ConcentratorCommandError(err)); - } + pub fn send( + &self, + tracer_payload_collection: &TracerPayloadCollection, + ) -> Result<(), StatsGeneratorError> { + if let TracerPayloadCollection::V07(tracer_payloads) = tracer_payload_collection { + for tracer_payload in tracer_payloads { + let metadata = TracerMetadata { + language: tracer_payload.language_name.clone(), + tracer_version: tracer_payload.tracer_version.clone(), + runtime_id: tracer_payload.runtime_id.clone(), + container_id: tracer_payload.container_id.clone(), + env: tracer_payload.env.clone(), + app_version: tracer_payload.app_version.clone(), + }; + for chunk in &tracer_payload.chunks { + if let Err(err) = self + .stats_concentrator + .add_chunk(chunk.clone(), metadata.clone()) + { + error!("Failed to send trace stats to concentrator: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); } } } Ok(()) } else { - error!("Unsupported trace payload version. Failed to send trace stats."); - Err(StatsGeneratorError::TracePayloadVersionError) + error!("Unsupported tracer payload version. Failed to send trace stats."); + Err(StatsGeneratorError::TracerPayloadVersionError) } } }