Skip to content

Commit 860e97d

Browse files
authored
all: API refactoring in preparation to support retry stats (grpc#8355)
Rebased PR grpc#8343 into the first commit of this PR, then (the 2nd commit) reverted the part for metric recording of retry attempts. The PR as a whole is mechanical refactoring. No behavior change (except that some of the old code path when tracer is created is moved into the new method `streamCreated()`). The API change is documented in go/grpc-stats-api-change-for-retry-java
1 parent b276459 commit 860e97d

55 files changed

Lines changed: 1210 additions & 563 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/src/main/java/io/grpc/ClientStreamTracer.java

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

2121
import com.google.common.base.MoreObjects;
22-
import io.grpc.Grpc;
2322
import javax.annotation.concurrent.ThreadSafe;
2423

2524
/**
@@ -28,6 +27,18 @@
2827
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
2928
@ThreadSafe
3029
public abstract class ClientStreamTracer extends StreamTracer {
30+
31+
/**
32+
* The stream is being created on a ready transport.
33+
*
34+
* @param headers the mutable initial metadata. Modifications to it will be sent to the socket but
35+
* not be seen by client interceptors and the application.
36+
*
37+
* @since 1.40.0
38+
*/
39+
public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadata headers) {
40+
}
41+
3142
/**
3243
* Headers has been sent to the socket.
3344
*/
@@ -54,22 +65,6 @@ public void inboundTrailers(Metadata trailers) {
5465
* Factory class for {@link ClientStreamTracer}.
5566
*/
5667
public abstract static class Factory {
57-
/**
58-
* Creates a {@link ClientStreamTracer} for a new client stream.
59-
*
60-
* @param callOptions the effective CallOptions of the call
61-
* @param headers the mutable headers of the stream. It can be safely mutated within this
62-
* method. It should not be saved because it is not safe for read or write after the
63-
* method returns.
64-
*
65-
* @deprecated use {@link
66-
* #newClientStreamTracer(io.grpc.ClientStreamTracer.StreamInfo, io.grpc.Metadata)} instead.
67-
*/
68-
@Deprecated
69-
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
70-
throw new UnsupportedOperationException("Not implemented");
71-
}
72-
7368
/**
7469
* Creates a {@link ClientStreamTracer} for a new client stream. This is called inside the
7570
* transport when it's creating the stream.
@@ -81,12 +76,15 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat
8176
*
8277
* @since 1.20.0
8378
*/
84-
@SuppressWarnings("deprecation")
8579
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
86-
return newClientStreamTracer(info.getCallOptions(), headers);
80+
throw new UnsupportedOperationException("Not implemented");
8781
}
8882
}
8983

84+
/** An abstract class for internal use only. */
85+
@Internal
86+
public abstract static class InternalLimitedInfoFactory extends Factory {}
87+
9088
/**
9189
* Information about a stream.
9290
*
@@ -99,15 +97,21 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata header
9997
public static final class StreamInfo {
10098
private final Attributes transportAttrs;
10199
private final CallOptions callOptions;
100+
private final boolean isTransparentRetry;
102101

103-
StreamInfo(Attributes transportAttrs, CallOptions callOptions) {
102+
StreamInfo(Attributes transportAttrs, CallOptions callOptions, boolean isTransparentRetry) {
104103
this.transportAttrs = checkNotNull(transportAttrs, "transportAttrs");
105104
this.callOptions = checkNotNull(callOptions, "callOptions");
105+
this.isTransparentRetry = isTransparentRetry;
106106
}
107107

108108
/**
109109
* Returns the attributes of the transport that this stream was created on.
110+
*
111+
* @deprecated Use {@link ClientStreamTracer#streamCreated(Attributes, Metadata)} to handle
112+
* the transport Attributes instead.
110113
*/
114+
@Deprecated
111115
@Grpc.TransportAttr
112116
public Attributes getTransportAttrs() {
113117
return transportAttrs;
@@ -120,16 +124,25 @@ public CallOptions getCallOptions() {
120124
return callOptions;
121125
}
122126

127+
/**
128+
* Whether the stream is a transparent retry.
129+
*
130+
* @since 1.40.0
131+
*/
132+
public boolean isTransparentRetry() {
133+
return isTransparentRetry;
134+
}
135+
123136
/**
124137
* Converts this StreamInfo into a new Builder.
125138
*
126139
* @since 1.21.0
127140
*/
128141
public Builder toBuilder() {
129-
Builder builder = new Builder();
130-
builder.setTransportAttrs(transportAttrs);
131-
builder.setCallOptions(callOptions);
132-
return builder;
142+
return new Builder()
143+
.setCallOptions(callOptions)
144+
.setTransportAttrs(transportAttrs)
145+
.setIsTransparentRetry(isTransparentRetry);
133146
}
134147

135148
/**
@@ -146,6 +159,7 @@ public String toString() {
146159
return MoreObjects.toStringHelper(this)
147160
.add("transportAttrs", transportAttrs)
148161
.add("callOptions", callOptions)
162+
.add("isTransparentRetry", isTransparentRetry)
149163
.toString();
150164
}
151165

@@ -157,16 +171,20 @@ public String toString() {
157171
public static final class Builder {
158172
private Attributes transportAttrs = Attributes.EMPTY;
159173
private CallOptions callOptions = CallOptions.DEFAULT;
174+
private boolean isTransparentRetry;
160175

161176
Builder() {
162177
}
163178

164179
/**
165180
* Sets the attributes of the transport that this stream was created on. This field is
166181
* optional.
182+
*
183+
* @deprecated Use {@link ClientStreamTracer#streamCreated(Attributes, Metadata)} to handle
184+
* the transport Attributes instead.
167185
*/
168-
@Grpc.TransportAttr
169-
public Builder setTransportAttrs(Attributes transportAttrs) {
186+
@Deprecated
187+
public Builder setTransportAttrs(@Grpc.TransportAttr Attributes transportAttrs) {
170188
this.transportAttrs = checkNotNull(transportAttrs, "transportAttrs cannot be null");
171189
return this;
172190
}
@@ -179,11 +197,21 @@ public Builder setCallOptions(CallOptions callOptions) {
179197
return this;
180198
}
181199

200+
/**
201+
* Sets whether the stream is a transparent retry.
202+
*
203+
* @since 1.40.0
204+
*/
205+
public Builder setIsTransparentRetry(boolean isTransparentRetry) {
206+
this.isTransparentRetry = isTransparentRetry;
207+
return this;
208+
}
209+
182210
/**
183211
* Builds a new StreamInfo.
184212
*/
185213
public StreamInfo build() {
186-
return new StreamInfo(transportAttrs, callOptions);
214+
return new StreamInfo(transportAttrs, callOptions, isTransparentRetry);
187215
}
188216
}
189217
}

api/src/test/java/io/grpc/CallOptionsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.mockito.Mockito.mock;
3131

3232
import com.google.common.base.Objects;
33+
import io.grpc.ClientStreamTracer.StreamInfo;
3334
import io.grpc.internal.SerializingExecutor;
3435
import java.util.concurrent.Executor;
3536
import java.util.concurrent.TimeUnit;
@@ -271,16 +272,15 @@ public void increment(long period, TimeUnit unit) {
271272
}
272273
}
273274

274-
private static class FakeTracerFactory extends ClientStreamTracer.Factory {
275+
private static class FakeTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory {
275276
final String name;
276277

277278
FakeTracerFactory(String name) {
278279
this.name = name;
279280
}
280281

281282
@Override
282-
public ClientStreamTracer newClientStreamTracer(
283-
ClientStreamTracer.StreamInfo info, Metadata headers) {
283+
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
284284
return new ClientStreamTracer() {};
285285
}
286286

binder/src/main/java/io/grpc/binder/internal/BinderTransport.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.common.util.concurrent.ListenableFuture;
3333
import io.grpc.Attributes;
3434
import io.grpc.CallOptions;
35+
import io.grpc.ClientStreamTracer;
3536
import io.grpc.Grpc;
3637
import io.grpc.Internal;
3738
import io.grpc.InternalChannelz.SocketStats;
@@ -632,28 +633,28 @@ public synchronized Runnable start(ManagedClientTransport.Listener clientTranspo
632633
public synchronized ClientStream newStream(
633634
final MethodDescriptor<?, ?> method,
634635
final Metadata headers,
635-
final CallOptions callOptions) {
636+
final CallOptions callOptions,
637+
ClientStreamTracer[] tracers) {
636638
if (isShutdown()) {
637-
return newFailingClientStream(shutdownStatus, callOptions, attributes, headers);
639+
return newFailingClientStream(shutdownStatus, attributes, headers, tracers);
638640
} else {
639641
int callId = latestCallId++;
640642
if (latestCallId == LAST_CALL_ID) {
641643
latestCallId = FIRST_CALL_ID;
642644
}
645+
StatsTraceContext statsTraceContext =
646+
StatsTraceContext.newClientContext(tracers, attributes, headers);
643647
Inbound.ClientInbound inbound =
644648
new Inbound.ClientInbound(
645649
this, attributes, callId, GrpcUtil.shouldBeCountedForInUse(callOptions));
646650
if (ongoingCalls.putIfAbsent(callId, inbound) != null) {
647651
Status failure = Status.INTERNAL.withDescription("Clashing call IDs");
648652
shutdownInternal(failure, true);
649-
return newFailingClientStream(failure, callOptions, attributes, headers);
653+
return newFailingClientStream(failure, attributes, headers, tracers);
650654
} else {
651655
if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) {
652656
clientTransportListener.transportInUse(true);
653657
}
654-
StatsTraceContext statsTraceContext =
655-
StatsTraceContext.newClientContext(callOptions, attributes, headers);
656-
657658
Outbound.ClientOutbound outbound =
658659
new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext);
659660
if (method.getType().clientSendsOneMessage()) {
@@ -763,12 +764,12 @@ protected void handlePingResponse(Parcel parcel) {
763764
}
764765

765766
private static ClientStream newFailingClientStream(
766-
Status failure, CallOptions callOptions, Attributes attributes, Metadata headers) {
767+
Status failure, Attributes attributes, Metadata headers,
768+
ClientStreamTracer[] tracers) {
767769
StatsTraceContext statsTraceContext =
768-
StatsTraceContext.newClientContext(callOptions, attributes, headers);
770+
StatsTraceContext.newClientContext(tracers, attributes, headers);
769771
statsTraceContext.clientOutboundHeaders();
770-
statsTraceContext.streamClosed(failure);
771-
return new FailingClientStream(failure);
772+
return new FailingClientStream(failure, tracers);
772773
}
773774

774775
private static InternalLogId buildLogId(

0 commit comments

Comments
 (0)