Skip to content

Commit 76b8c09

Browse files
authored
Merge 2501e57 into 24348b2
2 parents 24348b2 + 2501e57 commit 76b8c09

3 files changed

Lines changed: 74 additions & 0 deletions

File tree

sentry-spring-jakarta/api/sentry-spring-jakarta.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor :
260260
public fun <init> (Lio/sentry/IScopes;)V
261261
public fun <init> (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V
262262
public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
263+
public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
263264
public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V
264265
public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;
265266
public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public SentryKafkaRecordInterceptor(
5252
return delegateIntercept(record, consumer);
5353
}
5454

55+
finishStaleContext();
56+
5557
final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor");
5658
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();
5759

@@ -98,6 +100,11 @@ public void afterRecord(
98100
}
99101
}
100102

103+
@Override
104+
public void clearThreadState(final @NotNull Consumer<?, ?> consumer) {
105+
finishStaleContext();
106+
}
107+
101108
private @Nullable ConsumerRecord<K, V> delegateIntercept(
102109
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
103110
if (delegate != null) {
@@ -165,6 +172,12 @@ public void afterRecord(
165172
return transaction;
166173
}
167174

175+
private void finishStaleContext() {
176+
if (currentContext.get() != null) {
177+
finishSpan(SpanStatus.UNKNOWN, null);
178+
}
179+
}
180+
168181
private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) {
169182
final @Nullable SentryRecordContext ctx = currentContext.get();
170183
if (ctx == null) {

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,4 +208,64 @@ class SentryKafkaRecordInterceptorTest {
208208
SentryKafkaRecordInterceptor.TRACE_ORIGIN,
209209
)
210210
}
211+
212+
@Test
213+
fun `clearThreadState cleans up stale context`() {
214+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
215+
val record = createRecord()
216+
217+
// intercept sets up context in ThreadLocal
218+
interceptor.intercept(record, consumer)
219+
220+
// clearThreadState should clean up without success/failure being called
221+
interceptor.clearThreadState(consumer)
222+
223+
// lifecycle token should have been closed
224+
verify(lifecycleToken).close()
225+
}
226+
227+
@Test
228+
fun `clearThreadState is no-op when no context exists`() {
229+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
230+
231+
// should not throw
232+
interceptor.clearThreadState(consumer)
233+
}
234+
235+
@Test
236+
fun `intercept cleans up stale context from previous record`() {
237+
val lifecycleToken2 = mock<ISentryLifecycleToken>()
238+
val forkedScopes2 = mock<IScopes>()
239+
whenever(forkedScopes2.options).thenReturn(options)
240+
whenever(forkedScopes2.makeCurrent()).thenReturn(lifecycleToken2)
241+
val tx2 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes2)
242+
whenever(forkedScopes2.startTransaction(any<TransactionContext>(), any())).thenReturn(tx2)
243+
244+
var callCount = 0
245+
whenever(scopes.forkedScopes(any())).thenAnswer {
246+
callCount++
247+
if (callCount == 1) {
248+
val forkedScopes1 = mock<IScopes>()
249+
whenever(forkedScopes1.options).thenReturn(options)
250+
whenever(forkedScopes1.makeCurrent()).thenReturn(lifecycleToken)
251+
val tx1 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes1)
252+
whenever(forkedScopes1.startTransaction(any<TransactionContext>(), any())).thenReturn(tx1)
253+
forkedScopes1
254+
} else {
255+
forkedScopes2
256+
}
257+
}
258+
259+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
260+
val record = createRecord()
261+
262+
// First intercept sets up context
263+
interceptor.intercept(record, consumer)
264+
265+
// Second intercept without success/failure — should clean up stale context first
266+
interceptor.intercept(record, consumer)
267+
268+
// First lifecycle token should have been closed by the defensive cleanup
269+
verify(lifecycleToken).close()
270+
}
211271
}

0 commit comments

Comments
 (0)