diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 401ce316e..04894f9c6 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -43,16 +43,6 @@ pub struct Context { /// tracing. /// pub extracted_span_context: Option, - /// Whether the tracer has already generated stats for the dummy `aws.lambda` span - /// for this invocation (which only exists for universal instrumentation - /// languages). - /// - /// Set from the `Datadog-Client-Computed-Stats` header when the tracer's - /// dummy `aws.lambda` span (with `resource_name="dd-tracer-serverless-span"`) is received. - /// The value will be propagated to the same field of the real `aws.lambda` span generated by - /// the extension, which will be used to decide whether stats should be generated on extension - /// or backend. - pub client_computed_stats: bool, } /// Struct containing the information needed to reparent a span. @@ -104,7 +94,6 @@ impl Default for Context { snapstart_restore_span: None, tracer_span: None, extracted_span_context: None, - client_computed_stats: false, } } } @@ -519,12 +508,7 @@ impl ContextBuffer { /// Adds the tracer span to a `Context` in the buffer. /// - pub fn add_tracer_span( - &mut self, - request_id: &String, - tracer_span: &Span, - client_computed_stats: bool, - ) { + pub fn add_tracer_span(&mut self, request_id: &String, tracer_span: &Span) { if let Some(context) = self .buffer .iter_mut() @@ -544,7 +528,6 @@ impl ContextBuffer { .extend(tracer_span.metrics.clone()); context.tracer_span = Some(tracer_span.clone()); - context.client_computed_stats = client_computed_stats; } else { debug!("Could not add tracer span - context not found"); } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 1d134f5ce..caf812fe0 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -653,16 +653,9 @@ impl Processor { trace_sender: &Arc, context: Context, ) { - let client_computed_stats = context.client_computed_stats; let (traces, body_size) = self.get_ctx_spans(context); - self.send_spans( - traces, - body_size, - client_computed_stats, - tags_provider, - trace_sender, - ) - .await; + self.send_spans(traces, body_size, tags_provider, trace_sender) + .await; } fn get_ctx_spans(&mut self, context: Context) -> (Vec, usize) { @@ -727,7 +720,7 @@ impl Processor { let traces = vec![cold_start_span.clone()]; let body_size = size_of_val(cold_start_span); - self.send_spans(traces, body_size, false, tags_provider, trace_sender) + self.send_spans(traces, body_size, tags_provider, trace_sender) .await; } } @@ -739,7 +732,6 @@ impl Processor { &mut self, traces: Vec, body_size: usize, - client_computed_stats: bool, tags_provider: &Arc, trace_sender: &Arc, ) { @@ -752,7 +744,7 @@ impl Processor { tracer_version: "", container_id: "", client_computed_top_level: false, - client_computed_stats, + client_computed_stats: false, dropped_p0_traces: 0, dropped_p0_spans: 0, }; @@ -1399,10 +1391,9 @@ impl Processor { /// /// This is used to enrich the invocation span with additional metadata from the tracers /// top level span, since we discard the tracer span when we create the invocation span. - pub fn add_tracer_span(&mut self, span: &Span, client_computed_stats: bool) { + pub fn add_tracer_span(&mut self, span: &Span) { if let Some(request_id) = span.meta.get("request_id") { - self.context_buffer - .add_tracer_span(request_id, span, client_computed_stats); + self.context_buffer.add_tracer_span(request_id, span); } } @@ -2213,98 +2204,6 @@ mod tests { ); } - /// Verifies that `client_computed_stats` set on a context via `add_tracer_span` is - /// propagated all the way through `send_ctx_spans` to the `aws.lambda` payload sent - /// to the backend, so the extension does not generate duplicate stats. - #[tokio::test] - #[allow(clippy::unwrap_used)] - async fn test_client_computed_stats_propagated_to_aws_lambda_span() { - use crate::traces::stats_concentrator_service::StatsConcentratorService; - use crate::traces::stats_generator::StatsGenerator; - use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; - use tokio::sync::mpsc; - - let config = Arc::new(config::Config { - apm_dd_url: "https://trace.agent.datadoghq.com".to_string(), - ..config::Config::default() - }); - let tags_provider = Arc::new(provider::Provider::new( - Arc::clone(&config), - LAMBDA_RUNTIME_SLUG.to_string(), - &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - )); - let aws_config = Arc::new(AwsConfig { - region: "us-east-1".into(), - aws_lwa_proxy_lambda_runtime_api: None, - function_name: "test-function".into(), - sandbox_init_time: Instant::now(), - runtime_api: "***".into(), - exec_wrapper: None, - initialization_type: "on-demand".into(), - }); - let (aggregator_service, aggregator_handle) = - AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); - tokio::spawn(aggregator_service.run()); - let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config))); - let (durable_context_tx, _) = tokio::sync::mpsc::channel(1); - let mut p = Processor::new( - Arc::clone(&tags_provider), - Arc::clone(&config), - aws_config, - aggregator_handle, - propagator, - durable_context_tx, - ); - - let (trace_tx, mut trace_rx) = mpsc::channel(10); - let (stats_concentrator_service, stats_concentrator_handle) = - StatsConcentratorService::new(Arc::clone(&config)); - tokio::spawn(stats_concentrator_service.run()); - let trace_sender = Arc::new(SendingTraceProcessor { - appsec: None, - processor: Arc::new(trace_processor::ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }), - trace_tx, - stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), - }); - - let mut context = Context::from_request_id("req-1"); - context.invocation_span.trace_id = 1; - context.invocation_span.span_id = 2; - context.client_computed_stats = true; - - p.send_ctx_spans(&tags_provider, &trace_sender, context) - .await; - - let payload = trace_rx - .recv() - .await - .expect("expected payload from trace_tx"); - assert!( - payload.header_tags.client_computed_stats, - "client_computed_stats must be propagated to the aws.lambda span payload" - ); - - // Verify _dd.compute_stats is "0" in the built payload tags: client_computed_stats=true - // means the tracer has already computed stats, so neither extension nor backend should. - let send_data = payload.builder.build(); - let libdd_trace_utils::tracer_payload::TracerPayloadCollection::V07(payloads) = - send_data.get_payloads() - else { - panic!("expected V07 payload"); - }; - for p in payloads { - assert_eq!( - p.tags.get(crate::tags::lambda::tags::COMPUTE_STATS_KEY), - Some(&"0".to_string()), - "_dd.compute_stats must be 0 when client_computed_stats is true" - ); - } - } - fn make_trace_sender(config: Arc) -> Arc { use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; let (stats_concentrator_service, stats_concentrator_handle) = diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 299b477b3..ec2b8f9c2 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -109,7 +109,6 @@ pub enum ProcessorCommand { }, AddTracerSpan { span: Box, - client_computed_stats: bool, }, ForwardDurableContext { request_id: String, @@ -378,12 +377,10 @@ impl InvocationProcessorHandle { pub async fn add_tracer_span( &self, span: Span, - client_computed_stats: bool, ) -> Result<(), mpsc::error::SendError> { self.sender .send(ProcessorCommand::AddTracerSpan { span: Box::new(span), - client_computed_stats, }) .await } @@ -612,11 +609,8 @@ impl InvocationProcessorService { let result = Ok(self.processor.set_cold_start_span_trace_id(trace_id)); let _ = response.send(result); } - ProcessorCommand::AddTracerSpan { - span, - client_computed_stats, - } => { - self.processor.add_tracer_span(&span, client_computed_stats); + ProcessorCommand::AddTracerSpan { span } => { + self.processor.add_tracer_span(&span); } ProcessorCommand::ForwardDurableContext { request_id, diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index 230a92634..7790af076 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -39,7 +39,7 @@ const VERSION_KEY: &str = "version"; const SERVICE_KEY: &str = "service"; // ComputeStatsKey is the tag key indicating whether trace stats should be computed -pub(crate) const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; +const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; // FunctionTagsKey is the tag key for a function's tags to be set on the top level tracepayload const FUNCTION_TAGS_KEY: &str = "_dd.tags.function"; // TODO(astuyve) decide what to do with the version @@ -135,6 +135,12 @@ fn tags_from_env( tags_map.extend(config.tags.clone()); } + // The value of _dd.compute_stats is the opposite of config.compute_trace_stats_on_extension. + // "config.compute_trace_stats_on_extension == true" means computing stats on the extension side, + // so we set _dd.compute_stats to 0 so stats won't be computed on the backend side. + let compute_stats = i32::from(!config.compute_trace_stats_on_extension); + tags_map.insert(COMPUTE_STATS_KEY.to_string(), compute_stats.to_string()); + tags_map } @@ -294,7 +300,8 @@ mod tests { fn test_new_from_config() { let metadata = HashMap::new(); let tags = Lambda::new_from_config(Arc::new(Config::default()), &metadata); - assert_eq!(tags.tags_map.len(), 2); + assert_eq!(tags.tags_map.len(), 3); + assert_eq!(tags.tags_map.get(COMPUTE_STATS_KEY).unwrap(), "1"); let arch = arch_to_platform(); assert_eq!( tags.tags_map.get(ARCHITECTURE_KEY).unwrap(), @@ -429,7 +436,7 @@ mod tests { (parts[0].to_string(), parts[1].to_string()) }) .collect(); - assert_eq!(fn_tags_map.len(), 13); + assert_eq!(fn_tags_map.len(), 14); assert_eq!(fn_tags_map.get("key1").unwrap(), "value1"); assert_eq!(fn_tags_map.get("key2").unwrap(), "value2"); assert_eq!(fn_tags_map.get(ACCOUNT_ID_KEY).unwrap(), "123456789012"); @@ -471,7 +478,7 @@ mod tests { (parts[0].to_string(), parts[1].to_string()) }) .collect(); - assert_eq!(fn_tags_map.len(), 13); + assert_eq!(fn_tags_map.len(), 14); assert_eq!(fn_tags_map.get("key1").unwrap(), "value1"); assert_eq!(fn_tags_map.get("key2").unwrap(), "value2"); assert_eq!(fn_tags_map.get(ACCOUNT_ID_KEY).unwrap(), "123456789012"); diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 5a046f62a..3ac37bc22 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -501,8 +501,7 @@ impl TraceAgent { ); } - let tracer_header_tags: libdd_trace_utils::tracer_header_tags::TracerHeaderTags<'_> = - (&parts.headers).into(); + let tracer_header_tags = (&parts.headers).into(); let (body_size, mut traces): (usize, Vec>) = match version { ApiVersion::V04 => { @@ -532,6 +531,7 @@ impl TraceAgent { } }, }; + let mut reparenting_info = match invocation_processor_handle.get_reparenting_info().await { Ok(info) => info, Err(e) => { @@ -586,7 +586,7 @@ impl TraceAgent { if span.resource == INVOCATION_SPAN_RESOURCE && let Err(e) = invocation_processor_handle - .add_tracer_span(span.clone(), tracer_header_tags.client_computed_stats) + .add_tracer_span(span.clone()) .await { error!("Failed to add tracer span to processor: {}", e); diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 3864e4c60..05e151c65 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -5,7 +5,6 @@ use crate::appsec::processor::Processor as AppSecProcessor; use crate::appsec::processor::context::HoldArguments; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; -use crate::tags::lambda::tags::COMPUTE_STATS_KEY; use crate::tags::provider; use crate::traces::span_pointers::{SpanPointer, attach_span_pointers_to_meta}; use crate::traces::{ @@ -350,19 +349,6 @@ impl TraceProcessor for ServerlessTraceProcessor { let tags = tags_provider.get_function_tags_map(); for tracer_payload in collection.iter_mut() { tracer_payload.tags.extend(tags.clone()); - // Tell the backend whether to compute stats: - // - "1" (compute on backend) if neither the tracer nor the extension is computing them - // - "0" (skip on backend) if the extension or the tracer has already computed them - let compute_stats = if !config.compute_trace_stats_on_extension - && !header_tags.client_computed_stats - { - "1" - } else { - "0" - }; - tracer_payload - .tags - .insert(COMPUTE_STATS_KEY.to_string(), compute_stats.to_string()); } } let endpoint = Endpoint { @@ -516,7 +502,6 @@ impl SendingTraceProcessor { return Ok(()); } - let client_computed_stats = header_tags.client_computed_stats; let (payload, processed_traces) = self.processor.process_traces( config.clone(), tags_provider, @@ -528,9 +513,7 @@ impl SendingTraceProcessor { // This needs to be after process_traces() because process_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. - // Skip if the tracer has already computed stats (Datadog-Client-Computed-Stats header). if config.compute_trace_stats_on_extension - && !client_computed_stats && let Err(err) = self.stats_generator.send(&processed_traces) { // Just log the error. We don't think trace stats are critical, so we don't want to @@ -674,10 +657,6 @@ mod tests { ); let tracer_payload = tracer_payload.expect("expected Some payload"); - let mut expected_tags = tags_provider.get_function_tags_map(); - // process_traces always sets _dd.compute_stats:"1" - // because compute_trace_stats_on_extension is false and client_computed_stats is false. - expected_tags.insert(COMPUTE_STATS_KEY.to_string(), "1".to_string()); let expected_tracer_payload = pb::TracerPayload { container_id: "33".to_string(), language_name: "nodejs".to_string(), @@ -691,7 +670,7 @@ mod tests { tags: HashMap::new(), dropped_trace: false, }], - tags: expected_tags, + tags: tags_provider.get_function_tags_map(), env: "test-env".to_string(), hostname: String::new(), app_version: String::new(), @@ -1390,160 +1369,4 @@ mod tests { "body_size must be smaller than the original unfiltered request size" ); } - - /// Shared helper for the four `_dd.compute_stats` / stats-generation combination tests. - /// - /// Asserts: - /// - `_dd.compute_stats` tag value in the trace payload - /// - whether the extension generates stats via `send_processed_traces` - /// - /// | Input: `compute_trace_stats_on_extension` | Input: `client_computed_stats` | Expected: `_dd.compute_stats` | Expected: Extension generates stats? | - /// |-------------------------------------------|--------------------------------|-------------------------------|--------------------------------------| - /// | `false` | `false` | `"1"` | No | - /// | `false` | `true` | `"0"` | No | - /// | `true` | `false` | `"0"` | Yes | - /// | `true` | `true` | `"0"` | No | - #[allow(clippy::unwrap_used)] - #[allow(clippy::too_many_lines)] - async fn check_compute_stats_behavior( - compute_trace_stats_on_extension: bool, - client_computed_stats: bool, - ) { - use crate::traces::stats_concentrator_service::StatsConcentratorHandle; - use crate::traces::stats_generator::StatsGenerator; - use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; - use tokio::sync::mpsc; - - // "_dd.compute_stats" is "1" only when neither side computes stats (backend must do it). - let expected_tag = if !compute_trace_stats_on_extension && !client_computed_stats { - "1" - } else { - "0" - }; - // The extension generates stats only when it is configured to do so and the tracer hasn't. - let expect_stats = compute_trace_stats_on_extension && !client_computed_stats; - - let config = Arc::new(Config { - apm_dd_url: "https://trace.agent.datadoghq.com".to_string(), - compute_trace_stats_on_extension, - ..Config::default() - }); - let tags_provider = Arc::new(Provider::new( - config.clone(), - "lambda".to_string(), - &std::collections::HashMap::from([( - "function_arn".to_string(), - "test-arn".to_string(), - )]), - )); - let processor = Arc::new(ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }); - let header_tags = tracer_header_tags::TracerHeaderTags { - lang: "rust", - lang_version: "1.0", - lang_interpreter: "", - lang_vendor: "", - tracer_version: "1.0", - container_id: "", - client_computed_top_level: false, - client_computed_stats, - dropped_p0_traces: 0, - dropped_p0_spans: 0, - }; - let span = pb::Span { - trace_id: 1, - span_id: 1, - parent_id: 0, - service: "svc".to_string(), - name: "op".to_string(), - resource: "res".to_string(), - ..Default::default() - }; - - let (stats_tx, mut stats_rx) = mpsc::unbounded_channel(); - let stats_handle = StatsConcentratorHandle::new(stats_tx); - let stats_generator = Arc::new(StatsGenerator::new(stats_handle)); - let (trace_tx, mut trace_rx) = mpsc::channel(10); - let sending_processor = SendingTraceProcessor { - appsec: None, - processor, - trace_tx, - stats_generator, - }; - - sending_processor - .send_processed_traces( - config, - tags_provider, - header_tags, - vec![vec![span]], - 0, - None, - ) - .await - .unwrap(); - - if expect_stats { - assert!( - stats_rx.try_recv().is_ok(), - "extension must generate stats when compute_trace_stats_on_extension=true \ - and client_computed_stats=false" - ); - } else { - assert!( - stats_rx.try_recv().is_err(), - "extension must not generate stats (compute_trace_stats_on_extension={compute_trace_stats_on_extension}, \ - client_computed_stats={client_computed_stats})" - ); - } - - let payload_info = trace_rx - .try_recv() - .expect("expected payload in trace_tx channel"); - let send_data = payload_info.builder.build(); - let TracerPayloadCollection::V07(payloads) = send_data.get_payloads() else { - panic!("expected V07"); - }; - for payload in payloads { - assert_eq!( - payload - .tags - .get(crate::tags::lambda::tags::COMPUTE_STATS_KEY), - Some(&expected_tag.to_string()), - "_dd.compute_stats must be {expected_tag} (compute_trace_stats_on_extension={compute_trace_stats_on_extension}, \ - client_computed_stats={client_computed_stats})" - ); - } - } - - #[tokio::test] - #[allow(clippy::unwrap_used)] - async fn test_compute_stats_tag_neither_side_computes() { - // Neither extension nor tracer computes stats → backend must compute → tag "1". - check_compute_stats_behavior(false, false).await; - } - - #[tokio::test] - #[allow(clippy::unwrap_used)] - async fn test_compute_stats_tag_tracer_computes() { - // Tracer computed stats (Datadog-Client-Computed-Stats header set) → tag "0". - check_compute_stats_behavior(false, true).await; - } - - #[tokio::test] - #[allow(clippy::unwrap_used)] - async fn test_compute_stats_tag_extension_computes() { - // Extension computes stats (DD_COMPUTE_TRACE_STATS_ON_EXTENSION=true) → tag "0". - check_compute_stats_behavior(true, false).await; - } - - #[tokio::test] - #[allow(clippy::unwrap_used)] - async fn test_compute_stats_tag_both_compute() { - // Both extension and tracer compute stats → tracer takes precedence, tag "0", no double-count. - check_compute_stats_behavior(true, true).await; - } } diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 3b40545e6..8f619fdb1 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -22,6 +22,7 @@ async fn test_logs() { // protobuf is using hashmap, can't set a btreemap to have sorted keys. Using multiple regexp since // Can't do look around since -> error: look-around, including look-ahead and look-behind, is not supported let regexp_message = r#"[{"message":{"message":"START RequestId: 459921b5-681c-4a96-beb0-81e0aa586026 Version: $LATEST","lambda":{"arn":"test-arn","request_id":"459921b5-681c-4a96-beb0-81e0aa586026"},"timestamp":1666361103165,"status":"info"},"hostname":"test-arn","service":"","#; + let regexp_compute_state = r#"_dd.compute_stats:1"#; let regexp_arch = format!(r#"architecture:{}"#, arch); let regexp_function_arn = r#"function_arn:test-arn"#; let regexp_extension_version = r#"dd_extension_version"#; @@ -33,6 +34,7 @@ async fn test_logs() { .header("DD-API-KEY", dd_api_key) .header("Content-Type", "application/json") .body_contains(regexp_message) + .body_contains(regexp_compute_state) .body_contains(regexp_arch) .body_contains(regexp_function_arn) .body_contains(regexp_extension_version);