Skip to content

Commit 4429ec2

Browse files
authored
core: add perAttemptRecvTimeout to retry policy (grpc#8301)
1 parent 9b55aed commit 4429ec2

6 files changed

Lines changed: 122 additions & 9 deletions

File tree

core/src/main/java/io/grpc/internal/ManagedChannelServiceConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import io.grpc.InternalConfigSelector;
2828
import io.grpc.LoadBalancer.PickSubchannelArgs;
2929
import io.grpc.MethodDescriptor;
30+
import io.grpc.Status.Code;
3031
import io.grpc.internal.RetriableStream.Throttle;
3132
import java.util.Collections;
3233
import java.util.HashMap;
3334
import java.util.List;
3435
import java.util.Map;
36+
import java.util.Set;
3537
import javax.annotation.Nullable;
3638

3739
/**
@@ -354,9 +356,22 @@ private static RetryPolicy retryPolicy(Map<String, ?> retryPolicy, int maxAttemp
354356
"backoffMultiplier must be greater than 0: %s",
355357
backoffMultiplier);
356358

359+
Long perAttemptRecvTimeout =
360+
ServiceConfigUtil.getPerAttemptRecvTimeoutNanosFromRetryPolicy(retryPolicy);
361+
checkArgument(
362+
perAttemptRecvTimeout == null || perAttemptRecvTimeout >= 0,
363+
"perAttemptRecvTimeout cannot be negative: %s",
364+
perAttemptRecvTimeout);
365+
366+
Set<Code> retryableCodes =
367+
ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy);
368+
checkArgument(
369+
perAttemptRecvTimeout != null || !retryableCodes.isEmpty(),
370+
"retryableStatusCodes cannot be empty without perAttemptRecvTimeout");
371+
357372
return new RetryPolicy(
358373
maxAttempts, initialBackoffNanos, maxBackoffNanos, backoffMultiplier,
359-
ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy));
374+
perAttemptRecvTimeout, retryableCodes);
360375
}
361376

362377
private static HedgingPolicy hedgingPolicy(

core/src/main/java/io/grpc/internal/RetryPolicy.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.grpc.Status.Code;
2323
import java.util.Set;
2424
import javax.annotation.Nonnull;
25+
import javax.annotation.Nullable;
2526
import javax.annotation.concurrent.Immutable;
2627

2728
/**
@@ -33,6 +34,8 @@ final class RetryPolicy {
3334
final long initialBackoffNanos;
3435
final long maxBackoffNanos;
3536
final double backoffMultiplier;
37+
@Nullable
38+
final Long perAttemptRecvTimeoutNanos;
3639
final Set<Code> retryableStatusCodes;
3740

3841
/**
@@ -44,11 +47,13 @@ final class RetryPolicy {
4447
long initialBackoffNanos,
4548
long maxBackoffNanos,
4649
double backoffMultiplier,
50+
@Nullable Long perAttemptRecvTimeoutNanos,
4751
@Nonnull Set<Code> retryableStatusCodes) {
4852
this.maxAttempts = maxAttempts;
4953
this.initialBackoffNanos = initialBackoffNanos;
5054
this.maxBackoffNanos = maxBackoffNanos;
5155
this.backoffMultiplier = backoffMultiplier;
56+
this.perAttemptRecvTimeoutNanos = perAttemptRecvTimeoutNanos;
5257
this.retryableStatusCodes = ImmutableSet.copyOf(retryableStatusCodes);
5358
}
5459

@@ -59,6 +64,7 @@ public int hashCode() {
5964
initialBackoffNanos,
6065
maxBackoffNanos,
6166
backoffMultiplier,
67+
perAttemptRecvTimeoutNanos,
6268
retryableStatusCodes);
6369
}
6470

@@ -72,6 +78,7 @@ public boolean equals(Object other) {
7278
&& this.initialBackoffNanos == that.initialBackoffNanos
7379
&& this.maxBackoffNanos == that.maxBackoffNanos
7480
&& Double.compare(this.backoffMultiplier, that.backoffMultiplier) == 0
81+
&& Objects.equal(this.perAttemptRecvTimeoutNanos, that.perAttemptRecvTimeoutNanos)
7582
&& Objects.equal(this.retryableStatusCodes, that.retryableStatusCodes);
7683
}
7784

@@ -82,6 +89,7 @@ public String toString() {
8289
.add("initialBackoffNanos", initialBackoffNanos)
8390
.add("maxBackoffNanos", maxBackoffNanos)
8491
.add("backoffMultiplier", backoffMultiplier)
92+
.add("perAttemptRecvTimeoutNanos", perAttemptRecvTimeoutNanos)
8593
.add("retryableStatusCodes", retryableStatusCodes)
8694
.toString();
8795
}

core/src/main/java/io/grpc/internal/ServiceConfigUtil.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ static Double getBackoffMultiplierFromRetryPolicy(Map<String, ?> retryPolicy) {
140140
return JsonUtil.getNumber(retryPolicy, "backoffMultiplier");
141141
}
142142

143+
@Nullable
144+
static Long getPerAttemptRecvTimeoutNanosFromRetryPolicy(Map<String, ?> retryPolicy) {
145+
return JsonUtil.getStringAsDuration(retryPolicy, "perAttemptRecvTimeout");
146+
}
147+
143148
private static Set<Status.Code> getListOfStatusCodesAsSet(Map<String, ?> obj, String key) {
144149
List<?> statuses = JsonUtil.getList(obj, key);
145150
if (statuses == null) {
@@ -178,7 +183,6 @@ static Set<Status.Code> getRetryableStatusCodesFromRetryPolicy(Map<String, ?> re
178183
String retryableStatusCodesKey = "retryableStatusCodes";
179184
Set<Status.Code> codes = getListOfStatusCodesAsSet(retryPolicy, retryableStatusCodesKey);
180185
verify(codes != null, "%s is required in retry policy", retryableStatusCodesKey);
181-
verify(!codes.isEmpty(), "%s must not be empty", retryableStatusCodesKey);
182186
verify(!codes.contains(Status.Code.OK), "%s must not contain OK", retryableStatusCodesKey);
183187
return codes;
184188
}

core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static io.grpc.MethodDescriptor.MethodType.UNARY;
2121
import static io.grpc.Status.Code.UNAVAILABLE;
2222
import static java.util.concurrent.TimeUnit.MILLISECONDS;
23+
import static org.junit.Assert.fail;
2324

2425
import com.google.common.collect.ImmutableList;
2526
import com.google.common.collect.ImmutableMap;
@@ -155,13 +156,14 @@ public void managedChannelServiceConfig_parseMethodConfig() {
155156
"name", ImmutableList.of(name1, name2),
156157
"timeout", "1.234s",
157158
"retryPolicy",
158-
ImmutableMap.of(
159-
"maxAttempts", 3.0D,
160-
"initialBackoff", "1s",
161-
"maxBackoff", "10s",
162-
"backoffMultiplier", 1.5D,
163-
"retryableStatusCodes", ImmutableList.of("UNAVAILABLE")
164-
));
159+
ImmutableMap.builder()
160+
.put("maxAttempts", 3.0D)
161+
.put("initialBackoff", "1s")
162+
.put("maxBackoff", "10s")
163+
.put("backoffMultiplier", 1.5D)
164+
.put("perAttemptRecvTimeout", "2.5s")
165+
.put("retryableStatusCodes", ImmutableList.of("UNAVAILABLE"))
166+
.build());
165167
Map<String, ?> defaultMethodConfig = ImmutableMap.of(
166168
"name", ImmutableList.of(ImmutableMap.of()),
167169
"timeout", "4.321s");
@@ -187,6 +189,8 @@ public void managedChannelServiceConfig_parseMethodConfig() {
187189
methodInfo = serviceConfig.getMethodConfig(methodForName("service1", "method1"));
188190
assertThat(methodInfo.timeoutNanos).isEqualTo(MILLISECONDS.toNanos(1234));
189191
assertThat(methodInfo.retryPolicy.maxAttempts).isEqualTo(2);
192+
assertThat(methodInfo.retryPolicy.perAttemptRecvTimeoutNanos)
193+
.isEqualTo(MILLISECONDS.toNanos(2500));
190194
assertThat(methodInfo.retryPolicy.retryableStatusCodes).containsExactly(UNAVAILABLE);
191195
methodInfo = serviceConfig.getMethodConfig(methodForName("service1", "methodX"));
192196
assertThat(methodInfo.timeoutNanos).isEqualTo(MILLISECONDS.toNanos(4321));
@@ -212,6 +216,84 @@ public void getDefaultConfigSelectorFromConfig() {
212216
.isEqualTo(serviceConfig.getMethodConfig(method));
213217
}
214218

219+
@Test
220+
public void retryConfig_emptyRetriableStatusCodesAllowedWithPerAttemptRecvTimeoutGiven() {
221+
Map<String, ?> retryPolicy = ImmutableMap.<String, Object>builder()
222+
.put("maxAttempts", 3.0D)
223+
.put("initialBackoff", "1s")
224+
.put("maxBackoff", "10s")
225+
.put("backoffMultiplier", 1.5D)
226+
.put("perAttemptRecvTimeout", "2.5s")
227+
.put("retryableStatusCodes", ImmutableList.of())
228+
.build();
229+
Map<String, ?> methodConfig = ImmutableMap.of(
230+
"name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy);
231+
Map<String, ?> rawServiceConfig =
232+
ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig));
233+
assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null))
234+
.isNotNull();
235+
}
236+
237+
@Test
238+
public void retryConfig_PerAttemptRecvTimeoutUnsetAllowedIfRetryableStatusCodesNonempty() {
239+
Map<String, ?> retryPolicy = ImmutableMap.<String, Object>builder()
240+
.put("maxAttempts", 3.0D)
241+
.put("initialBackoff", "1s")
242+
.put("maxBackoff", "10s")
243+
.put("backoffMultiplier", 1.5D)
244+
.put("retryableStatusCodes", ImmutableList.of("UNAVAILABLE"))
245+
.build();
246+
Map<String, ?> methodConfig = ImmutableMap.of(
247+
"name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy);
248+
Map<String, ?> rawServiceConfig =
249+
ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig));
250+
assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null))
251+
.isNotNull();
252+
}
253+
254+
@Test
255+
public void retryConfig_emptyRetriableStatusCodesNotAllowedWithPerAttemptRecvTimeoutUnset() {
256+
Map<String, ?> retryPolicy = ImmutableMap.<String, Object>builder()
257+
.put("maxAttempts", 3.0D)
258+
.put("initialBackoff", "1s")
259+
.put("maxBackoff", "10s")
260+
.put("backoffMultiplier", 1.5D)
261+
.put("retryableStatusCodes", ImmutableList.of())
262+
.build();
263+
Map<String, ?> methodConfig = ImmutableMap.of(
264+
"name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy);
265+
Map<String, ?> rawServiceConfig =
266+
ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig));
267+
try {
268+
ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null);
269+
fail("The expected IllegalArgumentException is not thrown");
270+
} catch (IllegalArgumentException e) {
271+
assertThat(e).hasMessageThat().contains(
272+
"retryableStatusCodes cannot be empty without perAttemptRecvTimeout");
273+
}
274+
}
275+
276+
// For now we allow perAttemptRecvTimeout being zero although it does not make sense.
277+
// TODO(zdapeng): disallow zero perAttemptRecvTimeout if hedging is not enabled once we support
278+
// hedge_on_per_try_timeout.
279+
@Test
280+
public void retryConfig_AllowPerAttemptRecvTimeoutZero() {
281+
Map<String, ?> retryPolicy = ImmutableMap.<String, Object>builder()
282+
.put("maxAttempts", 3.0D)
283+
.put("initialBackoff", "1s")
284+
.put("maxBackoff", "10s")
285+
.put("backoffMultiplier", 1.5D)
286+
.put("perAttemptRecvTimeout", "0s")
287+
.put("retryableStatusCodes", ImmutableList.of())
288+
.build();
289+
Map<String, ?> methodConfig = ImmutableMap.of(
290+
"name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy);
291+
Map<String, ?> rawServiceConfig =
292+
ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig));
293+
assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null))
294+
.isNotNull();
295+
}
296+
215297
private static MethodDescriptor<?, ?> methodForName(String service, String method) {
216298
return MethodDescriptor.<Void, Void>newBuilder()
217299
.setFullMethodName(service + "/" + method)

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ public double nextDouble() {
120120
TimeUnit.SECONDS.toNanos(INITIAL_BACKOFF_IN_SECONDS),
121121
TimeUnit.SECONDS.toNanos(MAX_BACKOFF_IN_SECONDS),
122122
BACKOFF_MULTIPLIER,
123+
null,
123124
ImmutableSet.of(RETRIABLE_STATUS_CODE_1, RETRIABLE_STATUS_CODE_2));
124125
private static final HedgingPolicy HEDGING_POLICY =
125126
new HedgingPolicy(

core/src/test/java/io/grpc/internal/RetryPolicyTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public void getRetryPolicies() throws Exception {
7878
TimeUnit.MILLISECONDS.toNanos(2100),
7979
TimeUnit.MILLISECONDS.toNanos(2200),
8080
parseDouble("3"),
81+
null,
8182
ImmutableSet.of(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED)));
8283

8384
method = builder.setFullMethodName("SimpleService1/Foo1").build();
@@ -87,6 +88,7 @@ public void getRetryPolicies() throws Exception {
8788
TimeUnit.MILLISECONDS.toNanos(100),
8889
TimeUnit.MILLISECONDS.toNanos(1000),
8990
parseDouble("2"),
91+
null,
9092
ImmutableSet.of(Code.UNAVAILABLE)));
9193

9294
method = builder.setFullMethodName("SimpleService2/not_exist").build();
@@ -99,6 +101,7 @@ public void getRetryPolicies() throws Exception {
99101
TimeUnit.MILLISECONDS.toNanos(100),
100102
TimeUnit.MILLISECONDS.toNanos(1000),
101103
parseDouble("2"),
104+
null,
102105
ImmutableSet.of(Code.UNAVAILABLE)));
103106
} finally {
104107
if (reader != null) {

0 commit comments

Comments
 (0)