Skip to content

Commit 2faa748

Browse files
authored
census: Fix retry stats data race (grpc#8459)
There is data race in `CensusStatsModule. CallAttemptsTracerFactory`: If client call is cancelled while an active stream on the transport is not committed, then a [noop substream](https://github.com/grpc/grpc-java/blob/v1.40.0/core/src/main/java/io/grpc/internal/RetriableStream.java#L486) will be committed and the active stream will be cancelled. Because the active stream cancellation triggers the stream listener closed() on the _transport_ thread, the closed() method can be invoked concurrently with the call listener onClose(). Therefore, one `CallAttemptsTracerFactory.attemptEnded()` can be called concurrently with `CallAttemptsTracerFactory.callEnded()`, and there could be data race on RETRY_DELAY_PER_CALL. See also the regression test added. The same data race can happen in hedging case when one of hedges is committed and completes the call, other uncommitted hedges would cancel themselves and trigger their stream listeners closed() on the transport_thread concurrently. Fixing the race by recording RETRY_DELAY_PER_CALL once both the conditions are met: - callEnded is true - number of active streams is 0.
1 parent 522b37b commit 2faa748

File tree

2 files changed

+123
-52
lines changed

2 files changed

+123
-52
lines changed

census/src/main/java/io/grpc/census/CensusStatsModule.java

Lines changed: 59 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@
5555
import io.opencensus.tags.unsafe.ContextUtils;
5656
import java.util.concurrent.TimeUnit;
5757
import java.util.concurrent.atomic.AtomicBoolean;
58-
import java.util.concurrent.atomic.AtomicInteger;
5958
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
6059
import java.util.concurrent.atomic.AtomicLong;
6160
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
6261
import java.util.logging.Level;
6362
import java.util.logging.Logger;
6463
import javax.annotation.Nullable;
64+
import javax.annotation.concurrent.GuardedBy;
6565

6666
/**
6767
* Provides factories for {@link StreamTracer} that records stats to Census.
@@ -356,12 +356,12 @@ public void streamClosed(Status status) {
356356
if (module.recordFinishedRpcs) {
357357
// Stream is closed early. So no need to record metrics for any inbound events after this
358358
// point.
359-
recordFinishedRpc();
359+
recordFinishedAttempt();
360360
}
361361
} // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded.
362362
}
363363

364-
void recordFinishedRpc() {
364+
void recordFinishedAttempt() {
365365
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
366366
// TODO(songya): remove the deprecated measure constants once they are completed removed.
367367
.put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
@@ -405,48 +405,34 @@ static final class CallAttemptsTracerFactory extends
405405
Measure.MeasureDouble.create(
406406
"grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
407407

408-
@Nullable
409-
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
410-
411-
/**
412-
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
413-
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
414-
* (potentially racy) direct updates of the volatile variables.
415-
*/
416-
static {
417-
AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
418-
try {
419-
tmpCallEndedUpdater =
420-
AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
421-
} catch (Throwable t) {
422-
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
423-
tmpCallEndedUpdater = null;
424-
}
425-
callEndedUpdater = tmpCallEndedUpdater;
426-
}
427-
428408
ClientTracer inboundMetricTracer;
429409
private final CensusStatsModule module;
430410
private final Stopwatch stopwatch;
431-
private volatile int callEnded;
411+
@GuardedBy("lock")
412+
private boolean callEnded;
432413
private final TagContext parentCtx;
433414
private final TagContext startCtx;
434415
private final String fullMethodName;
435416

436417
// TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater.
437418
private final AtomicLong attemptsPerCall = new AtomicLong();
438419
private final AtomicLong transparentRetriesPerCall = new AtomicLong();
439-
private final AtomicLong retryDelayNanos = new AtomicLong();
440-
private final AtomicLong lastInactiveTimeStamp = new AtomicLong();
441-
private final AtomicInteger activeStreams = new AtomicInteger();
442-
private final AtomicBoolean activated = new AtomicBoolean();
420+
// write happens before read
421+
private Status status;
422+
private final Object lock = new Object();
423+
// write @GuardedBy("lock") and happens before read
424+
private long retryDelayNanos;
425+
@GuardedBy("lock")
426+
private int activeStreams;
427+
@GuardedBy("lock")
428+
private boolean finishedCallToBeRecorded;
443429

444430
CallAttemptsTracerFactory(
445431
CensusStatsModule module, TagContext parentCtx, String fullMethodName) {
446432
this.module = checkNotNull(module, "module");
447433
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
448434
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
449-
this.stopwatch = module.stopwatchSupplier.get().start();
435+
this.stopwatch = module.stopwatchSupplier.get();
450436
TagValue methodTag = TagValue.create(fullMethodName);
451437
startCtx = module.tagger.toBuilder(parentCtx)
452438
.putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag)
@@ -461,10 +447,14 @@ static final class CallAttemptsTracerFactory extends
461447

462448
@Override
463449
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
464-
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info);
465-
if (activeStreams.incrementAndGet() == 1) {
466-
if (!activated.compareAndSet(false, true)) {
467-
retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
450+
synchronized (lock) {
451+
if (finishedCallToBeRecorded) {
452+
// This can be the case when the called is cancelled but a retry attempt is created.
453+
return new ClientStreamTracer() {};
454+
}
455+
if (++activeStreams == 1 && stopwatch.isRunning()) {
456+
stopwatch.stop();
457+
retryDelayNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
468458
}
469459
}
470460
if (module.recordStartedRpcs && attemptsPerCall.get() > 0) {
@@ -477,42 +467,59 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
477467
} else {
478468
attemptsPerCall.incrementAndGet();
479469
}
480-
return tracer;
470+
return new ClientTracer(this, module, parentCtx, startCtx, info);
481471
}
482472

483473
// Called whenever each attempt is ended.
484474
void attemptEnded() {
485-
if (activeStreams.decrementAndGet() == 0) {
486-
// Race condition between two extremely close events does not matter because the difference
487-
// in the result would be very small.
488-
long lastInactiveTimeStamp =
489-
this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
490-
retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
475+
if (!module.recordFinishedRpcs) {
476+
return;
477+
}
478+
boolean shouldRecordFinishedCall = false;
479+
synchronized (lock) {
480+
if (--activeStreams == 0) {
481+
stopwatch.start();
482+
if (callEnded && !finishedCallToBeRecorded) {
483+
shouldRecordFinishedCall = true;
484+
finishedCallToBeRecorded = true;
485+
}
486+
}
487+
}
488+
if (shouldRecordFinishedCall) {
489+
recordFinishedCall();
491490
}
492491
}
493492

494493
void callEnded(Status status) {
495-
if (callEndedUpdater != null) {
496-
if (callEndedUpdater.getAndSet(this, 1) != 0) {
494+
if (!module.recordFinishedRpcs) {
495+
return;
496+
}
497+
this.status = status;
498+
boolean shouldRecordFinishedCall = false;
499+
synchronized (lock) {
500+
if (callEnded) {
501+
// FIXME(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
497502
return;
498503
}
499-
} else {
500-
if (callEnded != 0) {
501-
return;
504+
callEnded = true;
505+
if (activeStreams == 0 && !finishedCallToBeRecorded) {
506+
shouldRecordFinishedCall = true;
507+
finishedCallToBeRecorded = true;
502508
}
503-
callEnded = 1;
504509
}
505-
if (!module.recordFinishedRpcs) {
506-
return;
510+
if (shouldRecordFinishedCall) {
511+
recordFinishedCall();
507512
}
508-
stopwatch.stop();
513+
}
514+
515+
void recordFinishedCall() {
509516
if (attemptsPerCall.get() == 0) {
510517
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null);
511518
tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
512519
tracer.statusCode = status.getCode();
513-
tracer.recordFinishedRpc();
520+
tracer.recordFinishedAttempt();
514521
} else if (inboundMetricTracer != null) {
515-
inboundMetricTracer.recordFinishedRpc();
522+
inboundMetricTracer.recordFinishedAttempt();
516523
}
517524

518525
long retriesPerCall = 0;
@@ -523,7 +530,7 @@ void callEnded(Status status) {
523530
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
524531
.put(RETRIES_PER_CALL, retriesPerCall)
525532
.put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get())
526-
.put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI);
533+
.put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI);
527534
TagValue methodTag = TagValue.create(fullMethodName);
528535
TagValue statusTag = TagValue.create(status.getCode().toString());
529536
measureMap.record(

interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,70 @@ public void statsRecorded() throws Exception {
356356
assertRetryStatsRecorded(1, 0, 10_000);
357357
}
358358

359+
@Test
360+
public void statsRecorde_callCancelledBeforeCommit() throws Exception {
361+
startNewServer();
362+
retryPolicy = ImmutableMap.<String, Object>builder()
363+
.put("maxAttempts", 4D)
364+
.put("initialBackoff", "10s")
365+
.put("maxBackoff", "10s")
366+
.put("backoffMultiplier", 1D)
367+
.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
368+
.build();
369+
createNewChannel();
370+
371+
// We will have streamClosed return at a particular moment that we want.
372+
final CountDownLatch streamClosedLatch = new CountDownLatch(1);
373+
ClientStreamTracer.Factory streamTracerFactory = new ClientStreamTracer.Factory() {
374+
@Override
375+
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
376+
return new ClientStreamTracer() {
377+
@Override
378+
public void streamClosed(Status status) {
379+
if (status.getCode().equals(Code.CANCELLED)) {
380+
try {
381+
streamClosedLatch.await();
382+
} catch (InterruptedException e) {
383+
Thread.currentThread().interrupt();
384+
throw new AssertionError("streamClosedLatch interrupted", e);
385+
}
386+
}
387+
}
388+
};
389+
}
390+
};
391+
ClientCall<String, Integer> call = channel.newCall(
392+
clientStreamingMethod, CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory));
393+
call.start(mockCallListener, new Metadata());
394+
assertRpcStartedRecorded();
395+
fakeClock.forwardTime(5, SECONDS);
396+
String message = "String of length 20.";
397+
call.sendMessage(message);
398+
assertOutboundMessageRecorded();
399+
ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
400+
serverCall.request(2);
401+
assertOutboundWireSizeRecorded(message.length());
402+
// trigger retry
403+
serverCall.close(
404+
Status.UNAVAILABLE.withDescription("original attempt failed"),
405+
new Metadata());
406+
assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1);
407+
elapseBackoff(10, SECONDS);
408+
assertRpcStartedRecorded();
409+
assertOutboundMessageRecorded();
410+
serverCall = serverCalls.poll(5, SECONDS);
411+
serverCall.request(2);
412+
assertOutboundWireSizeRecorded(message.length());
413+
fakeClock.forwardTime(7, SECONDS);
414+
call.cancel("Cancelled before commit", null); // A noop substream will commit.
415+
// The call listener is closed, but the netty substream listener is not yet closed.
416+
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
417+
// Let the netty substream listener be closed.
418+
streamClosedLatch.countDown();
419+
assertRetryStatsRecorded(1, 0, 10_000);
420+
assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1);
421+
}
422+
359423
@Test
360424
public void serverCancelledAndClientDeadlineExceeded() throws Exception {
361425
startNewServer();

0 commit comments

Comments
 (0)