Skip to content

Commit be7aa50

Browse files
authored
xds: referenciate server routing config (grpc#8491)
* routing config ref * atomic ref virtual host list * Revert "routing config ref" This reverts commit cbcad57. * test: noop config non-static, better validation
1 parent 9870db1 commit be7aa50

3 files changed

Lines changed: 150 additions & 153 deletions

File tree

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ public void run() {
396396
}
397397
routeDiscoveryStates.keySet().retainAll(allRds);
398398
if (pendingRds.isEmpty()) {
399-
updateSelector(true);
399+
updateSelector();
400400
}
401401
}
402402
});
@@ -450,14 +450,7 @@ private void shutdown() {
450450
releaseSuppliersInFlight();
451451
}
452452

453-
/**
454-
* Use firstTimeNoPendingRds to indicate that the previous SslContextProviderSuppliers in
455-
* filterChainSelectorRef should be released. Call updateSelector(true) when all routing are
456-
* just complete and the newest filter chain is ready to be applied to the
457-
* filterChainSelectorRef. Call updateSelector(false) for subsequent routing update
458-
* corresponding to the same filter chain list.
459-
*/
460-
private void updateSelector(boolean firstTimeNoPendingRds) {
453+
private void updateSelector() {
461454
Map<FilterChain, ServerRoutingConfig> filterChainRouting = new HashMap<>();
462455
for (FilterChain filterChain: filterChains) {
463456
filterChainRouting.put(filterChain, generateRoutingConfig(filterChain));
@@ -466,10 +459,7 @@ private void updateSelector(boolean firstTimeNoPendingRds) {
466459
Collections.unmodifiableMap(filterChainRouting),
467460
defaultFilterChain == null ? null : defaultFilterChain.getSslContextProviderSupplier(),
468461
defaultFilterChain == null ? null : generateRoutingConfig(defaultFilterChain));
469-
List<SslContextProviderSupplier> toRelease = Collections.emptyList();
470-
if (firstTimeNoPendingRds) {
471-
toRelease = getSuppliersInUse();
472-
}
462+
List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
473463
filterChainSelectorRef.set(selector);
474464
for (SslContextProviderSupplier e: toRelease) {
475465
e.close();
@@ -480,14 +470,12 @@ private void updateSelector(boolean firstTimeNoPendingRds) {
480470
private ServerRoutingConfig generateRoutingConfig(FilterChain filterChain) {
481471
HttpConnectionManager hcm = filterChain.getHttpConnectionManager();
482472
if (hcm.virtualHosts() != null) {
483-
return ServerRoutingConfig.create(hcm.httpFilterConfigs(), hcm.virtualHosts());
473+
return ServerRoutingConfig.create(hcm.httpFilterConfigs(),
474+
new AtomicReference<>(hcm.virtualHosts()));
484475
} else {
485476
RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
486-
if (rds != null && rds.savedVirtualHosts != null) {
487-
return ServerRoutingConfig.create(hcm.httpFilterConfigs(), rds.savedVirtualHosts);
488-
} else {
489-
return ServerRoutingConfig.FAILING_ROUTING_CONFIG;
490-
}
477+
checkNotNull(rds, "rds");
478+
return ServerRoutingConfig.create(hcm.httpFilterConfigs(), rds.savedVirtualHosts);
491479
}
492480
}
493481

@@ -555,8 +543,8 @@ private void releaseSuppliersInFlight() {
555543

556544
private final class RouteDiscoveryState implements RdsResourceWatcher {
557545
private final String resourceName;
558-
@Nullable
559-
private List<VirtualHost> savedVirtualHosts;
546+
private AtomicReference<ImmutableList<VirtualHost>> savedVirtualHosts =
547+
new AtomicReference<>();
560548
private boolean isPending = true;
561549

562550
private RouteDiscoveryState(String resourceName) {
@@ -571,7 +559,7 @@ public void run() {
571559
if (!routeDiscoveryStates.containsKey(resourceName)) {
572560
return;
573561
}
574-
savedVirtualHosts = update.virtualHosts;
562+
savedVirtualHosts.set(ImmutableList.copyOf(update.virtualHosts));
575563
maybeUpdateSelector();
576564
}
577565
});
@@ -586,7 +574,7 @@ public void run() {
586574
return;
587575
}
588576
logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
589-
savedVirtualHosts = null;
577+
savedVirtualHosts.set(null);
590578
maybeUpdateSelector();
591579
}
592580
});
@@ -608,13 +596,13 @@ public void run() {
608596
}
609597

610598
// Update the selector to use the most recently updated configs only after all rds have been
611-
// discovered, i.e. pendingRds is empty. Do the updateSelector even after rds are already
612-
// fully discovered and new change comes.
599+
// discovered for the first time. Later changes on rds will be applied through virtual host
600+
// list atomic ref.
613601
private void maybeUpdateSelector() {
614602
isPending = false;
615-
boolean isLastPending = pendingRds.remove(resourceName);
616-
if (pendingRds.isEmpty()) {
617-
updateSelector(isLastPending);
603+
boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
604+
if (isLastPending) {
605+
updateSelector();
618606
}
619607
}
620608
}
@@ -644,15 +632,19 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
644632
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
645633
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
646634
ServerRoutingConfig routingConfig = call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
647-
if (routingConfig == null
648-
|| routingConfig.equals(ServerRoutingConfig.FAILING_ROUTING_CONFIG)) {
649-
String errorMsg = "Missing xDS routing config. " + (routingConfig == null ? "" :
650-
"RDS config unavailable.");
635+
if (routingConfig == null) {
636+
String errorMsg = "Missing xDS routing config.";
637+
call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
638+
return new Listener<ReqT>() {};
639+
}
640+
List<VirtualHost> virtualHosts = routingConfig.virtualHosts().get();
641+
if (virtualHosts == null) {
642+
String errorMsg = "Missing xDS routing config VirtualHosts due to RDS config unavailable.";
651643
call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
652644
return new Listener<ReqT>() {};
653645
}
654646
VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
655-
routingConfig.virtualHosts(), call.getAuthority());
647+
virtualHosts, call.getAuthority());
656648
if (virtualHost == null) {
657649
call.close(
658650
Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
@@ -727,24 +719,20 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
727719
*/
728720
@AutoValue
729721
abstract static class ServerRoutingConfig {
730-
private static final ServerRoutingConfig FAILING_ROUTING_CONFIG =
731-
new AutoValue_XdsServerWrapper_ServerRoutingConfig(
732-
ImmutableList.<NamedFilterConfig>of(), ImmutableList.<VirtualHost>of());
733-
734722
// Top level http filter configs.
735723
abstract ImmutableList<NamedFilterConfig> httpFilterConfigs();
736724

737-
abstract ImmutableList<VirtualHost> virtualHosts();
725+
abstract AtomicReference<ImmutableList<VirtualHost>> virtualHosts();
738726

739727
/**
740728
* Server routing configuration.
741729
* */
742730
public static ServerRoutingConfig create(List<NamedFilterConfig> httpFilterConfigs,
743-
List<VirtualHost> virtualHosts) {
731+
AtomicReference<ImmutableList<VirtualHost>> virtualHosts) {
744732
checkNotNull(httpFilterConfigs, "httpFilterConfigs");
745733
checkNotNull(virtualHosts, "virtualHosts");
746734
return new AutoValue_XdsServerWrapper_ServerRoutingConfig(
747-
ImmutableList.copyOf(httpFilterConfigs), ImmutableList.copyOf(virtualHosts));
735+
ImmutableList.copyOf(httpFilterConfigs), virtualHosts);
748736
}
749737
}
750738
}

0 commit comments

Comments
 (0)