Skip to content

Commit 2142902

Browse files
authored
core: fix retry flow control issue (grpc#8401)
There has been an issue about flow control when retry is enabled. Currently we call `masterListener.onReady()` whenever `substreamListener.onReady()` is called. The user's `onReady()` implementation might do ``` while(observer.isReady()) { // send one more message. } ``` However, currently if the `RetriableStream` is still draining, `isReady()` is false, and user's `onReady()` exits immediately. And because `substreamListener.onReady()` is already called, it may not be called again after drained. This PR fixes the issue by - Use a SerializeExecutor to call all `masterListener` callbacks. - Once `RetriableStream` is drained, check `isReady()` and if so call `onReady()`. - Once `substreamListener.onReady()` is called, check `isReady()` and only if so we call `masterListener.onReady()`.
1 parent fd2a58a commit 2142902

2 files changed

Lines changed: 170 additions & 18 deletions

File tree

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import io.grpc.Metadata;
3131
import io.grpc.MethodDescriptor;
3232
import io.grpc.Status;
33+
import io.grpc.SynchronizationContext;
3334
import io.grpc.internal.ClientStreamListener.RpcProgress;
3435
import java.io.InputStream;
36+
import java.lang.Thread.UncaughtExceptionHandler;
3537
import java.util.ArrayList;
3638
import java.util.Collection;
3739
import java.util.Collections;
@@ -64,6 +66,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {
6466

6567
private final MethodDescriptor<ReqT, ?> method;
6668
private final Executor callExecutor;
69+
private final Executor listenerSerializeExecutor = new SynchronizationContext(
70+
new UncaughtExceptionHandler() {
71+
@Override
72+
public void uncaughtException(Thread t, Throwable e) {
73+
throw Status.fromThrowable(e)
74+
.withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
75+
.asRuntimeException();
76+
}
77+
}
78+
);
6779
private final ScheduledExecutorService scheduledExecutorService;
6880
// Must not modify it.
6981
private final Metadata headers;
@@ -105,6 +117,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
105117
private FutureCanceller scheduledHedging;
106118
private long nextBackoffIntervalNanos;
107119
private Status cancellationStatus;
120+
private boolean isClosed;
108121

109122
RetriableStream(
110123
MethodDescriptor<ReqT, ?> method, Metadata headers,
@@ -247,6 +260,7 @@ private void drain(Substream substream) {
247260
int chunk = 0x80;
248261
List<BufferEntry> list = null;
249262
boolean streamStarted = false;
263+
Runnable onReadyRunnable = null;
250264

251265
while (true) {
252266
State savedState;
@@ -264,7 +278,18 @@ private void drain(Substream substream) {
264278
}
265279
if (index == savedState.buffer.size()) { // I'm drained
266280
state = savedState.substreamDrained(substream);
267-
return;
281+
if (!isReady()) {
282+
return;
283+
}
284+
onReadyRunnable = new Runnable() {
285+
@Override
286+
public void run() {
287+
if (!isClosed) {
288+
masterListener.onReady();
289+
}
290+
}
291+
};
292+
break;
268293
}
269294

270295
if (substream.closed) {
@@ -299,6 +324,11 @@ private void drain(Substream substream) {
299324
}
300325
}
301326

327+
if (onReadyRunnable != null) {
328+
listenerSerializeExecutor.execute(onReadyRunnable);
329+
return;
330+
}
331+
302332
substream.stream.cancel(
303333
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
304334
}
@@ -450,14 +480,22 @@ public void run() {
450480
}
451481

452482
@Override
453-
public final void cancel(Status reason) {
483+
public final void cancel(final Status reason) {
454484
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
455485
noopSubstream.stream = new NoopClientStream();
456486
Runnable runnable = commit(noopSubstream);
457487

458488
if (runnable != null) {
459-
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
460489
runnable.run();
490+
listenerSerializeExecutor.execute(
491+
new Runnable() {
492+
@Override
493+
public void run() {
494+
isClosed = true;
495+
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
496+
497+
}
498+
});
461499
return;
462500
}
463501

@@ -771,18 +809,25 @@ private final class Sublistener implements ClientStreamListener {
771809
}
772810

773811
@Override
774-
public void headersRead(Metadata headers) {
812+
public void headersRead(final Metadata headers) {
775813
commitAndRun(substream);
776814
if (state.winningSubstream == substream) {
777-
masterListener.headersRead(headers);
778815
if (throttle != null) {
779816
throttle.onSuccess();
780817
}
818+
listenerSerializeExecutor.execute(
819+
new Runnable() {
820+
@Override
821+
public void run() {
822+
masterListener.headersRead(headers);
823+
}
824+
});
781825
}
782826
}
783827

784828
@Override
785-
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
829+
public void closed(
830+
final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
786831
synchronized (lock) {
787832
state = state.substreamClosed(substream);
788833
closedSubstreamsInsight.append(status.getCode());
@@ -793,7 +838,14 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
793838
if (substream.bufferLimitExceeded) {
794839
commitAndRun(substream);
795840
if (state.winningSubstream == substream) {
796-
masterListener.closed(status, rpcProgress, trailers);
841+
listenerSerializeExecutor.execute(
842+
new Runnable() {
843+
@Override
844+
public void run() {
845+
isClosed = true;
846+
masterListener.closed(status, rpcProgress, trailers);
847+
}
848+
});
797849
}
798850
return;
799851
}
@@ -900,7 +952,14 @@ public void run() {
900952

901953
commitAndRun(substream);
902954
if (state.winningSubstream == substream) {
903-
masterListener.closed(status, rpcProgress, trailers);
955+
listenerSerializeExecutor.execute(
956+
new Runnable() {
957+
@Override
958+
public void run() {
959+
isClosed = true;
960+
masterListener.closed(status, rpcProgress, trailers);
961+
}
962+
});
904963
}
905964
}
906965

@@ -970,22 +1029,37 @@ private Integer getPushbackMills(Metadata trailer) {
9701029
}
9711030

9721031
@Override
973-
public void messagesAvailable(MessageProducer producer) {
1032+
public void messagesAvailable(final MessageProducer producer) {
9741033
State savedState = state;
9751034
checkState(
9761035
savedState.winningSubstream != null, "Headers should be received prior to messages.");
9771036
if (savedState.winningSubstream != substream) {
9781037
return;
9791038
}
980-
masterListener.messagesAvailable(producer);
1039+
listenerSerializeExecutor.execute(
1040+
new Runnable() {
1041+
@Override
1042+
public void run() {
1043+
masterListener.messagesAvailable(producer);
1044+
}
1045+
});
9811046
}
9821047

9831048
@Override
9841049
public void onReady() {
9851050
// FIXME(#7089): hedging case is broken.
986-
// TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once
987-
// drained and if is still ready.
988-
masterListener.onReady();
1051+
if (!isReady()) {
1052+
return;
1053+
}
1054+
listenerSerializeExecutor.execute(
1055+
new Runnable() {
1056+
@Override
1057+
public void run() {
1058+
if (!isClosed) {
1059+
masterListener.onReady();
1060+
}
1061+
}
1062+
});
9891063
}
9901064
}
9911065

0 commit comments

Comments
 (0)