Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
2f4d4fe
ci retry
pablomartinezbernardo May 5, 2026
abd524e
feat(aws_durable_execution): persist trace context across suspend/resume
joeyzhao2018 Apr 30, 2026
f0388f1
replace the root span with the anchor span
joeyzhao2018 May 6, 2026
8ef5d5b
small comment update
joeyzhao2018 May 8, 2026
b52e654
style fixes
joeyzhao2018 May 8, 2026
1eb7ec2
resolve style checks
joeyzhao2018 May 8, 2026
bfcea7c
use json instead
joeyzhao2018 May 11, 2026
1e9d9af
simplification
joeyzhao2018 May 11, 2026
d1d16dc
simplify AI generated code
joeyzhao2018 May 11, 2026
cfa5832
formatting
joeyzhao2018 May 11, 2026
082fb8d
remove useless variables
joeyzhao2018 May 11, 2026
66074eb
fix tests
joeyzhao2018 May 11, 2026
8a59dcd
remove an irrelavant change
joeyzhao2018 May 11, 2026
7eef743
tracecontext diff logic clean up
joeyzhao2018 May 11, 2026
5b22143
remove another unnecessary cache
joeyzhao2018 May 11, 2026
d283ef5
unpdate comments and reformt
joeyzhao2018 May 11, 2026
cfc0070
normalize w3c parent id
joeyzhao2018 May 13, 2026
eba598d
mark the extra checkpoint steps as visited
joeyzhao2018 May 13, 2026
9a8c244
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 13, 2026
1427e96
change to sync
joeyzhao2018 May 13, 2026
89e7ca6
add one integration test to make sure the checkpoint survives
joeyzhao2018 May 13, 2026
fa1de48
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 13, 2026
30c47b4
typing fixes
joeyzhao2018 May 13, 2026
e6b737b
simplify it further
joeyzhao2018 May 13, 2026
17bb8f3
format
joeyzhao2018 May 13, 2026
ff9ca4a
add DD_DURABLE_CROSS_INVOCATION_TRACING_ENABLED config
joeyzhao2018 May 14, 2026
f9846a9
written by datadog only; read by datadog only; so use datadog style only
joeyzhao2018 May 14, 2026
76769d0
small refactor
joeyzhao2018 May 15, 2026
b97ea32
add instrumentation telemetry
joeyzhao2018 May 15, 2026
216a133
add a snitch test to keep an eye on the _parent_id behavior of the sdk
joeyzhao2018 May 15, 2026
49ca382
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 15, 2026
593188b
nosec
joeyzhao2018 May 15, 2026
dc28e47
adding extra info to locate exceptions
joeyzhao2018 May 15, 2026
590a5d3
update tests
joeyzhao2018 May 15, 2026
821f9d4
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 15, 2026
faf0c2e
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 20, 2026
35fe7d5
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 21, 2026
53c99b5
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 21, 2026
19492a6
add more useful information in debug logs
joeyzhao2018 May 22, 2026
a7908fe
@durable_execution sdk contracts gurantess position 1 is always a Dur…
joeyzhao2018 May 24, 2026
c2450e8
simplify the comments
joeyzhao2018 May 24, 2026
fad8927
we should avoid using the theading library directly, this can cause s…
joeyzhao2018 May 25, 2026
88784f9
further simplify
joeyzhao2018 May 25, 2026
7b4e63b
use the constant HTTP_HEADER_PARENT_ID
joeyzhao2018 May 25, 2026
0c948a7
update release notes
joeyzhao2018 May 25, 2026
3e59a47
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 25, 2026
43365c2
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 26, 2026
8eefe97
types
joeyzhao2018 May 26, 2026
130bfad
Merge branch 'main' into joey/cross-invocation-tracecontext-propagation
joeyzhao2018 May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
from ddtrace.contrib._events.aws_durable import AwsDurableExecuteEvent
from ddtrace.contrib._events.aws_durable import AwsDurableInvokeEvent
from ddtrace.contrib._events.aws_durable import AwsDurableOperationEvent
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.trace_checkpoint import (
mark_trace_context_checkpoints_visited,
)
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.trace_checkpoint import (
maybe_save_trace_context_checkpoint,
)
from ddtrace.contrib.trace_utils import unwrap
from ddtrace.contrib.trace_utils import wrap
from ddtrace.internal import core
Expand Down Expand Up @@ -102,6 +108,15 @@ def traced_user_func(*inner_args, **inner_kwargs):
durable_context = get_argument_value(inner_args, inner_kwargs, 1, "durable_context", optional=True)
arn, is_replay = _read_execution_state(durable_context)

# Pre-visit our prior ``_datadog_*`` checkpoints so the SDK's
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated
# replay tracker can transition out of REPLAY when the user code
# catches up. Without this, our synthetic ops sit in
# state.operations as completed-but-unvisited and the SDK never
# un-suppresses context.logger.
state = getattr(durable_context, "state", None)
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated
if state is not None:
mark_trace_context_checkpoints_visited(state)

event = AwsDurableExecuteEvent(
component=config.aws_durable_execution_sdk_python.integration_name,
integration_config=config.aws_durable_execution_sdk_python,
Expand All @@ -117,6 +132,10 @@ def traced_user_func(*inner_args, **inner_kwargs):
# Dispatch without exc_info so __exit__ skips auto-dispatch
# and the span is not tagged with the exception.
ctx.event.suspended = True
# Workflow is pausing; another invocation will resume it. This
# is the only branch where it's worth persisting trace context.
if ctx.span is not None:
maybe_save_trace_context_checkpoint(durable_context, ctx.span)
Comment thread
joeyzhao2018 marked this conversation as resolved.
ctx.dispatch_ended_event()
raise

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
"""Persist Datadog trace context as a STEP operation in the durable execution log.

When a durable handler suspends (``SuspendExecution``) the integration appends
a synthetic STEP operation named ``_datadog_{N}`` whose payload is a JSON dict
of the propagation headers for the current trace context. On the next
invocation, ``datadog-lambda-python`` reads the highest ``N`` out of
``InitialExecutionState.Operations`` and re-activates the trace.

The save runs only on the suspend path: a workflow that returns or fails for
good has no further invocations to read the checkpoint, so writing one would
be wasted work and would be rejected by the SDK's terminating checkpointer.
The save is also a no-op when the new headers match the most recent prior
checkpoint (after stripping the per-span ``x-datadog-parent-id`` and the
``dd=p:`` entry of ``tracestate``) — every replay would otherwise rewrite
identical context and pile up redundant operations.

AIDEV-NOTE: ``_datadog_*`` is a reserved step name. Users must not create
steps with this prefix; the SDK does not enforce this — we rely on it being
unusual enough not to collide.
"""

from __future__ import annotations

import hashlib
import json
import re
import threading
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should should avoid using the theading library directly, this can cause stability issues in forked enviornments. We should use this module instead: https://github.com/DataDog/dd-trace-py/blob/main/ddtrace/internal/threads.py

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — swapped to ddtrace.internal.threads.Lock.

Quick follow-up: is this something most Python devs would catch on instinct, or is it pretty specific to ddtrace's constraints (gevent monkey-patching, forked workers)? If it's the latter, would it be worth a lint rule or CI grep that flags import threading / threading.Lock in new code under ddtrace/? Let me know and I can open a PR for that.

from typing import TYPE_CHECKING
from typing import Optional

from aws_durable_execution_sdk_python.identifier import OperationIdentifier
from aws_durable_execution_sdk_python.lambda_service import OperationUpdate

from ddtrace.internal.logger import get_logger
from ddtrace.propagation.http import HTTPPropagator


if TYPE_CHECKING:
from aws_durable_execution_sdk_python.context import DurableContext
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated

from ddtrace._trace.span import Span


log = get_logger(__name__)


_CHECKPOINT_NAME_PREFIX = "_datadog_"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to deal with size limits for checkpoint names or headers? Woud using _dd be more efficent?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can potentially benefit from this bit explicitness here. One reason is that these operation names will be directly viewed by the customers. Not just visible. They will for sure look at those checkpoints. I can even picture customers go look for trace-ids from these checkpoints and directly jump to our traces. Another reason is that _dd actually has a higher chance of collision. I actually once looked into a support case where the customer services also had a ton of stuff that starts with dd* something.

As for the size limits, I put more details in my design doc. But in short, I think our stuff's size is negligible.

_STATE_NEXT_N_ATTR = "_dd_next_checkpoint_n"
# Module-global lock serializing checkpoint-N allocation across all states.
# ExecutionState is shared across worker threads (parallel/map), so two threads
# can race on the counter. The critical section is microseconds and the SDK
# already serializes per-state work in practice, so a single lock is simpler
# than per-state locks and not a measurable bottleneck.
_COUNTER_LOCK = threading.Lock()
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated

# `traceparent` format: <version>-<trace_id>-<parent_id>-<flags>
_TRACEPARENT_RE = re.compile(r"^([0-9a-f]{2})-([0-9a-f]{32})-([0-9a-f]{16})-([0-9a-f]{2})$")


def _strip_parent_from_traceparent(value: str) -> str:
"""Zero out the ``parent_id`` segment of a W3C ``traceparent``.

``traceparent`` is ``<version>-<trace_id>-<parent_id>-<flags>``; the
parent-id rotates per span and would otherwise dominate the diff. We
replace it with all-zeros so the comparison only catches *real* trace
context changes (trace_id, flags). The normalized form is used only as a
diff key and is never persisted.
"""
m = _TRACEPARENT_RE.match(value)
if m is None:
return value
return f"{m.group(1)}-{m.group(2)}-{'0' * 16}-{m.group(4)}"


def _strip_dd_parent_from_tracestate(value: str) -> str:
"""Drop the ``p:`` entry from the ``dd=`` vendor section to avoid creating spurious diffs.
``p:`` carries the per-span parent id and rotates every span. Other vendors' segments pass
through untouched.
"""
out_segments = []
for seg in (s.strip() for s in value.split(",")):
if not seg:
continue
if not seg.startswith("dd="):
out_segments.append(seg)
continue
kvs = seg[len("dd=") :]
kept = [kv for kv in (k.strip() for k in kvs.split(";")) if kv and not kv.startswith("p:")]
if kept:
out_segments.append("dd=" + ";".join(kept))
# If only `p:` was present, drop the whole `dd=` segment.
return ",".join(out_segments)


def _stable_headers(headers: dict) -> dict:
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated
"""Strip per-span volatile fields so the diff is meaningful.

Three values rotate per span and must be normalized away or every
invocation would look like a real context change:
``x-datadog-parent-id``, the ``dd=p:`` segment of ``tracestate``, and the
``parent_id`` segment of W3C ``traceparent``.
"""
out = {}
for k, v in headers.items():
kl = k.lower()
if kl == "x-datadog-parent-id":
continue
if kl == "tracestate" and isinstance(v, str):
normalized = _strip_dd_parent_from_tracestate(v)
if normalized:
out[k] = normalized
continue
if kl == "traceparent" and isinstance(v, str):
out[k] = _strip_parent_from_traceparent(v)
continue
out[k] = v
return out


def _max_existing_checkpoint_n(state) -> int:
"""The highest ``N`` already present in ``state.operations``, or -1 if none."""
operations = getattr(state, "operations", None) or {}
highest = -1
for op in operations.values():
name = getattr(op, "name", None)
if not name or not name.startswith(_CHECKPOINT_NAME_PREFIX):
continue
suffix = name[len(_CHECKPOINT_NAME_PREFIX) :]
try:
n = int(suffix)
except ValueError:
continue
if n > highest:
highest = n
return highest


def mark_trace_context_checkpoints_visited(state) -> None:
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated
"""Tell the SDK our ``_datadog_*`` ops are already visited.

The SDK's ``track_replay`` transitions REPLAY → NEW only when every
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated
completed op in ``state.operations`` is in ``_visited_operations``.
Our checkpoints are completed STEP ops that user code never visits,
so without this they would keep the SDK in REPLAY for the rest of the
invocation — which silently suppresses ``context.logger`` output and
anything else gated on ``is_replaying()``.

Mirrors what would happen if the customer had written these steps:
``DurableContext.*`` methods all call ``state.track_replay(op_id)``
before doing anything else; we do the same for our ops at invocation
start. ``track_replay`` itself acquires ``_replay_status_lock`` and
runs the subset check, so we don't need to touch SDK internals.
"""
operations = getattr(state, "operations", None) or {}
track_replay = getattr(state, "track_replay", None)
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated
if track_replay is None:
return
for op_id, op in list(operations.items()):
name = getattr(op, "name", None)
if isinstance(name, str) and name.startswith(_CHECKPOINT_NAME_PREFIX):
try:
track_replay(op_id)
except Exception:
log.debug("track_replay failed for %s", op_id, exc_info=True)


def _allocate_checkpoint_n(state) -> int:
"""Atomically reserve the next ``N`` for a checkpoint write.

The SDK batches checkpoints for up to ~1s before sending; if two threads
both reuse the same ``N`` the resulting blake2b operation_ids collide and
AWS rejects the batch with "Cannot update the same operation twice in a
single request". Counting only ``state.operations`` would also collide,
because AWS doesn't update that until the batch lands. So we keep a
process-local counter on the state itself.
"""
with _COUNTER_LOCK:
n = getattr(state, _STATE_NEXT_N_ATTR, None)
if n is None:
n = _max_existing_checkpoint_n(state) + 1
try:
setattr(state, _STATE_NEXT_N_ATTR, n + 1)
except Exception:
log.debug("Could not advance checkpoint counter", exc_info=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing a bunch of debug logs on fatal exceptions, should they be debug considering that level will not show by default? I understand the fear of spamming warn lines, but I thing I would find it even more confusing for a feature to completely fail and no error/warning log showing at all.

This comment is in this specific line, but it applies to a bunch of the debug logs.

Copy link
Copy Markdown
Contributor Author

@joeyzhao2018 joeyzhao2018 May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow me to push back a bit on this one. There are a few main reasons I simply cannot ignore.

  1. After all, we're not customer code. I don't want the WARN/ERROR from us lands in their application logs and pages their oncall for things they can't act on. The unwritten contract for a library running in someone else's runtime is to stay quiet unless something the customer can act on is broken.
  2. Graceful degradation, not feature loss. If these paths fail the worst outcome is a Datadog observability feature degrades — a checkpoint doesn't get pre-marked, a tag doesn't get set. Customer workflow correctness is untouched. WARN/ERROR for "an observability library lost some observability" is louder than the impact warrants.
  3. The "no signal at all" concern has a better mitigation than log level: telemetry. If we want Datadog to see when an integration degrades, that's what integration-error telemetry is for — visible to us, not piped into customer logs.
    • But, we haven't turned on instrumentation telemetry for serverless yet
      • But but, we are working on it and @emmettbutler actually have a few PRs for that. And we also need that to see how customers use the integration. So I added them here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to this, can we make the debug logs a bit more actionable. Just from the message it's difficult to debug and dive deeper into root causes. We should include some info about the state and in this case why we could not advance (this could be captured in exc_info but that might not be the case everywhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with some details to help debugging.
In general, we are trying to use

  • the metric tags of integration telemertry carry the dimensions we want to aggregate on
  • the logs carry the runtime values needed to reproduce locally

return -1
return n


def _step_id(name: str, execution_arn: str) -> str:
"""Deterministic blake2b-based step id so re-runs don't duplicate."""
digest = hashlib.blake2b(f"{name}:{execution_arn}".encode("utf-8"), digest_size=16).hexdigest()
return digest


def _read_prior_checkpoint_payload(state) -> Optional[dict]:
"""Parsed headers dict from the highest-N ``_datadog_*`` operation, or ``None``.

On replay invocations, ``state.operations`` already contains the
checkpoints written by previous invocations. We use the highest-numbered
one for two purposes:

- **Parent id reuse** — its ``x-datadog-parent-id`` is the value to stamp
into the new save (so all checkpoints across all invocations parent off
the same execute-span anchor; see ``_resolve_override_parent_id``).
- **Diff suppression** — comparing its stable headers to ours tells us
whether the trace context actually changed since the last save.
"""
operations = getattr(state, "operations", None) or {}
best_op = None
best_n = -1
for op in operations.values():
name = getattr(op, "name", None)
if not name or not name.startswith(_CHECKPOINT_NAME_PREFIX):
continue
suffix = name[len(_CHECKPOINT_NAME_PREFIX) :]
try:
n = int(suffix)
except ValueError:
continue
if n > best_n:
best_n = n
best_op = op
if best_op is None:
return None
step_details = getattr(best_op, "step_details", None)
payload_str = getattr(step_details, "result", None) if step_details is not None else None
if not payload_str:
return None
try:
payload = json.loads(payload_str)
except Exception:
return None
return payload if isinstance(payload, dict) else None


def _resolve_override_parent_id(span, prior_payload: Optional[dict]) -> Optional[str]:
"""Resolve the parent id to stamp into the saved checkpoint.

1. **Prior checkpoint** — if any ``_datadog_*`` already exists on the
state (i.e. this is a replay invocation), reuse its saved parent id
verbatim. Across all invocations of the same durable execution this
stays stable.
2. **Current execute span** — on the very first save, anchor to the
current ``aws.durable.execute`` span id.
"""
if prior_payload is not None:
pid = prior_payload.get("x-datadog-parent-id")
if pid is not None:
return str(pid)
anchor_span_id = getattr(span, "span_id", None)
return str(anchor_span_id) if anchor_span_id is not None else None


def _override_parent_id(headers: dict, parent_id: str) -> None:
"""Stamp ``parent_id`` into the Datadog and W3C parent-id fields.

``HTTPPropagator.inject`` writes the *current* span id; we replace it
with the resolved anchor id so all replays parent off the same span.
``tracestate``'s ``dd=`` segment also carries a parent id, but rewriting
it would mean re-encoding the vendor section — the Datadog extractor on
the other end uses ``x-datadog-parent-id`` as the source of truth, so we
leave ``tracestate`` alone (and the diff layer drops it anyway).
"""
if "x-datadog-parent-id" in headers:
headers["x-datadog-parent-id"] = parent_id
tp = headers.get("traceparent")
if isinstance(tp, str):
m = _TRACEPARENT_RE.match(tp)
if m is not None:
try:
new_span = format(int(parent_id), "016x")
except ValueError:
return
headers["traceparent"] = f"{m.group(1)}-{m.group(2)}-{new_span}-{m.group(4)}"


def maybe_save_trace_context_checkpoint(durable_context: "DurableContext", span: "Span") -> None:
"""Append a ``_datadog_{N}`` STEP if propagation headers changed.

Called once per invocation, on the ``SuspendExecution`` path of the
durable-execution wrapper. No-op when the propagation headers match the
most recent existing ``_datadog_*`` operation in ``state.operations``
(after stripping per-span volatile fields), so identical context across
replays does not pile up redundant checkpoints.

Failure here must never break the workflow; all errors are swallowed.
"""
try:
state = getattr(durable_context, "state", None)
if state is None:
return
execution_arn = getattr(state, "durable_execution_arn", None)
if not execution_arn:
return

headers: dict = {}
try:
HTTPPropagator.inject(span, headers)
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated
except Exception:
log.debug("HTTPPropagator.inject failed", exc_info=True)
return
if not headers:
return

prior_payload = _read_prior_checkpoint_payload(state)

# ``_stable_headers`` strips ``x-datadog-parent-id`` and the
# ``dd=p:`` segment, so the override does not affect the diff —
# defer it until we know we're actually saving. Suppress when prior
# matches: trace context hasn't changed.
stable = _stable_headers(headers)
if prior_payload is not None and _stable_headers(prior_payload) == stable:
Comment thread
joeyzhao2018 marked this conversation as resolved.
return

# Stamp the anchor parent id so every checkpoint across replays
# references the same execute span. Without this, each invocation
# would inject its own current span id and fragment the trace tree.
override_pid = _resolve_override_parent_id(span, prior_payload)
if override_pid is not None:
_override_parent_id(headers, override_pid)

# Allocate ``N`` only after the diff so no-op calls don't burn numbers.
n = _allocate_checkpoint_n(state)
if n < 0:
return
name = f"{_CHECKPOINT_NAME_PREFIX}{n}"
operation_id = _step_id(name, execution_arn)

# AWS validates parent_id against real operations in the execution,
# so we mirror what the SDK does for the user's own steps: parent off
# the current durable context's parent (None at the top level, which
# AWS accepts as "child of the EXECUTION"). Using a Datadog span id
# here would fail with InvalidParameterValueException.
parent_id: Optional[str] = getattr(durable_context, "_parent_id", None)
Comment thread
joeyzhao2018 marked this conversation as resolved.

identifier = OperationIdentifier(operation_id=operation_id, parent_id=parent_id, name=name)
payload = json.dumps(headers, separators=(",", ":"))
update = OperationUpdate.create_step_succeed(identifier, payload)
Comment thread
pablomartinezbernardo marked this conversation as resolved.

# Use Sync to avoid being abandoned: the SDK drops unflushed async
# checkpoints on suspend. Cost is negligible — it's suspending anyway.
try:
state.create_checkpoint(update, is_sync=True)
except Exception:
log.debug("Failed to write trace-context checkpoint", exc_info=True)
except Exception:
log.debug("maybe_save_trace_context_checkpoint failed", exc_info=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
features:
- |
aws_durable_execution_sdk_python: Propagate trace context across durable execution
Comment thread
joeyzhao2018 marked this conversation as resolved.
Outdated
suspend/resume cycles. When a workflow suspends (``SuspendExecution``), the
integration appends a single ``_datadog_{N}`` STEP operation containing the
current trace's propagation headers; on the next invocation the headers are
read back so the trace continues across replays. No checkpoint is written for
workflows that complete or fail terminally.
Loading
Loading