Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions datadog_lambda/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ def patch_all():

if config.trace_enabled:
patch_all_dd()
# AIDEV-NOTE: Until the aws_durable_execution_sdk_python integration
# ships in a stable ddtrace release, this branch wires it up directly.
# ddtrace.patch(aws_durable_execution_sdk_python=True) becomes a no-op
# against PyPI ddtrace because _monkey.py doesn't know the name yet.
# Remove once the integration is GA.
try:
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import (
patch as _patch_ade,
)

_patch_ade()
logger.debug("aws_durable_execution_sdk_python integration patched")
except Exception as e:
logger.debug("Failed to patch aws_durable_execution_sdk_python: %s", e)
else:
_patch_http()
_ensure_patch_requests()
Expand Down
232 changes: 224 additions & 8 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,207 @@ def get_injected_authorizer_data(event, is_http_api) -> dict:
logger.debug("Failed to check if invocated by an authorizer. error %s", e)


def is_durable_execution_replay(event):
"""
Check if this Lambda invocation is a durable execution replay.

A replay occurs when there are existing operations in InitialExecutionState,
meaning this invocation is resuming from a previous checkpoint rather than
starting fresh.

For replay invocations, we should skip creating inferred spans because:
- The trace context is being continued from the checkpoint
- Creating an inferred span would create a duplicate

Returns:
True if this is a replay invocation (should skip inferred span)
False if this is first invocation or not a durable execution
"""
if not isinstance(event, dict):
return False

if "DurableExecutionArn" not in event:
return False

initial_state = event.get("InitialExecutionState", {})
operations = initial_state.get("Operations", [])

# The SDK always includes the EXECUTION operation itself (1 operation on first invocation).
# A replay has >1 operations (the EXECUTION + previously completed operations).
# This aligns with the SDK's ReplayStatus logic in execution.py.
return len(operations) > 1


_TRACE_CHECKPOINT_PREFIX = "_datadog_"


def _extract_from_datadog_checkpoint(operations):
"""Priority 1: highest-numbered ``_datadog_{N}`` STEP operation.

Returns a Context, or None if no usable checkpoint is present.
"""
candidates = []
for operation in operations:
op_name = operation.get("Name")
if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX):
continue
suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX) :]
try:
number = int(suffix)
except ValueError:
continue
candidates.append((number, operation))

if not candidates:
return None

candidates.sort(key=lambda t: t[0])
_, operation = candidates[-1]

payload_str = (operation.get("StepDetails") or {}).get("Result")
if not payload_str:
return None

try:
payload = json.loads(payload_str)
except (ValueError, TypeError):
return None

if not isinstance(payload, dict):
return None

context = propagator.extract(payload)
if context and context.trace_id:
return context
return None


def _extract_from_input_payload(operations):
"""Priority 2: Datadog headers in the original event's InputPayload.

The first operation in ``InitialExecutionState.Operations`` is the EXECUTION
operation; its ``ExecutionDetails.InputPayload`` is the original event the
durable function was invoked with — immutable across replays. If the caller
embedded Datadog headers (typical for API Gateway, direct invoke from
another instrumented service, or a chained durable invoke), use them.
"""
if not operations:
return None

first_op = operations[0]
payload_str = (first_op.get("ExecutionDetails") or {}).get("InputPayload")
if not payload_str:
return None

try:
payload = json.loads(payload_str)
except (ValueError, TypeError):
return None

if not isinstance(payload, dict):
return None

# Try the wrapping conventions used by other instrumented sources.
for carrier in (
payload.get("_datadog"),
payload.get("headers"),
payload,
):
if not isinstance(carrier, dict):
continue
context = propagator.extract(carrier)
if context and context.trace_id:
return context
return None


def extract_context_from_durable_execution(event, lambda_context):
"""
Extract Datadog trace context from AWS Lambda Durable Execution event.

Two-tier priority:

1. Highest-numbered ``_datadog_{N}`` STEP checkpoint (set by the
``aws_durable_execution_sdk_python`` integration on a prior invocation).
2. Datadog headers found in the original event's ``InputPayload`` —
carries upstream context when an instrumented service invoked us.

If neither yields a context, returns ``None`` and the caller falls through
to the rest of the extraction chain (Lambda context, X-Ray, etc.). On the
very first invocation of a durable execution with no upstream context, the
tracer will simply mint a fresh trace; subsequent invocations recover that
same trace via the priority-1 checkpoint.
"""
try:
if not isinstance(event, dict):
return None

if "DurableExecutionArn" not in event or "InitialExecutionState" not in event:
return None

operations = event.get("InitialExecutionState", {}).get("Operations", [])

ctx = _extract_from_datadog_checkpoint(operations)
if ctx is not None:
return ctx

return _extract_from_input_payload(operations)

except Exception as e:
logger.debug("Failed to extract trace context from durable execution: %s", e)
return None


def create_durable_execution_root_span(event):
"""
Create the durable execution root span on the FIRST invocation only.

- First invocation (no prior operations): creates the root span and returns it.
The span gets a fresh random ``span_id`` from the tracer; dd-trace-py's
checkpoint writer reads that id back via the live span tree (grandparent
walk) and persists it so subsequent invocations parent off the same root.
- Replay invocations: returns ``None`` — trace context is restored from the
``_datadog_{N}`` checkpoint by ``extract_context_from_durable_execution``.

Returns the root span (caller must call ``span.finish()`` when the invocation
ends), or ``None`` if not a durable execution or if this is a replay.
"""
try:
if not isinstance(event, dict):
return None

execution_arn = event.get("DurableExecutionArn")
has_initial_state = "InitialExecutionState" in event
if not execution_arn or not has_initial_state:
return None

if is_durable_execution_replay(event):
return None

service_name = (
os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution"
)
resource = (
execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn
)

span = tracer.trace(
"aws.durable-execution",
service=service_name,
resource=resource,
span_type="serverless",
)
if span is None:
return None

span.set_tag("durable.execution_arn", execution_arn)
return span

except Exception as e:
logger.debug("Failed to create durable execution root span: %s", e)
return None


def extract_dd_trace_context(
event, lambda_context, extractor=None, decode_authorizer_context: bool = True
):
Expand All @@ -634,6 +835,18 @@ def extract_dd_trace_context(
trace_context_source = None
event_source = parse_event_source(event)

# Check for AWS Lambda Durable Execution events first (before other checks)
# This ensures trace context is properly continued across durable invocations
durable_context = extract_context_from_durable_execution(event, lambda_context)
if _is_context_complete(durable_context):
logger.debug("Extracted Datadog trace context from durable execution")
dd_trace_context = durable_context
trace_context_source = TraceContextSource.EVENT
logger.debug(
"extracted dd trace context from durable execution: %s", dd_trace_context
)
return dd_trace_context, trace_context_source, event_source

if extractor is not None:
context = extract_context_custom_extractor(extractor, event, lambda_context)
elif isinstance(event, (set, dict)) and "request" in event:
Expand Down Expand Up @@ -977,12 +1190,16 @@ def process_injected_data(event, request_time_epoch_ms, args, tags):
start_time_ns = int(
injected_authorizer_data.get(Headers.Parent_Span_Finish_Time)
)
integration_latency = int(
event["requestContext"]["authorizer"].get("integrationLatency", 0)
)
finish_time_ns = max(
start_time_ns, (request_time_epoch_ms + integration_latency) * 1e6
)
finish_time_ns = (
request_time_epoch_ms
+ (
int(
event["requestContext"]["authorizer"].get(
"integrationLatency", 0
)
)
)
) * 1e6
upstream_authorizer_span = insert_upstream_authorizer_span(
args, tags, start_time_ns, finish_time_ns
)
Expand Down Expand Up @@ -1445,9 +1662,9 @@ def create_function_execution_span(
trace_context_source,
merge_xray_traces,
trigger_tags,
durable_function_tags=None,
parent_span=None,
span_pointers=None,
durable_function_tags=None,
):
tags = None
if context:
Expand All @@ -1456,7 +1673,6 @@ def create_function_execution_span(
function_arn = ":".join(tk[0:7]) if len(tk) > 7 else function_arn
function_version = tk[7] if len(tk) > 7 else "$LATEST"
tags = {
"span.kind": "server",
"cold_start": str(is_cold_start).lower(),
"function_arn": function_arn,
"function_version": function_version,
Expand Down
Loading
Loading