Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ddtrace/contrib/internal/google_cloud_pubsub/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _supported_versions() -> dict[str, str]:
return {"google.cloud.pubsub_v1": ">=2.10.0"}


def _traced_subscribe_callback(callback, project_id, subscription_id, message):
def _traced_subscribe_callback(callback, project_id, subscription_id, subscription, message):
propagated_context = None
if config.google_cloud_pubsub.distributed_tracing_enabled and message.attributes:
ctx = HTTPPropagator.extract(dict(message.attributes))
Expand All @@ -97,7 +97,8 @@ def _traced_subscribe_callback(callback, project_id, subscription_id, message):
project_id=project_id,
subscription_id=subscription_id,
message=message,
):
) as ctx:
core.dispatch("google_cloud_pubsub.receive.pre", (subscription, message, ctx.span))
callback(message)


Expand Down Expand Up @@ -222,6 +223,7 @@ def _traced_publish(func, instance, args, kwargs):
topic_id=topic_id,
publish_kwargs=kwargs,
) as ctx:
core.dispatch("google_cloud_pubsub.send.pre", (args, kwargs, ctx.span))
try:
result = func(*args, **kwargs)
except BaseException as e:
Expand All @@ -243,6 +245,6 @@ def _traced_subscribe(func, instance, args, kwargs):
subscription = get_argument_value(args, kwargs, 0, "subscription")
callback = get_argument_value(args, kwargs, 1, "callback")
project_id, subscription_id = parse_resource_path(subscription)
traced_callback = partial(_traced_subscribe_callback, callback, project_id, subscription_id)
traced_callback = partial(_traced_subscribe_callback, callback, project_id, subscription_id, subscription)
args, kwargs = set_argument_value(args, kwargs, 1, "callback", traced_callback)
return func(*args, **kwargs)
4 changes: 3 additions & 1 deletion ddtrace/internal/datastreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from ...internal.utils.importlib import require_modules


required_modules = ["confluent_kafka", "botocore", "kombu", "aiokafka"]
required_modules = ["confluent_kafka", "botocore", "kombu", "aiokafka", "google.cloud.pubsub_v1"]
_processor = None

if config._data_streams_enabled:
Expand All @@ -16,6 +16,8 @@
from . import botocore # noqa:F401
if "kombu" not in missing_modules:
from . import kombu # noqa:F401
if "google.cloud.pubsub_v1" not in missing_modules:
from . import google_cloud_pubsub # noqa:F401


def data_streams_processor(reset=False):
Expand Down
69 changes: 69 additions & 0 deletions ddtrace/internal/datastreams/google_cloud_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Any, Dict, Optional, Tuple

from ddtrace import config
from ddtrace.internal import core
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.utils import _calculate_byte_size
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils import get_argument_value


log = get_logger(__name__)

# Reserved kwargs on Publisher.publish that are not message attributes.
_PUBLISH_RESERVED_KWARGS = frozenset({"data", "ordering_key", "retry", "timeout"})


def _extract_publish_attributes(kwargs: Dict[str, Any]) -> Dict[str, Any]:
return {k: v for k, v in kwargs.items() if k not in _PUBLISH_RESERVED_KWARGS}


def dsm_pubsub_send(args: Tuple[Any, ...], kwargs: Dict[str, Any], span: Optional[Any]) -> None:
from . import data_streams_processor as processor

topic = get_argument_value(args, kwargs, 0, "topic")
data = get_argument_value(args, kwargs, 1, "data", optional=True)
ordering_key = kwargs.get("ordering_key", "")
attributes = _extract_publish_attributes(kwargs)

payload_size = 0
payload_size += _calculate_byte_size(data)
payload_size += _calculate_byte_size(ordering_key)
payload_size += _calculate_byte_size(attributes)

edge_tags = ["direction:out", f"topic:{topic}", "type:google-pubsub"]
ctx = processor().set_checkpoint(edge_tags, payload_size=payload_size, span=span)
# Pub/Sub message attributes are passed as **kwargs to Publisher.publish().
# Python's varkwargs accepts hyphenated keys like "dd-pathway-ctx-base64"
# when unpacked, so injecting into the kwargs dict propagates the pathway
# context to the broker. The existing distributed-tracing HTTPPropagator.inject
# call uses the same mechanism (see _on_pubsub_send_start in trace_handlers.py).
DsmPathwayCodec.encode(ctx, kwargs)


def dsm_pubsub_receive(subscription: str, message: Any, span: Optional[Any]) -> None:
from . import data_streams_processor as processor

attributes = dict(message.attributes) if message.attributes else {}

payload_size = 0
payload_size += _calculate_byte_size(getattr(message, "data", None))
payload_size += _calculate_byte_size(getattr(message, "ordering_key", "") or "")
payload_size += _calculate_byte_size(attributes)

ctx = DsmPathwayCodec.decode(attributes, processor())
# AIDEV-NOTE: dd-trace-py uses the `topic:` tag key as the generic destination
# identifier for every messaging integration (Kafka, Kinesis, SQS, SNS, RabbitMQ).
# The *value* on the consumer side is the Pub/Sub subscription path, which
# preserves fan-out distinction (multiple subscriptions on the same topic each
# produce a distinct pathway node) while keeping the tag schema consistent with
# the rest of the Python DSM integrations. Note that dd-trace-java uses a
# dedicated `subscription:` tag key here instead; the wire-format pathway hash
# is unaffected by that difference.
edge_tags = ["direction:in", f"topic:{subscription}", "type:google-pubsub"]
ctx.set_checkpoint(edge_tags, payload_size=payload_size, span=span)


if config._data_streams_enabled:
core.on("google_cloud_pubsub.send.pre", dsm_pubsub_send)
core.on("google_cloud_pubsub.receive.pre", dsm_pubsub_receive)
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,9 @@ disallow_untyped_calls = false
disallow_untyped_defs = false
disallow_incomplete_defs = false

[mypy-ddtrace.internal.datastreams.google_cloud_pubsub]
disallow_untyped_calls = false

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reasons to have this? Wouldn't it be preferable to have it enabled? Or is the point that the existing untyped code is now causing issues being checked because it's being checked now that it's been changed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — I've addressed this in fd1fb45. The new module now has full type annotations on all three function signatures (_extract_publish_attributes, dsm_pubsub_send, dsm_pubsub_receive), so I was able to drop disallow_untyped_defs and disallow_incomplete_defs from the stanza. The one remaining exemption (disallow_untyped_calls = false) is still needed because the functions call into untyped helpers like data_streams_processor() and DsmPathwayCodec — the same reason the other DSM integrations (kafka, kombu, botocore) carry that flag.

[mypy-ddtrace.internal.datastreams.processor]
disallow_any_generics = false
disallow_untyped_calls = false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
features:
- |
google_cloud_pubsub: This introduces Data Streams Monitoring (DSM) context
propagation for the Google Cloud Pub/Sub integration. Producer publish
operations inject the DSM pathway context into message attributes, and
subscriber callbacks extract it and record a consume checkpoint. To enable,
set ``DD_DATA_STREAMS_ENABLED=true``.
128 changes: 128 additions & 0 deletions tests/contrib/google_cloud_pubsub/test_google_cloud_pubsub_dsm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import threading

import pytest

from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64
from ddtrace.internal.datastreams.processor import DataStreamsCtx
from ddtrace.internal.native import DDSketch
from tests.datastreams.utils import all_pathway_stat_keys


DSM_TEST_PATH_HEADER_SIZE = 28


@pytest.fixture
def dsm_processor():
processor = data_streams_processor(reset=True)
assert processor is not None, "Data Streams Monitoring is not enabled"
yield processor
processor.shutdown(timeout=5)


def _wait_for_pathway_directions(processor, *required, timeout=10.0):
"""Poll DSM buckets until checkpoints for each required direction tag appear."""
import time

deadline = time.time() + timeout
while time.time() < deadline:
tag_strs = [" ".join(key[0]) for key in all_pathway_stat_keys(processor)]
if all(any(req in tags for tags in tag_strs) for req in required):
return
time.sleep(0.1)
raise AssertionError(f"timed out waiting for DSM checkpoints {required}; saw: {tag_strs}")


def test_dsm_payload_size_produce(dsm_processor, publisher, topic_path):
"""Producer pathway records a payload size that accounts for data, attributes, and the injected pathway header."""
payload = b"data streams hello"
test_attrs = {"custom_key": "custom_value"}
publisher.publish(topic_path, payload, **test_attrs).result(timeout=10)

_wait_for_pathway_directions(dsm_processor, "direction:out")

# Verify a non-zero payload-size sketch was recorded on the producer pathway.
found_produce_sketch = False
with dsm_processor._lock:
for bucket in dsm_processor._buckets.values():
for key, stats in bucket.pathway_stats.items():
tags = key[0]
if "direction:out" in tags and any(t.startswith("topic:") for t in tags):
assert "type:google-pubsub" in tags
assert stats.payload_size.count >= 1
found_produce_sketch = True
assert found_produce_sketch, "expected producer pathway sketch not recorded"


def test_dsm_pathway_linkage(dsm_processor, publisher, topic_path, subscriber, subscription_path):
"""Publishing then subscribing produces linked producer→consumer pathway hashes with the expected tag schema."""
received = threading.Event()

def callback(message):
message.ack()
received.set()

future = subscriber.subscribe(subscription_path, callback=callback)
try:
publisher.publish(topic_path, b"data streams hello").result(timeout=10)
assert received.wait(timeout=10), "timed out waiting for subscriber callback"
finally:
future.cancel()
future.result(timeout=5)

_wait_for_pathway_directions(dsm_processor, "direction:out", "direction:in")

ctx = DataStreamsCtx(dsm_processor, 0, 0, 0)
parent_hash = ctx._compute_hash(
sorted(["direction:out", f"topic:{topic_path}", "type:google-pubsub"]),
0,
)
child_hash = ctx._compute_hash(
sorted(["direction:in", f"topic:{subscription_path}", "type:google-pubsub"]),
parent_hash,
)
hash_pairs = {(key[1], key[2]) for key in all_pathway_stat_keys(dsm_processor)}
assert (parent_hash, 0) in hash_pairs, f"producer hash missing; saw {hash_pairs}"
assert (child_hash, parent_hash) in hash_pairs, f"consumer hash missing; saw {hash_pairs}"


def test_dsm_pathway_header_injected_on_publish(dsm_processor, publisher, topic_path, subscriber, subscription_path):
"""The dd-pathway-ctx-base64 attribute is injected into published messages and survives the round trip."""
publisher.publish(topic_path, b"data streams hello", custom_key="custom_value").result(timeout=10)

response = subscriber.pull(subscription=subscription_path, max_messages=1, timeout=10)
assert len(response.received_messages) == 1
attributes = dict(response.received_messages[0].message.attributes)
assert PROPAGATION_KEY_BASE_64 in attributes
assert attributes[PROPAGATION_KEY_BASE_64]
# User-provided attributes are preserved alongside the injected pathway key.
assert attributes["custom_key"] == "custom_value"


def test_dsm_payload_size_matches_expected(dsm_processor, publisher, topic_path):
"""With distributed tracing disabled, payload size = data + attrs + injected pathway key + path header bytes."""
from tests.utils import override_config

payload = b"abcdef" # 6 bytes
test_attrs = {"k1": "v1"} # 2 + 2 = 4 bytes of attribute content
with override_config("google_cloud_pubsub", dict(distributed_tracing_enabled=False)):
publisher.publish(topic_path, payload, **test_attrs).result(timeout=10)

_wait_for_pathway_directions(dsm_processor, "direction:out")

expected_payload_size = float(len(payload) + 4 + len(PROPAGATION_KEY_BASE_64) + DSM_TEST_PATH_HEADER_SIZE)
expected_sketch = DDSketch()
expected_sketch.add(expected_payload_size)
expected_proto = expected_sketch.to_proto()

with dsm_processor._lock:
produce_stats = [
stats
for bucket in dsm_processor._buckets.values()
for key, stats in bucket.pathway_stats.items()
if "direction:out" in key[0]
]
assert len(produce_stats) >= 1
for stats in produce_stats:
assert stats.payload_size.count >= 1
assert stats.payload_size.to_proto() == expected_proto
1 change: 1 addition & 0 deletions tests/contrib/suitespec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ components:
- ddtrace/contrib/internal/gevent/*
google_cloud_pubsub:
- ddtrace/contrib/internal/google_cloud_pubsub/*
- ddtrace/internal/datastreams/google_cloud_pubsub.py
graphql:
- ddtrace/contrib/internal/graphql/*
grpc:
Expand Down
Loading