-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathdurable.py
More file actions
63 lines (51 loc) · 2.05 KB
/
durable.py
File metadata and controls
63 lines (51 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.
import logging
import re
logger = logging.getLogger(__name__)
def _parse_durable_execution_arn(arn):
"""
Parses a DurableExecutionArn to extract execution name and ID.
ARN format:
arn:aws:lambda:{region}:{account}:function:{func}:{version}/durable-execution/{name}/{id}
Returns (execution_name, execution_id) or None if parsing fails.
"""
match = re.search(r"/durable-execution/([^/]+)/([^/]+)$", arn)
if not match:
return None
execution_name, execution_id = match.group(1), match.group(2)
if not execution_name or not execution_id:
return None
return execution_name, execution_id
def extract_durable_function_tags(event):
"""
Extracts durable function tags from the Lambda event payload.
Returns a dict with durable function tags, or an empty dict if the event
is not a durable function invocation.
"""
if not isinstance(event, dict):
return {}
durable_execution_arn = event.get("DurableExecutionArn")
if not isinstance(durable_execution_arn, str):
return {}
parsed = _parse_durable_execution_arn(durable_execution_arn)
if not parsed:
logger.error("Failed to parse DurableExecutionArn: %s", durable_execution_arn)
return {}
execution_name, execution_id = parsed
return {
"aws_lambda.durable_function.execution_name": execution_name,
"aws_lambda.durable_function.execution_id": execution_id,
}
VALID_DURABLE_STATUSES = {"SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT"}
def extract_durable_execution_status(response, event):
if not isinstance(event, dict) or "DurableExecutionArn" not in event:
return None
if not isinstance(response, dict):
return None
status = response.get("Status")
if status not in VALID_DURABLE_STATUSES:
return None
return status