Skip to content

Commit 2aece76

Browse files
authored
Merge e86169f into 6d91bdc
2 parents 6d91bdc + e86169f commit 2aece76

File tree

2 files changed

+61
-2
lines changed

2 files changed

+61
-2
lines changed

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.sentry.TransactionContext;
1212
import io.sentry.TransactionOptions;
1313
import io.sentry.util.SpanUtils;
14+
import java.nio.ByteBuffer;
1415
import java.nio.charset.StandardCharsets;
1516
import java.util.Collections;
1617
import java.util.List;
@@ -21,6 +22,7 @@
2122
import org.jetbrains.annotations.NotNull;
2223
import org.jetbrains.annotations.Nullable;
2324
import org.springframework.kafka.listener.RecordInterceptor;
25+
import org.springframework.kafka.support.KafkaHeaders;
2426

2527
/**
2628
* A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka
@@ -161,6 +163,11 @@ private boolean isIgnored() {
161163
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
162164
}
163165

166+
final @Nullable Integer retryCount = retryCount(record);
167+
if (retryCount != null) {
168+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount);
169+
}
170+
164171
final @Nullable String enqueuedTimeStr =
165172
headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
166173
if (enqueuedTimeStr != null) {
@@ -178,6 +185,25 @@ private boolean isIgnored() {
178185
return transaction;
179186
}
180187

188+
private @Nullable Integer retryCount(final @NotNull ConsumerRecord<K, V> record) {
189+
final @Nullable Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
190+
if (header == null) {
191+
return null;
192+
}
193+
194+
final byte[] value = header.value();
195+
if (value == null || value.length != Integer.BYTES) {
196+
return null;
197+
}
198+
199+
final int attempt = ByteBuffer.wrap(value).getInt();
200+
if (attempt <= 0) {
201+
return null;
202+
}
203+
204+
return attempt - 1;
205+
}
206+
181207
private void finishStaleContext() {
182208
if (currentContext.get() != null) {
183209
finishSpan(SpanStatus.UNKNOWN, null);

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ import io.sentry.Sentry
77
import io.sentry.SentryOptions
88
import io.sentry.SentryTraceHeader
99
import io.sentry.SentryTracer
10+
import io.sentry.SpanDataConvention
1011
import io.sentry.TransactionContext
1112
import io.sentry.test.initForTest
13+
import java.nio.ByteBuffer
1214
import java.nio.charset.StandardCharsets
1315
import kotlin.test.AfterTest
1416
import kotlin.test.BeforeTest
1517
import kotlin.test.Test
1618
import kotlin.test.assertEquals
19+
import kotlin.test.assertNull
1720
import org.apache.kafka.clients.consumer.Consumer
1821
import org.apache.kafka.clients.consumer.ConsumerRecord
1922
import org.apache.kafka.common.header.internals.RecordHeaders
@@ -24,6 +27,7 @@ import org.mockito.kotlin.never
2427
import org.mockito.kotlin.verify
2528
import org.mockito.kotlin.whenever
2629
import org.springframework.kafka.listener.RecordInterceptor
30+
import org.springframework.kafka.support.KafkaHeaders
2731

2832
class SentryKafkaRecordInterceptorTest {
2933

@@ -32,6 +36,7 @@ class SentryKafkaRecordInterceptorTest {
3236
private lateinit var options: SentryOptions
3337
private lateinit var consumer: Consumer<String, String>
3438
private lateinit var lifecycleToken: ISentryLifecycleToken
39+
private lateinit var transaction: SentryTracer
3540

3641
@BeforeTest
3742
fun setup() {
@@ -52,8 +57,9 @@ class SentryKafkaRecordInterceptorTest {
5257
whenever(forkedScopes.options).thenReturn(options)
5358
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
5459

55-
val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
56-
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)
60+
transaction = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
61+
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any()))
62+
.thenReturn(transaction)
5763
}
5864

5965
@AfterTest
@@ -81,6 +87,7 @@ class SentryKafkaRecordInterceptorTest {
8187
sentryTrace: String? = null,
8288
baggage: String? = null,
8389
enqueuedTime: Long? = null,
90+
deliveryAttempt: Int? = null,
8491
): ConsumerRecord<String, String> {
8592
val headers = RecordHeaders()
8693
sentryTrace?.let {
@@ -95,6 +102,12 @@ class SentryKafkaRecordInterceptorTest {
95102
it.toString().toByteArray(StandardCharsets.UTF_8),
96103
)
97104
}
105+
deliveryAttempt?.let {
106+
headers.add(
107+
KafkaHeaders.DELIVERY_ATTEMPT,
108+
ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array(),
109+
)
110+
}
98111
val record = ConsumerRecord<String, String>("my-topic", 0, 0L, "key", "value")
99112
headers.forEach { record.headers().add(it) }
100113
return record
@@ -132,6 +145,26 @@ class SentryKafkaRecordInterceptorTest {
132145
verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull())
133146
}
134147

148+
@Test
149+
fun `sets retry count from delivery attempt header`() {
150+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
151+
val record = createRecordWithHeaders(deliveryAttempt = 3)
152+
153+
withMockSentry { interceptor.intercept(record, consumer) }
154+
155+
assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
156+
}
157+
158+
@Test
159+
fun `does not set retry count when delivery attempt header is missing`() {
160+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
161+
val record = createRecord()
162+
163+
withMockSentry { interceptor.intercept(record, consumer) }
164+
165+
assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
166+
}
167+
135168
@Test
136169
fun `does not create span when queue tracing is disabled`() {
137170
options.isEnableQueueTracing = false

0 commit comments

Comments
 (0)