fix(aiokafka): collect kafka cluster_id for DSM#18272
Conversation
The aiokafka integration was not collecting the Kafka cluster_id, while the
confluent_kafka integration was. This caused Data Streams Monitoring edges
and offset commits from aiokafka producers/consumers to be missing the
kafka_cluster_id tag, preventing cluster-level aggregation.
Fetch cluster_id by sending a MetadataRequest_v5 to a broker the first time
a producer/consumer is traced, cache the result on the AIOKafkaClient, and
use a 5 minute failure cache to avoid repeated slow lookups (matching the
confluent_kafka integration). The id is exposed both as the kafka.cluster_id
span tag and via core.set_item("kafka_cluster_id", ...) so the DSM hooks
include it in checkpoint edge tags and produce/commit tracking.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codeowners resolved as |
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 446e7971d0
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
|
||
| async def traced_commit(func, instance, args, kwargs): | ||
| result = await func(*args, **kwargs) | ||
| cluster_id = await _get_cluster_id(getattr(instance, "_client", None), None) |
There was a problem hiding this comment.
Avoid all-topics metadata lookup in commit path
traced_commit always calls _get_cluster_id(..., None), and _get_cluster_id builds MetadataRequest_v5([] , False) when topic is None. In Kafka, an empty topic list requests metadata for the entire cluster, so the first commit on each consumer can trigger a large broker round-trip and add avoidable latency in a hot path (especially with many topics). This should use a concrete topic from commit offsets (as the confluent integration does) or skip cluster-id lookup when no topic is available.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed in 9239ad4 — traced_commit now reads only the cached _dd_cluster_id and never issues a metadata request from the commit path.
For reference: an empty topics=[] in MetadataRequest_v5 actually means "no topic metadata, brokers/cluster only" — verified by encoding:
[]→\x00\x00\x00\x00(array length 0, no topics)None→\xff\xff\xff\xff(null, all topics)
So the produce/getone path doesn't fan out across all topics either. But the commit hot-path concern is fair — switched it to cached-only.
Tracking calls and DSM edge tags now include kafka_cluster_id, so the existing offset-lookup keys (with cluster_id="") and pathway-hash expectations no longer matched. Read the cluster id from the AIOKafkaClient and use it in PartitionKey / pathway tag expectations. Snapshots regenerated to include the new kafka.cluster_id span tag. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per PR review: the commit handler should not issue a metadata round-trip. Use only the cached cluster id on the commit path — it is normally populated by a prior produce/consume; if not yet cached, skip the tag for that commit rather than block on a broker request. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
aiokafkaintegration was missingkafka_cluster_idcollection whileconfluent_kafkaalready collects it. As a result, DSM edges and produce/commit tracking from aiokafka producers/consumers were missing thekafka_cluster_idtag, breaking cluster-level aggregation in Data Streams Monitoring._get_cluster_idhelper that sends aMetadataRequest_v5to a connected broker and caches the result on theAIOKafkaClient(success + 5 min failure cache, mirroring the confluent_kafka helper).kafka.cluster_idspan tag and propagate it to the DSM hooks viacore.set_item("kafka_cluster_id", ...), so consume/produce edge tags andtrack_kafka_produce/track_kafka_commitcalls include it.Test plan
aiokafkaapps againstconfluentinc/cp-kafka:7.5.0._get_cluster_id(p.client, "demo-orders")returned the broker'sCLUSTER_ID(Mka3OEVBNTcwNTJENDM2Qg), and the Datadog agent successfully forwarded traces and data_streams_messages todatad0g.com.🤖 Generated with Claude Code