Skip to content

Commit 7a65c74

Browse files
authored
xds: apply valid resources while NACKing update (grpc#8506)
Implementing [gRFC A46](grpc/proposal#260)
1 parent 7ad7876 commit 7a65c74

2 files changed

Lines changed: 248 additions & 96 deletions

File tree

xds/src/main/java/io/grpc/xds/ClientXdsClient.java

Lines changed: 71 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ final class ClientXdsClient extends AbstractXdsClient {
190190
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
191191
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
192192
Set<String> unpackedResources = new HashSet<>(resources.size());
193+
Set<String> invalidResources = new HashSet<>();
193194
List<String> errors = new ArrayList<>();
194195
Set<String> retainedRdsResources = new HashSet<>();
195196

@@ -222,6 +223,7 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
222223
} catch (ResourceInvalidException e) {
223224
errors.add(
224225
"LDS response Listener '" + listenerName + "' validation error: " + e.getMessage());
226+
invalidResources.add(listenerName);
225227
continue;
226228
}
227229

@@ -231,19 +233,9 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
231233
getLogger().log(XdsLogLevel.INFO,
232234
"Received LDS Response version {0} nonce {1}. Parsed resources: {2}",
233235
versionInfo, nonce, unpackedResources);
234-
235-
if (!errors.isEmpty()) {
236-
handleResourcesRejected(ResourceType.LDS, unpackedResources, versionInfo, nonce, errors);
237-
return;
238-
}
239-
240-
handleResourcesAccepted(ResourceType.LDS, parsedResources, versionInfo, nonce);
241-
for (String resource : rdsResourceSubscribers.keySet()) {
242-
if (!retainedRdsResources.contains(resource)) {
243-
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource);
244-
subscriber.onAbsent();
245-
}
246-
}
236+
handleResourceUpdate(
237+
ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo,
238+
nonce, errors);
247239
}
248240

249241
private LdsUpdate processClientSideListener(
@@ -1313,6 +1305,7 @@ static StructOrError<ClusterWeight> parseClusterWeight(
13131305
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
13141306
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
13151307
Set<String> unpackedResources = new HashSet<>(resources.size());
1308+
Set<String> invalidResources = new HashSet<>();
13161309
List<String> errors = new ArrayList<>();
13171310

13181311
for (int i = 0; i < resources.size(); i++) {
@@ -1340,6 +1333,7 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
13401333
errors.add(
13411334
"RDS response RouteConfiguration '" + routeConfigName + "' validation error: " + e
13421335
.getMessage());
1336+
invalidResources.add(routeConfigName);
13431337
continue;
13441338
}
13451339

@@ -1348,12 +1342,9 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
13481342
getLogger().log(XdsLogLevel.INFO,
13491343
"Received RDS Response version {0} nonce {1}. Parsed resources: {2}",
13501344
versionInfo, nonce, unpackedResources);
1351-
1352-
if (!errors.isEmpty()) {
1353-
handleResourcesRejected(ResourceType.RDS, unpackedResources, versionInfo, nonce, errors);
1354-
} else {
1355-
handleResourcesAccepted(ResourceType.RDS, parsedResources, versionInfo, nonce);
1356-
}
1345+
handleResourceUpdate(
1346+
ResourceType.RDS, parsedResources, invalidResources, Collections.<String>emptySet(),
1347+
versionInfo, nonce, errors);
13571348
}
13581349

13591350
private static RdsUpdate processRouteConfiguration(
@@ -1377,6 +1368,7 @@ private static RdsUpdate processRouteConfiguration(
13771368
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
13781369
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
13791370
Set<String> unpackedResources = new HashSet<>(resources.size());
1371+
Set<String> invalidResources = new HashSet<>();
13801372
List<String> errors = new ArrayList<>();
13811373
Set<String> retainedEdsResources = new HashSet<>();
13821374

@@ -1413,28 +1405,17 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
14131405
} catch (ResourceInvalidException e) {
14141406
errors.add(
14151407
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage());
1408+
invalidResources.add(clusterName);
14161409
continue;
14171410
}
14181411
parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource));
14191412
}
14201413
getLogger().log(XdsLogLevel.INFO,
14211414
"Received CDS Response version {0} nonce {1}. Parsed resources: {2}",
14221415
versionInfo, nonce, unpackedResources);
1423-
1424-
if (!errors.isEmpty()) {
1425-
handleResourcesRejected(ResourceType.CDS, unpackedResources, versionInfo, nonce, errors);
1426-
return;
1427-
}
1428-
1429-
handleResourcesAccepted(ResourceType.CDS, parsedResources, versionInfo, nonce);
1430-
// CDS responses represents the state of the world, EDS resources not referenced in CDS
1431-
// resources should be deleted.
1432-
for (String resource : edsResourceSubscribers.keySet()) {
1433-
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
1434-
if (!retainedEdsResources.contains(resource)) {
1435-
subscriber.onAbsent();
1436-
}
1437-
}
1416+
handleResourceUpdate(
1417+
ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo,
1418+
nonce, errors);
14381419
}
14391420

14401421
@VisibleForTesting
@@ -1615,6 +1596,7 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
16151596
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
16161597
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
16171598
Set<String> unpackedResources = new HashSet<>(resources.size());
1599+
Set<String> invalidResources = new HashSet<>();
16181600
List<String> errors = new ArrayList<>();
16191601

16201602
for (int i = 0; i < resources.size(); i++) {
@@ -1649,16 +1631,17 @@ protected void handleEdsResponse(String versionInfo, List<Any> resources, String
16491631
} catch (ResourceInvalidException e) {
16501632
errors.add("EDS response ClusterLoadAssignment '" + clusterName
16511633
+ "' validation error: " + e.getMessage());
1634+
invalidResources.add(clusterName);
16521635
continue;
16531636
}
16541637
parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource));
16551638
}
1656-
1657-
if (!errors.isEmpty()) {
1658-
handleResourcesRejected(ResourceType.EDS, unpackedResources, versionInfo, nonce, errors);
1659-
} else {
1660-
handleResourcesAccepted(ResourceType.EDS, parsedResources, versionInfo, nonce);
1661-
}
1639+
getLogger().log(
1640+
XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}",
1641+
versionInfo, nonce, unpackedResources);
1642+
handleResourceUpdate(
1643+
ResourceType.EDS, parsedResources, invalidResources, Collections.<String>emptySet(),
1644+
versionInfo, nonce, errors);
16621645
}
16631646

16641647
private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
@@ -2048,43 +2031,67 @@ private void cleanUpResourceTimers() {
20482031
}
20492032
}
20502033

2051-
private void handleResourcesAccepted(
2052-
ResourceType type, Map<String, ParsedResource> parsedResources, String version,
2053-
String nonce) {
2054-
ackResponse(type, version, nonce);
2055-
2034+
private void handleResourceUpdate(
2035+
ResourceType type, Map<String, ParsedResource> parsedResources, Set<String> invalidResources,
2036+
Set<String> retainedResources, String version, String nonce, List<String> errors) {
2037+
String errorDetail = null;
2038+
if (errors.isEmpty()) {
2039+
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
2040+
ackResponse(type, version, nonce);
2041+
} else {
2042+
errorDetail = Joiner.on('\n').join(errors);
2043+
getLogger().log(XdsLogLevel.WARNING,
2044+
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
2045+
type, version, nonce, errorDetail);
2046+
nackResponse(type, nonce, errorDetail);
2047+
}
20562048
long updateTime = timeProvider.currentTimeNanos();
20572049
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
20582050
String resourceName = entry.getKey();
20592051
ResourceSubscriber subscriber = entry.getValue();
2052+
// Attach error details to the subscribed resources that included in the ADS update.
2053+
if (invalidResources.contains(resourceName)) {
2054+
subscriber.onRejected(version, updateTime, errorDetail);
2055+
}
20602056
// Notify the watchers.
20612057
if (parsedResources.containsKey(resourceName)) {
20622058
subscriber.onData(parsedResources.get(resourceName), version, updateTime);
20632059
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
2060+
if (subscriber.data != null && invalidResources.contains(resourceName)) {
2061+
// Update is rejected but keep using the cached data.
2062+
if (type == ResourceType.LDS) {
2063+
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
2064+
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
2065+
if (hcm != null) {
2066+
String rdsName = hcm.rdsName();
2067+
if (rdsName != null) {
2068+
retainedResources.add(rdsName);
2069+
}
2070+
}
2071+
} else {
2072+
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
2073+
String edsName = cdsUpdate.edsServiceName();
2074+
if (edsName == null) {
2075+
edsName = cdsUpdate.clusterName();
2076+
}
2077+
retainedResources.add(edsName);
2078+
}
2079+
continue;
2080+
}
20642081
// For State of the World services, notify watchers when their watched resource is missing
20652082
// from the ADS update.
20662083
subscriber.onAbsent();
20672084
}
20682085
}
2069-
}
2070-
2071-
private void handleResourcesRejected(
2072-
ResourceType type, Set<String> unpackedResourceNames, String version,
2073-
String nonce, List<String> errors) {
2074-
String errorDetail = Joiner.on('\n').join(errors);
2075-
getLogger().log(XdsLogLevel.WARNING,
2076-
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
2077-
type, version, nonce, errorDetail);
2078-
nackResponse(type, nonce, errorDetail);
2079-
2080-
long updateTime = timeProvider.currentTimeNanos();
2081-
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
2082-
String resourceName = entry.getKey();
2083-
ResourceSubscriber subscriber = entry.getValue();
2084-
2085-
// Attach error details to the subscribed resources that included in the ADS update.
2086-
if (unpackedResourceNames.contains(resourceName)) {
2087-
subscriber.onRejected(version, updateTime, errorDetail);
2086+
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
2087+
// LDS/CDS resources should be deleted.
2088+
if (type == ResourceType.LDS || type == ResourceType.CDS) {
2089+
Map<String, ResourceSubscriber> dependentSubscribers =
2090+
type == ResourceType.LDS ? rdsResourceSubscribers : edsResourceSubscribers;
2091+
for (String resource : dependentSubscribers.keySet()) {
2092+
if (!retainedResources.contains(resource)) {
2093+
dependentSubscribers.get(resource).onAbsent();
2094+
}
20882095
}
20892096
}
20902097
}

0 commit comments

Comments
 (0)