@@ -190,6 +190,7 @@ final class ClientXdsClient extends AbstractXdsClient {
190190 protected void handleLdsResponse (String versionInfo , List <Any > resources , String nonce ) {
191191 Map <String , ParsedResource > parsedResources = new HashMap <>(resources .size ());
192192 Set <String > unpackedResources = new HashSet <>(resources .size ());
193+ Set <String > invalidResources = new HashSet <>();
193194 List <String > errors = new ArrayList <>();
194195 Set <String > retainedRdsResources = new HashSet <>();
195196
@@ -222,6 +223,7 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
222223 } catch (ResourceInvalidException e ) {
223224 errors .add (
224225 "LDS response Listener '" + listenerName + "' validation error: " + e .getMessage ());
226+ invalidResources .add (listenerName );
225227 continue ;
226228 }
227229
@@ -231,19 +233,9 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
231233 getLogger ().log (XdsLogLevel .INFO ,
232234 "Received LDS Response version {0} nonce {1}. Parsed resources: {2}" ,
233235 versionInfo , nonce , unpackedResources );
234-
235- if (!errors .isEmpty ()) {
236- handleResourcesRejected (ResourceType .LDS , unpackedResources , versionInfo , nonce , errors );
237- return ;
238- }
239-
240- handleResourcesAccepted (ResourceType .LDS , parsedResources , versionInfo , nonce );
241- for (String resource : rdsResourceSubscribers .keySet ()) {
242- if (!retainedRdsResources .contains (resource )) {
243- ResourceSubscriber subscriber = rdsResourceSubscribers .get (resource );
244- subscriber .onAbsent ();
245- }
246- }
236+ handleResourceUpdate (
237+ ResourceType .LDS , parsedResources , invalidResources , retainedRdsResources , versionInfo ,
238+ nonce , errors );
247239 }
248240
249241 private LdsUpdate processClientSideListener (
@@ -1313,6 +1305,7 @@ static StructOrError<ClusterWeight> parseClusterWeight(
13131305 protected void handleRdsResponse (String versionInfo , List <Any > resources , String nonce ) {
13141306 Map <String , ParsedResource > parsedResources = new HashMap <>(resources .size ());
13151307 Set <String > unpackedResources = new HashSet <>(resources .size ());
1308+ Set <String > invalidResources = new HashSet <>();
13161309 List <String > errors = new ArrayList <>();
13171310
13181311 for (int i = 0 ; i < resources .size (); i ++) {
@@ -1340,6 +1333,7 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
13401333 errors .add (
13411334 "RDS response RouteConfiguration '" + routeConfigName + "' validation error: " + e
13421335 .getMessage ());
1336+ invalidResources .add (routeConfigName );
13431337 continue ;
13441338 }
13451339
@@ -1348,12 +1342,9 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
13481342 getLogger ().log (XdsLogLevel .INFO ,
13491343 "Received RDS Response version {0} nonce {1}. Parsed resources: {2}" ,
13501344 versionInfo , nonce , unpackedResources );
1351-
1352- if (!errors .isEmpty ()) {
1353- handleResourcesRejected (ResourceType .RDS , unpackedResources , versionInfo , nonce , errors );
1354- } else {
1355- handleResourcesAccepted (ResourceType .RDS , parsedResources , versionInfo , nonce );
1356- }
1345+ handleResourceUpdate (
1346+ ResourceType .RDS , parsedResources , invalidResources , Collections .<String >emptySet (),
1347+ versionInfo , nonce , errors );
13571348 }
13581349
13591350 private static RdsUpdate processRouteConfiguration (
@@ -1377,6 +1368,7 @@ private static RdsUpdate processRouteConfiguration(
13771368 protected void handleCdsResponse (String versionInfo , List <Any > resources , String nonce ) {
13781369 Map <String , ParsedResource > parsedResources = new HashMap <>(resources .size ());
13791370 Set <String > unpackedResources = new HashSet <>(resources .size ());
1371+ Set <String > invalidResources = new HashSet <>();
13801372 List <String > errors = new ArrayList <>();
13811373 Set <String > retainedEdsResources = new HashSet <>();
13821374
@@ -1413,28 +1405,17 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
14131405 } catch (ResourceInvalidException e ) {
14141406 errors .add (
14151407 "CDS response Cluster '" + clusterName + "' validation error: " + e .getMessage ());
1408+ invalidResources .add (clusterName );
14161409 continue ;
14171410 }
14181411 parsedResources .put (clusterName , new ParsedResource (cdsUpdate , resource ));
14191412 }
14201413 getLogger ().log (XdsLogLevel .INFO ,
14211414 "Received CDS Response version {0} nonce {1}. Parsed resources: {2}" ,
14221415 versionInfo , nonce , unpackedResources );
1423-
1424- if (!errors .isEmpty ()) {
1425- handleResourcesRejected (ResourceType .CDS , unpackedResources , versionInfo , nonce , errors );
1426- return ;
1427- }
1428-
1429- handleResourcesAccepted (ResourceType .CDS , parsedResources , versionInfo , nonce );
1430- // CDS responses represents the state of the world, EDS resources not referenced in CDS
1431- // resources should be deleted.
1432- for (String resource : edsResourceSubscribers .keySet ()) {
1433- ResourceSubscriber subscriber = edsResourceSubscribers .get (resource );
1434- if (!retainedEdsResources .contains (resource )) {
1435- subscriber .onAbsent ();
1436- }
1437- }
1416+ handleResourceUpdate (
1417+ ResourceType .CDS , parsedResources , invalidResources , retainedEdsResources , versionInfo ,
1418+ nonce , errors );
14381419 }
14391420
14401421 @ VisibleForTesting
@@ -1615,6 +1596,7 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
16151596 protected void handleEdsResponse (String versionInfo , List <Any > resources , String nonce ) {
16161597 Map <String , ParsedResource > parsedResources = new HashMap <>(resources .size ());
16171598 Set <String > unpackedResources = new HashSet <>(resources .size ());
1599+ Set <String > invalidResources = new HashSet <>();
16181600 List <String > errors = new ArrayList <>();
16191601
16201602 for (int i = 0 ; i < resources .size (); i ++) {
@@ -1649,16 +1631,17 @@ protected void handleEdsResponse(String versionInfo, List<Any> resources, String
16491631 } catch (ResourceInvalidException e ) {
16501632 errors .add ("EDS response ClusterLoadAssignment '" + clusterName
16511633 + "' validation error: " + e .getMessage ());
1634+ invalidResources .add (clusterName );
16521635 continue ;
16531636 }
16541637 parsedResources .put (clusterName , new ParsedResource (edsUpdate , resource ));
16551638 }
1656-
1657- if (! errors . isEmpty ()) {
1658- handleResourcesRejected ( ResourceType . EDS , unpackedResources , versionInfo , nonce , errors );
1659- } else {
1660- handleResourcesAccepted ( ResourceType .EDS , parsedResources , versionInfo , nonce );
1661- }
1639+ getLogger (). log (
1640+ XdsLogLevel . INFO , "Received EDS Response version {0} nonce {1}. Parsed resources: {2}" ,
1641+ versionInfo , nonce , unpackedResources );
1642+ handleResourceUpdate (
1643+ ResourceType .EDS , parsedResources , invalidResources , Collections .< String > emptySet (),
1644+ versionInfo , nonce , errors );
16621645 }
16631646
16641647 private static EdsUpdate processClusterLoadAssignment (ClusterLoadAssignment assignment )
@@ -2048,43 +2031,67 @@ private void cleanUpResourceTimers() {
20482031 }
20492032 }
20502033
2051- private void handleResourcesAccepted (
2052- ResourceType type , Map <String , ParsedResource > parsedResources , String version ,
2053- String nonce ) {
2054- ackResponse (type , version , nonce );
2055-
2034+ private void handleResourceUpdate (
2035+ ResourceType type , Map <String , ParsedResource > parsedResources , Set <String > invalidResources ,
2036+ Set <String > retainedResources , String version , String nonce , List <String > errors ) {
2037+ String errorDetail = null ;
2038+ if (errors .isEmpty ()) {
2039+ checkArgument (invalidResources .isEmpty (), "found invalid resources but missing errors" );
2040+ ackResponse (type , version , nonce );
2041+ } else {
2042+ errorDetail = Joiner .on ('\n' ).join (errors );
2043+ getLogger ().log (XdsLogLevel .WARNING ,
2044+ "Failed processing {0} Response version {1} nonce {2}. Errors:\n {3}" ,
2045+ type , version , nonce , errorDetail );
2046+ nackResponse (type , nonce , errorDetail );
2047+ }
20562048 long updateTime = timeProvider .currentTimeNanos ();
20572049 for (Map .Entry <String , ResourceSubscriber > entry : getSubscribedResourcesMap (type ).entrySet ()) {
20582050 String resourceName = entry .getKey ();
20592051 ResourceSubscriber subscriber = entry .getValue ();
2052+ // Attach error details to the subscribed resources that included in the ADS update.
2053+ if (invalidResources .contains (resourceName )) {
2054+ subscriber .onRejected (version , updateTime , errorDetail );
2055+ }
20602056 // Notify the watchers.
20612057 if (parsedResources .containsKey (resourceName )) {
20622058 subscriber .onData (parsedResources .get (resourceName ), version , updateTime );
20632059 } else if (type == ResourceType .LDS || type == ResourceType .CDS ) {
2060+ if (subscriber .data != null && invalidResources .contains (resourceName )) {
2061+ // Update is rejected but keep using the cached data.
2062+ if (type == ResourceType .LDS ) {
2063+ LdsUpdate ldsUpdate = (LdsUpdate ) subscriber .data ;
2064+ io .grpc .xds .HttpConnectionManager hcm = ldsUpdate .httpConnectionManager ();
2065+ if (hcm != null ) {
2066+ String rdsName = hcm .rdsName ();
2067+ if (rdsName != null ) {
2068+ retainedResources .add (rdsName );
2069+ }
2070+ }
2071+ } else {
2072+ CdsUpdate cdsUpdate = (CdsUpdate ) subscriber .data ;
2073+ String edsName = cdsUpdate .edsServiceName ();
2074+ if (edsName == null ) {
2075+ edsName = cdsUpdate .clusterName ();
2076+ }
2077+ retainedResources .add (edsName );
2078+ }
2079+ continue ;
2080+ }
20642081 // For State of the World services, notify watchers when their watched resource is missing
20652082 // from the ADS update.
20662083 subscriber .onAbsent ();
20672084 }
20682085 }
2069- }
2070-
2071- private void handleResourcesRejected (
2072- ResourceType type , Set <String > unpackedResourceNames , String version ,
2073- String nonce , List <String > errors ) {
2074- String errorDetail = Joiner .on ('\n' ).join (errors );
2075- getLogger ().log (XdsLogLevel .WARNING ,
2076- "Failed processing {0} Response version {1} nonce {2}. Errors:\n {3}" ,
2077- type , version , nonce , errorDetail );
2078- nackResponse (type , nonce , errorDetail );
2079-
2080- long updateTime = timeProvider .currentTimeNanos ();
2081- for (Map .Entry <String , ResourceSubscriber > entry : getSubscribedResourcesMap (type ).entrySet ()) {
2082- String resourceName = entry .getKey ();
2083- ResourceSubscriber subscriber = entry .getValue ();
2084-
2085- // Attach error details to the subscribed resources that included in the ADS update.
2086- if (unpackedResourceNames .contains (resourceName )) {
2087- subscriber .onRejected (version , updateTime , errorDetail );
2086+ // LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
2087+ // LDS/CDS resources should be deleted.
2088+ if (type == ResourceType .LDS || type == ResourceType .CDS ) {
2089+ Map <String , ResourceSubscriber > dependentSubscribers =
2090+ type == ResourceType .LDS ? rdsResourceSubscribers : edsResourceSubscribers ;
2091+ for (String resource : dependentSubscribers .keySet ()) {
2092+ if (!retainedResources .contains (resource )) {
2093+ dependentSubscribers .get (resource ).onAbsent ();
2094+ }
20882095 }
20892096 }
20902097 }
0 commit comments