@@ -396,7 +396,7 @@ public void run() {
396396 }
397397 routeDiscoveryStates .keySet ().retainAll (allRds );
398398 if (pendingRds .isEmpty ()) {
399- updateSelector (true );
399+ updateSelector ();
400400 }
401401 }
402402 });
@@ -450,14 +450,7 @@ private void shutdown() {
450450 releaseSuppliersInFlight ();
451451 }
452452
453- /**
454- * Use firstTimeNoPendingRds to indicate that the previous SslContextProviderSuppliers in
455- * filterChainSelectorRef should be released. Call updateSelector(true) when all routing are
456- * just complete and the newest filter chain is ready to be applied to the
457- * filterChainSelectorRef. Call updateSelector(false) for subsequent routing update
458- * corresponding to the same filter chain list.
459- */
460- private void updateSelector (boolean firstTimeNoPendingRds ) {
453+ private void updateSelector () {
461454 Map <FilterChain , ServerRoutingConfig > filterChainRouting = new HashMap <>();
462455 for (FilterChain filterChain : filterChains ) {
463456 filterChainRouting .put (filterChain , generateRoutingConfig (filterChain ));
@@ -466,10 +459,7 @@ private void updateSelector(boolean firstTimeNoPendingRds) {
466459 Collections .unmodifiableMap (filterChainRouting ),
467460 defaultFilterChain == null ? null : defaultFilterChain .getSslContextProviderSupplier (),
468461 defaultFilterChain == null ? null : generateRoutingConfig (defaultFilterChain ));
469- List <SslContextProviderSupplier > toRelease = Collections .emptyList ();
470- if (firstTimeNoPendingRds ) {
471- toRelease = getSuppliersInUse ();
472- }
462+ List <SslContextProviderSupplier > toRelease = getSuppliersInUse ();
473463 filterChainSelectorRef .set (selector );
474464 for (SslContextProviderSupplier e : toRelease ) {
475465 e .close ();
@@ -480,14 +470,12 @@ private void updateSelector(boolean firstTimeNoPendingRds) {
480470 private ServerRoutingConfig generateRoutingConfig (FilterChain filterChain ) {
481471 HttpConnectionManager hcm = filterChain .getHttpConnectionManager ();
482472 if (hcm .virtualHosts () != null ) {
483- return ServerRoutingConfig .create (hcm .httpFilterConfigs (), hcm .virtualHosts ());
473+ return ServerRoutingConfig .create (hcm .httpFilterConfigs (),
474+ new AtomicReference <>(hcm .virtualHosts ()));
484475 } else {
485476 RouteDiscoveryState rds = routeDiscoveryStates .get (hcm .rdsName ());
486- if (rds != null && rds .savedVirtualHosts != null ) {
487- return ServerRoutingConfig .create (hcm .httpFilterConfigs (), rds .savedVirtualHosts );
488- } else {
489- return ServerRoutingConfig .FAILING_ROUTING_CONFIG ;
490- }
477+ checkNotNull (rds , "rds" );
478+ return ServerRoutingConfig .create (hcm .httpFilterConfigs (), rds .savedVirtualHosts );
491479 }
492480 }
493481
@@ -555,8 +543,8 @@ private void releaseSuppliersInFlight() {
555543
556544 private final class RouteDiscoveryState implements RdsResourceWatcher {
557545 private final String resourceName ;
558- @ Nullable
559- private List < VirtualHost > savedVirtualHosts ;
546+ private AtomicReference < ImmutableList < VirtualHost >> savedVirtualHosts =
547+ new AtomicReference <>() ;
560548 private boolean isPending = true ;
561549
562550 private RouteDiscoveryState (String resourceName ) {
@@ -571,7 +559,7 @@ public void run() {
571559 if (!routeDiscoveryStates .containsKey (resourceName )) {
572560 return ;
573561 }
574- savedVirtualHosts = update .virtualHosts ;
562+ savedVirtualHosts . set ( ImmutableList . copyOf ( update .virtualHosts )) ;
575563 maybeUpdateSelector ();
576564 }
577565 });
@@ -586,7 +574,7 @@ public void run() {
586574 return ;
587575 }
588576 logger .log (Level .WARNING , "Rds {0} unavailable" , resourceName );
589- savedVirtualHosts = null ;
577+ savedVirtualHosts . set ( null ) ;
590578 maybeUpdateSelector ();
591579 }
592580 });
@@ -608,13 +596,13 @@ public void run() {
608596 }
609597
610598 // Update the selector to use the most recently updated configs only after all rds have been
611- // discovered, i.e. pendingRds is empty. Do the updateSelector even after rds are already
612- // fully discovered and new change comes .
599+ // discovered for the first time. Later changes on rds will be applied through virtual host
600+ // list atomic ref .
613601 private void maybeUpdateSelector () {
614602 isPending = false ;
615- boolean isLastPending = pendingRds .remove (resourceName );
616- if (pendingRds . isEmpty () ) {
617- updateSelector (isLastPending );
603+ boolean isLastPending = pendingRds .remove (resourceName ) && pendingRds . isEmpty () ;
604+ if (isLastPending ) {
605+ updateSelector ();
618606 }
619607 }
620608 }
@@ -644,15 +632,19 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
644632 public <ReqT , RespT > Listener <ReqT > interceptCall (ServerCall <ReqT , RespT > call ,
645633 Metadata headers , ServerCallHandler <ReqT , RespT > next ) {
646634 ServerRoutingConfig routingConfig = call .getAttributes ().get (ATTR_SERVER_ROUTING_CONFIG );
647- if (routingConfig == null
648- || routingConfig .equals (ServerRoutingConfig .FAILING_ROUTING_CONFIG )) {
649- String errorMsg = "Missing xDS routing config. " + (routingConfig == null ? "" :
650- "RDS config unavailable." );
635+ if (routingConfig == null ) {
636+ String errorMsg = "Missing xDS routing config." ;
637+ call .close (Status .UNAVAILABLE .withDescription (errorMsg ), new Metadata ());
638+ return new Listener <ReqT >() {};
639+ }
640+ List <VirtualHost > virtualHosts = routingConfig .virtualHosts ().get ();
641+ if (virtualHosts == null ) {
642+ String errorMsg = "Missing xDS routing config VirtualHosts due to RDS config unavailable." ;
651643 call .close (Status .UNAVAILABLE .withDescription (errorMsg ), new Metadata ());
652644 return new Listener <ReqT >() {};
653645 }
654646 VirtualHost virtualHost = RoutingUtils .findVirtualHostForHostName (
655- routingConfig . virtualHosts () , call .getAuthority ());
647+ virtualHosts , call .getAuthority ());
656648 if (virtualHost == null ) {
657649 call .close (
658650 Status .UNAVAILABLE .withDescription ("Could not find xDS virtual host matching RPC" ),
@@ -727,24 +719,20 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
727719 */
728720 @ AutoValue
729721 abstract static class ServerRoutingConfig {
730- private static final ServerRoutingConfig FAILING_ROUTING_CONFIG =
731- new AutoValue_XdsServerWrapper_ServerRoutingConfig (
732- ImmutableList .<NamedFilterConfig >of (), ImmutableList .<VirtualHost >of ());
733-
734722 // Top level http filter configs.
735723 abstract ImmutableList <NamedFilterConfig > httpFilterConfigs ();
736724
737- abstract ImmutableList <VirtualHost > virtualHosts ();
725+ abstract AtomicReference < ImmutableList <VirtualHost > > virtualHosts ();
738726
739727 /**
740728 * Server routing configuration.
741729 * */
742730 public static ServerRoutingConfig create (List <NamedFilterConfig > httpFilterConfigs ,
743- List < VirtualHost > virtualHosts ) {
731+ AtomicReference < ImmutableList < VirtualHost > > virtualHosts ) {
744732 checkNotNull (httpFilterConfigs , "httpFilterConfigs" );
745733 checkNotNull (virtualHosts , "virtualHosts" );
746734 return new AutoValue_XdsServerWrapper_ServerRoutingConfig (
747- ImmutableList .copyOf (httpFilterConfigs ), ImmutableList . copyOf ( virtualHosts ) );
735+ ImmutableList .copyOf (httpFilterConfigs ), virtualHosts );
748736 }
749737 }
750738}
0 commit comments