5555import io .opencensus .tags .unsafe .ContextUtils ;
5656import java .util .concurrent .TimeUnit ;
5757import java .util .concurrent .atomic .AtomicBoolean ;
58- import java .util .concurrent .atomic .AtomicInteger ;
5958import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
6059import java .util .concurrent .atomic .AtomicLong ;
6160import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
6261import java .util .logging .Level ;
6362import java .util .logging .Logger ;
6463import javax .annotation .Nullable ;
64+ import javax .annotation .concurrent .GuardedBy ;
6565
6666/**
6767 * Provides factories for {@link StreamTracer} that records stats to Census.
@@ -356,12 +356,12 @@ public void streamClosed(Status status) {
356356 if (module .recordFinishedRpcs ) {
357357 // Stream is closed early. So no need to record metrics for any inbound events after this
358358 // point.
359- recordFinishedRpc ();
359+ recordFinishedAttempt ();
360360 }
361361 } // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded.
362362 }
363363
364- void recordFinishedRpc () {
364+ void recordFinishedAttempt () {
365365 MeasureMap measureMap = module .statsRecorder .newMeasureMap ()
366366 // TODO(songya): remove the deprecated measure constants once they are completed removed.
367367 .put (DeprecatedCensusConstants .RPC_CLIENT_FINISHED_COUNT , 1 )
@@ -405,48 +405,34 @@ static final class CallAttemptsTracerFactory extends
405405 Measure .MeasureDouble .create (
406406 "grpc.io/client/retry_delay_per_call" , "Retry delay per call" , "ms" );
407407
408- @ Nullable
409- private static final AtomicIntegerFieldUpdater <CallAttemptsTracerFactory > callEndedUpdater ;
410-
411- /**
412- * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
413- * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
414- * (potentially racy) direct updates of the volatile variables.
415- */
416- static {
417- AtomicIntegerFieldUpdater <CallAttemptsTracerFactory > tmpCallEndedUpdater ;
418- try {
419- tmpCallEndedUpdater =
420- AtomicIntegerFieldUpdater .newUpdater (CallAttemptsTracerFactory .class , "callEnded" );
421- } catch (Throwable t ) {
422- logger .log (Level .SEVERE , "Creating atomic field updaters failed" , t );
423- tmpCallEndedUpdater = null ;
424- }
425- callEndedUpdater = tmpCallEndedUpdater ;
426- }
427-
428408 ClientTracer inboundMetricTracer ;
429409 private final CensusStatsModule module ;
430410 private final Stopwatch stopwatch ;
431- private volatile int callEnded ;
411+ @ GuardedBy ("lock" )
412+ private boolean callEnded ;
432413 private final TagContext parentCtx ;
433414 private final TagContext startCtx ;
434415 private final String fullMethodName ;
435416
436417 // TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater.
437418 private final AtomicLong attemptsPerCall = new AtomicLong ();
438419 private final AtomicLong transparentRetriesPerCall = new AtomicLong ();
439- private final AtomicLong retryDelayNanos = new AtomicLong ();
440- private final AtomicLong lastInactiveTimeStamp = new AtomicLong ();
441- private final AtomicInteger activeStreams = new AtomicInteger ();
442- private final AtomicBoolean activated = new AtomicBoolean ();
420+ // write happens before read
421+ private Status status ;
422+ private final Object lock = new Object ();
423+ // write @GuardedBy("lock") and happens before read
424+ private long retryDelayNanos ;
425+ @ GuardedBy ("lock" )
426+ private int activeStreams ;
427+ @ GuardedBy ("lock" )
428+ private boolean finishedCallToBeRecorded ;
443429
444430 CallAttemptsTracerFactory (
445431 CensusStatsModule module , TagContext parentCtx , String fullMethodName ) {
446432 this .module = checkNotNull (module , "module" );
447433 this .parentCtx = checkNotNull (parentCtx , "parentCtx" );
448434 this .fullMethodName = checkNotNull (fullMethodName , "fullMethodName" );
449- this .stopwatch = module .stopwatchSupplier .get (). start () ;
435+ this .stopwatch = module .stopwatchSupplier .get ();
450436 TagValue methodTag = TagValue .create (fullMethodName );
451437 startCtx = module .tagger .toBuilder (parentCtx )
452438 .putLocal (RpcMeasureConstants .GRPC_CLIENT_METHOD , methodTag )
@@ -461,10 +447,14 @@ static final class CallAttemptsTracerFactory extends
461447
462448 @ Override
463449 public ClientStreamTracer newClientStreamTracer (StreamInfo info , Metadata metadata ) {
464- ClientTracer tracer = new ClientTracer (this , module , parentCtx , startCtx , info );
465- if (activeStreams .incrementAndGet () == 1 ) {
466- if (!activated .compareAndSet (false , true )) {
467- retryDelayNanos .addAndGet (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
450+ synchronized (lock ) {
451+ if (finishedCallToBeRecorded ) {
452+ // This can be the case when the called is cancelled but a retry attempt is created.
453+ return new ClientStreamTracer () {};
454+ }
455+ if (++activeStreams == 1 && stopwatch .isRunning ()) {
456+ stopwatch .stop ();
457+ retryDelayNanos = stopwatch .elapsed (TimeUnit .NANOSECONDS );
468458 }
469459 }
470460 if (module .recordStartedRpcs && attemptsPerCall .get () > 0 ) {
@@ -477,42 +467,59 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
477467 } else {
478468 attemptsPerCall .incrementAndGet ();
479469 }
480- return tracer ;
470+ return new ClientTracer ( this , module , parentCtx , startCtx , info ) ;
481471 }
482472
483473 // Called whenever each attempt is ended.
484474 void attemptEnded () {
485- if (activeStreams .decrementAndGet () == 0 ) {
486- // Race condition between two extremely close events does not matter because the difference
487- // in the result would be very small.
488- long lastInactiveTimeStamp =
489- this .lastInactiveTimeStamp .getAndSet (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
490- retryDelayNanos .addAndGet (-lastInactiveTimeStamp );
475+ if (!module .recordFinishedRpcs ) {
476+ return ;
477+ }
478+ boolean shouldRecordFinishedCall = false ;
479+ synchronized (lock ) {
480+ if (--activeStreams == 0 ) {
481+ stopwatch .start ();
482+ if (callEnded && !finishedCallToBeRecorded ) {
483+ shouldRecordFinishedCall = true ;
484+ finishedCallToBeRecorded = true ;
485+ }
486+ }
487+ }
488+ if (shouldRecordFinishedCall ) {
489+ recordFinishedCall ();
491490 }
492491 }
493492
494493 void callEnded (Status status ) {
495- if (callEndedUpdater != null ) {
496- if (callEndedUpdater .getAndSet (this , 1 ) != 0 ) {
494+ if (!module .recordFinishedRpcs ) {
495+ return ;
496+ }
497+ this .status = status ;
498+ boolean shouldRecordFinishedCall = false ;
499+ synchronized (lock ) {
500+ if (callEnded ) {
501+ // FIXME(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
497502 return ;
498503 }
499- } else {
500- if (callEnded != 0 ) {
501- return ;
504+ callEnded = true ;
505+ if (activeStreams == 0 && !finishedCallToBeRecorded ) {
506+ shouldRecordFinishedCall = true ;
507+ finishedCallToBeRecorded = true ;
502508 }
503- callEnded = 1 ;
504509 }
505- if (! module . recordFinishedRpcs ) {
506- return ;
510+ if (shouldRecordFinishedCall ) {
511+ recordFinishedCall () ;
507512 }
508- stopwatch .stop ();
513+ }
514+
515+ void recordFinishedCall () {
509516 if (attemptsPerCall .get () == 0 ) {
510517 ClientTracer tracer = new ClientTracer (this , module , parentCtx , startCtx , null );
511518 tracer .roundtripNanos = stopwatch .elapsed (TimeUnit .NANOSECONDS );
512519 tracer .statusCode = status .getCode ();
513- tracer .recordFinishedRpc ();
520+ tracer .recordFinishedAttempt ();
514521 } else if (inboundMetricTracer != null ) {
515- inboundMetricTracer .recordFinishedRpc ();
522+ inboundMetricTracer .recordFinishedAttempt ();
516523 }
517524
518525 long retriesPerCall = 0 ;
@@ -523,7 +530,7 @@ void callEnded(Status status) {
523530 MeasureMap measureMap = module .statsRecorder .newMeasureMap ()
524531 .put (RETRIES_PER_CALL , retriesPerCall )
525532 .put (TRANSPARENT_RETRIES_PER_CALL , transparentRetriesPerCall .get ())
526- .put (RETRY_DELAY_PER_CALL , retryDelayNanos . get () / NANOS_PER_MILLI );
533+ .put (RETRY_DELAY_PER_CALL , retryDelayNanos / NANOS_PER_MILLI );
527534 TagValue methodTag = TagValue .create (fullMethodName );
528535 TagValue statusTag = TagValue .create (status .getCode ().toString ());
529536 measureMap .record (
0 commit comments