2222import com .github .udpa .udpa .type .v1 .TypedStruct ;
2323import com .google .common .annotations .VisibleForTesting ;
2424import com .google .common .base .Joiner ;
25+ import com .google .common .base .Splitter ;
2526import com .google .common .base .Stopwatch ;
2627import com .google .common .base .Strings ;
2728import com .google .common .base .Supplier ;
2829import com .google .common .collect .ImmutableList ;
2930import com .google .protobuf .Any ;
31+ import com .google .protobuf .Duration ;
3032import com .google .protobuf .InvalidProtocolBufferException ;
3133import com .google .protobuf .Message ;
3234import com .google .protobuf .util .Durations ;
4547import io .envoyproxy .envoy .config .core .v3 .TrafficDirection ;
4648import io .envoyproxy .envoy .config .endpoint .v3 .ClusterLoadAssignment ;
4749import io .envoyproxy .envoy .config .listener .v3 .Listener ;
50+ import io .envoyproxy .envoy .config .route .v3 .RetryPolicy .RetryBackOff ;
4851import io .envoyproxy .envoy .config .route .v3 .RouteConfiguration ;
4952import io .envoyproxy .envoy .extensions .filters .network .http_connection_manager .v3 .HttpConnectionManager ;
5053import io .envoyproxy .envoy .extensions .filters .network .http_connection_manager .v3 .Rds ;
5558import io .grpc .EquivalentAddressGroup ;
5659import io .grpc .ManagedChannel ;
5760import io .grpc .Status ;
61+ import io .grpc .Status .Code ;
5862import io .grpc .SynchronizationContext .ScheduledHandle ;
5963import io .grpc .internal .BackoffPolicy ;
6064import io .grpc .internal .TimeProvider ;
7781import io .grpc .xds .VirtualHost .Route .RouteAction ;
7882import io .grpc .xds .VirtualHost .Route .RouteAction .ClusterWeight ;
7983import io .grpc .xds .VirtualHost .Route .RouteAction .HashPolicy ;
84+ import io .grpc .xds .VirtualHost .Route .RouteAction .RetryPolicy ;
8085import io .grpc .xds .VirtualHost .Route .RouteMatch ;
8186import io .grpc .xds .VirtualHost .Route .RouteMatch .PathMatcher ;
8287import io .grpc .xds .XdsLogger .XdsLogLevel ;
8893import java .util .Arrays ;
8994import java .util .Collection ;
9095import java .util .Collections ;
96+ import java .util .EnumSet ;
9197import java .util .HashMap ;
9298import java .util .HashSet ;
9399import java .util .LinkedHashMap ;
94100import java .util .List ;
101+ import java .util .Locale ;
95102import java .util .Map ;
96103import java .util .Objects ;
97104import java .util .Set ;
@@ -123,6 +130,10 @@ final class ClientXdsClient extends AbstractXdsClient {
123130 static boolean enableFaultInjection =
124131 Strings .isNullOrEmpty (System .getenv ("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION" ))
125132 || Boolean .parseBoolean (System .getenv ("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION" ));
133+ @ VisibleForTesting
134+ static boolean enableRetry =
135+ !Strings .isNullOrEmpty (System .getenv ("GRPC_XDS_EXPERIMENTAL_RETRY" ))
136+ && Boolean .parseBoolean (System .getenv ("GRPC_XDS_EXPERIMENTAL_RETRY" ));
126137
127138 private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
128139 "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
@@ -142,6 +153,11 @@ final class ClientXdsClient extends AbstractXdsClient {
142153 "type.googleapis.com/udpa.type.v1.TypedStruct" ;
143154 private static final String TYPE_URL_FILTER_CONFIG =
144155 "type.googleapis.com/envoy.config.route.v3.FilterConfig" ;
156+ // TODO(zdapeng): need to discuss how to handle unsupported values.
157+ private static final Set <Code > SUPPORTED_RETRYABLE_CODES =
158+ Collections .unmodifiableSet (EnumSet .of (
159+ Code .CANCELLED , Code .DEADLINE_EXCEEDED , Code .INTERNAL , Code .RESOURCE_EXHAUSTED ,
160+ Code .UNAVAILABLE ));
145161
146162 private final FilterRegistry filterRegistry = FilterRegistry .getDefaultRegistry ();
147163 private final Map <String , ResourceSubscriber > ldsResourceSubscribers = new HashMap <>();
@@ -948,6 +964,16 @@ static StructOrError<RouteAction> parseRouteAction(
948964 timeoutNano = Durations .toNanos (maxStreamDuration .getMaxStreamDuration ());
949965 }
950966 }
967+ RetryPolicy retryPolicy = null ;
968+ if (enableRetry && proto .hasRetryPolicy ()) {
969+ StructOrError <RetryPolicy > retryPolicyOrError = parseRetryPolicy (proto .getRetryPolicy ());
970+ if (retryPolicyOrError != null ) {
971+ if (retryPolicyOrError .errorDetail != null ) {
972+ return StructOrError .fromError (retryPolicyOrError .errorDetail );
973+ }
974+ retryPolicy = retryPolicyOrError .struct ;
975+ }
976+ }
951977 List <HashPolicy > hashPolicies = new ArrayList <>();
952978 for (io .envoyproxy .envoy .config .route .v3 .RouteAction .HashPolicy config
953979 : proto .getHashPolicyList ()) {
@@ -983,7 +1009,7 @@ static StructOrError<RouteAction> parseRouteAction(
9831009 switch (proto .getClusterSpecifierCase ()) {
9841010 case CLUSTER :
9851011 return StructOrError .fromStruct (RouteAction .forCluster (
986- proto .getCluster (), hashPolicies , timeoutNano ));
1012+ proto .getCluster (), hashPolicies , timeoutNano , retryPolicy ));
9871013 case CLUSTER_HEADER :
9881014 return null ;
9891015 case WEIGHTED_CLUSTERS :
@@ -1005,14 +1031,76 @@ static StructOrError<RouteAction> parseRouteAction(
10051031 }
10061032 // TODO(chengyuanzhang): validate if the sum of weights equals to total weight.
10071033 return StructOrError .fromStruct (RouteAction .forWeightedClusters (
1008- weightedClusters , hashPolicies , timeoutNano ));
1034+ weightedClusters , hashPolicies , timeoutNano , retryPolicy ));
10091035 case CLUSTERSPECIFIER_NOT_SET :
10101036 default :
10111037 return StructOrError .fromError (
10121038 "Unknown cluster specifier: " + proto .getClusterSpecifierCase ());
10131039 }
10141040 }
10151041
1042+ @ Nullable // Return null if we ignore the given policy.
1043+ private static StructOrError <RetryPolicy > parseRetryPolicy (
1044+ io .envoyproxy .envoy .config .route .v3 .RetryPolicy retryPolicyProto ) {
1045+ int maxAttempts = 2 ;
1046+ if (retryPolicyProto .hasNumRetries ()) {
1047+ maxAttempts = retryPolicyProto .getNumRetries ().getValue () + 1 ;
1048+ }
1049+ Duration initialBackoff = Durations .fromMillis (25 );
1050+ Duration maxBackoff = Durations .fromMillis (250 );
1051+ if (retryPolicyProto .hasRetryBackOff ()) {
1052+ RetryBackOff retryBackOff = retryPolicyProto .getRetryBackOff ();
1053+ if (!retryBackOff .hasBaseInterval ()) {
1054+ return StructOrError .fromError ("No base_interval specified in retry_backoff" );
1055+ }
1056+ Duration originalInitialBackoff = initialBackoff = retryBackOff .getBaseInterval ();
1057+ if (Durations .compare (initialBackoff , Durations .ZERO ) <= 0 ) {
1058+ return StructOrError .fromError ("base_interval in retry_backoff must be positive" );
1059+ }
1060+ if (Durations .compare (initialBackoff , Durations .fromMillis (1 )) < 0 ) {
1061+ initialBackoff = Durations .fromMillis (1 );
1062+ }
1063+ if (retryBackOff .hasMaxInterval ()) {
1064+ maxBackoff = retryPolicyProto .getRetryBackOff ().getMaxInterval ();
1065+ if (Durations .compare (maxBackoff , originalInitialBackoff ) < 0 ) {
1066+ return StructOrError .fromError (
1067+ "max_interval in retry_backoff cannot be less than base_interval" );
1068+ }
1069+ if (Durations .compare (maxBackoff , Durations .fromMillis (1 )) < 0 ) {
1070+ maxBackoff = Durations .fromMillis (1 );
1071+ }
1072+ } else {
1073+ maxBackoff = Durations .fromNanos (Durations .toNanos (initialBackoff ) * 10 );
1074+ }
1075+ }
1076+ Iterable <String > retryOns = Splitter .on (',' ).split (retryPolicyProto .getRetryOn ());
1077+ ImmutableList .Builder <Code > retryableStatusCodesBuilder = ImmutableList .builder ();
1078+ for (String retryOn : retryOns ) {
1079+ Code code ;
1080+ try {
1081+ code = Code .valueOf (retryOn .toUpperCase (Locale .US ).replace ('-' , '_' ));
1082+ } catch (IllegalArgumentException e ) {
1083+ // TODO(zdapeng): TBD
1084+ // unsupported value, such as "5xx"
1085+ return null ;
1086+ }
1087+ if (!SUPPORTED_RETRYABLE_CODES .contains (code )) {
1088+ // TODO(zdapeng): TBD
1089+ // unsupported value
1090+ return null ;
1091+ }
1092+ retryableStatusCodesBuilder .add (code );
1093+ }
1094+ List <Code > retryableStatusCodes = retryableStatusCodesBuilder .build ();
1095+ if (!retryableStatusCodes .isEmpty ()) {
1096+ return StructOrError .fromStruct (
1097+ RetryPolicy .create (
1098+ maxAttempts , retryableStatusCodes , initialBackoff , maxBackoff ,
1099+ /* perAttemptRecvTimeout= */ null ));
1100+ }
1101+ return null ;
1102+ }
1103+
10161104 @ VisibleForTesting
10171105 static StructOrError <ClusterWeight > parseClusterWeight (
10181106 io .envoyproxy .envoy .config .route .v3 .WeightedCluster .ClusterWeight proto ,
0 commit comments