Skip to content

Commit f364242

Browse files
authored
xds: support xds retry policy (grpc#8304)
1 parent f2ed41a commit f364242

9 files changed

Lines changed: 601 additions & 114 deletions

File tree

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2321,53 +2321,6 @@ public void execute(Runnable command) {
23212321
}
23222322
}
23232323

2324-
@VisibleForTesting
2325-
static final class ScParser extends NameResolver.ServiceConfigParser {
2326-
2327-
private final boolean retryEnabled;
2328-
private final int maxRetryAttemptsLimit;
2329-
private final int maxHedgedAttemptsLimit;
2330-
private final AutoConfiguredLoadBalancerFactory autoLoadBalancerFactory;
2331-
2332-
ScParser(
2333-
boolean retryEnabled,
2334-
int maxRetryAttemptsLimit,
2335-
int maxHedgedAttemptsLimit,
2336-
AutoConfiguredLoadBalancerFactory autoLoadBalancerFactory) {
2337-
this.retryEnabled = retryEnabled;
2338-
this.maxRetryAttemptsLimit = maxRetryAttemptsLimit;
2339-
this.maxHedgedAttemptsLimit = maxHedgedAttemptsLimit;
2340-
this.autoLoadBalancerFactory =
2341-
checkNotNull(autoLoadBalancerFactory, "autoLoadBalancerFactory");
2342-
}
2343-
2344-
@Override
2345-
public ConfigOrError parseServiceConfig(Map<String, ?> rawServiceConfig) {
2346-
try {
2347-
Object loadBalancingPolicySelection;
2348-
ConfigOrError choiceFromLoadBalancer =
2349-
autoLoadBalancerFactory.parseLoadBalancerPolicy(rawServiceConfig);
2350-
if (choiceFromLoadBalancer == null) {
2351-
loadBalancingPolicySelection = null;
2352-
} else if (choiceFromLoadBalancer.getError() != null) {
2353-
return ConfigOrError.fromError(choiceFromLoadBalancer.getError());
2354-
} else {
2355-
loadBalancingPolicySelection = choiceFromLoadBalancer.getConfig();
2356-
}
2357-
return ConfigOrError.fromConfig(
2358-
ManagedChannelServiceConfig.fromServiceConfig(
2359-
rawServiceConfig,
2360-
retryEnabled,
2361-
maxRetryAttemptsLimit,
2362-
maxHedgedAttemptsLimit,
2363-
loadBalancingPolicySelection));
2364-
} catch (RuntimeException e) {
2365-
return ConfigOrError.fromError(
2366-
Status.UNKNOWN.withDescription("failed to parse service config").withCause(e));
2367-
}
2368-
}
2369-
}
2370-
23712324
/**
23722325
* A ResolutionState indicates the status of last name resolution.
23732326
*/

core/src/main/java/io/grpc/internal/ManagedChannelServiceConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,20 +218,23 @@ public boolean equals(Object o) {
218218
return false;
219219
}
220220
ManagedChannelServiceConfig that = (ManagedChannelServiceConfig) o;
221-
return Objects.equal(serviceMethodMap, that.serviceMethodMap)
221+
return Objects.equal(defaultMethodConfig, that.defaultMethodConfig)
222+
&& Objects.equal(serviceMethodMap, that.serviceMethodMap)
222223
&& Objects.equal(serviceMap, that.serviceMap)
223224
&& Objects.equal(retryThrottling, that.retryThrottling)
224225
&& Objects.equal(loadBalancingConfig, that.loadBalancingConfig);
225226
}
226227

227228
@Override
228229
public int hashCode() {
229-
return Objects.hashCode(serviceMethodMap, serviceMap, retryThrottling, loadBalancingConfig);
230+
return Objects.hashCode(
231+
defaultMethodConfig, serviceMethodMap, serviceMap, retryThrottling, loadBalancingConfig);
230232
}
231233

232234
@Override
233235
public String toString() {
234236
return MoreObjects.toStringHelper(this)
237+
.add("defaultMethodConfig", defaultMethodConfig)
235238
.add("serviceMethodMap", serviceMethodMap)
236239
.add("serviceMap", serviceMap)
237240
.add("retryThrottling", retryThrottling)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2019 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.internal;
18+
19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
import io.grpc.NameResolver;
23+
import io.grpc.NameResolver.ConfigOrError;
24+
import io.grpc.Status;
25+
import java.util.Map;
26+
27+
/** The library built-in implementation of service config parser. */
28+
@VisibleForTesting
29+
public final class ScParser extends NameResolver.ServiceConfigParser {
30+
31+
private final boolean retryEnabled;
32+
private final int maxRetryAttemptsLimit;
33+
private final int maxHedgedAttemptsLimit;
34+
private final AutoConfiguredLoadBalancerFactory autoLoadBalancerFactory;
35+
36+
/** Creates a parse with global retry settings and an auto configured lb factory. */
37+
public ScParser(
38+
boolean retryEnabled,
39+
int maxRetryAttemptsLimit,
40+
int maxHedgedAttemptsLimit,
41+
AutoConfiguredLoadBalancerFactory autoLoadBalancerFactory) {
42+
this.retryEnabled = retryEnabled;
43+
this.maxRetryAttemptsLimit = maxRetryAttemptsLimit;
44+
this.maxHedgedAttemptsLimit = maxHedgedAttemptsLimit;
45+
this.autoLoadBalancerFactory = checkNotNull(autoLoadBalancerFactory, "autoLoadBalancerFactory");
46+
}
47+
48+
@Override
49+
public ConfigOrError parseServiceConfig(Map<String, ?> rawServiceConfig) {
50+
try {
51+
Object loadBalancingPolicySelection;
52+
ConfigOrError choiceFromLoadBalancer =
53+
autoLoadBalancerFactory.parseLoadBalancerPolicy(rawServiceConfig);
54+
if (choiceFromLoadBalancer == null) {
55+
loadBalancingPolicySelection = null;
56+
} else if (choiceFromLoadBalancer.getError() != null) {
57+
return ConfigOrError.fromError(choiceFromLoadBalancer.getError());
58+
} else {
59+
loadBalancingPolicySelection = choiceFromLoadBalancer.getConfig();
60+
}
61+
return ConfigOrError.fromConfig(
62+
ManagedChannelServiceConfig.fromServiceConfig(
63+
rawServiceConfig,
64+
retryEnabled,
65+
maxRetryAttemptsLimit,
66+
maxHedgedAttemptsLimit,
67+
loadBalancingPolicySelection));
68+
} catch (RuntimeException e) {
69+
return ConfigOrError.fromError(
70+
Status.UNKNOWN.withDescription("failed to parse service config").withCause(e));
71+
}
72+
}
73+
}

core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@
112112
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
113113
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
114114
import io.grpc.internal.InternalSubchannel.TransportLogger;
115-
import io.grpc.internal.ManagedChannelImpl.ScParser;
116115
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
117116
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
118117
import io.grpc.internal.ManagedChannelImplBuilder.UnsupportedClientTransportFactoryBuilder;

xds/src/main/java/io/grpc/xds/ClientXdsClient.java

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import com.github.udpa.udpa.type.v1.TypedStruct;
2323
import com.google.common.annotations.VisibleForTesting;
2424
import com.google.common.base.Joiner;
25+
import com.google.common.base.Splitter;
2526
import com.google.common.base.Stopwatch;
2627
import com.google.common.base.Strings;
2728
import com.google.common.base.Supplier;
2829
import com.google.common.collect.ImmutableList;
2930
import com.google.protobuf.Any;
31+
import com.google.protobuf.Duration;
3032
import com.google.protobuf.InvalidProtocolBufferException;
3133
import com.google.protobuf.Message;
3234
import com.google.protobuf.util.Durations;
@@ -45,6 +47,7 @@
4547
import io.envoyproxy.envoy.config.core.v3.TrafficDirection;
4648
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
4749
import io.envoyproxy.envoy.config.listener.v3.Listener;
50+
import io.envoyproxy.envoy.config.route.v3.RetryPolicy.RetryBackOff;
4851
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
4952
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
5053
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
@@ -55,6 +58,7 @@
5558
import io.grpc.EquivalentAddressGroup;
5659
import io.grpc.ManagedChannel;
5760
import io.grpc.Status;
61+
import io.grpc.Status.Code;
5862
import io.grpc.SynchronizationContext.ScheduledHandle;
5963
import io.grpc.internal.BackoffPolicy;
6064
import io.grpc.internal.TimeProvider;
@@ -77,6 +81,7 @@
7781
import io.grpc.xds.VirtualHost.Route.RouteAction;
7882
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
7983
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
84+
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
8085
import io.grpc.xds.VirtualHost.Route.RouteMatch;
8186
import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher;
8287
import io.grpc.xds.XdsLogger.XdsLogLevel;
@@ -88,10 +93,12 @@
8893
import java.util.Arrays;
8994
import java.util.Collection;
9095
import java.util.Collections;
96+
import java.util.EnumSet;
9197
import java.util.HashMap;
9298
import java.util.HashSet;
9399
import java.util.LinkedHashMap;
94100
import java.util.List;
101+
import java.util.Locale;
95102
import java.util.Map;
96103
import java.util.Objects;
97104
import java.util.Set;
@@ -123,6 +130,10 @@ final class ClientXdsClient extends AbstractXdsClient {
123130
static boolean enableFaultInjection =
124131
Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"))
125132
|| Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"));
133+
@VisibleForTesting
134+
static boolean enableRetry =
135+
!Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_RETRY"))
136+
&& Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_RETRY"));
126137

127138
private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
128139
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
@@ -142,6 +153,11 @@ final class ClientXdsClient extends AbstractXdsClient {
142153
"type.googleapis.com/udpa.type.v1.TypedStruct";
143154
private static final String TYPE_URL_FILTER_CONFIG =
144155
"type.googleapis.com/envoy.config.route.v3.FilterConfig";
156+
// TODO(zdapeng): need to discuss how to handle unsupported values.
157+
private static final Set<Code> SUPPORTED_RETRYABLE_CODES =
158+
Collections.unmodifiableSet(EnumSet.of(
159+
Code.CANCELLED, Code.DEADLINE_EXCEEDED, Code.INTERNAL, Code.RESOURCE_EXHAUSTED,
160+
Code.UNAVAILABLE));
145161

146162
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
147163
private final Map<String, ResourceSubscriber> ldsResourceSubscribers = new HashMap<>();
@@ -948,6 +964,16 @@ static StructOrError<RouteAction> parseRouteAction(
948964
timeoutNano = Durations.toNanos(maxStreamDuration.getMaxStreamDuration());
949965
}
950966
}
967+
RetryPolicy retryPolicy = null;
968+
if (enableRetry && proto.hasRetryPolicy()) {
969+
StructOrError<RetryPolicy> retryPolicyOrError = parseRetryPolicy(proto.getRetryPolicy());
970+
if (retryPolicyOrError != null) {
971+
if (retryPolicyOrError.errorDetail != null) {
972+
return StructOrError.fromError(retryPolicyOrError.errorDetail);
973+
}
974+
retryPolicy = retryPolicyOrError.struct;
975+
}
976+
}
951977
List<HashPolicy> hashPolicies = new ArrayList<>();
952978
for (io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy config
953979
: proto.getHashPolicyList()) {
@@ -983,7 +1009,7 @@ static StructOrError<RouteAction> parseRouteAction(
9831009
switch (proto.getClusterSpecifierCase()) {
9841010
case CLUSTER:
9851011
return StructOrError.fromStruct(RouteAction.forCluster(
986-
proto.getCluster(), hashPolicies, timeoutNano));
1012+
proto.getCluster(), hashPolicies, timeoutNano, retryPolicy));
9871013
case CLUSTER_HEADER:
9881014
return null;
9891015
case WEIGHTED_CLUSTERS:
@@ -1005,14 +1031,76 @@ static StructOrError<RouteAction> parseRouteAction(
10051031
}
10061032
// TODO(chengyuanzhang): validate if the sum of weights equals to total weight.
10071033
return StructOrError.fromStruct(RouteAction.forWeightedClusters(
1008-
weightedClusters, hashPolicies, timeoutNano));
1034+
weightedClusters, hashPolicies, timeoutNano, retryPolicy));
10091035
case CLUSTERSPECIFIER_NOT_SET:
10101036
default:
10111037
return StructOrError.fromError(
10121038
"Unknown cluster specifier: " + proto.getClusterSpecifierCase());
10131039
}
10141040
}
10151041

1042+
@Nullable // Return null if we ignore the given policy.
1043+
private static StructOrError<RetryPolicy> parseRetryPolicy(
1044+
io.envoyproxy.envoy.config.route.v3.RetryPolicy retryPolicyProto) {
1045+
int maxAttempts = 2;
1046+
if (retryPolicyProto.hasNumRetries()) {
1047+
maxAttempts = retryPolicyProto.getNumRetries().getValue() + 1;
1048+
}
1049+
Duration initialBackoff = Durations.fromMillis(25);
1050+
Duration maxBackoff = Durations.fromMillis(250);
1051+
if (retryPolicyProto.hasRetryBackOff()) {
1052+
RetryBackOff retryBackOff = retryPolicyProto.getRetryBackOff();
1053+
if (!retryBackOff.hasBaseInterval()) {
1054+
return StructOrError.fromError("No base_interval specified in retry_backoff");
1055+
}
1056+
Duration originalInitialBackoff = initialBackoff = retryBackOff.getBaseInterval();
1057+
if (Durations.compare(initialBackoff, Durations.ZERO) <= 0) {
1058+
return StructOrError.fromError("base_interval in retry_backoff must be positive");
1059+
}
1060+
if (Durations.compare(initialBackoff, Durations.fromMillis(1)) < 0) {
1061+
initialBackoff = Durations.fromMillis(1);
1062+
}
1063+
if (retryBackOff.hasMaxInterval()) {
1064+
maxBackoff = retryPolicyProto.getRetryBackOff().getMaxInterval();
1065+
if (Durations.compare(maxBackoff, originalInitialBackoff) < 0) {
1066+
return StructOrError.fromError(
1067+
"max_interval in retry_backoff cannot be less than base_interval");
1068+
}
1069+
if (Durations.compare(maxBackoff, Durations.fromMillis(1)) < 0) {
1070+
maxBackoff = Durations.fromMillis(1);
1071+
}
1072+
} else {
1073+
maxBackoff = Durations.fromNanos(Durations.toNanos(initialBackoff) * 10);
1074+
}
1075+
}
1076+
Iterable<String> retryOns = Splitter.on(',').split(retryPolicyProto.getRetryOn());
1077+
ImmutableList.Builder<Code> retryableStatusCodesBuilder = ImmutableList.builder();
1078+
for (String retryOn : retryOns) {
1079+
Code code;
1080+
try {
1081+
code = Code.valueOf(retryOn.toUpperCase(Locale.US).replace('-', '_'));
1082+
} catch (IllegalArgumentException e) {
1083+
// TODO(zdapeng): TBD
1084+
// unsupported value, such as "5xx"
1085+
return null;
1086+
}
1087+
if (!SUPPORTED_RETRYABLE_CODES.contains(code)) {
1088+
// TODO(zdapeng): TBD
1089+
// unsupported value
1090+
return null;
1091+
}
1092+
retryableStatusCodesBuilder.add(code);
1093+
}
1094+
List<Code> retryableStatusCodes = retryableStatusCodesBuilder.build();
1095+
if (!retryableStatusCodes.isEmpty()) {
1096+
return StructOrError.fromStruct(
1097+
RetryPolicy.create(
1098+
maxAttempts, retryableStatusCodes, initialBackoff, maxBackoff,
1099+
/* perAttemptRecvTimeout= */ null));
1100+
}
1101+
return null;
1102+
}
1103+
10161104
@VisibleForTesting
10171105
static StructOrError<ClusterWeight> parseClusterWeight(
10181106
io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto,

0 commit comments

Comments
 (0)