diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 8dbd04042..6859da0d2 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1375,6 +1375,7 @@ impl Processor { request_id: &str, execution_id: &str, execution_name: &str, + first_invocation: Option, ) { if let Err(e) = self .durable_context_tx @@ -1382,6 +1383,7 @@ impl Processor { request_id: request_id.to_owned(), execution_id: execution_id.to_owned(), execution_name: execution_name.to_owned(), + first_invocation, }) .await { diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 3a796e117..299b477b3 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -115,6 +115,7 @@ pub enum ProcessorCommand { request_id: String, execution_id: String, execution_name: String, + first_invocation: Option, }, OnOutOfMemoryError { timestamp: i64, @@ -392,12 +393,14 @@ impl InvocationProcessorHandle { request_id: String, execution_id: String, execution_name: String, + first_invocation: Option, ) -> Result<(), mpsc::error::SendError> { self.sender .send(ProcessorCommand::ForwardDurableContext { request_id, execution_id, execution_name, + first_invocation, }) .await } @@ -619,9 +622,15 @@ impl InvocationProcessorService { request_id, execution_id, execution_name, + first_invocation, } => { self.processor - .forward_durable_context(&request_id, &execution_id, &execution_name) + .forward_durable_context( + &request_id, + &execution_id, + &execution_name, + first_invocation, + ) .await; } ProcessorCommand::OnOutOfMemoryError { timestamp } => { diff --git a/bottlecap/src/logs/lambda/mod.rs b/bottlecap/src/logs/lambda/mod.rs index 0a49c9418..61d899f8f 100644 --- a/bottlecap/src/logs/lambda/mod.rs +++ b/bottlecap/src/logs/lambda/mod.rs @@ -8,6 +8,7 @@ pub struct DurableContextUpdate { pub request_id: String, pub execution_id: String, pub execution_name: String, + pub first_invocation: Option, } /// Durable execution context stored per `request_id` in `LambdaProcessor::durable_context_map`. @@ -15,6 +16,7 @@ pub struct DurableContextUpdate { pub struct DurableExecutionContext { pub execution_id: String, pub execution_name: String, + pub first_invocation: Option, } /// @@ -49,10 +51,21 @@ pub struct Message { pub struct Lambda { pub arn: String, pub request_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] + #[serde( + rename = "durable_function.execution_id", + skip_serializing_if = "Option::is_none" + )] pub durable_execution_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] + #[serde( + rename = "durable_function.execution_name", + skip_serializing_if = "Option::is_none" + )] pub durable_execution_name: Option, + #[serde( + rename = "durable_function.first_invocation", + skip_serializing_if = "Option::is_none" + )] + pub first_invocation: Option, } impl Message { diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 1c0adff20..e8a536492 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -86,6 +86,14 @@ fn is_oom_error(error_msg: &str) -> bool { .any(|&oom_str| error_msg.contains(oom_str)) } +/// Returns `true` for START, END, and REPORT platform logs. +/// These are the only logs that carry the `first_invocation` attribute. +fn is_platform_log(message: &str) -> bool { + message.starts_with("START RequestId:") + || message.starts_with("END RequestId:") + || message.starts_with("REPORT RequestId:") +} + /// Parses a Lambda durable execution ARN and returns `(execution_id, execution_name)`. /// /// Expected format: @@ -538,9 +546,10 @@ impl LambdaProcessor { /// `request_id`. pub fn insert_to_durable_context_map( &mut self, - request_id: &str, // key - execution_id: &str, // value - execution_name: &str, // value + request_id: &str, // key + execution_id: &str, // value + execution_name: &str, // value + first_invocation: Option, // value ) { if self.durable_context_map.contains_key(request_id) { error!("LOGS | insert_to_durable_context_map: request_id={request_id} already in map"); @@ -557,6 +566,7 @@ impl LambdaProcessor { DurableExecutionContext { execution_id: execution_id.to_string(), execution_name: execution_name.to_string(), + first_invocation, }, ); self.drain_held_logs_for_request_id(request_id); @@ -589,13 +599,8 @@ impl LambdaProcessor { self.held_logs_order.retain(|r| r != request_id); let durable_ctx = self.durable_context_map.get(request_id).cloned(); if let Some(ctx) = durable_ctx { - for mut log in held { - log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone()); - log.message.lambda.durable_execution_name = Some(ctx.execution_name.clone()); - if let Ok(s) = serde_json::to_string(&log) { - drop(log); - self.ready_logs.push(s); - } + for log in held { + self.set_durable_context_and_mark_ready(log, &ctx); } } } @@ -614,13 +619,8 @@ impl LambdaProcessor { if let Some(ctx) = durable_ctx { // If the request_id is in the durable context map, set durable execution id // and execution name, and add logs to ready_logs. - for mut log in logs { - log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone()); - log.message.lambda.durable_execution_name = - Some(ctx.execution_name.clone()); - if let Ok(s) = serde_json::to_string(&log) { - self.ready_logs.push(s); - } + for log in logs { + self.set_durable_context_and_mark_ready(log, &ctx); } } else { // No context yet — keep logs in held_logs until the aws.lambda span arrives. @@ -648,6 +648,26 @@ impl LambdaProcessor { } } + /// Applies durable execution context to a log and pushes it to `ready_logs`. + /// `first_invocation` is set only for platform logs (START/END/REPORT). + fn set_durable_context_and_mark_ready( + &mut self, + mut log: IntakeLog, + ctx: &DurableExecutionContext, + ) { + log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone()); + log.message.lambda.durable_execution_name = Some(ctx.execution_name.clone()); + if is_platform_log(&log.message.message) { + log.message.lambda.first_invocation = ctx.first_invocation; + } + if let Ok(s) = serde_json::to_string(&log) { + // explicitly drop log so we don't accidentally re-use it and push + // duplicate logs to the aggregator + drop(log); + self.ready_logs.push(s); + } + } + /// Stashes a log in `held_logs` under `request_id`, waiting for durable context. /// /// If `held_logs` is at capacity and `request_id` is a new key, the oldest key is evicted: @@ -685,7 +705,7 @@ impl LambdaProcessor { /// - `Some(false)` → serialize and push straight to `ready_logs`. /// - `Some(true)` → mark this log as ready to be aggregated if its `request_id` is already in `durable_context_map` /// (context was populated by an `aws.lambda` span); otherwise stash in `held_logs`. - fn queue_log_after_rules(&mut self, mut log: IntakeLog) { + fn queue_log_after_rules(&mut self, log: IntakeLog) { // Durable execution SDK logs already carry execution context extracted from executionArn. if log.message.lambda.durable_execution_id.is_some() { if let Ok(serialized_log) = serde_json::to_string(&log) { @@ -730,14 +750,7 @@ impl LambdaProcessor { match durable_ctx { Some(ctx) => { - log.message.lambda.durable_execution_id = Some(ctx.execution_id); - log.message.lambda.durable_execution_name = Some(ctx.execution_name); - if let Ok(serialized_log) = serde_json::to_string(&log) { - // explicitly drop log so we don't accidentally re-use it and push - // duplicate logs to the aggregator - drop(log); - self.ready_logs.push(serialized_log); - } + self.set_durable_context_and_mark_ready(log, &ctx); } None => { if let Some(rid) = log.message.lambda.request_id.clone() { @@ -2515,11 +2528,11 @@ mod tests { assert_eq!(batches.len(), 1); let logs: Vec = serde_json::from_slice(&batches[0]).unwrap(); assert_eq!( - logs[0]["message"]["lambda"]["durable_execution_id"], + logs[0]["message"]["lambda"]["durable_function.execution_id"], "my-id" ); assert_eq!( - logs[0]["message"]["lambda"]["durable_execution_name"], + logs[0]["message"]["lambda"]["durable_function.execution_name"], "my-name" ); } diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index 71a28609f..e71cb1bd3 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -52,6 +52,7 @@ impl LogsProcessor { &update.request_id, &update.execution_id, &update.execution_name, + update.first_invocation, ); let ready_logs = self.take_ready_logs(); if !ready_logs.is_empty() @@ -66,10 +67,16 @@ impl LogsProcessor { request_id: &str, execution_id: &str, execution_name: &str, + first_invocation: Option, ) { match self { LogsProcessor::Lambda(p) => { - p.insert_to_durable_context_map(request_id, execution_id, execution_name); + p.insert_to_durable_context_map( + request_id, + execution_id, + execution_name, + first_invocation, + ); } } } diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index 1745d4273..230a92634 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -54,6 +54,8 @@ const ACCOUNT_ID_KEY: &str = "account_id"; pub const REQUEST_ID_KEY: &str = "request_id"; pub const DURABLE_EXECUTION_ID_KEY: &str = "aws_lambda.durable_function.execution_id"; pub const DURABLE_EXECUTION_NAME_KEY: &str = "aws_lambda.durable_function.execution_name"; +pub const DURABLE_FUNCTION_FIRST_INVOCATION_KEY: &str = + "aws_lambda.durable_function.first_invocation"; const AWS_ACCOUNT_KEY: &str = "aws_account"; const RESOURCE_KEY: &str = "resource"; diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 876528d6d..30c13f2b8 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -599,15 +599,22 @@ impl TraceAgent { span.meta .get(tags::lambda::tags::DURABLE_EXECUTION_NAME_KEY), ) - && let Err(e) = invocation_processor_handle + { + let first_invocation = span + .meta + .get(tags::lambda::tags::DURABLE_FUNCTION_FIRST_INVOCATION_KEY) + .map(|v| v == "true"); + if let Err(e) = invocation_processor_handle .forward_durable_context( request_id.clone(), execution_id.clone(), execution_name.clone(), + first_invocation, ) .await - { - error!("Failed to forward durable context to processor: {e}"); + { + error!("Failed to forward durable context to processor: {e}"); + } } handle_reparenting(&mut reparenting_info, &mut span);