-
-
Notifications
You must be signed in to change notification settings - Fork 468
feat(spring-jakarta): [Queue Instrumentation 4] Add Kafka consumer instrumentation #5255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
adinauer
wants to merge
6
commits into
feat/queue-instrumentation-producer
Choose a base branch
from
feat/queue-instrumentation-consumer
base: feat/queue-instrumentation-producer
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
6099047
feat(spring-jakarta): Add Kafka consumer instrumentation
adinauer 1f00027
changelog
adinauer 2b74bab
Merge branch 'feat/queue-instrumentation-producer' into feat/queue-inโฆ
adinauer be3a2ba
fix(spring-jakarta): Update consumer references and add reflection waโฆ
adinauer 6450f63
Merge branch 'feat/queue-instrumentation-producer' into feat/queue-inโฆ
adinauer f92f47c
fix(spring-jakarta): Initialize Sentry in consumer test, fix API fileโฆ
adinauer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
71 changes: 71 additions & 0 deletions
71
...ta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| package io.sentry.spring.jakarta.kafka; | ||
|
|
||
| import io.sentry.ScopesAdapter; | ||
| import io.sentry.SentryLevel; | ||
| import java.lang.reflect.Field; | ||
| import org.jetbrains.annotations.ApiStatus; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
| import org.springframework.beans.BeansException; | ||
| import org.springframework.beans.factory.config.BeanPostProcessor; | ||
| import org.springframework.core.Ordered; | ||
| import org.springframework.core.PriorityOrdered; | ||
| import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; | ||
| import org.springframework.kafka.listener.RecordInterceptor; | ||
|
|
||
| /** | ||
| * Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory} | ||
| * beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate. | ||
| */ | ||
| @ApiStatus.Internal | ||
| public final class SentryKafkaConsumerBeanPostProcessor | ||
| implements BeanPostProcessor, PriorityOrdered { | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public @NotNull Object postProcessAfterInitialization( | ||
| final @NotNull Object bean, final @NotNull String beanName) throws BeansException { | ||
| if (bean instanceof AbstractKafkaListenerContainerFactory) { | ||
| final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory = | ||
| (AbstractKafkaListenerContainerFactory<?, ?, ?>) bean; | ||
|
|
||
| final @Nullable RecordInterceptor<?, ?> existing = getExistingInterceptor(factory); | ||
| if (existing instanceof SentryKafkaRecordInterceptor) { | ||
| return bean; | ||
| } | ||
|
|
||
| @SuppressWarnings("rawtypes") | ||
| final RecordInterceptor sentryInterceptor = | ||
| new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing); | ||
| factory.setRecordInterceptor(sentryInterceptor); | ||
| } | ||
| return bean; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private @Nullable RecordInterceptor<?, ?> getExistingInterceptor( | ||
| final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory) { | ||
| try { | ||
| final @NotNull Field field = | ||
| AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); | ||
| field.setAccessible(true); | ||
| return (RecordInterceptor<?, ?>) field.get(factory); | ||
| } catch (NoSuchFieldException | IllegalAccessException e) { | ||
| ScopesAdapter.getInstance() | ||
| .getOptions() | ||
| .getLogger() | ||
| .log( | ||
| SentryLevel.WARNING, | ||
| "Unable to read existing recordInterceptor from " | ||
| + "AbstractKafkaListenerContainerFactory via reflection. " | ||
| + "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.", | ||
| e); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public int getOrder() { | ||
| return Ordered.LOWEST_PRECEDENCE; | ||
| } | ||
| } | ||
201 changes: 201 additions & 0 deletions
201
...ng-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| package io.sentry.spring.jakarta.kafka; | ||
|
|
||
| import io.sentry.BaggageHeader; | ||
| import io.sentry.IScopes; | ||
| import io.sentry.ISentryLifecycleToken; | ||
| import io.sentry.ITransaction; | ||
| import io.sentry.SentryTraceHeader; | ||
| import io.sentry.SpanDataConvention; | ||
| import io.sentry.SpanStatus; | ||
| import io.sentry.TransactionContext; | ||
| import io.sentry.TransactionOptions; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import org.apache.kafka.clients.consumer.Consumer; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.apache.kafka.common.header.Header; | ||
| import org.jetbrains.annotations.ApiStatus; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
| import org.springframework.kafka.listener.RecordInterceptor; | ||
|
|
||
| /** | ||
| * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka | ||
| * records with distributed tracing support. | ||
| */ | ||
| @ApiStatus.Internal | ||
| public final class SentryKafkaRecordInterceptor<K, V> implements RecordInterceptor<K, V> { | ||
|
|
||
| static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.consumer"; | ||
|
|
||
| private final @NotNull IScopes scopes; | ||
| private final @Nullable RecordInterceptor<K, V> delegate; | ||
|
|
||
| private static final @NotNull ThreadLocal<SentryRecordContext> currentContext = | ||
| new ThreadLocal<>(); | ||
|
|
||
| public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) { | ||
| this(scopes, null); | ||
| } | ||
|
|
||
| public SentryKafkaRecordInterceptor( | ||
| final @NotNull IScopes scopes, final @Nullable RecordInterceptor<K, V> delegate) { | ||
| this.scopes = scopes; | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public @Nullable ConsumerRecord<K, V> intercept( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) { | ||
| if (!scopes.getOptions().isEnableQueueTracing()) { | ||
| return delegateIntercept(record, consumer); | ||
| } | ||
|
|
||
| final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor"); | ||
| final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); | ||
|
|
||
| continueTrace(forkedScopes, record); | ||
|
|
||
| final @Nullable ITransaction transaction = startTransaction(forkedScopes, record); | ||
| currentContext.set(new SentryRecordContext(lifecycleToken, transaction)); | ||
|
|
||
| return delegateIntercept(record, consumer); | ||
| } | ||
|
|
||
| @Override | ||
| public void success( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) { | ||
| try { | ||
| if (delegate != null) { | ||
| delegate.success(record, consumer); | ||
| } | ||
| } finally { | ||
| finishSpan(SpanStatus.OK, null); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void failure( | ||
| final @NotNull ConsumerRecord<K, V> record, | ||
| final @NotNull Exception exception, | ||
| final @NotNull Consumer<K, V> consumer) { | ||
| try { | ||
| if (delegate != null) { | ||
| delegate.failure(record, exception, consumer); | ||
| } | ||
| } finally { | ||
| finishSpan(SpanStatus.INTERNAL_ERROR, exception); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void afterRecord( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) { | ||
| if (delegate != null) { | ||
| delegate.afterRecord(record, consumer); | ||
| } | ||
| } | ||
|
|
||
| private @Nullable ConsumerRecord<K, V> delegateIntercept( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) { | ||
| if (delegate != null) { | ||
| return delegate.intercept(record, consumer); | ||
| } | ||
| return record; | ||
| } | ||
|
|
||
| private void continueTrace( | ||
| final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) { | ||
| final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); | ||
| final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); | ||
| final @Nullable List<String> baggageHeaders = | ||
| baggage != null ? Collections.singletonList(baggage) : null; | ||
| forkedScopes.continueTrace(sentryTrace, baggageHeaders); | ||
| } | ||
|
|
||
| private @Nullable ITransaction startTransaction( | ||
| final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) { | ||
| if (!forkedScopes.getOptions().isTracingEnabled()) { | ||
| return null; | ||
| } | ||
|
|
||
| final @NotNull TransactionOptions txOptions = new TransactionOptions(); | ||
| txOptions.setOrigin(TRACE_ORIGIN); | ||
| txOptions.setBindToScope(true); | ||
|
|
||
| final @NotNull ITransaction transaction = | ||
| forkedScopes.startTransaction( | ||
| new TransactionContext("queue.process", "queue.process"), txOptions); | ||
|
|
||
| if (transaction.isNoOp()) { | ||
| return null; | ||
| } | ||
|
|
||
| transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); | ||
| transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); | ||
|
|
||
| final @Nullable String messageId = headerValue(record, "messaging.message.id"); | ||
| if (messageId != null) { | ||
| transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); | ||
| } | ||
|
|
||
| final @Nullable String enqueuedTimeStr = | ||
| headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); | ||
| if (enqueuedTimeStr != null) { | ||
| try { | ||
| final long enqueuedTime = Long.parseLong(enqueuedTimeStr); | ||
| final long latencyMs = System.currentTimeMillis() - enqueuedTime; | ||
| if (latencyMs >= 0) { | ||
| transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs); | ||
| } | ||
| } catch (NumberFormatException ignored) { | ||
| // ignore malformed header | ||
| } | ||
| } | ||
|
|
||
| return transaction; | ||
| } | ||
|
|
||
| private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { | ||
| final @Nullable SentryRecordContext ctx = currentContext.get(); | ||
| if (ctx == null) { | ||
| return; | ||
| } | ||
| currentContext.remove(); | ||
|
|
||
| try { | ||
| final @Nullable ITransaction transaction = ctx.transaction; | ||
| if (transaction != null) { | ||
| transaction.setStatus(status); | ||
| if (throwable != null) { | ||
| transaction.setThrowable(throwable); | ||
| } | ||
| transaction.finish(); | ||
| } | ||
| } finally { | ||
| ctx.lifecycleToken.close(); | ||
| } | ||
| } | ||
|
|
||
| private @Nullable String headerValue( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) { | ||
| final @Nullable Header header = record.headers().lastHeader(headerName); | ||
| if (header == null || header.value() == null) { | ||
| return null; | ||
| } | ||
| return new String(header.value(), StandardCharsets.UTF_8); | ||
| } | ||
|
|
||
| private static final class SentryRecordContext { | ||
| final @NotNull ISentryLifecycleToken lifecycleToken; | ||
| final @Nullable ITransaction transaction; | ||
|
|
||
| SentryRecordContext( | ||
| final @NotNull ISentryLifecycleToken lifecycleToken, | ||
| final @Nullable ITransaction transaction) { | ||
| this.lifecycleToken = lifecycleToken; | ||
| this.transaction = transaction; | ||
| } | ||
| } | ||
| } |
58 changes: 58 additions & 0 deletions
58
...rc/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| package io.sentry.spring.jakarta.kafka | ||
|
|
||
| import kotlin.test.Test | ||
| import kotlin.test.assertSame | ||
| import kotlin.test.assertTrue | ||
| import org.mockito.kotlin.mock | ||
| import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory | ||
| import org.springframework.kafka.core.ConsumerFactory | ||
|
|
||
| class SentryKafkaConsumerBeanPostProcessorTest { | ||
|
|
||
| @Test | ||
| fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() { | ||
| val consumerFactory = mock<ConsumerFactory<String, String>>() | ||
| val factory = ConcurrentKafkaListenerContainerFactory<String, String>() | ||
| factory.consumerFactory = consumerFactory | ||
|
|
||
| val processor = SentryKafkaConsumerBeanPostProcessor() | ||
| processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") | ||
|
|
||
| // Verify via reflection that the interceptor was set | ||
| val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") | ||
| field.isAccessible = true | ||
| val interceptor = field.get(factory) | ||
| assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>) | ||
| } | ||
|
|
||
| @Test | ||
| fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() { | ||
| val consumerFactory = mock<ConsumerFactory<String, String>>() | ||
| val factory = ConcurrentKafkaListenerContainerFactory<String, String>() | ||
| factory.consumerFactory = consumerFactory | ||
|
|
||
| val processor = SentryKafkaConsumerBeanPostProcessor() | ||
| // First wrap | ||
| processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") | ||
|
|
||
| val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") | ||
| field.isAccessible = true | ||
| val firstInterceptor = field.get(factory) | ||
|
|
||
| // Second wrap โ should be idempotent | ||
| processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") | ||
| val secondInterceptor = field.get(factory) | ||
|
|
||
| assertSame(firstInterceptor, secondInterceptor) | ||
| } | ||
|
|
||
| @Test | ||
| fun `does not wrap non-factory beans`() { | ||
| val someBean = "not a factory" | ||
| val processor = SentryKafkaConsumerBeanPostProcessor() | ||
|
|
||
| val result = processor.postProcessAfterInitialization(someBean, "someBean") | ||
|
|
||
| assertSame(someBean, result) | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason, we are going the composite route in the producer but adding the existing interceptor as a delegate here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a
CompositeRecordInterceptor