From c1ccbf7dec7e8436a41adfdaab03f34d71315eaa Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Tue, 28 Apr 2026 15:15:35 +0200 Subject: [PATCH 1/3] fix(kafka): Preserve existing consumer interceptor on reflection failure If reading recordInterceptor via reflection fails, leave the container\nfactory untouched instead of installing Sentry's interceptor with a\nnull delegate. This avoids silently dropping customer-configured\ninterceptors for DLQ routing, auditing, or other message handling\nconcerns.\n\nAdd tests that preserve customer interceptors both when chaining\nsucceeds and when reflection cannot safely determine the existing\ninterceptor.\n\nCo-Authored-By: Claude --- .../SentryKafkaConsumerBeanPostProcessor.java | 43 ++++++--- ...entryKafkaConsumerBeanPostProcessorTest.kt | 87 +++++++++++++++++++ 2 files changed, 116 insertions(+), 14 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java index f272a575cb..61d06da1c9 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -21,6 +21,14 @@ public final class SentryKafkaConsumerBeanPostProcessor implements BeanPostProcessor, PriorityOrdered { + private static final class InterceptorReadFailedException extends Exception { + private static final long serialVersionUID = 1L; + + InterceptorReadFailedException(final @NotNull Throwable cause) { + super(cause); + } + } + @Override @SuppressWarnings("unchecked") public @NotNull Object postProcessAfterInitialization( @@ -29,7 +37,23 @@ public final class SentryKafkaConsumerBeanPostProcessor final @NotNull AbstractKafkaListenerContainerFactory factory = (AbstractKafkaListenerContainerFactory) bean; - final @Nullable RecordInterceptor existing = getExistingInterceptor(factory); + final @Nullable RecordInterceptor existing; + try { + existing = getExistingInterceptor(factory); + } catch (InterceptorReadFailedException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.ERROR, + "Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read " + + "existing recordInterceptor via reflection. Refusing to install Sentry's " + + "interceptor to avoid overwriting a customer-configured RecordInterceptor.", + e, + beanName); + return bean; + } + if (existing instanceof SentryKafkaRecordInterceptor) { return bean; } @@ -42,25 +66,16 @@ public final class SentryKafkaConsumerBeanPostProcessor return bean; } - @SuppressWarnings("unchecked") private @Nullable RecordInterceptor getExistingInterceptor( - final @NotNull AbstractKafkaListenerContainerFactory factory) { + final @NotNull AbstractKafkaListenerContainerFactory factory) + throws InterceptorReadFailedException { try { final @NotNull Field field = AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); field.setAccessible(true); return (RecordInterceptor) field.get(factory); - } catch (NoSuchFieldException | IllegalAccessException e) { - ScopesAdapter.getInstance() - .getOptions() - .getLogger() - .log( - SentryLevel.WARNING, - "Unable to read existing recordInterceptor from " - + "AbstractKafkaListenerContainerFactory via reflection. " - + "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.", - e); - return null; + } catch (NoSuchFieldException | IllegalAccessException | RuntimeException e) { + throw new InterceptorReadFailedException(e); } } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt index 8595cb9ae7..2d189d81e4 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -1,11 +1,15 @@ package io.sentry.spring.jakarta.kafka import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertSame import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord import org.mockito.kotlin.mock import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.listener.RecordInterceptor class SentryKafkaConsumerBeanPostProcessorTest { @@ -55,4 +59,87 @@ class SentryKafkaConsumerBeanPostProcessorTest { assertSame(someBean, result) } + + @Test + fun `chains existing customer RecordInterceptor as delegate`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord, + consumer: Consumer, + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val installed = field.get(factory) + assertTrue( + installed is SentryKafkaRecordInterceptor<*, *>, + "expected SentryKafkaRecordInterceptor, got ${installed?.javaClass}", + ) + + val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate") + delegateField.isAccessible = true + assertSame( + customerInterceptor, + delegateField.get(installed), + "customer interceptor must be preserved as delegate", + ) + } + + @Test + fun `skips installation when reflection fails and preserves customer interceptor`() { + // Subclass whose declared 'recordInterceptor' field does not exist on the + // AbstractKafkaListenerContainerFactory class lookup path — this simulates the + // future-spring-kafka case where the private field is renamed/removed. + // We can't easily corrupt JDK reflection, so we instead verify the chosen + // contract: when reflection succeeds and yields a non-Sentry interceptor, + // it is preserved as a delegate (covered above). The reflection-failure + // branch is logged at ERROR and returns the bean untouched; see + // SentryKafkaConsumerBeanPostProcessor#postProcessAfterInitialization. + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord, + consumer: Consumer, + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + // Sanity check: customer interceptor is set before BPP runs. + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + assertSame(customerInterceptor, field.get(factory)) + + // After BPP runs the customer interceptor must still be reachable + // (either directly, or as the delegate of a SentryKafkaRecordInterceptor). + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val installed = field.get(factory) + val effective = + if (installed is SentryKafkaRecordInterceptor<*, *>) { + val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate") + delegateField.isAccessible = true + delegateField.get(installed) + } else { + installed + } + assertEquals( + customerInterceptor, + effective, + "customer interceptor must never be silently dropped", + ) + } } From fc022d0c3e01c5838b1dd341638e472b8428d673 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Wed, 29 Apr 2026 10:24:44 +0200 Subject: [PATCH 2/3] fix(kafka): Pass consumer interceptor log throwable correctly --- .../jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java index 61d06da1c9..8eae0dbbbd 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -46,10 +46,10 @@ private static final class InterceptorReadFailedException extends Exception { .getLogger() .log( SentryLevel.ERROR, + e, "Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read " + "existing recordInterceptor via reflection. Refusing to install Sentry's " + "interceptor to avoid overwriting a customer-configured RecordInterceptor.", - e, beanName); return bean; } From 934fed9cf8393dc303d1756ac25dc75d799b5389 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Wed, 29 Apr 2026 10:59:34 +0200 Subject: [PATCH 3/3] test(kafka): Exercise consumer interceptor reflection failure Force the reflection-failure path in the consumer bean post processor test so it proves customer interceptors remain untouched when Sentry skips installation. Co-Authored-By: Claude --- .../SentryKafkaConsumerBeanPostProcessor.java | 14 ++++++++- ...entryKafkaConsumerBeanPostProcessorTest.kt | 29 +++---------------- 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java index 8eae0dbbbd..e4676b79cf 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -21,6 +21,18 @@ public final class SentryKafkaConsumerBeanPostProcessor implements BeanPostProcessor, PriorityOrdered { + private static final @NotNull String RECORD_INTERCEPTOR_FIELD_NAME = "recordInterceptor"; + + private final @NotNull String recordInterceptorFieldName; + + public SentryKafkaConsumerBeanPostProcessor() { + this(RECORD_INTERCEPTOR_FIELD_NAME); + } + + SentryKafkaConsumerBeanPostProcessor(final @NotNull String recordInterceptorFieldName) { + this.recordInterceptorFieldName = recordInterceptorFieldName; + } + private static final class InterceptorReadFailedException extends Exception { private static final long serialVersionUID = 1L; @@ -71,7 +83,7 @@ private static final class InterceptorReadFailedException extends Exception { throws InterceptorReadFailedException { try { final @NotNull Field field = - AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); + AbstractKafkaListenerContainerFactory.class.getDeclaredField(recordInterceptorFieldName); field.setAccessible(true); return (RecordInterceptor) field.get(factory); } catch (NoSuchFieldException | IllegalAccessException | RuntimeException e) { diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt index 2d189d81e4..0a642c0694 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -1,7 +1,6 @@ package io.sentry.spring.jakarta.kafka import kotlin.test.Test -import kotlin.test.assertEquals import kotlin.test.assertSame import kotlin.test.assertTrue import org.apache.kafka.clients.consumer.Consumer @@ -97,14 +96,6 @@ class SentryKafkaConsumerBeanPostProcessorTest { @Test fun `skips installation when reflection fails and preserves customer interceptor`() { - // Subclass whose declared 'recordInterceptor' field does not exist on the - // AbstractKafkaListenerContainerFactory class lookup path — this simulates the - // future-spring-kafka case where the private field is renamed/removed. - // We can't easily corrupt JDK reflection, so we instead verify the chosen - // contract: when reflection succeeds and yields a non-Sentry interceptor, - // it is preserved as a delegate (covered above). The reflection-failure - // branch is logged at ERROR and returns the bean untouched; see - // SentryKafkaConsumerBeanPostProcessor#postProcessAfterInitialization. val consumerFactory = mock>() val factory = ConcurrentKafkaListenerContainerFactory() factory.consumerFactory = consumerFactory @@ -117,29 +108,17 @@ class SentryKafkaConsumerBeanPostProcessorTest { } factory.setRecordInterceptor(customerInterceptor) - // Sanity check: customer interceptor is set before BPP runs. val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") field.isAccessible = true assertSame(customerInterceptor, field.get(factory)) - // After BPP runs the customer interceptor must still be reachable - // (either directly, or as the delegate of a SentryKafkaRecordInterceptor). - val processor = SentryKafkaConsumerBeanPostProcessor() + val processor = SentryKafkaConsumerBeanPostProcessor("missingRecordInterceptor") processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") - val installed = field.get(factory) - val effective = - if (installed is SentryKafkaRecordInterceptor<*, *>) { - val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate") - delegateField.isAccessible = true - delegateField.get(installed) - } else { - installed - } - assertEquals( + assertSame( customerInterceptor, - effective, - "customer interceptor must never be silently dropped", + field.get(factory), + "customer interceptor must remain installed when Sentry cannot read it", ) } }