Skip to content

feat(google_cloud_pubsub): add Data Streams Monitoring support#18271

Open
J0ns0x wants to merge 4 commits into
mainfrom
feat/google-cloud-pubsub-dsm
Open

feat(google_cloud_pubsub): add Data Streams Monitoring support#18271
J0ns0x wants to merge 4 commits into
mainfrom
feat/google-cloud-pubsub-dsm

Conversation

@J0ns0x
Copy link
Copy Markdown

@J0ns0x J0ns0x commented May 26, 2026

Description

Adds Data Streams Monitoring (DSM) context propagation to the Google Cloud Pub/Sub integration, closing the parity gap with dd-trace-java (shipped 2023-11-17, #6156) and dd-trace-js (shipped 2024-10-30, #3855).

Without this, Python customers using Pub/Sub see broken DSM topologies where producer and subscriber appear as disconnected nodes, and any multi-language pipeline involving a Python hop loses pathway continuity.

The DSM pathway context is injected into Pub/Sub message attributes on publish (key dd-pathway-ctx-base64, the standard cross-language carrier) and extracted + checkpointed on the subscriber side. Wire-format matches Java and Node.js so cross-language pathways stitch correctly. Local tag schema follows the existing Python DSM convention: topic: as the generic destination tag with type:google-pubsub.

Gated by DD_DATA_STREAMS_ENABLED (the global flag) — no new per-integration toggle, matching the Kombu and Botocore patterns.

Testing

  • New file tests/contrib/google_cloud_pubsub/test_google_cloud_pubsub_dsm.py covering:
    • Producer pathway checkpoint recording with the correct tag schema
    • Producer→consumer pathway hash linkage end-to-end
    • Wire-level injection of dd-pathway-ctx-base64 survives the round trip
    • Exact payload-size accounting (with distributed tracing disabled, for determinism)
  • Tests run via scripts/run-tests --venv <hash> against the Pub/Sub emulator container provided by the existing tests/contrib/google_cloud_pubsub/conftest.py.
  • Static validation: scripts/lint fmt, scripts/lint typing, scripts/lint suitespec-check, and scripts/import-analysis/cycles.py analyze all pass.
  • Note: integration tests were not executed locally (Docker unavailable on dev machine). CI will exercise them.

Risks

  • Low. The new DSM module only registers core.on() listeners when DD_DATA_STREAMS_ENABLED=true; with DSM off, core.dispatch(...) is a silent no-op (same pattern as Kafka and Kombu).
  • The _traced_subscribe_callback signature gained one extra positional argument (the full subscription path), but it's an internal symbol and is only called via functools.partial from _traced_subscribe in the same module.
  • Synchronous subscriber.pull() and push subscriptions remain uninstrumented for DSM (same coverage as the existing distributed tracing for this integration; out of scope for this PR).

Additional Notes

  • Cross-language compatibility: Java tags consume checkpoints with subscription: rather than topic:. The encoded pathway context (the wire format) is identical across all three tracers, so pathway hashes flow through correctly; only the local DSM stats labels differ. The backend tolerates both shapes today.
  • Release note at releasenotes/notes/google-cloud-pubsub-dsm-ad88122f16faba8a.yaml.

🤖 Generated with Claude Code

## Description

Adds Data Streams Monitoring (DSM) context propagation to the Google Cloud
Pub/Sub integration, closing the parity gap with dd-trace-java
(shipped 2023-11-17, #6156) and dd-trace-js (shipped 2024-10-30, #3855).
Without this, Python customers using Pub/Sub see broken DSM topologies
where producer and subscriber appear as disconnected nodes, and any
multi-language pipeline involving a Python hop loses pathway continuity.

The DSM pathway context is injected into Pub/Sub message attributes on
publish (key `dd-pathway-ctx-base64`, the standard cross-language carrier)
and extracted + checkpointed on the subscriber side. Wire-format matches
Java and Node.js so cross-language pathways stitch correctly. Local tag
schema follows the existing Python DSM convention: `topic:` as the generic
destination tag with `type:google-pubsub`.

Gated by `DD_DATA_STREAMS_ENABLED` (the global flag) — no new per-integration
toggle, matching the Kombu and Botocore patterns.

## Testing

- New file `tests/contrib/google_cloud_pubsub/test_google_cloud_pubsub_dsm.py`
  covering: producer checkpoint recording, producer→consumer pathway hash
  linkage, wire-level injection of `dd-pathway-ctx-base64`, and exact
  payload-size accounting with distributed tracing disabled.
- Tests run via `scripts/run-tests --venv <hash>` against the Pub/Sub
  emulator container provided by the existing
  `tests/contrib/google_cloud_pubsub/conftest.py`.
- Static validation: `scripts/lint fmt`, `scripts/lint typing`,
  `scripts/lint suitespec-check`, and
  `scripts/import-analysis/cycles.py analyze` all pass.
- Note: integration tests were not executed locally (Docker unavailable
  on dev machine). CI will exercise them.

## Risks

- Low. The new DSM module only registers `core.on()` listeners when
  `DD_DATA_STREAMS_ENABLED=true`; with DSM off, `core.dispatch(...)` is a
  silent no-op (same pattern as Kafka and Kombu).
- The `_traced_subscribe_callback` signature gained one extra positional
  argument (the full subscription path), but it's an internal symbol and
  is only called via `functools.partial` from `_traced_subscribe` in the
  same module.
- Synchronous `subscriber.pull()` and push subscriptions remain
  uninstrumented for DSM (same coverage as the existing distributed
  tracing for this integration; out of scope for this PR).

## Additional Notes

- Cross-language compatibility: Java tags consume checkpoints with
  `subscription:` rather than `topic:`. The encoded pathway context (the
  wire format) is identical across all three tracers, so pathway hashes
  flow through correctly; only the local DSM stats labels differ. The
  backend tolerates both shapes today.
- Release note at
  `releasenotes/notes/google-cloud-pubsub-dsm-ad88122f16faba8a.yaml`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@J0ns0x J0ns0x requested review from a team as code owners May 26, 2026 20:12
@datadog-datadog-prod-us1

This comment has been minimized.

@pr-commenter
Copy link
Copy Markdown

pr-commenter Bot commented May 26, 2026

Benchmarks

Benchmark execution time: 2026-05-26 20:19:28

Comparing candidate commit 94b0c4f in PR branch feat/google-cloud-pubsub-dsm with baseline commit 829f5ae in branch main.

Found 0 performance improvements and 6 performance regressions! Performance is the same for 614 metrics, 10 unstable metrics.

scenario:iastaspects-capitalize_noaspect

  • 🟥 execution_time [+18.445µs; +22.935µs] or [+7.396%; +9.196%]

scenario:iastaspects-ljust_aspect

  • 🟥 execution_time [+84.161µs; +90.475µs] or [+17.056%; +18.336%]

scenario:iastaspects-stringio_aspect

  • 🟥 execution_time [+640.943µs; +679.524µs] or [+16.600%; +17.599%]

scenario:iastaspectsospath-ospathbasename_aspect

  • 🟥 execution_time [+107.201µs; +114.949µs] or [+25.119%; +26.935%]

scenario:span-start

  • 🟥 execution_time [+1.353ms; +1.544ms] or [+8.579%; +9.791%]

scenario:telemetryaddmetric-1-count-metric-1-times

  • 🟥 execution_time [+236.494ns; +271.140ns] or [+11.033%; +12.649%]

Comment thread mypy.ini
disallow_untyped_calls = false
disallow_untyped_defs = false
disallow_incomplete_defs = false

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reasons to have this? Wouldn't it be preferable to have it enabled? Or is the point that the existing untyped code is now causing issues being checked because it's being checked now that it's been changed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — I've addressed this in fd1fb45. The new module now has full type annotations on all three function signatures (_extract_publish_attributes, dsm_pubsub_send, dsm_pubsub_receive), so I was able to drop disallow_untyped_defs and disallow_incomplete_defs from the stanza. The one remaining exemption (disallow_untyped_calls = false) is still needed because the functions call into untyped helpers like data_streams_processor() and DsmPathwayCodec — the same reason the other DSM integrations (kafka, kombu, botocore) carry that flag.

- Add type annotations to all three functions in
  ddtrace/internal/datastreams/google_cloud_pubsub.py so mypy can
  verify them without blanket exemptions
- Trim mypy.ini stanza to only disallow_untyped_calls=false (we still
  call into the untyped processor/codec helpers, but our own defs are
  now typed so the other two flags are no longer needed)
- Add ddtrace/internal/datastreams/google_cloud_pubsub.py to the
  google_cloud_pubsub component in suitespec.yml so changes to the DSM
  module correctly trigger the Pub/Sub test suite in CI

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@cit-pr-commenter-54b7da
Copy link
Copy Markdown

Codeowners resolved as

ddtrace/internal/datastreams/google_cloud_pubsub.py                     @DataDog/data-streams-monitoring
mypy.ini                                                                @DataDog/python-guild @DataDog/apm-core-python
tests/contrib/suitespec.yml                                             @DataDog/python-guild

Copy link
Copy Markdown
Collaborator

@emmettbutler emmettbutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice way to take advantage of the core dispatch API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants