Skip to content
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
'use strict'

const TracingPlugin = require('../../dd-trace/src/plugins/tracing')
const { getEnvironmentVariable } = require('../../dd-trace/src/config/helper')
const { isFalse } = require('../../dd-trace/src/util')
const { saveTraceContextCheckpointIfUpdated } = require('./trace-checkpoint')

// Termination reasons that indicate the execution is suspending rather than exiting permanently.
// Sourced from (`@aws/durable-execution-sdk-js`'s termination-manager/types.ts).
const PENDING_TERMINATION_REASONS = new Set([
'OPERATION_TERMINATED',
'RETRY_SCHEDULED',
'RETRY_INTERRUPTED_STEP',
'WAIT_SCHEDULED',
'CALLBACK_PENDING',
'CUSTOM',
])

const DEFAULT_TERMINATION_REASON = 'OPERATION_TERMINATED'

// Default on; users opt out by setting to false.
function isCrossInvocationTracingEnabled () {
return !isFalse(getEnvironmentVariable('DD_DURABLE_CROSS_INVOCATION_TRACING_ENABLED'))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use !this._tracerConfig.DD_DURABLE_CROSS_INVOCATION_TRACING_ENABLED. The config should be available in the plugin, see comment below

}

class AwsDurableExecutionSdkJsHandlerPlugin extends TracingPlugin {
static id = 'aws-durable-execution-sdk-js'
Expand Down Expand Up @@ -28,9 +49,56 @@ class AwsDurableExecutionSdkJsHandlerPlugin extends TracingPlugin {
meta,
}, ctx)

this._installTerminationCheckpointHook(ctx, event)

return ctx.currentStore
}

// Wrap the user handler so we can capture the SDK's DurableContext, and
// install a hook on the termination manager so that when the execution
// suspends (PENDING) we persist the current trace context as a `_datadog`
// checkpoint, which subsequent invocations consume to extract the parent
// trace context.
_installTerminationCheckpointHook (ctx, event) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a private property or a helper method outside of the class to prevent adding the underscored method which is accessible from the outside

if (!isCrossInvocationTracingEnabled()) return
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!isCrossInvocationTracingEnabled()) return
if (!this._tracerConfig.DD_DURABLE_CROSS_INVOCATION_TRACING_ENABLED) return


const args = ctx.arguments || []
if (args.length < 6 || typeof args[5] !== 'function') return

const executionContext = args[2]
const terminationManager = executionContext?.terminationManager
if (!terminationManager || typeof terminationManager.terminate !== 'function') return
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!terminationManager || typeof terminationManager.terminate !== 'function') return
if (typeof terminationManager?.terminate !== 'function') return


const span = ctx.currentStore?.span
if (!span) return

const state = {
durableContext: undefined,
firstExecutionSpanId: span.context?.()?.toSpanId?.(),
invocationEvent: event,
savePromise: null,
saved: false,
span,
tracer: this._tracer,
}

const originalHandler = args[5]
args[5] = function (...handlerArgs) {
state.durableContext = handlerArgs[1]
return originalHandler.apply(this, handlerArgs)
}

const originalTerminate = terminationManager.terminate
terminationManager.terminate = function (...terminateArgs) {
const reason = terminateArgs[0]?.reason ?? DEFAULT_TERMINATION_REASON
if (PENDING_TERMINATION_REASONS.has(reason)) {
// Must enqueue checkpoint updates before the checkpoint manager flips to terminating.
void maybeSaveCheckpoint(state)
}
return originalTerminate.apply(this, terminateArgs)
}
}

asyncEnd (ctx) {
const span = ctx?.currentStore?.span
const status = ctx?.result?.Status
Expand Down Expand Up @@ -58,4 +126,24 @@ function finishOpenChildSpans (executeSpan) {
}
}

function maybeSaveCheckpoint (state) {
if (!state || state.saved || state.savePromise) return state.savePromise
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state must be truthy as it seems, otherwise state.savePromise would fail (if it is not false or 0).

Suggested change
if (!state || state.saved || state.savePromise) return state.savePromise
if (state.saved || state.savePromise) return state.savePromise

if (!state.tracer || !state.span || !state.durableContext) return null

state.savePromise = saveTraceContextCheckpointIfUpdated(
state.tracer,
state.span,
state.durableContext,
state.firstExecutionSpanId,
state.invocationEvent,
).catch(() => {
// Best-effort — never break customer workloads.
}).finally(() => {
state.saved = true
state.savePromise = null
})

return state.savePromise
}

module.exports = AwsDurableExecutionSdkJsHandlerPlugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
'use strict'

const crypto = require('crypto')
const log = require('../../dd-trace/src/log')
const TextMapPropagator = require('../../dd-trace/src/opentracing/propagation/text_map')

const CHECKPOINT_NAME_PREFIX = '_datadog_'

// Per-tracer-config cache for a propagator that injects only Datadog-style
// headers (`x-datadog-*`) regardless of the user's `DD_TRACE_PROPAGATION_STYLE_INJECT`.
// Checkpoints are written and read entirely by Datadog code, so honoring user
// style preferences would only complicate the payload contract.
const datadogOnlyPropagatorCache = new WeakMap()

function getDatadogOnlyPropagator (tracer) {
const config = tracer?._tracer?._config ?? tracer?._config
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the tracer ever not be defined? Similar with the properties that are accessed from it here.

I believe we also only need one of the two

if (!config) return null
const cached = datadogOnlyPropagatorCache.get(config)
if (cached) return cached
// Shadow `tracePropagationStyle.inject` while inheriting every other field
// (x-datadog-tags length cap, etc.) from the live config. Force-disable
// `legacyBaggageEnabled` too: it injects `ot-baggage-*` headers independently
// of the inject style, which would leak baggage into the checkpoint payload.
const shadowConfig = Object.create(config)
shadowConfig.tracePropagationStyle = {
...config.tracePropagationStyle,
inject: ['datadog'],
}
Comment thread
joeyzhao2018 marked this conversation as resolved.
shadowConfig.legacyBaggageEnabled = false
const propagator = new TextMapPropagator(shadowConfig)
datadogOnlyPropagatorCache.set(config, propagator)
return propagator
}

/**
* Build the Datadog-format headers dict from a span context.
* Mirrors ddtrace-py HTTPPropagator.inject output so the same payload
* can be consumed by either language's datadog-lambda wrapper.
* @param {object} tracer
* @param {object} span
* @returns {Record<string, string>}
*/
function injectHeaders (tracer, span) {
const headers = {}
try {
const propagator = getDatadogOnlyPropagator(tracer)
if (propagator) {
const ctx = typeof span?.context === 'function' ? span.context() : span
propagator.inject(ctx, headers)
} else {
// Test environments pass a tracer mock without `_config`. Fall back to
// its own `inject` so unit tests can assert on the shape they control.
tracer.inject?.(span, 'http_headers', headers)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please adjust the tracer mock instead of writing source code in a particular way for making the test pass

}
} catch (e) {
log.debug('Failed to inject trace context', e)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we guard against something here? I am unsure if this could ever somehow fail

}
return headers
}

/**
* Mutate headers in-place to set parent_id to the provided value.
* @param {Record<string, string>} headers
* @param {string | number | undefined} parentId
*/
function overrideParentId (headers, parentId) {
if (parentId === undefined || parentId === null) return
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit

Suggested change
if (parentId === undefined || parentId === null) return
if (!parentId) return

If parentId could be 0, I would use parentId == null

if ('x-datadog-trace-id' in headers) {
headers['x-datadog-parent-id'] = String(parentId)
}
}

/**
* Find the checkpoint with the highest N for _datadog_{N} in the event's operations.
* @param {unknown} event
* @returns {{ number: number, operation: object } | null}
*/
function findLastCheckpointOrNull (event) {
if (!event || typeof event !== 'object') return null

const operations = event.InitialExecutionState?.Operations
if (!Array.isArray(operations)) return null

let best = null
for (const op of operations) {
const name = op?.Name
if (typeof name !== 'string') continue

if (!name.startsWith(CHECKPOINT_NAME_PREFIX)) continue
Comment on lines +87 to +89
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (typeof name !== 'string') continue
if (!name.startsWith(CHECKPOINT_NAME_PREFIX)) continue
if (typeof name !== 'string' || !name.startsWith(CHECKPOINT_NAME_PREFIX)) continue

const suffix = name.slice(CHECKPOINT_NAME_PREFIX.length)
const n = Number.parseInt(suffix, 10)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess n is standing for number? Would you mind using a more readable name? :)

if (Number.isNaN(n) || String(n) !== suffix) continue

if (!best || n > best.number) {
best = { number: n, operation: op }
}
}

return best
}

/**
* Parse the JSON payload from a checkpoint STEP operation's Payload or StepDetails.Result.
* @param {object} op
* @returns {Record<string, string> | null}
*/
function parseCheckpointPayload (op) {
try {
const raw = op?.Payload ?? op?.StepDetails?.Result
if (!raw || typeof raw !== 'string') return null
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we normally return undefined instead of null. I believe we could change these everywhere?

const parsed = JSON.parse(raw)
return parsed && typeof parsed === 'object' ? parsed : null
} catch {
log.debug('Failed to parse checkpoint payload')
return null
}
}

/**
* Save a _datadog_{number} STEP operation via the SDK's checkpoint manager.
* Uses a deterministic blake2b hash of (name:arn) as the stepId so the save is
* idempotent within an execution.
* @param {object} checkpointManager
* @param {string} executionArn
* @param {number} number
* @param {Record<string, string>} headers
* @returns {Promise<void>}
*/
async function saveCheckpoint (checkpointManager, executionArn, number, headers) {
const name = `${CHECKPOINT_NAME_PREFIX}${number}`
const stepId = crypto
.createHash('blake2b512')
.update(`${name}:${executionArn}`)
.digest('hex')
.slice(0, 64)
const payload = JSON.stringify(headers)

// Queue START and SUCCEED back-to-back before awaiting. This allows callers
// to trigger save right before termination without losing the second update.
const startPromise = checkpointManager.checkpoint(stepId, {
Id: stepId,
Action: 'START',
Type: 'STEP',
SubType: 'STEP',
Name: name,
})
const succeedPromise = checkpointManager.checkpoint(stepId, {
Id: stepId,
Action: 'SUCCEED',
Type: 'STEP',
SubType: 'STEP',
Name: name,
Payload: payload,
})
await startPromise
await succeedPromise
Comment on lines +155 to +156
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await startPromise
await succeedPromise
await Promise.all([
startPromise,
succeedPromise,
])

This prevents a potential unhandled rejection

log.debug('Saved trace context checkpoint %s', name)
}

/**
* Save a new trace-context checkpoint when the current context differs from
* the most recent `_datadog_{N}` operation already in the event.
*
* Every checkpoint across the durable execution carries the same
* `x-datadog-parent-id` so all resumed invocations attach to the same anchor:
* - First checkpoint (no previous): anchor at `firstExecutionSpanId`.
* - Subsequent: reuse the prior checkpoint's `x-datadog-parent-id` verbatim —
* that value originated from the first save and is the anchor we've been
* carrying forward.
*
* Caller is responsible for invoking this only when a save is appropriate — i.e.
* the SDK is about to return Status: PENDING (see PENDING_TERMINATION_REASONS in
* handler.js). This function does not re-check that.
*
* @param {object} tracer
* @param {object} span - aws.durable.execute span
* @param {object} durableContext - SDK's DurableContextImpl
* @param {string | undefined} firstExecutionSpanId - span id of the first
* invocation's `aws.durable.execute` span. Only consulted on the very
* first save; ignored once a prior `_datadog_{N}` exists. We anchor at
* this span (which this integration owns) rather than its parent so the
* anchor doesn't depend on whatever upstream context happens to be
* active when `bindStart` fires.
* @param {unknown} event - raw invocation event (has InitialExecutionState)
* @returns {Promise<void>}
*/
async function saveTraceContextCheckpointIfUpdated (
tracer, span, durableContext, firstExecutionSpanId, event,
) {
try {
const checkpointManager = durableContext.checkpoint ?? durableContext.checkpointManager
if (typeof checkpointManager?.checkpoint !== 'function') return

const currentHeaders = injectHeaders(tracer, span)
if (!currentHeaders || Object.keys(currentHeaders).length === 0) return

const latest = findLastCheckpointOrNull(event)

let newNumber
if (latest) {
const latestHeaders = parseCheckpointPayload(latest.operation)
if (!latestHeaders) return

Comment thread
joeyzhao2018 marked this conversation as resolved.
// Compare trace contexts ignoring x-datadog-parent-id, which always changes
// since it reflects the active span at save time. The propagator emits keys
// in a deterministic order, so JSON.stringify is a stable equality check.
const anchoredSpanId = latestHeaders['x-datadog-parent-id']
delete currentHeaders['x-datadog-parent-id']
delete latestHeaders['x-datadog-parent-id']
if (JSON.stringify(currentHeaders) === JSON.stringify(latestHeaders)) return

newNumber = latest.number + 1
overrideParentId(currentHeaders, anchoredSpanId)
} else {
newNumber = 0
if (firstExecutionSpanId) overrideParentId(currentHeaders, firstExecutionSpanId)
}

const executionArn = event?.DurableExecutionArn || ''
await saveCheckpoint(checkpointManager, executionArn, newNumber, currentHeaders)
} catch (e) {
log.debug('Failed to save trace context checkpoint', e)
}
}

module.exports = {
CHECKPOINT_NAME_PREFIX,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CHECKPOINT_NAME_PREFIX,

saveTraceContextCheckpointIfUpdated,
}
Loading
Loading