3030import io .grpc .Metadata ;
3131import io .grpc .MethodDescriptor ;
3232import io .grpc .Status ;
33+ import io .grpc .SynchronizationContext ;
3334import io .grpc .internal .ClientStreamListener .RpcProgress ;
3435import java .io .InputStream ;
36+ import java .lang .Thread .UncaughtExceptionHandler ;
3537import java .util .ArrayList ;
3638import java .util .Collection ;
3739import java .util .Collections ;
@@ -64,6 +66,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {
6466
6567 private final MethodDescriptor <ReqT , ?> method ;
6668 private final Executor callExecutor ;
69+ private final Executor listenerSerializeExecutor = new SynchronizationContext (
70+ new UncaughtExceptionHandler () {
71+ @ Override
72+ public void uncaughtException (Thread t , Throwable e ) {
73+ throw Status .fromThrowable (e )
74+ .withDescription ("Uncaught exception in the SynchronizationContext. Re-thrown." )
75+ .asRuntimeException ();
76+ }
77+ }
78+ );
6779 private final ScheduledExecutorService scheduledExecutorService ;
6880 // Must not modify it.
6981 private final Metadata headers ;
@@ -105,6 +117,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
105117 private FutureCanceller scheduledHedging ;
106118 private long nextBackoffIntervalNanos ;
107119 private Status cancellationStatus ;
120+ private boolean isClosed ;
108121
109122 RetriableStream (
110123 MethodDescriptor <ReqT , ?> method , Metadata headers ,
@@ -247,6 +260,7 @@ private void drain(Substream substream) {
247260 int chunk = 0x80 ;
248261 List <BufferEntry > list = null ;
249262 boolean streamStarted = false ;
263+ Runnable onReadyRunnable = null ;
250264
251265 while (true ) {
252266 State savedState ;
@@ -264,7 +278,18 @@ private void drain(Substream substream) {
264278 }
265279 if (index == savedState .buffer .size ()) { // I'm drained
266280 state = savedState .substreamDrained (substream );
267- return ;
281+ if (!isReady ()) {
282+ return ;
283+ }
284+ onReadyRunnable = new Runnable () {
285+ @ Override
286+ public void run () {
287+ if (!isClosed ) {
288+ masterListener .onReady ();
289+ }
290+ }
291+ };
292+ break ;
268293 }
269294
270295 if (substream .closed ) {
@@ -299,6 +324,11 @@ private void drain(Substream substream) {
299324 }
300325 }
301326
327+ if (onReadyRunnable != null ) {
328+ listenerSerializeExecutor .execute (onReadyRunnable );
329+ return ;
330+ }
331+
302332 substream .stream .cancel (
303333 state .winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED );
304334 }
@@ -450,14 +480,22 @@ public void run() {
450480 }
451481
452482 @ Override
453- public final void cancel (Status reason ) {
483+ public final void cancel (final Status reason ) {
454484 Substream noopSubstream = new Substream (0 /* previousAttempts doesn't matter here */ );
455485 noopSubstream .stream = new NoopClientStream ();
456486 Runnable runnable = commit (noopSubstream );
457487
458488 if (runnable != null ) {
459- masterListener .closed (reason , RpcProgress .PROCESSED , new Metadata ());
460489 runnable .run ();
490+ listenerSerializeExecutor .execute (
491+ new Runnable () {
492+ @ Override
493+ public void run () {
494+ isClosed = true ;
495+ masterListener .closed (reason , RpcProgress .PROCESSED , new Metadata ());
496+
497+ }
498+ });
461499 return ;
462500 }
463501
@@ -771,18 +809,25 @@ private final class Sublistener implements ClientStreamListener {
771809 }
772810
773811 @ Override
774- public void headersRead (Metadata headers ) {
812+ public void headersRead (final Metadata headers ) {
775813 commitAndRun (substream );
776814 if (state .winningSubstream == substream ) {
777- masterListener .headersRead (headers );
778815 if (throttle != null ) {
779816 throttle .onSuccess ();
780817 }
818+ listenerSerializeExecutor .execute (
819+ new Runnable () {
820+ @ Override
821+ public void run () {
822+ masterListener .headersRead (headers );
823+ }
824+ });
781825 }
782826 }
783827
784828 @ Override
785- public void closed (Status status , RpcProgress rpcProgress , Metadata trailers ) {
829+ public void closed (
830+ final Status status , final RpcProgress rpcProgress , final Metadata trailers ) {
786831 synchronized (lock ) {
787832 state = state .substreamClosed (substream );
788833 closedSubstreamsInsight .append (status .getCode ());
@@ -793,7 +838,14 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
793838 if (substream .bufferLimitExceeded ) {
794839 commitAndRun (substream );
795840 if (state .winningSubstream == substream ) {
796- masterListener .closed (status , rpcProgress , trailers );
841+ listenerSerializeExecutor .execute (
842+ new Runnable () {
843+ @ Override
844+ public void run () {
845+ isClosed = true ;
846+ masterListener .closed (status , rpcProgress , trailers );
847+ }
848+ });
797849 }
798850 return ;
799851 }
@@ -900,7 +952,14 @@ public void run() {
900952
901953 commitAndRun (substream );
902954 if (state .winningSubstream == substream ) {
903- masterListener .closed (status , rpcProgress , trailers );
955+ listenerSerializeExecutor .execute (
956+ new Runnable () {
957+ @ Override
958+ public void run () {
959+ isClosed = true ;
960+ masterListener .closed (status , rpcProgress , trailers );
961+ }
962+ });
904963 }
905964 }
906965
@@ -970,22 +1029,37 @@ private Integer getPushbackMills(Metadata trailer) {
9701029 }
9711030
9721031 @ Override
973- public void messagesAvailable (MessageProducer producer ) {
1032+ public void messagesAvailable (final MessageProducer producer ) {
9741033 State savedState = state ;
9751034 checkState (
9761035 savedState .winningSubstream != null , "Headers should be received prior to messages." );
9771036 if (savedState .winningSubstream != substream ) {
9781037 return ;
9791038 }
980- masterListener .messagesAvailable (producer );
1039+ listenerSerializeExecutor .execute (
1040+ new Runnable () {
1041+ @ Override
1042+ public void run () {
1043+ masterListener .messagesAvailable (producer );
1044+ }
1045+ });
9811046 }
9821047
9831048 @ Override
9841049 public void onReady () {
9851050 // FIXME(#7089): hedging case is broken.
986- // TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once
987- // drained and if is still ready.
988- masterListener .onReady ();
1051+ if (!isReady ()) {
1052+ return ;
1053+ }
1054+ listenerSerializeExecutor .execute (
1055+ new Runnable () {
1056+ @ Override
1057+ public void run () {
1058+ if (!isClosed ) {
1059+ masterListener .onReady ();
1060+ }
1061+ }
1062+ });
9891063 }
9901064 }
9911065
0 commit comments