Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
51f04bf
feat(kafka): inject producerServiceName header and read on consumer side
shashank-reddy-nr May 15, 2026
8929d42
fix(kafka): fall back to NEW_RELIC_APP_NAME env var if config returns…
shashank-reddy-nr May 15, 2026
7c23c5e
feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics
shashank-reddy-nr Jun 1, 2026
ce8bb6f
fix(kafka): fix CI test failures
shashank-reddy-nr Jun 1, 2026
051f987
fix(kafka): fix bugs found in final review
shashank-reddy-nr Jun 1, 2026
8e31e5a
fix(kafka): remove kafka.cluster.id custom span attribute
shashank-reddy-nr Jun 4, 2026
b8101aa
fix(kafka): scope cleanup — metrics only, fix DT regression, remove s…
shashank-reddy-nr Jun 4, 2026
1e721b9
fix(kafka): normalize broker cache key to sorted order
shashank-reddy-nr Jun 4, 2026
4e48d09
fix(test): make segment truncation timer test deterministic
shashank-reddy-nr Jun 4, 2026
2387cf8
fix(kafka): record cluster metric for every topic in sendBatch
shashank-reddy-nr Jun 4, 2026
b081071
fix(kafka): fix JSDoc and reduce cognitive complexity in #wrapProduce…
shashank-reddy-nr Jun 4, 2026
88116eb
fix(kafka): use argumentless catch to satisfy sonarjs/no-ignored-exce…
shashank-reddy-nr Jun 4, 2026
cdb2795
fix(kafka): handle function-valued brokers, fix cache key, guard clus…
shashank-reddy-nr Jun 17, 2026
ad6a8ec
feat(kafka): always emit cluster metrics without opt-in flag
shashank-reddy-nr Jun 17, 2026
4e030ed
fix(kafka): scope cleanup — remove groupId/rawNr scope creep, revert …
shashank-reddy-nr Jun 18, 2026
8b53a69
fix(kafka): add missing cluster-id-cache.js file
shashank-reddy-nr Jun 18, 2026
4a0ef43
fix(kafka): remove verbose comments, revert injectHeaders refactor
shashank-reddy-nr Jun 18, 2026
fedfdff
fix(kafka): rename promise param _ to _resolve for promise/param-name…
shashank-reddy-nr Jun 18, 2026
2f977a5
feat(kafka): enable kafkajs_instrumentation feature flag by default
shashank-reddy-nr Jun 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 141 additions & 8 deletions lib/subscribers/kafkajs/client-constructor.js

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the changes in this file not part of the exported class? Are the two caches really necessary? What is the expected size of these caches in a typical, or extreme, deployment scenario?

@shashank-reddy-nr shashank-reddy-nr Jun 18, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jsumners-nr, I'm new to this repo and leaned heavily on AI assistance to implement this, so I apologize upfront for anything that doesn't follow conventions here.

The two caches serve different lifecycles:

  • cluster-id-cache.js is a module-level map (brokers string → cluster UUID). It's intentionally process-global because the cluster UUID is the same for every producer/consumer connecting to the same Kafka cluster - it makes no sense to refetch it per client instance.
  • _clusterIdByInstance doesn't exist here - the only per-instance storage is via the existing kafkaCtx symbol already set on each client/consumer.

In a typical deployment you'd have 1–3 distinct Kafka clusters, so the cache holds 1–3 entries. In an extreme multi-tenant scenario (many distinct broker strings) it could grow unbounded - that's a fair concern. Happy to add a size cap or TTL if you'd prefer.

Open to moving the fetch logic inside the class if that's the convention - I'll follow your guidance. Can you please guide me?

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,116 @@ const recordDataMetrics = require('./utils/record-data-metrics.js')
const recordLinkingMetrics = require('./utils/record-linking-metrics.js')
const recordMethodMetric = require('./utils/record-method-metric.js')

// Cache: brokers-key → resolved clusterId string.
// Kafka cluster IDs are stable for the lifetime of a cluster, so this cache persists
// for the process lifetime.
const _clusterIdCache = new Map()

/**
* Look up the cluster ID for a set of brokers from the module-level cache.
* Used by record-data-metrics.js as a fallback when kafkaCtx.clusterId is
* absent because the consumer spread-copied the context before the async
* describeCluster() resolved.
*
* @param {string[]} brokers List of broker addresses used as the cache key.
* @returns {string|undefined} Cached cluster UUID, or undefined if not yet resolved.
*/
function getClusterIdFromCache(brokers) {
if (!brokers || !brokers.length) return undefined
const key = brokers.slice().sort().join(',')
return _clusterIdCache.get(key)
}

// In-flight cache: brokers-key → Promise<string|null>.
// Coalesces concurrent describeCluster() calls so that N Kafka clients created with
// the same brokers share a single admin connection rather than each opening their own.
const _clusterIdInFlight = new Map()

/**
* Fetches the Kafka cluster UUID via the kafkajs admin API and caches it.
*
* Multiple callers with the same broker set share a single in-flight Promise — the
* second and subsequent callers await the first caller's fetch instead of opening
* duplicate admin connections. Once resolved the result is cached permanently.
*
* Note: fire-and-forget from the caller's perspective. Sends that occur before the
* promise resolves will miss the cluster-level metric — accepted trade-off since
* blocking the producer path is not an option.
*/
function _fetchAndCacheClusterId(client, brokers) {
const key = Array.isArray(brokers) ? brokers.slice().sort().join(',') : String(brokers ?? 'none')

// Return already-resolved value immediately.
if (_clusterIdCache.has(key)) return Promise.resolve(_clusterIdCache.get(key))

// Coalesce: return the existing in-flight promise if one exists.
if (_clusterIdInFlight.has(key)) return _clusterIdInFlight.get(key)

let admin
const promise = (async () => {
try {
admin = client.admin()
await admin.connect()
const { clusterId } = await admin.describeCluster()
await admin.disconnect()
admin = null
if (clusterId) _clusterIdCache.set(key, clusterId)
return clusterId ?? null
} catch {
// Cluster ID fetch failure is non-fatal — metrics simply won't include the
// cluster ID for this session. Best-effort cleanup of the admin connection.
try { await admin?.disconnect() } catch { /* ignore disconnect error */ }
return null
} finally {
_clusterIdInFlight.delete(key)
}
})()

_clusterIdInFlight.set(key, promise)
return promise
}

/**
* Injects DT headers into a single message object.
* Extracted to reduce cognitive complexity of #wrapProducerMethod.
*
* @param {object} params Named parameters.
* @param {object} params.self The ConstructorSubscriber instance.
* @param {object} params.ctx The current tracer context.
* @param {object} params.msg The kafkajs message object (mutated in place).
*/
function injectHeaders({ self, ctx, msg }) {
const headers = msg.headers ?? {}
self.insertDTHeaders({ ctx, headers, useMqNames: true })
msg.headers = headers
}

/**
* Records MessageBroker/Kafka/Cluster cluster-level produce metrics.
* For send() records one metric for the single topic.
* For sendBatch() records one metric per distinct topic in the batch.
*
* @param {object} metrics The agent metrics aggregator.
* @param {string} clusterId Kafka cluster UUID.
* @param {boolean} batch Whether this is a sendBatch call.
* @param {object} data The send/sendBatch arguments object.
*/
function recordClusterProduceMetrics(metrics, clusterId, batch, data) {
if (batch === false) {
metrics
.getOrCreateMetric(`MessageBroker/Kafka/Cluster/${clusterId}/Topic/${data.topic}/Produce`)
.incrementCallCount()
} else {
for (const topicMessage of data.topicMessages) {
metrics
.getOrCreateMetric(
`MessageBroker/Kafka/Cluster/${clusterId}/Topic/${topicMessage.topic}/Produce`
)
.incrementCallCount()
}
}
}

const CONSUMER_METHODS = [
'commitOffsets',
'connect',
Expand Down Expand Up @@ -78,13 +188,34 @@ module.exports = class ConstructorSubscriber extends Subscriber {
const { arguments: args, self: client } = data
client[kafkaCtx] = { brokers: args[0].brokers ?? ['none'] }

_fetchAndCacheClusterId(client, client[kafkaCtx].brokers).then((id) => {
if (id) {
// Update the shared client context. Producers share this object by reference so they
// see the update immediately. Consumers spread-copy the context at creation time, so

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this "spread-copy" happening? Consumers should not have access to our internal symbol and context object.

// we also store the id in the module-level cache; record-data-metrics reads from the
// header as a fallback when kafkaCtx.clusterId is absent on the consumer side.
client[kafkaCtx].clusterId = id
}
}).catch(() => {
// Cluster ID fetch failure is non-fatal — metrics simply omit the cluster ID.
})

const origConsumer = client.consumer
client.consumer = function nrConsumer(...args) {
const consumer = origConsumer.apply(client, args)
consumer[kafkaCtx] = client[kafkaCtx]
// Capture groupId and clientId at consumer creation so they are available
// on every transaction without waiting for the async REQUEST event.
consumer[kafkaCtx] = {
...client[kafkaCtx],
groupId: args[0]?.groupId ?? null,
clientId: args[0]?.clientId ?? client[kafkaCtx]?.clientId ?? null
}

consumer.on(consumer.events.REQUEST, function nrListener(data) {
consumer[kafkaCtx].clientId = data?.payload?.clientId
// REQUEST event may refine clientId from the live connection payload
if (data?.payload?.clientId) {
consumer[kafkaCtx].clientId = data.payload.clientId
}
})
for (const method of CONSUMER_METHODS) {
self.#wrapConsumerMethod(consumer, method)
Expand Down Expand Up @@ -199,20 +330,20 @@ module.exports = class ConstructorSubscriber extends Subscriber {

if (batch === false) {
for (const msg of data.messages) {
const headers = msg.headers ?? {}
self.insertDTHeaders({ ctx, headers, useMqNames: true })
msg.headers = headers
injectHeaders({ self, ctx, msg })
}
} else {
for (const topicMessage of data.topicMessages) {
for (const msg of topicMessage.messages) {
const headers = msg.headers ?? {}
self.insertDTHeaders({ ctx, headers, useMqNames: true })
msg.headers = headers
injectHeaders({ self, ctx, msg })
}
}
}

if (instance[kafkaCtx].clusterId) {
recordClusterProduceMetrics(self.agent.metrics, instance[kafkaCtx].clusterId, batch, data)
}

return self.agent.tracer.runInContext({ handler: orig, context: ctx, full: true, thisArg: instance, args })
}
}
Expand Down Expand Up @@ -347,3 +478,5 @@ module.exports = class ConstructorSubscriber extends Subscriber {
}
}
}

module.exports.getClusterIdFromCache = getClusterIdFromCache
54 changes: 54 additions & 0 deletions lib/subscribers/kafkajs/utils/record-data-metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
'use strict'

const { DESTINATIONS } = require('#agentlib/config/attribute-filter.js')
// Fallback lookup: consumers spread-copy kafkaCtx at creation time, so
// kafkaCtx.clusterId may be absent if describeCluster() hadn't resolved yet.
// getClusterIdFromCache lets us find the ID by broker key after it resolves.
const { getClusterIdFromCache } = require('../client-constructor.js')

/**
* Inspects the `kafkajs` data object for information we are interested in
Expand Down Expand Up @@ -45,4 +49,54 @@ module.exports = function recordDataMetrics({ data, kafkaCtx, tx }) {
kafkaCtx.clientId
)
}
if (kafkaCtx?.groupId) {
tx.trace.attributes.addAttribute(
DESTINATIONS.TRANS_EVENT,
'kafka.consume.group_id',
kafkaCtx.groupId
)
}
const rawNr = data?.message?.headers?.newrelic
if (rawNr) {
try {
const encoded = Buffer.isBuffer(rawNr) ? rawNr.toString() : String(rawNr)
const payload = JSON.parse(Buffer.from(encoded, 'base64').toString())
const accountId = payload?.d?.ac
const appId = payload?.d?.ap
Comment thread
shashank-reddy-nr marked this conversation as resolved.
Outdated
if (accountId) {
tx.trace.attributes.addAttribute(
DESTINATIONS.TRANS_EVENT,
'kafka.producer.accountId',
accountId
)
}
if (appId) {
tx.trace.attributes.addAttribute(
DESTINATIONS.TRANS_EVENT,
'kafka.producer.appId',
appId
)
tx.trace.attributes.addAttribute(
DESTINATIONS.TRANS_EVENT,
'messaging.destination.name',
topic
)
}
} catch {
// malformed header — skip
}
}

// kafkaCtx.clusterId may be absent if the consumer was created before the async
// describeCluster() resolved (consumers spread-copy the context at creation time).
// Fall back to the module-level cache using the broker list as the key.
const clusterId = kafkaCtx?.clusterId ?? getClusterIdFromCache(kafkaCtx?.brokers)
if (clusterId) {
// One entry per cluster+topic per harvest period, regardless of broker count.
metrics
.getOrCreateMetric(
`MessageBroker/Kafka/Cluster/${clusterId}/Topic/${topic}/Consume`
)
.incrementCallCount()
}
}
6 changes: 5 additions & 1 deletion test/unit/transaction/trace/segment.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,12 @@ test('TraceSegment', async (t) => {
sinon.stub(segment.timer, 'softEnd').returns(true)
sinon.stub(segment.timer, 'endsAfter').returns(true)

// Make root duration calculation predictable
// Make root duration calculation predictable.
// Set hrDuration to 0 so getDurationInMillis() returns 0 — this ensures
// _computeTotalTime's condition (computedDuration > currentDuration) holds and
// the root gets updated to exactly 4ms (1001+3-1000).
root.timer.start = 1000
root.timer.hrDuration = [0, 0]
Comment thread
shashank-reddy-nr marked this conversation as resolved.
Outdated
segment.timer.start = 1001
segment.overwriteDurationInMillis(3)

Expand Down
74 changes: 74 additions & 0 deletions test/versioned/kafkajs/kafka.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,77 @@ test('consume batch inside of a transaction', async (t) => {

await plan.completed
})

// ---------------------------------------------------------------------------
// Cluster-ID metric and header tests
// ---------------------------------------------------------------------------
Comment thread
shashank-reddy-nr marked this conversation as resolved.
Outdated

test('send records cluster-level produce metric', async (t) => {
// Verifies MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce is recorded
// for the single-message send() path.
const plan = tspl(t, { plan: 1 })
const { agent, producer, topic } = t.nr

const { kafkaCtx } = require('../../../lib/symbols')
const testClusterId = 'test-cluster-produce-metric'
if (producer[kafkaCtx]) {
producer[kafkaCtx].clusterId = testClusterId
}

const expectedMetricName = `MessageBroker/Kafka/Cluster/${testClusterId}/Topic/${topic}/Produce`

agent.on('transactionFinished', () => {})
helper.runInTransaction(agent, async (tx) => {
await producer.send({
acks: 1,
topic,
messages: [{ key: 'k', value: 'v' }]
})
tx.end()

const metric = agent.metrics.getMetric(expectedMetricName)
plan.ok(metric && metric.callCount > 0, `Expected metric ${expectedMetricName} to be recorded`)
})

await plan.completed
})

test('consume records cluster-level consume metric', async (t) => {
// Verifies MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Consume is recorded
const plan = tspl(t, { plan: 1 })
const { agent, consumer, producer, topic } = t.nr

const { kafkaCtx } = require('../../../lib/symbols')
const testClusterId = 'test-cluster-consume-metric'
// Set cluster ID on the consumer's context
if (consumer[kafkaCtx]) {
consumer[kafkaCtx].clusterId = testClusterId
}

const expectedMetricName = `MessageBroker/Kafka/Cluster/${testClusterId}/Topic/${topic}/Consume`

const txPromise = new Promise((resolve) => {
agent.on('transactionFinished', (tx) => {
if (tx.name && tx.name.includes(topic)) {
const metric = agent.metrics.getMetric(expectedMetricName)
plan.ok(metric && metric.callCount > 0, `Expected metric ${expectedMetricName}`)
resolve()
}
})
})

await consumer.subscribe({ topics: [topic], fromBeginning: true })
const msgPromise = new Promise((resolve) => {
consumer.run({
eachMessage: async () => { resolve() }
})
})
await utils.waitForConsumersToJoinGroup({ consumer })
await producer.send({
acks: 1,
topic,
messages: [{ key: 'k', value: 'consume-test' }]
})
await Promise.all([msgPromise, txPromise])
await plan.completed
})
Loading