-
Notifications
You must be signed in to change notification settings - Fork 51
durable: extract trace context from checkpoints #818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
20b0054
604a2a1
4db58ed
2aedd77
499aa45
f11ab2a
b3f83df
62d8b69
42247d0
958daf0
bb9e643
9c4e70c
806de56
205dd2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ | |
| DD_TRACE_JAVA_TRACE_ID_PADDING = "00000000" | ||
| HIGHER_64_BITS = "HIGHER_64_BITS" | ||
| LOWER_64_BITS = "LOWER_64_BITS" | ||
| _TRACE_CHECKPOINT_PREFIX = "_datadog_" | ||
|
|
||
|
|
||
| def _dsm_set_checkpoint(context_json, event_type, arn): | ||
|
|
@@ -546,6 +547,121 @@ def extract_context_from_step_functions(event, lambda_context): | |
| return extract_context_from_lambda_context(lambda_context) | ||
|
|
||
|
|
||
| def _durable_operations(event): | ||
| if not isinstance(event, dict): | ||
| return [] | ||
|
|
||
| operations = event.get("InitialExecutionState", {}).get("Operations") | ||
| if isinstance(operations, list): | ||
| return operations | ||
| if not isinstance(operations, dict): | ||
| return [] | ||
|
|
||
| numeric_keys = [] | ||
| other_keys = [] | ||
| for key, value in operations.items(): | ||
| if not isinstance(value, dict): | ||
| continue | ||
| try: | ||
| numeric_keys.append((int(key), value)) | ||
| except (TypeError, ValueError): | ||
| other_keys.append((str(key), value)) | ||
|
|
||
| numeric_keys.sort(key=lambda item: item[0]) | ||
| other_keys.sort(key=lambda item: item[0]) | ||
| return [value for _, value in numeric_keys + other_keys] | ||
|
|
||
|
|
||
| def _extract_context_from_durable_checkpoint(operation): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should these functions live in datadog_lambda/durable.py? |
||
| if not isinstance(operation, dict): | ||
| return None | ||
|
|
||
| step_details = operation.get("StepDetails") | ||
| if not isinstance(step_details, dict): | ||
| return None | ||
|
|
||
| result = step_details.get("Result") | ||
| if isinstance(result, str): | ||
| try: | ||
| result = json.loads(result) | ||
| except Exception: | ||
| return None | ||
|
|
||
| if not isinstance(result, dict): | ||
| return None | ||
|
|
||
| return propagator.extract(result) | ||
|
|
||
|
|
||
| def _extract_context_from_durable_input_payload(operation): | ||
| if not isinstance(operation, dict): | ||
| return None | ||
|
|
||
| execution_details = operation.get("ExecutionDetails") | ||
| if not isinstance(execution_details, dict): | ||
| return None | ||
|
|
||
| input_payload = execution_details.get("InputPayload") | ||
| if isinstance(input_payload, str): | ||
| try: | ||
| input_payload = json.loads(input_payload) | ||
| except Exception: | ||
| return None | ||
|
|
||
| if not isinstance(input_payload, dict): | ||
| return None | ||
|
|
||
| headers = input_payload.get("headers") | ||
| if isinstance(headers, dict): | ||
| return propagator.extract(headers) | ||
|
|
||
| dd_data = input_payload.get("_datadog") | ||
| if isinstance(dd_data, dict): | ||
| return propagator.extract(dd_data) | ||
|
|
||
| return None | ||
|
|
||
|
|
||
| def extract_context_from_durable_execution(event): | ||
| if not isinstance(event, dict): | ||
| return None | ||
| if not isinstance(event.get("DurableExecutionArn"), str): | ||
| return None | ||
|
|
||
| operations = _durable_operations(event) | ||
| if not operations: | ||
| return None | ||
|
|
||
| best_context = None | ||
| best_number = -1 | ||
| for operation in operations: | ||
| if not isinstance(operation, dict): | ||
| continue | ||
| name = operation.get("Name") | ||
| if not isinstance(name, str) or not name.startswith(_TRACE_CHECKPOINT_PREFIX): | ||
| continue | ||
| suffix = name[len(_TRACE_CHECKPOINT_PREFIX) :] | ||
| try: | ||
| number = int(suffix) | ||
| except (TypeError, ValueError): | ||
| continue | ||
| if number < best_number: | ||
| continue | ||
| context = _extract_context_from_durable_checkpoint(operation) | ||
| if _is_context_complete(context): | ||
| best_context = context | ||
| best_number = number | ||
|
|
||
| if best_context is not None: | ||
| return best_context | ||
|
|
||
| upstream_context = _extract_context_from_durable_input_payload(operations[0]) | ||
| if _is_context_complete(upstream_context): | ||
| return upstream_context | ||
|
|
||
| return None | ||
|
|
||
|
|
||
| def extract_context_custom_extractor(extractor, event, lambda_context): | ||
| """ | ||
| Extract Datadog trace context using a custom trace extractor function | ||
|
|
@@ -633,29 +749,34 @@ def extract_dd_trace_context( | |
| global dd_trace_context | ||
| trace_context_source = None | ||
| event_source = parse_event_source(event) | ||
| context = None | ||
|
|
||
| if extractor is not None: | ||
| context = extract_context_custom_extractor(extractor, event, lambda_context) | ||
| elif isinstance(event, (set, dict)) and "request" in event: | ||
| context = extract_context_from_request_header_or_context( | ||
| event, lambda_context, event_source | ||
| ) | ||
| elif isinstance(event, (set, dict)) and "headers" in event: | ||
| context = extract_context_from_http_event_or_context( | ||
| event, lambda_context, event_source, decode_authorizer_context | ||
| ) | ||
| elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS): | ||
| context = extract_context_from_sqs_or_sns_event_or_context( | ||
| event, lambda_context, event_source | ||
| ) | ||
| elif event_source.equals(EventTypes.EVENTBRIDGE): | ||
| context = extract_context_from_eventbridge_event(event, lambda_context) | ||
| elif event_source.equals(EventTypes.KINESIS): | ||
| context = extract_context_from_kinesis_event(event, lambda_context) | ||
| elif event_source.equals(EventTypes.STEPFUNCTIONS): | ||
| context = extract_context_from_step_functions(event, lambda_context) | ||
| else: | ||
| context = extract_context_from_lambda_context(lambda_context) | ||
| elif isinstance(event, (set, dict)) and "DurableExecutionArn" in event: | ||
| context = extract_context_from_durable_execution(event) | ||
|
Comment on lines
+695
to
+696
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens in the first invocation here? Do we expect the first invocation to extract the context with the other methods? |
||
|
|
||
| if context is None: | ||
| if isinstance(event, (set, dict)) and "request" in event: | ||
| context = extract_context_from_request_header_or_context( | ||
| event, lambda_context, event_source | ||
| ) | ||
| elif isinstance(event, (set, dict)) and "headers" in event: | ||
| context = extract_context_from_http_event_or_context( | ||
| event, lambda_context, event_source, decode_authorizer_context | ||
| ) | ||
| elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS): | ||
| context = extract_context_from_sqs_or_sns_event_or_context( | ||
| event, lambda_context, event_source | ||
| ) | ||
| elif event_source.equals(EventTypes.EVENTBRIDGE): | ||
| context = extract_context_from_eventbridge_event(event, lambda_context) | ||
| elif event_source.equals(EventTypes.KINESIS): | ||
| context = extract_context_from_kinesis_event(event, lambda_context) | ||
| elif event_source.equals(EventTypes.STEPFUNCTIONS): | ||
| context = extract_context_from_step_functions(event, lambda_context) | ||
| else: | ||
| context = extract_context_from_lambda_context(lambda_context) | ||
|
|
||
| if _is_context_complete(context): | ||
| logger.debug("Extracted Datadog trace context from event or context") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.