feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce|Consume metrics#4044
feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce|Consume metrics#4044shashank-reddy-nr wants to merge 19 commits into
Conversation
Producer: hoist app name lookup outside the message loop, then stamp
each outgoing message header with 'producerServiceName' alongside the
existing DT headers in both send() and sendBatch() paths.
Consumer: read 'producerServiceName' from incoming message headers in
recordDataMetrics and
- add kafka.producer.serviceName TRANS_EVENT attribute
- add messaging.destination.name TRANS_EVENT attribute (topic name)
- emit Message/Kafka/Topic/Named/{topic}/Producer/{name} metric to
support entity relationship rules between consumer and topic entities
Adds per-cluster, per-topic produce and consume metrics that uniquely identify
Kafka clusters by their UUID (cluster ID). These complement the existing per-node
MessageBroker/Kafka/Nodes/{server}/... metrics by collapsing all broker addresses
of the same cluster into a single metric, enabling cluster-level throughput
analysis across MSK, Confluent Cloud, and self-hosted Kafka.
Metric format:
MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Produce
MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Consume
The cluster ID is fetched automatically using the client's own authenticated
connection — no extra configuration or credentials needed.
Also includes:
- Unit and integration tests for all new code paths
- Bug fixes identified in code review (volatile fields, thread-safety,
per-message vs per-poll counting, auth config passthrough)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
|
- .NET: Replace System.Text.Json (unavailable on net462 without NuGet) with simple string extraction for newrelic header parsing — no external dependency - Node.js: Fix lint violations — empty catch blocks now log debug messages, extract injectHeaders() helper to reduce cognitive complexity, remove unused variables from test, add JSDoc @param description - Python: Remove proactive cluster ID fetch from KafkaConsumer.__init__ to fix race condition where daemon thread overwrote seeded test fixture values; reactive fetch on first consumed message is sufficient Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Node.js: catch (err) → catch {} (no-variable) to satisfy sonarjs
no-ignored-exceptions rule; .catch() comment for non-fatal cluster
ID fetch failure
- .NET: remove redundant segment.End() in auto-created transaction path
— GetDelegateFor(segment) already calls segment.End() internally,
so the manual wrapper must not call it again
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The cluster ID belongs only in the metric name for relationship building. It does not need to be set as a span/transaction attribute. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tale tests Java: - consumer-2.0.0: remove out-of-scope producer identity (accountId/appId) and group_id capture; restore DT header acceptance - consumer-3.7.0: add DT header acceptance (was missing, regression vs 2.0.0); remove dead ClusterIdHelper.java (3.7.0 uses Weaver field access, not reflection) - spring-kafka-2.2.0: revert to original (producer identity extraction out of scope) Python: - fix add_messagebroker_info regression: was incorrectly inside 'if cluster_id:' block, meaning library-version attribute dropped until cluster ID cached - remove test_cluster_id_agent_attribute (consumer) and test_cluster_id_attribute_on_transaction (producer) — these tested a span attribute that was already removed from production code Node.js: - remove stale 'send injects kafka.cluster.id header' test — the header injection code was removed; this test was asserting the old behavior .NET: - replace undefined 'InternalApi.RecordMetric' with correct fully-qualified 'NewRelic.Api.Agent.NewRelic.RecordMetric()' — the previous name would have been a compile error Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
_fetchAndCacheClusterId used unsorted join while getClusterIdFromCache
used sorted — causing cache misses when brokers are listed in different
orders. Align both to brokers.slice().sort().join(',').
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The root timer's hrDuration was unset, so getDurationInMillis() fell through to process.hrtime(hrstart) returning real elapsed time (~5ms). _computeTotalTime only updates the root when computedDuration > currentDuration, so 4ms > 5ms was false and the root was never set. Setting hrDuration = [0, 0] makes getDurationInMillis() return 0, so the computed 4ms correctly overwrites it. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
sendBatch can include messages for multiple topics in one call via topicMessages[]. Previously only topicMessages[0].topic got a metric; all subsequent topics were silently unmetered. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…rMethod Add missing @param description for injectHeaders. Extract recordClusterProduceMetrics() helper to bring #wrapProducerMethod cognitive complexity from 18 back to ≤15. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ptions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ter metrics - Skip cluster ID fetch when brokers is a function (dynamic config); prevents TypeError crash from calling .slice() on a function - Unified _brokersKey() helper so getClusterIdFromCache and _fetchAndCacheClusterId use the same key for all broker types - feature_flags.js: add kafka_cluster_id_in_headers prerelease flag (default false) - record-data-metrics.js: guard cluster metric emission behind clusterIdEnabled Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove the `kafka_cluster_id_in_headers` prerelease feature flag and all `clusterIdEnabled` guards. Cluster-level metrics now fire unconditionally whenever a cluster ID is available — no configuration required. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@shashank-reddy-nr do you mind sharing the context of this PR. I see it's adding a bunch of attributes and metrics but what is driving this? |
|
Hi @bizob2828, I made code changes to add a couple of new Kafka metrics to establish a relationship between our self-hosted Kafka and the APM. Here is the work request I raised for the APM team to review the changes and design, which has the complete context and explains why this is needed. |
|
Please update the PR description here with details on why this PR is needed and how this PR solves those needs. |
There was a problem hiding this comment.
Why are the changes in this file not part of the exported class? Are the two caches really necessary? What is the expected size of these caches in a typical, or extreme, deployment scenario?
There was a problem hiding this comment.
Hi @jsumners-nr, I'm new to this repo and leaned heavily on AI assistance to implement this, so I apologize upfront for anything that doesn't follow conventions here.
The two caches serve different lifecycles:
cluster-id-cache.jsis a module-level map (brokers string → cluster UUID). It's intentionally process-global because the cluster UUID is the same for every producer/consumer connecting to the same Kafka cluster - it makes no sense to refetch it per client instance._clusterIdByInstancedoesn't exist here - the only per-instance storage is via the existing kafkaCtx symbol already set on each client/consumer.
In a typical deployment you'd have 1–3 distinct Kafka clusters, so the cache holds 1–3 entries. In an extreme multi-tenant scenario (many distinct broker strings) it could grow unbounded - that's a fair concern. Happy to add a size cap or TTL if you'd prefer.
Open to moving the fetch logic inside the class if that's the convention - I'll follow your guidance. Can you please guide me?
| _fetchAndCacheClusterId(client, client[kafkaCtx].brokers).then((id) => { | ||
| if (id) { | ||
| // Update the shared client context. Producers share this object by reference so they | ||
| // see the update immediately. Consumers spread-copy the context at creation time, so |
There was a problem hiding this comment.
Where is this "spread-copy" happening? Consumers should not have access to our internal symbol and context object.
…unrelated test change
- Remove kafka.consume.group_id span attribute (not part of cluster metrics goal)
- Remove rawNr block that manually decoded DT payload d.ac/d.ap (DT framework
already handles the newrelic header via tools.startConsumeSegment)
- Remove groupId from consumer kafkaCtx spread-copy (no longer stored)
- Revert segment.test.js verbose comment + hrDuration line (unrelated to Kafka)
- Remove cosmetic section-divider comments in kafka.test.js
The cluster metrics feature itself is unchanged:
MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce|Consume
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…s lint rule Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4044 +/- ##
==========================================
- Coverage 97.34% 97.32% -0.03%
==========================================
Files 506 515 +9
Lines 61476 61851 +375
Branches 1 1
==========================================
+ Hits 59845 60195 +350
- Misses 1631 1656 +25
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Why
Customers running self-hosted Kafka can't see producer → topic → consumer topology in New Relic - there's no relationship linking a producer APM service, a Kafka topic, and a consumer service.
The blocker is that today's instrumentation keys off broker address (
MessageBroker/Kafka/Nodes/{broker_address}/...). That's unique by luck in managed Kafka (MSK, Confluent Cloud), but self-hosted clusters routinely reuse identical broker addresses across VPCs (e.g.kafka-broker-1:9092in both prod and staging), so address can't reliably join a service to a specific cluster's topic.The fix is Kafka's native cluster UUID (KIP-78): stable, globally unique (even for self-hosted), and already present in the broker metadata response. Emitting it in the metric name gives the APM metric and the existing Kafka Topic entity (
kafka.cluster.idtag, from OTel infra / nri-kafka) a shared key, so the relationship rule can fire and form the producer → topic → consumer chain.Purely additive - no existing metrics, distributed-tracing behavior, or instrumentation is changed or removed.
Summary
Adds two new agent-level timeslice metrics per Kafka message produced or consumed:
MessageBroker/Kafka/Cluster/{clusterId}/Topic/{topic}/ProduceMessageBroker/Kafka/Cluster/{clusterId}/Topic/{topic}/ConsumeThese complement the existing
Message/Kafka/Topic/Named/{topic}/Received/*metrics by adding the cluster UUID dimension, enabling cluster-level traffic segmentation in NRDB.How it works
Cluster UUID resolution - Obtained asynchronously via a KafkaJS admin client calling
describeCluster()once per unique broker set. The UUID is never injected into Kafka wire headers. Resolves within 5 seconds; silently skipped on error (best-effort, always-on).Caching -
cluster-id-cache.jsholds a module-level map keyed by the sorted broker string. Deduplicates concurrent fetches via an in-flight Set.Consumer spread-copy - Each consumer gets its own
kafkaCtxsnapshot at construction (spread-copy) so per-consumerclientIdupdates do not clobber a shared object. Because the UUID fetch is async,record-data-metrics.jsfalls back togetClusterIdFromCache(brokers)whenkafkaCtx.clusterIdis not yet set.Metric recording - Uses
agentMetrics.getOrCreateMetric(...).incrementCallCount()so metrics aggregate at agent level, not per-transaction.Description
Please provide a brief description of the changes introduced in this pull request.
What problem does it solve? What is the context of this change?
How to Test
Please describe how you have tested these changes. Have you run the code against an example application?
What steps did you take to ensure that the changes are working correctly?
Related Issues
Please include any related issues or pull requests in this section, using the format
Closes #<issue number>orFixes #<issue number>if applicable.