Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,13 +1375,15 @@ impl Processor {
request_id: &str,
execution_id: &str,
execution_name: &str,
first_invocation: Option<bool>,
) {
if let Err(e) = self
.durable_context_tx
.send(DurableContextUpdate {
request_id: request_id.to_owned(),
execution_id: execution_id.to_owned(),
execution_name: execution_name.to_owned(),
first_invocation,
})
.await
{
Expand Down
11 changes: 10 additions & 1 deletion bottlecap/src/lifecycle/invocation/processor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub enum ProcessorCommand {
request_id: String,
execution_id: String,
execution_name: String,
first_invocation: Option<bool>,
},
OnOutOfMemoryError {
timestamp: i64,
Expand Down Expand Up @@ -392,12 +393,14 @@ impl InvocationProcessorHandle {
request_id: String,
execution_id: String,
execution_name: String,
first_invocation: Option<bool>,
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
self.sender
.send(ProcessorCommand::ForwardDurableContext {
request_id,
execution_id,
execution_name,
first_invocation,
})
.await
}
Expand Down Expand Up @@ -619,9 +622,15 @@ impl InvocationProcessorService {
request_id,
execution_id,
execution_name,
first_invocation,
} => {
self.processor
.forward_durable_context(&request_id, &execution_id, &execution_name)
.forward_durable_context(
&request_id,
&execution_id,
&execution_name,
first_invocation,
)
.await;
}
ProcessorCommand::OnOutOfMemoryError { timestamp } => {
Expand Down
17 changes: 15 additions & 2 deletions bottlecap/src/logs/lambda/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ pub struct DurableContextUpdate {
pub request_id: String,
pub execution_id: String,
pub execution_name: String,
pub first_invocation: Option<bool>,
}

/// Durable execution context stored per `request_id` in `LambdaProcessor::durable_context_map`.
#[derive(Clone, Debug)]
pub struct DurableExecutionContext {
pub execution_id: String,
pub execution_name: String,
pub first_invocation: Option<bool>,
}

///
Expand Down Expand Up @@ -49,10 +51,21 @@ pub struct Message {
pub struct Lambda {
pub arn: String,
pub request_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(
rename = "durable_function.execution_id",
skip_serializing_if = "Option::is_none"
)]
pub durable_execution_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(
rename = "durable_function.execution_name",
skip_serializing_if = "Option::is_none"
)]
pub durable_execution_name: Option<String>,
#[serde(
rename = "durable_function.first_invocation",
skip_serializing_if = "Option::is_none"
)]
pub first_invocation: Option<bool>,
}

impl Message {
Expand Down
69 changes: 41 additions & 28 deletions bottlecap/src/logs/lambda/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ fn is_oom_error(error_msg: &str) -> bool {
.any(|&oom_str| error_msg.contains(oom_str))
}

/// Returns `true` for START, END, and REPORT platform logs.
/// These are the only logs that carry the `first_invocation` attribute.
fn is_platform_log(message: &str) -> bool {
message.starts_with("START RequestId:")
|| message.starts_with("END RequestId:")
|| message.starts_with("REPORT RequestId:")
}

/// Parses a Lambda durable execution ARN and returns `(execution_id, execution_name)`.
///
/// Expected format:
Expand Down Expand Up @@ -538,9 +546,10 @@ impl LambdaProcessor {
/// `request_id`.
pub fn insert_to_durable_context_map(
&mut self,
request_id: &str, // key
execution_id: &str, // value
execution_name: &str, // value
request_id: &str, // key
execution_id: &str, // value
execution_name: &str, // value
first_invocation: Option<bool>, // value
) {
if self.durable_context_map.contains_key(request_id) {
error!("LOGS | insert_to_durable_context_map: request_id={request_id} already in map");
Expand All @@ -557,6 +566,7 @@ impl LambdaProcessor {
DurableExecutionContext {
execution_id: execution_id.to_string(),
execution_name: execution_name.to_string(),
first_invocation,
},
);
self.drain_held_logs_for_request_id(request_id);
Expand Down Expand Up @@ -589,13 +599,8 @@ impl LambdaProcessor {
self.held_logs_order.retain(|r| r != request_id);
let durable_ctx = self.durable_context_map.get(request_id).cloned();
if let Some(ctx) = durable_ctx {
for mut log in held {
log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone());
log.message.lambda.durable_execution_name = Some(ctx.execution_name.clone());
if let Ok(s) = serde_json::to_string(&log) {
drop(log);
self.ready_logs.push(s);
}
for log in held {
self.set_durable_context_and_mark_ready(log, &ctx);
}
}
}
Expand All @@ -614,13 +619,8 @@ impl LambdaProcessor {
if let Some(ctx) = durable_ctx {
// If the request_id is in the durable context map, set durable execution id
// and execution name, and add logs to ready_logs.
for mut log in logs {
log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone());
log.message.lambda.durable_execution_name =
Some(ctx.execution_name.clone());
if let Ok(s) = serde_json::to_string(&log) {
self.ready_logs.push(s);
}
for log in logs {
self.set_durable_context_and_mark_ready(log, &ctx);
}
} else {
// No context yet — keep logs in held_logs until the aws.lambda span arrives.
Expand Down Expand Up @@ -648,6 +648,26 @@ impl LambdaProcessor {
}
}

/// Applies durable execution context to a log and pushes it to `ready_logs`.
/// `first_invocation` is set only for platform logs (START/END/REPORT).
fn set_durable_context_and_mark_ready(
&mut self,
mut log: IntakeLog,
ctx: &DurableExecutionContext,
) {
log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone());
log.message.lambda.durable_execution_name = Some(ctx.execution_name.clone());
if is_platform_log(&log.message.message) {
log.message.lambda.first_invocation = ctx.first_invocation;
}
if let Ok(s) = serde_json::to_string(&log) {
// explicitly drop log so we don't accidentally re-use it and push
// duplicate logs to the aggregator
drop(log);
self.ready_logs.push(s);
}
}

/// Stashes a log in `held_logs` under `request_id`, waiting for durable context.
///
/// If `held_logs` is at capacity and `request_id` is a new key, the oldest key is evicted:
Expand Down Expand Up @@ -685,7 +705,7 @@ impl LambdaProcessor {
/// - `Some(false)` → serialize and push straight to `ready_logs`.
/// - `Some(true)` → mark this log as ready to be aggregated if its `request_id` is already in `durable_context_map`
/// (context was populated by an `aws.lambda` span); otherwise stash in `held_logs`.
fn queue_log_after_rules(&mut self, mut log: IntakeLog) {
fn queue_log_after_rules(&mut self, log: IntakeLog) {
// Durable execution SDK logs already carry execution context extracted from executionArn.
if log.message.lambda.durable_execution_id.is_some() {
if let Ok(serialized_log) = serde_json::to_string(&log) {
Expand Down Expand Up @@ -730,14 +750,7 @@ impl LambdaProcessor {

match durable_ctx {
Some(ctx) => {
log.message.lambda.durable_execution_id = Some(ctx.execution_id);
log.message.lambda.durable_execution_name = Some(ctx.execution_name);
if let Ok(serialized_log) = serde_json::to_string(&log) {
// explicitly drop log so we don't accidentally re-use it and push
// duplicate logs to the aggregator
drop(log);
self.ready_logs.push(serialized_log);
}
self.set_durable_context_and_mark_ready(log, &ctx);
}
None => {
if let Some(rid) = log.message.lambda.request_id.clone() {
Expand Down Expand Up @@ -2515,11 +2528,11 @@ mod tests {
assert_eq!(batches.len(), 1);
let logs: Vec<serde_json::Value> = serde_json::from_slice(&batches[0]).unwrap();
assert_eq!(
logs[0]["message"]["lambda"]["durable_execution_id"],
logs[0]["message"]["lambda"]["durable_function.execution_id"],
"my-id"
);
assert_eq!(
logs[0]["message"]["lambda"]["durable_execution_name"],
logs[0]["message"]["lambda"]["durable_function.execution_name"],
"my-name"
);
}
Expand Down
9 changes: 8 additions & 1 deletion bottlecap/src/logs/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl LogsProcessor {
&update.request_id,
&update.execution_id,
&update.execution_name,
update.first_invocation,
);
let ready_logs = self.take_ready_logs();
if !ready_logs.is_empty()
Expand All @@ -66,10 +67,16 @@ impl LogsProcessor {
request_id: &str,
execution_id: &str,
execution_name: &str,
first_invocation: Option<bool>,
) {
match self {
LogsProcessor::Lambda(p) => {
p.insert_to_durable_context_map(request_id, execution_id, execution_name);
p.insert_to_durable_context_map(
request_id,
execution_id,
execution_name,
first_invocation,
);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/tags/lambda/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const ACCOUNT_ID_KEY: &str = "account_id";
pub const REQUEST_ID_KEY: &str = "request_id";
pub const DURABLE_EXECUTION_ID_KEY: &str = "aws_lambda.durable_function.execution_id";
pub const DURABLE_EXECUTION_NAME_KEY: &str = "aws_lambda.durable_function.execution_name";
pub const DURABLE_FUNCTION_FIRST_INVOCATION_KEY: &str =
"aws_lambda.durable_function.first_invocation";

const AWS_ACCOUNT_KEY: &str = "aws_account";
const RESOURCE_KEY: &str = "resource";
Expand Down
13 changes: 10 additions & 3 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,15 +599,22 @@ impl TraceAgent {
span.meta
.get(tags::lambda::tags::DURABLE_EXECUTION_NAME_KEY),
)
&& let Err(e) = invocation_processor_handle
{
let first_invocation = span
.meta
.get(tags::lambda::tags::DURABLE_FUNCTION_FIRST_INVOCATION_KEY)
.map(|v| v == "true");
if let Err(e) = invocation_processor_handle
.forward_durable_context(
request_id.clone(),
execution_id.clone(),
execution_name.clone(),
first_invocation,
)
.await
{
error!("Failed to forward durable context to processor: {e}");
{
error!("Failed to forward durable context to processor: {e}");
}
}

handle_reparenting(&mut reparenting_info, &mut span);
Expand Down
Loading