diff --git a/lib/feature_flags.js b/lib/feature_flags.js index e2ea83264c..e37f9b0290 100644 --- a/lib/feature_flags.js +++ b/lib/feature_flags.js @@ -12,7 +12,7 @@ exports.prerelease = { internal_test_only: false, reverse_naming_rules: false, unresolved_promise_cleanup: true, - kafkajs_instrumentation: false, + kafkajs_instrumentation: true, undici_error_tracking: true } diff --git a/lib/subscribers/kafkajs/client-constructor.js b/lib/subscribers/kafkajs/client-constructor.js index a9ad91e82c..bcdf797e6a 100644 --- a/lib/subscribers/kafkajs/client-constructor.js +++ b/lib/subscribers/kafkajs/client-constructor.js @@ -14,6 +14,33 @@ const { kafkaCtx } = require('#agentlib/symbols.js') const recordDataMetrics = require('./utils/record-data-metrics.js') const recordLinkingMetrics = require('./utils/record-linking-metrics.js') const recordMethodMetric = require('./utils/record-method-metric.js') +const { getClusterIdFromCache, _fetchAndCacheClusterId } = require('./utils/cluster-id-cache.js') + +/** + * Records MessageBroker/Kafka/Cluster cluster-level produce metrics. + * For send() records one metric for the single topic. + * For sendBatch() records one metric per distinct topic in the batch. + * + * @param {object} metrics The agent metrics aggregator. + * @param {string} clusterId Kafka cluster UUID. + * @param {boolean} batch Whether this is a sendBatch call. + * @param {object} data The send/sendBatch arguments object. + */ +function recordClusterProduceMetrics(metrics, clusterId, batch, data) { + if (batch === false) { + metrics + .getOrCreateMetric(`MessageBroker/Kafka/Cluster/${clusterId}/Topic/${data.topic}/Produce`) + .incrementCallCount(data.messages.length) + } else { + for (const topicMessage of data.topicMessages) { + metrics + .getOrCreateMetric( + `MessageBroker/Kafka/Cluster/${clusterId}/Topic/${topicMessage.topic}/Produce` + ) + .incrementCallCount(topicMessage.messages.length) + } + } +} const CONSUMER_METHODS = [ 'commitOffsets', @@ -76,15 +103,28 @@ module.exports = class ConstructorSubscriber extends Subscriber { end(data, ctx) { const self = this const { arguments: args, self: client } = data - client[kafkaCtx] = { brokers: args[0].brokers ?? ['none'] } + const rawBrokers = args[0]?.brokers + client[kafkaCtx] = { brokers: typeof rawBrokers === 'function' ? [] : (rawBrokers ?? ['none']) } + + _fetchAndCacheClusterId(client, client[kafkaCtx].brokers).then((id) => { + if (id) client[kafkaCtx].clusterId = id + }).catch(() => {}) const origConsumer = client.consumer client.consumer = function nrConsumer(...args) { const consumer = origConsumer.apply(client, args) - consumer[kafkaCtx] = client[kafkaCtx] + // Capture clientId at consumer creation so it is available on every + // transaction without waiting for the async REQUEST event. + consumer[kafkaCtx] = { + ...client[kafkaCtx], + clientId: args[0]?.clientId ?? client[kafkaCtx]?.clientId ?? null + } consumer.on(consumer.events.REQUEST, function nrListener(data) { - consumer[kafkaCtx].clientId = data?.payload?.clientId + // REQUEST event may refine clientId from the live connection payload + if (data?.payload?.clientId) { + consumer[kafkaCtx].clientId = data.payload.clientId + } }) for (const method of CONSUMER_METHODS) { self.#wrapConsumerMethod(consumer, method) @@ -213,6 +253,10 @@ module.exports = class ConstructorSubscriber extends Subscriber { } } + if (instance[kafkaCtx].clusterId) { + recordClusterProduceMetrics(self.agent.metrics, instance[kafkaCtx].clusterId, batch, data) + } + return self.agent.tracer.runInContext({ handler: orig, context: ctx, full: true, thisArg: instance, args }) } } @@ -285,6 +329,7 @@ module.exports = class ConstructorSubscriber extends Subscriber { recordDataMetrics({ tx: ctx.transaction, kafkaCtx: instance[kafkaCtx], + agentMetrics: self.agent.metrics, data }) @@ -335,12 +380,21 @@ module.exports = class ConstructorSubscriber extends Subscriber { const eachBatch = args[0].eachBatch args[0].eachBatch = function nrWrappedEachBatch() { recordMethodMetric({ agent: self.agent, name: 'eachBatch' }) + const { batch } = arguments[0] recordLinkingMetrics({ brokers: instance[kafkaCtx].brokers, agent: self.agent, - topic: arguments[0].batch.topic, + topic: batch.topic, producer: false }) + // Emit one cluster Consume metric per message in the batch. + const clusterId = instance[kafkaCtx]?.clusterId ?? getClusterIdFromCache(instance[kafkaCtx]?.brokers) + if (clusterId) { + const m = self.agent.metrics.getOrCreateMetric( + `MessageBroker/Kafka/Cluster/${clusterId}/Topic/${batch.topic}/Consume` + ) + for (let i = 0; i < batch.messages.length; i++) m.incrementCallCount() + } return eachBatch.apply(instance, arguments) } return self.agent.tracer.runInContext({ handler: orig, context: ctx, full: true, thisArg: instance, args }) diff --git a/lib/subscribers/kafkajs/utils/cluster-id-cache.js b/lib/subscribers/kafkajs/utils/cluster-id-cache.js new file mode 100644 index 0000000000..bef3a51b7a --- /dev/null +++ b/lib/subscribers/kafkajs/utils/cluster-id-cache.js @@ -0,0 +1,56 @@ +/* + * Copyright 2026 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +const _clusterIdCache = new Map() +const _clusterIdInFlight = new Map() + +function getClusterIdFromCache(brokers) { + if (!Array.isArray(brokers)) return undefined + const key = brokers.slice().sort().join(',') + return _clusterIdCache.get(key) +} + +function _fetchAndCacheClusterId(client, brokers) { + const key = Array.isArray(brokers) ? brokers.slice().sort().join(',') : String(brokers ?? 'none') + + if (_clusterIdCache.has(key)) return Promise.resolve(_clusterIdCache.get(key)) + if (_clusterIdInFlight.has(key)) return _clusterIdInFlight.get(key) + + const TIMEOUT_MS = 5000 + let admin + let timeoutHandle + const promise = (async () => { + try { + admin = client.admin() + const timeout = new Promise((_resolve, reject) => { + timeoutHandle = setTimeout( + () => reject(new Error('NR Kafka cluster ID fetch timed out')), + TIMEOUT_MS + ) + if (typeof timeoutHandle.unref === 'function') timeoutHandle.unref() + }) + await Promise.race([admin.connect(), timeout]) + const { clusterId } = await Promise.race([admin.describeCluster(), timeout]) + admin.on?.('error', () => {}) + await admin.disconnect() + admin = null + if (clusterId) _clusterIdCache.set(key, clusterId) + return clusterId ?? null + } catch { + try { if (admin) await admin.disconnect() } catch { /* ignore */ } + return null + } finally { + clearTimeout(timeoutHandle) + _clusterIdInFlight.delete(key) + } + })() + + _clusterIdInFlight.set(key, promise) + return promise +} + +module.exports = { getClusterIdFromCache, _fetchAndCacheClusterId } diff --git a/lib/subscribers/kafkajs/utils/record-data-metrics.js b/lib/subscribers/kafkajs/utils/record-data-metrics.js index f425e4658a..e61ae624fa 100644 --- a/lib/subscribers/kafkajs/utils/record-data-metrics.js +++ b/lib/subscribers/kafkajs/utils/record-data-metrics.js @@ -6,6 +6,7 @@ 'use strict' const { DESTINATIONS } = require('#agentlib/config/attribute-filter.js') +const { getClusterIdFromCache } = require('./cluster-id-cache.js') /** * Inspects the `kafkajs` data object for information we are interested in @@ -17,8 +18,9 @@ const { DESTINATIONS } = require('#agentlib/config/attribute-filter.js') * @param {object} params.kafkaCtx The local context store we add to the * consumer client. * @param {Transaction} params.tx The current transaction. + * @param {object} params.agentMetrics Agent-level metrics aggregator (for cluster metrics). */ -module.exports = function recordDataMetrics({ data, kafkaCtx, tx }) { +module.exports = function recordDataMetrics({ data, kafkaCtx, tx, agentMetrics }) { if (!tx) { return } @@ -45,4 +47,16 @@ module.exports = function recordDataMetrics({ data, kafkaCtx, tx }) { kafkaCtx.clientId ) } + // kafkaCtx.clusterId may be absent if the consumer was created before the async + // describeCluster() resolved (consumers spread-copy the context at creation time). + // Fall back to the module-level cache using the broker list as the key. + // Use agentMetrics (global) so cluster metrics are not scoped to a single transaction. + const clusterId = kafkaCtx?.clusterId ?? getClusterIdFromCache(kafkaCtx?.brokers) + if (clusterId && agentMetrics) { + agentMetrics + .getOrCreateMetric( + `MessageBroker/Kafka/Cluster/${clusterId}/Topic/${topic}/Consume` + ) + .incrementCallCount() + } } diff --git a/test/versioned/kafkajs/kafka.test.js b/test/versioned/kafkajs/kafka.test.js index 931d2673ea..5a7d3ce495 100644 --- a/test/versioned/kafkajs/kafka.test.js +++ b/test/versioned/kafkajs/kafka.test.js @@ -443,3 +443,73 @@ test('consume batch inside of a transaction', async (t) => { await plan.completed }) + +test('send records cluster-level produce metric', async (t) => { + // Verifies MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce is recorded + // for the single-message send() path. + const plan = tspl(t, { plan: 1 }) + const { agent, producer, topic } = t.nr + + const { kafkaCtx } = require('../../../lib/symbols') + const testClusterId = 'test-cluster-produce-metric' + if (producer[kafkaCtx]) { + producer[kafkaCtx].clusterId = testClusterId + } + + const expectedMetricName = `MessageBroker/Kafka/Cluster/${testClusterId}/Topic/${topic}/Produce` + + agent.on('transactionFinished', () => {}) + helper.runInTransaction(agent, async (tx) => { + await producer.send({ + acks: 1, + topic, + messages: [{ key: 'k', value: 'v' }] + }) + tx.end() + + const metric = agent.metrics.getMetric(expectedMetricName) + plan.ok(metric && metric.callCount > 0, `Expected metric ${expectedMetricName} to be recorded`) + }) + + await plan.completed +}) + +test('consume records cluster-level consume metric', async (t) => { + // Verifies MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Consume is recorded + const plan = tspl(t, { plan: 1 }) + const { agent, consumer, producer, topic } = t.nr + + const { kafkaCtx } = require('../../../lib/symbols') + const testClusterId = 'test-cluster-consume-metric' + // Set cluster ID on the consumer's context + if (consumer[kafkaCtx]) { + consumer[kafkaCtx].clusterId = testClusterId + } + + const expectedMetricName = `MessageBroker/Kafka/Cluster/${testClusterId}/Topic/${topic}/Consume` + + const txPromise = new Promise((resolve) => { + agent.on('transactionFinished', (tx) => { + if (tx.name && tx.name.includes(topic)) { + const metric = agent.metrics.getMetric(expectedMetricName) + plan.ok(metric && metric.callCount > 0, `Expected metric ${expectedMetricName}`) + resolve() + } + }) + }) + + await consumer.subscribe({ topics: [topic], fromBeginning: true }) + const msgPromise = new Promise((resolve) => { + consumer.run({ + eachMessage: async () => { resolve() } + }) + }) + await utils.waitForConsumersToJoinGroup({ consumer }) + await producer.send({ + acks: 1, + topic, + messages: [{ key: 'k', value: 'consume-test' }] + }) + await Promise.all([msgPromise, txPromise]) + await plan.completed +})