diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java index fd7302658c5..2a66ef6770a 100644 --- a/api/src/main/java/io/grpc/NameResolver.java +++ b/api/src/main/java/io/grpc/NameResolver.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -412,6 +413,7 @@ public static final class Args { private final ProxyDetector proxyDetector; private final SynchronizationContext syncContext; private final ServiceConfigParser serviceConfigParser; + @Nullable private final ScheduledExecutorService scheduledExecutorService; @Nullable private final ChannelLogger channelLogger; @Nullable private final Executor executor; @@ -420,12 +422,14 @@ private Args( ProxyDetector proxyDetector, SynchronizationContext syncContext, ServiceConfigParser serviceConfigParser, + @Nullable ScheduledExecutorService scheduledExecutorService, @Nullable ChannelLogger channelLogger, @Nullable Executor executor) { this.defaultPort = checkNotNull(defaultPort, "defaultPort not set"); this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set"); this.syncContext = checkNotNull(syncContext, "syncContext not set"); this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set"); + this.scheduledExecutorService = scheduledExecutorService; this.channelLogger = channelLogger; this.executor = executor; } @@ -460,6 +464,25 @@ public SynchronizationContext getSynchronizationContext() { return syncContext; } + /** + * Returns a {@link ScheduledExecutorService} for scheduling delayed tasks. + * + *

This service is a shared resource and is only meant for quick tasks. DO NOT block or run + * time-consuming tasks. + * + *

The returned service doesn't support {@link ScheduledExecutorService#shutdown shutdown()} + * and {@link ScheduledExecutorService#shutdownNow shutdownNow()}. They will throw if called. + * + * @since 1.26.0 + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454") + public ScheduledExecutorService getScheduledExecutorService() { + if (scheduledExecutorService == null) { + throw new IllegalStateException("ScheduledExecutorService not set in Builder"); + } + return scheduledExecutorService; + } + /** * Returns the {@link ServiceConfigParser}. * @@ -501,6 +524,7 @@ public String toString() { .add("proxyDetector", proxyDetector) .add("syncContext", syncContext) .add("serviceConfigParser", serviceConfigParser) + .add("scheduledExecutorService", scheduledExecutorService) .add("channelLogger", channelLogger) .add("executor", executor) .toString(); @@ -517,6 +541,7 @@ public Builder toBuilder() { builder.setProxyDetector(proxyDetector); builder.setSynchronizationContext(syncContext); builder.setServiceConfigParser(serviceConfigParser); + builder.setScheduledExecutorService(scheduledExecutorService); builder.setChannelLogger(channelLogger); builder.setOffloadExecutor(executor); return builder; @@ -541,6 +566,7 @@ public static final class Builder { private ProxyDetector proxyDetector; private SynchronizationContext syncContext; private ServiceConfigParser serviceConfigParser; + private ScheduledExecutorService scheduledExecutorService; private ChannelLogger channelLogger; private Executor executor; @@ -577,6 +603,16 @@ public Builder setSynchronizationContext(SynchronizationContext syncContext) { return this; } + /** + * See {@link Args#getScheduledExecutorService}. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454") + public Builder setScheduledExecutorService( + ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = checkNotNull(scheduledExecutorService); + return this; + } + /** * See {@link Args#getServiceConfigParser}. This is a required field. * @@ -618,7 +654,7 @@ public Args build() { return new Args( defaultPort, proxyDetector, syncContext, serviceConfigParser, - channelLogger, executor); + scheduledExecutorService, channelLogger, executor); } } } diff --git a/api/src/test/java/io/grpc/NameResolverTest.java b/api/src/test/java/io/grpc/NameResolverTest.java index 828047443a4..0924002fd28 100644 --- a/api/src/test/java/io/grpc/NameResolverTest.java +++ b/api/src/test/java/io/grpc/NameResolverTest.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -45,6 +46,8 @@ public class NameResolverTest { private final SynchronizationContext syncContext = new SynchronizationContext(mock(UncaughtExceptionHandler.class)); private final ServiceConfigParser parser = mock(ServiceConfigParser.class); + private final ScheduledExecutorService scheduledExecutorService = + mock(ScheduledExecutorService.class); private final ChannelLogger channelLogger = mock(ChannelLogger.class); private final Executor executor = Executors.newSingleThreadExecutor(); private URI uri; @@ -62,6 +65,7 @@ public void args() { assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser); + assertThat(args.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService); assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor); @@ -70,6 +74,7 @@ public void args() { assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser); + assertThat(args2.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService); assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor); @@ -254,6 +259,7 @@ private NameResolver.Args createArgs() { .setProxyDetector(proxyDetector) .setSynchronizationContext(syncContext) .setServiceConfigParser(parser) + .setScheduledExecutorService(scheduledExecutorService) .setChannelLogger(channelLogger) .setOffloadExecutor(executor) .build(); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index de3a472501c..b21faa322d3 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -137,7 +137,7 @@ final class ManagedChannelImpl extends ManagedChannel implements private final NameResolver.Args nameResolverArgs; private final AutoConfiguredLoadBalancerFactory loadBalancerFactory; private final ClientTransportFactory transportFactory; - private final ScheduledExecutorForBalancer scheduledExecutorForBalancer; + private final RestrictedScheduledExecutor scheduledExecutor; private final Executor executor; private final ObjectPool executorPool; private final ObjectPool balancerRpcExecutorPool; @@ -562,6 +562,12 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new this.target = checkNotNull(builder.target, "target"); this.logId = InternalLogId.allocate("Channel", target); this.timeProvider = checkNotNull(timeProvider, "timeProvider"); + this.executorPool = checkNotNull(builder.executorPool, "executorPool"); + this.executor = checkNotNull(executorPool.getObject(), "executor"); + this.transportFactory = + new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); + this.scheduledExecutor = + new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService()); maxTraceEvents = builder.maxTraceEvents; channelTracer = new ChannelTracer( logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(), @@ -581,6 +587,7 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new .setDefaultPort(builder.getDefaultPort()) .setProxyDetector(proxyDetector) .setSynchronizationContext(syncContext) + .setScheduledExecutorService(scheduledExecutor) .setServiceConfigParser( new ScParser( retryEnabled, @@ -598,18 +605,11 @@ public void execute(Runnable command) { }) .build(); this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs); - this.executorPool = checkNotNull(builder.executorPool, "executorPool"); this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"); this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool); - this.executor = checkNotNull(executorPool.getObject(), "executor"); this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext); this.delayedTransport.start(delayedTransportListener); this.backoffPolicyProvider = backoffPolicyProvider; - this.transportFactory = - new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); - this.scheduledExecutorForBalancer = - new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService()); - serviceConfigInterceptor = new ServiceConfigInterceptor( retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts); this.defaultServiceConfig = builder.defaultServiceConfig; @@ -1269,7 +1269,7 @@ public SynchronizationContext getSynchronizationContext() { @Override public ScheduledExecutorService getScheduledExecutorService() { - return scheduledExecutorForBalancer; + return scheduledExecutor; } @Override @@ -1736,10 +1736,10 @@ synchronized void release() { } } - private static final class ScheduledExecutorForBalancer implements ScheduledExecutorService { + private static final class RestrictedScheduledExecutor implements ScheduledExecutorService { final ScheduledExecutorService delegate; - private ScheduledExecutorForBalancer(ScheduledExecutorService delegate) { + private RestrictedScheduledExecutor(ScheduledExecutorService delegate) { this.delegate = checkNotNull(delegate, "delegate"); } diff --git a/core/src/test/java/io/grpc/internal/ReadableBuffersArrayTest.java b/core/src/test/java/io/grpc/internal/ReadableBuffersArrayTest.java index 2bed7d52b12..d5c4fa77fd8 100644 --- a/core/src/test/java/io/grpc/internal/ReadableBuffersArrayTest.java +++ b/core/src/test/java/io/grpc/internal/ReadableBuffersArrayTest.java @@ -23,11 +23,14 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; /** * Tests for the array-backed {@link ReadableBuffer} returned by {@link ReadableBuffers#wrap(byte[], * int, int)}. */ +@RunWith(JUnit4.class) public class ReadableBuffersArrayTest extends ReadableBufferTestBase { @Test diff --git a/cronet/build.gradle b/cronet/build.gradle index dd0c9174fc1..fcd7e16cfad 100644 --- a/cronet/build.gradle +++ b/cronet/build.gradle @@ -42,8 +42,8 @@ android { buildTypes { debug { minifyEnabled false } release { - minifyEnabled true - proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro' + minifyEnabled false + consumerProguardFiles 'proguard-rules.pro' } } testOptions { unitTests { includeAndroidResources = true } } diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index df8d7a51cc8..6fb6e497795 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -91,7 +91,7 @@ class Utils { @Override public ByteBufAllocator create() { if (Boolean.parseBoolean( - System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) { + System.getProperty("io.grpc.netty.useCustomAllocator", "true"))) { int maxOrder; if (System.getProperty("io.netty.allocator.maxOrder") == null) { // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index cf770a991e1..3d735a7cf40 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -366,7 +366,8 @@ public boolean isReady() { @Override public void setOnReadyHandler(Runnable onReadyHandler) { if (frozen) { - throw new IllegalStateException("Cannot alter onReadyHandler after call started"); + throw new IllegalStateException( + "Cannot alter onReadyHandler after call started. Use ClientResponseObserver"); } this.onReadyHandler = onReadyHandler; } @@ -374,7 +375,8 @@ public void setOnReadyHandler(Runnable onReadyHandler) { @Override public void disableAutoInboundFlowControl() { if (frozen) { - throw new IllegalStateException("Cannot disable auto flow control call started"); + throw new IllegalStateException( + "Cannot disable auto flow control after call started. Use ClientResponseObserver"); } autoFlowControlEnabled = false; } diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 078c80d4adb..b37bf18e8fe 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -380,7 +380,9 @@ public boolean isReady() { @Override public void setOnReadyHandler(Runnable r) { - checkState(!frozen, "Cannot alter onReadyHandler after initialization"); + checkState(!frozen, "Cannot alter onReadyHandler after initialization. May only be called " + + "during the initial call to the application, before the service returns its " + + "StreamObserver"); this.onReadyHandler = r; } @@ -391,7 +393,9 @@ public boolean isCancelled() { @Override public void setOnCancelHandler(Runnable onCancelHandler) { - checkState(!frozen, "Cannot alter onCancelHandler after initialization"); + checkState(!frozen, "Cannot alter onCancelHandler after initialization. May only be called " + + "during the initial call to the application, before the service returns its " + + "StreamObserver"); this.onCancelHandler = onCancelHandler; } diff --git a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java index f49879939be..dcd5f31b1f7 100644 --- a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java +++ b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java @@ -195,6 +195,14 @@ static final class LbEndpoint { private final int loadBalancingWeight; private final boolean isHealthy; + @VisibleForTesting + LbEndpoint(String address, int port, int loadBalancingWeight, boolean isHealthy) { + this( + new EquivalentAddressGroup( + new InetSocketAddress(address, port)), + loadBalancingWeight, isHealthy); + } + @VisibleForTesting LbEndpoint(EquivalentAddressGroup eag, int loadBalancingWeight, boolean isHealthy) { this.eag = eag; diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java index 8f9d396fcc1..2527c0dc2ce 100644 --- a/xds/src/main/java/io/grpc/xds/LocalityStore.java +++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java @@ -76,15 +76,14 @@ interface LocalityStore { void updateOobMetricsReportInterval(long reportIntervalNano); - LoadStatsStore getLoadStatsStore(); - @VisibleForTesting abstract class LocalityStoreFactory { private static final LocalityStoreFactory DEFAULT_INSTANCE = new LocalityStoreFactory() { @Override - LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) { - return new LocalityStoreImpl(helper, lbRegistry); + LocalityStore newLocalityStore( + Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) { + return new LocalityStoreImpl(helper, lbRegistry, loadStatsStore); } }; @@ -92,7 +91,8 @@ static LocalityStoreFactory getInstance() { return DEFAULT_INSTANCE; } - abstract LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry); + abstract LocalityStore newLocalityStore( + Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore); } final class LocalityStoreImpl implements LocalityStore { @@ -113,9 +113,10 @@ final class LocalityStoreImpl implements LocalityStore { private List dropOverloads = ImmutableList.of(); private long metricsReportIntervalNano = -1; - LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) { + LocalityStoreImpl( + Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) { this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance, - new LoadStatsStoreImpl(), OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance()); + loadStatsStore, OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance()); } @VisibleForTesting @@ -284,11 +285,6 @@ public String toString() { TimeUnit.MINUTES, helper.getScheduledExecutorService()); } - @Override - public LoadStatsStore getLoadStatsStore() { - return loadStatsStore; - } - @Override public void updateOobMetricsReportInterval(long reportIntervalNano) { metricsReportIntervalNano = reportIntervalNano; diff --git a/xds/src/main/java/io/grpc/xds/LookasideLb.java b/xds/src/main/java/io/grpc/xds/LookasideLb.java index 9fd9a8c8cf3..d32530c6423 100644 --- a/xds/src/main/java/io/grpc/xds/LookasideLb.java +++ b/xds/src/main/java/io/grpc/xds/LookasideLb.java @@ -175,11 +175,13 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { xdsClient = new XdsComms2( channel, helper, new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER, node); - localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry); + LoadStatsStore loadStatsStore = new LoadStatsStoreImpl(); + localityStore = localityStoreFactory.newLocalityStore( + helper, lbRegistry, loadStatsStore); // TODO(zdapeng): Use XdsClient to do Lrs directly. lrsClient = loadReportClientFactory.createLoadReportClient( channel, helper, new ExponentialBackoffPolicy.Provider(), - localityStore.getLoadStatsStore()); + loadStatsStore); final LoadReportCallback lrsCallback = new LoadReportCallback() { @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 39945ab074e..6ed206715bc 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -344,6 +344,8 @@ void watchConfigData(String hostName, int port, ConfigWatcher watcher) { /** * Registers a data watcher for the given cluster. + * + *

Adding the same watcher for the same cluster more than once is a no-op. */ void watchClusterData(String clusterName, ClusterWatcher watcher) { } @@ -351,12 +353,16 @@ void watchClusterData(String clusterName, ClusterWatcher watcher) { /** * Unregisters the given cluster watcher, which was registered to receive updates for the * given cluster. + * + *

Cancelling a watcher that was not registered for the given cluster is a no-op. */ void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) { } /** * Registers a data watcher for endpoints in the given cluster. + * + *

Adding the same watcher for the same cluster more than once is a no-op. */ void watchEndpointData(String clusterName, EndpointWatcher watcher) { } @@ -364,6 +370,8 @@ void watchEndpointData(String clusterName, EndpointWatcher watcher) { /** * Unregisters the given endpoints watcher, which was registered to receive updates for * endpoints information in the given cluster. + * + *

Cancelling a watcher that was not registered for the given cluster is a no-op. */ void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) { } diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index c497f4ad9eb..e2f9c94b98e 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -25,6 +25,11 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Code; +import io.envoyproxy.envoy.api.v2.Cluster; +import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; +import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.Listener; @@ -44,6 +49,9 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.stub.StreamObserver; import io.grpc.xds.Bootstrapper.ChannelCreds; +import io.grpc.xds.EnvoyProtoData.DropOverload; +import io.grpc.xds.EnvoyProtoData.Locality; +import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -66,6 +74,11 @@ final class XdsClientImpl extends XdsClient { @VisibleForTesting static final String ADS_TYPE_URL_RDS = "type.googleapis.com/envoy.api.v2.RouteConfiguration"; + @VisibleForTesting + static final String ADS_TYPE_URL_CDS = "type.googleapis.com/envoy.api.v2.Cluster"; + @VisibleForTesting + static final String ADS_TYPE_URL_EDS = + "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; private final ManagedChannel channel; private final SynchronizationContext syncContext; @@ -84,6 +97,25 @@ final class XdsClientImpl extends XdsClient { // responses. private final Map routeConfigNamesToClusterNames = new HashMap<>(); + // Cached data for CDS responses, keyed by cluster names. + // Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead + // of whole Cluster messages to reduce memory usage. + private final Map clusterNamesToClusterUpdates = new HashMap<>(); + + // Cached data for EDS responses, keyed by cluster names. + // CDS responses indicate absence of clusters and EDS responses indicate presence of clusters. + // Optimization: cache EndpointUpdate, which contains only information needed by gRPC, instead + // of whole ClusterLoadAssignment messages to reduce memory usage. + private final Map clusterNamesToEndpointUpdates = new HashMap<>(); + + // Cluster watchers waiting for cluster information updates. Multiple cluster watchers + // can watch on information for the same cluster. + private final Map> clusterWatchers = new HashMap<>(); + + // Endpoint watchers waiting for endpoint updates for each cluster. Multiple endpoint + // watchers can watch endpoints in the same cluster. + private final Map> endpointWatchers = new HashMap<>(); + @Nullable private AdsStream adsStream; @Nullable @@ -170,6 +202,122 @@ void watchConfigData(String hostName, int port, ConfigWatcher watcher) { adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName)); } + @Override + void watchClusterData(String clusterName, ClusterWatcher watcher) { + checkNotNull(watcher, "watcher"); + boolean needRequest = false; + if (!clusterWatchers.containsKey(clusterName)) { + logger.log(Level.FINE, "Start watching cluster {0}", clusterName); + needRequest = true; + clusterWatchers.put(clusterName, new HashSet()); + } + Set watchers = clusterWatchers.get(clusterName); + if (watchers.contains(watcher)) { + logger.log(Level.WARNING, "Watcher {0} already registered", watcher); + return; + } + watchers.add(watcher); + // If local cache contains cluster information to be watched, notify the watcher immediately. + if (clusterNamesToClusterUpdates.containsKey(clusterName)) { + watcher.onClusterChanged(clusterNamesToClusterUpdates.get(clusterName)); + } + if (rpcRetryTimer != null) { + // Currently in retry backoff. + return; + } + if (needRequest) { + if (adsStream == null) { + startRpcStream(); + } + adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet()); + } + } + + @Override + void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) { + checkNotNull(watcher, "watcher"); + Set watchers = clusterWatchers.get(clusterName); + if (watchers == null || !watchers.contains(watcher)) { + logger.log(Level.FINE, "Watcher {0} was not registered", watcher); + return; + } + watchers.remove(watcher); + if (watchers.isEmpty()) { + logger.log(Level.FINE, "Stop watching cluster {0}", clusterName); + clusterWatchers.remove(clusterName); + // If unsubscribe the last resource, do NOT send a CDS request for an empty resource list. + // This is a workaround for CDS protocol resource unsubscribe. + if (clusterWatchers.isEmpty()) { + return; + } + // No longer interested in this cluster, send an updated CDS request to unsubscribe + // this resource. + if (rpcRetryTimer != null) { + // Currently in retry backoff. + return; + } + checkState(adsStream != null, + "Severe bug: ADS stream was not created while an endpoint watcher was registered"); + adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet()); + } + } + + @Override + void watchEndpointData(String clusterName, EndpointWatcher watcher) { + checkNotNull(watcher, "watcher"); + boolean needRequest = false; + if (!endpointWatchers.containsKey(clusterName)) { + logger.log(Level.FINE, "Start watching endpoints in cluster {0}", clusterName); + needRequest = true; + endpointWatchers.put(clusterName, new HashSet()); + } + Set watchers = endpointWatchers.get(clusterName); + if (watchers.contains(watcher)) { + logger.log(Level.WARNING, "Watcher {0} already registered", watcher); + return; + } + watchers.add(watcher); + // If local cache contains endpoint information for the cluster to be watched, notify + // the watcher immediately. + if (clusterNamesToEndpointUpdates.containsKey(clusterName)) { + watcher.onEndpointChanged(clusterNamesToEndpointUpdates.get(clusterName)); + } + if (rpcRetryTimer != null) { + // Currently in retry backoff. + return; + } + if (needRequest) { + if (adsStream == null) { + startRpcStream(); + } + adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet()); + } + } + + @Override + void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) { + checkNotNull(watcher, "watcher"); + Set watchers = endpointWatchers.get(clusterName); + if (watchers == null || !watchers.contains(watcher)) { + logger.log(Level.FINE, "Watcher {0} was not registered", watcher); + return; + } + watchers.remove(watcher); + if (watchers.isEmpty()) { + logger.log(Level.FINE, "Stop watching endpoints in cluster {0}", clusterName); + endpointWatchers.remove(clusterName); + // Remove the corresponding EDS cache entry. + clusterNamesToEndpointUpdates.remove(clusterName); + // No longer interested in this cluster, send an updated EDS request to unsubscribe + // this resource. + if (rpcRetryTimer != null) { + // Currently in retry backoff. + return; + } + adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet()); + } + } + /** * Builds a channel to the given server URI with the first supported channel creds config. */ @@ -214,7 +362,6 @@ private void handleLdsResponse(DiscoveryResponse ldsResponse) { logger.log(Level.FINE, "Received an LDS response: {0}", ldsResponse); checkState(ldsResourceName != null && configWatcher != null, "No LDS request was ever sent. Management server is doing something wrong"); - adsStream.ldsRespNonce = ldsResponse.getNonce(); // Unpack Listener messages. List listeners = new ArrayList<>(ldsResponse.getResourcesCount()); @@ -224,7 +371,7 @@ private void handleLdsResponse(DiscoveryResponse ldsResponse) { } } catch (InvalidProtocolBufferException e) { adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), - ldsResponse.getNonce(), "Broken LDS response."); + "Broken LDS response."); return; } @@ -242,7 +389,7 @@ private void handleLdsResponse(DiscoveryResponse ldsResponse) { } } catch (InvalidProtocolBufferException e) { adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), - ldsResponse.getNonce(), "Broken LDS response."); + "Broken LDS response."); return; } @@ -289,12 +436,11 @@ private void handleLdsResponse(DiscoveryResponse ldsResponse) { } if (errorMessage != null) { - adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), - ldsResponse.getNonce(), errorMessage); + adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), errorMessage); return; } adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), - ldsResponse.getVersionInfo(), ldsResponse.getNonce()); + ldsResponse.getVersionInfo()); // Remove RDS cache entries for RouteConfigurations not referenced by this LDS response. // LDS responses represents the state of the world, RouteConfigurations not referenced @@ -342,7 +488,6 @@ private void handleRdsResponse(DiscoveryResponse rdsResponse) { logger.log(Level.FINE, "Received an RDS response: {0}", rdsResponse); checkState(adsStream.rdsResourceName != null, "Never requested for RDS resources, management server is doing something wrong"); - adsStream.rdsRespNonce = rdsResponse.getNonce(); // Unpack RouteConfiguration messages. List routeConfigs = new ArrayList<>(rdsResponse.getResourcesCount()); @@ -352,7 +497,7 @@ private void handleRdsResponse(DiscoveryResponse rdsResponse) { } } catch (InvalidProtocolBufferException e) { adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName), - rdsResponse.getNonce(), "Broken RDS response."); + "Broken RDS response."); return; } @@ -362,7 +507,6 @@ private void handleRdsResponse(DiscoveryResponse rdsResponse) { String clusterName = processRouteConfig(routeConfig); if (clusterName == null) { adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName), - rdsResponse.getNonce(), "Cannot find a valid cluster name in VirtualHost inside " + "RouteConfiguration with domains matching: " + hostName + "."); return; @@ -372,7 +516,7 @@ private void handleRdsResponse(DiscoveryResponse rdsResponse) { routeConfigNamesToClusterNames.putAll(clusterNames); adsStream.sendAckRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName), - rdsResponse.getVersionInfo(), rdsResponse.getNonce()); + rdsResponse.getVersionInfo()); // Notify the ConfigWatcher if this RDS response contains the most recently requested // RDS resource. @@ -426,6 +570,229 @@ private String processRouteConfig(RouteConfiguration config) { return null; } + /** + * Handles CDS response, which contains a list of Cluster messages with information for a logical + * cluster. The response is NACKed if messages for requested resources contain invalid + * information for gRPC's usage. Otherwise, an ACK request is sent to management server. + * Response data for requested clusters is cached locally, in case of new cluster watchers + * interested in the same clusters are added later. + */ + private void handleCdsResponse(DiscoveryResponse cdsResponse) { + logger.log(Level.FINE, "Received an CDS response: {0}", cdsResponse); + checkState(adsStream.cdsResourceNames != null, + "Never requested for CDS resources, management server is doing something wrong"); + adsStream.cdsRespNonce = cdsResponse.getNonce(); + + // Unpack Cluster messages. + List clusters = new ArrayList<>(cdsResponse.getResourcesCount()); + try { + for (com.google.protobuf.Any res : cdsResponse.getResourcesList()) { + clusters.add(res.unpack(Cluster.class)); + } + } catch (InvalidProtocolBufferException e) { + adsStream.sendNackRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames, + "Broken CDS response"); + return; + } + + String errorMessage = null; + // Cluster information update for requested clusters received in this CDS response. + Map clusterUpdates = new HashMap<>(); + // CDS responses represents the state of the world, EDS services not referenced by + // Clusters are those no longer exist. + Set edsServices = new HashSet<>(); + for (Cluster cluster : clusters) { + String clusterName = cluster.getName(); + // Skip information for clusters not requested. + // Management server is required to always send newly requested resources, even if they + // may have been sent previously (proactively). Thus, client does not need to cache + // unrequested resources. + if (!adsStream.cdsResourceNames.contains(clusterName)) { + continue; + } + ClusterUpdate.Builder updateBuilder = ClusterUpdate.newBuilder(); + updateBuilder.setClusterName(clusterName); + // The type field must be set to EDS. + if (!cluster.getType().equals(DiscoveryType.EDS)) { + errorMessage = "Cluster [" + clusterName + "]: only EDS discovery type is supported " + + "in gRPC."; + break; + } + // In the eds_cluster_config field, the eds_config field must be set to indicate to + // use EDS (must be set to use ADS). + EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig(); + if (!edsClusterConfig.getEdsConfig().hasAds()) { + errorMessage = "Cluster [" + clusterName + "]: field eds_cluster_config must be set to " + + "indicate to use EDS over ADS."; + break; + } + // If the service_name field is set, that value will be used for the EDS request + // instead of the cluster name (default). + if (!edsClusterConfig.getServiceName().isEmpty()) { + updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName()); + edsServices.add(edsClusterConfig.getServiceName()); + } else { + edsServices.add(clusterName); + } + // The lb_policy field must be set to ROUND_ROBIN. + if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) { + errorMessage = "Cluster [" + clusterName + "]: only round robin load balancing policy is " + + "supported in gRPC."; + break; + } + updateBuilder.setLbPolicy("round_robin"); + // If the lrs_server field is set, it must have its self field set, in which case the + // client should use LRS for load reporting. Otherwise (the lrs_server field is not set), + // LRS load reporting will be disabled. + if (cluster.hasLrsServer()) { + if (!cluster.getLrsServer().hasSelf()) { + errorMessage = "Cluster [" + clusterName + "]: only support enabling LRS for the same " + + "management server."; + break; + } + updateBuilder.setEnableLrs(true); + updateBuilder.setLrsServerName(""); + } else { + updateBuilder.setEnableLrs(false); + } + clusterUpdates.put(clusterName, updateBuilder.build()); + } + if (errorMessage != null) { + adsStream.sendNackRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames, errorMessage); + return; + } + adsStream.sendAckRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames, + cdsResponse.getVersionInfo()); + + // Update local CDS cache with data in this response. + clusterNamesToClusterUpdates.clear(); + clusterNamesToClusterUpdates.putAll(clusterUpdates); + + // Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response. + clusterNamesToEndpointUpdates.keySet().retainAll(edsServices); + + // Notify watchers if clusters interested in present. Otherwise, notify with an error. + for (Map.Entry> entry : clusterWatchers.entrySet()) { + if (clusterUpdates.containsKey(entry.getKey())) { + ClusterUpdate clusterUpdate = clusterUpdates.get(entry.getKey()); + for (ClusterWatcher watcher : entry.getValue()) { + watcher.onClusterChanged(clusterUpdate); + } + } else { + for (ClusterWatcher watcher : entry.getValue()) { + watcher.onError( + Status.NOT_FOUND.withDescription( + "Requested cluster [" + entry.getKey() + "] does not exist")); + } + } + } + } + + /** + * Handles EDS response, which contains a list of ClusterLoadAssignment messages with + * endpoint load balancing information for each cluster. The response is NACKed if messages + * for requested resources contain invalid information for gRPC's usage. Otherwise, + * an ACK request is sent to management server. Response data for requested clusters is + * cached locally, in case of new endpoint watchers interested in the same clusters + * are added later. + */ + private void handleEdsResponse(DiscoveryResponse edsResponse) { + logger.log(Level.FINE, "Received an EDS response: {0}", edsResponse); + + // Unpack ClusterLoadAssignment messages. + List clusterLoadAssignments = + new ArrayList<>(edsResponse.getResourcesCount()); + try { + for (com.google.protobuf.Any res : edsResponse.getResourcesList()) { + clusterLoadAssignments.add(res.unpack(ClusterLoadAssignment.class)); + } + } catch (InvalidProtocolBufferException e) { + adsStream.sendNackRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet(), + "Broken EDS response"); + return; + } + + String errorMessage = null; + // Endpoint information updates for requested clusters received in this EDS response. + Map endpointUpdates = new HashMap<>(); + // Walk through each ClusterLoadAssignment message. If any of them for requested clusters + // contain invalid information for gRPC's load balancing usage, the whole response is rejected. + for (ClusterLoadAssignment assignment : clusterLoadAssignments) { + String clusterName = assignment.getClusterName(); + // Skip information for clusters not requested. + // Management server is required to always send newly requested resources, even if they + // may have been sent previously (proactively). Thus, client does not need to cache + // unrequested resources. + if (!endpointWatchers.containsKey(clusterName)) { + continue; + } + EndpointUpdate.Builder updateBuilder = EndpointUpdate.newBuilder(); + updateBuilder.setClusterName(clusterName); + if (assignment.getEndpointsCount() == 0) { + errorMessage = "Cluster without any locality endpoint."; + break; + } + // The policy.disable_overprovisioning field must be set to true. + if (!assignment.getPolicy().getDisableOverprovisioning()) { + errorMessage = "Cluster requires overprovisioning."; + break; + } + for (io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints localityLbEndpoints + : assignment.getEndpointsList()) { + // The lb_endpoints field for LbEndpoint must contain at least one entry. + if (localityLbEndpoints.getLbEndpointsCount() == 0) { + errorMessage = "Locality with no endpoint."; + break; + } + // The endpoint field of each lb_endpoints must be set. + // Inside of it: the address field must be set. + for (io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint lbEndpoint + : localityLbEndpoints.getLbEndpointsList()) { + if (!lbEndpoint.hasEndpoint() || !lbEndpoint.getEndpoint().hasAddress()) { + errorMessage = "Invalid endpoint address information."; + break; + } + } + if (errorMessage != null) { + break; + } + // Note endpoints with health status other than UNHEALTHY and UNKNOWN are still + // handed over to watching parties. It is watching parties' responsibility to + // filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy(). + updateBuilder.addLocalityLbEndpoints( + Locality.fromEnvoyProtoLocality(localityLbEndpoints.getLocality()), + LocalityLbEndpoints.fromEnvoyProtoLocalityLbEndpoints(localityLbEndpoints)); + } + if (errorMessage != null) { + break; + } + for (io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload dropOverload + : assignment.getPolicy().getDropOverloadsList()) { + updateBuilder.addDropPolicy(DropOverload.fromEnvoyProtoDropOverload(dropOverload)); + } + EndpointUpdate update = updateBuilder.build(); + endpointUpdates.put(clusterName, update); + } + if (errorMessage != null) { + adsStream.sendNackRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet(), + "ClusterLoadAssignment message contains invalid information for gRPC's usage: " + + errorMessage); + return; + } + adsStream.sendAckRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet(), + edsResponse.getVersionInfo()); + + // Update local EDS cache by inserting updated endpoint information. + clusterNamesToEndpointUpdates.putAll(endpointUpdates); + + // Notify watchers waiting for updates of endpoint information received in this EDS response. + for (Map.Entry entry : endpointUpdates.entrySet()) { + for (EndpointWatcher watcher : endpointWatchers.get(entry.getKey())) { + watcher.onEndpointChanged(entry.getValue()); + } + } + } + @VisibleForTesting final class RpcRetryTask implements Runnable { @Override @@ -434,7 +801,12 @@ public void run() { if (configWatcher != null) { adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName)); } - // TODO(chengyuanzhang): send CDS/EDS requests if CDS/EDS watcher presents. + if (!clusterWatchers.isEmpty()) { + adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet()); + } + if (!endpointWatchers.isEmpty()) { + adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet()); + } } } @@ -450,6 +822,8 @@ private final class AdsStream implements StreamObserver { // resources. private String ldsVersion = ""; private String rdsVersion = ""; + private String cdsVersion = ""; + private String edsVersion = ""; // Response nonce for the most recently received discovery responses of each resource type. // Client initiated requests start response nonce with empty string. @@ -459,11 +833,24 @@ private final class AdsStream implements StreamObserver { // DiscoveryResponse. private String ldsRespNonce = ""; private String rdsRespNonce = ""; + private String cdsRespNonce = ""; + private String edsRespNonce = ""; - // Most recently requested resource name(s) for each resource type. Note the resource_name in - // LDS requests will always be "xds:" URI (including port suffix if present). + // Most recently requested RDS resource name, which is an intermediate resource name for + // resolving service config. + // LDS request always use the same resource name, which is the "xds:" URI. + // Resource names for EDS requests are always represented by the cluster names that + // watchers are interested in. @Nullable private String rdsResourceName; + // Most recently requested CDS resource names. + // Due to CDS protocol limitation, client does not send a CDS request for empty resource + // names when unsubscribing the last resource. Management server assumes it is still + // subscribing to the last resource, client also need to behave so to avoid data lose. + // Therefore, cluster names that watchers interested in cannot always represent resource names + // in most recently sent CDS requests. + @Nullable + private Collection cdsResourceNames; private AdsStream(AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub) { this.stub = checkNotNull(stub, "stub"); @@ -480,12 +867,26 @@ public void onNext(final DiscoveryResponse response) { public void run() { responseReceived = true; String typeUrl = response.getTypeUrl(); + // Nonce in each response is echoed back in the following ACK/NACK request. It is + // used for management server to identify which response the client is ACKing/NACking. + // To avoid confusion, client-initiated requests will always use the nonce in + // most recently received responses of each resource type. if (typeUrl.equals(ADS_TYPE_URL_LDS)) { + ldsRespNonce = response.getNonce(); handleLdsResponse(response); } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { + rdsRespNonce = response.getNonce(); handleRdsResponse(response); + } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) { + cdsRespNonce = response.getNonce(); + handleCdsResponse(response); + } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) { + edsRespNonce = response.getNonce(); + handleEdsResponse(response); + } else { + logger.log(Level.FINE, "Received an unknown type of DiscoveryResponse {0}", + response); } - // TODO(zdapeng): add CDS/EDS response handles. } }); } @@ -571,8 +972,17 @@ private void sendXdsRequest(String typeUrl, Collection resourceNames) { version = rdsVersion; nonce = rdsRespNonce; rdsResourceName = resourceNames.iterator().next(); + } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) { + version = cdsVersion; + nonce = cdsRespNonce; + // For CDS protocol resource unsubscribe workaround, keep the last unsubscribed cluster + // as the requested resource name for ACK requests when all all resources have + // been unsubscribed. + cdsResourceNames = ImmutableList.copyOf(resourceNames); + } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) { + version = edsVersion; + nonce = edsRespNonce; } - // TODO(chengyuanzhang): cases for CDS/EDS. DiscoveryRequest request = DiscoveryRequest .newBuilder() @@ -590,16 +1000,22 @@ private void sendXdsRequest(String typeUrl, Collection resourceNames) { * version for the corresponding resource type. */ private void sendAckRequest(String typeUrl, Collection resourceNames, - String versionInfo, String nonce) { + String versionInfo) { checkState(requestWriter != null, "ADS stream has not been started"); + String nonce = ""; if (typeUrl.equals(ADS_TYPE_URL_LDS)) { ldsVersion = versionInfo; - ldsRespNonce = nonce; + nonce = ldsRespNonce; } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { rdsVersion = versionInfo; - rdsRespNonce = nonce; + nonce = rdsRespNonce; + } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) { + cdsVersion = versionInfo; + nonce = cdsRespNonce; + } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) { + edsVersion = versionInfo; + nonce = edsRespNonce; } - // TODO(chengyuanzhang): cases for CDS/EDS. DiscoveryRequest request = DiscoveryRequest .newBuilder() @@ -616,16 +1032,24 @@ private void sendAckRequest(String typeUrl, Collection resourceNames, * Sends a DiscoveryRequest with the given information as an NACK. NACK takes the previous * accepted version. */ - private void sendNackRequest(String typeUrl, Collection resourceNames, String nonce, + private void sendNackRequest(String typeUrl, Collection resourceNames, String message) { checkState(requestWriter != null, "ADS stream has not been started"); String versionInfo = ""; + String nonce = ""; if (typeUrl.equals(ADS_TYPE_URL_LDS)) { versionInfo = ldsVersion; + nonce = ldsRespNonce; } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { versionInfo = rdsVersion; + nonce = rdsRespNonce; + } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) { + versionInfo = cdsVersion; + nonce = cdsRespNonce; + } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) { + versionInfo = edsVersion; + nonce = edsRespNonce; } - // TODO(chengyuanzhang): cases for CDS/EDS. DiscoveryRequest request = DiscoveryRequest .newBuilder() diff --git a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java index d32b1dcb8e7..0f8ff762702 100644 --- a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java +++ b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java @@ -194,11 +194,10 @@ public void onCompleted() { localityStoreFactory = new LocalityStoreFactory() { @Override - public LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) { + public LocalityStore newLocalityStore( + Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) { helpers.add(helper); LocalityStore localityStore = mock(LocalityStore.class); - LoadStatsStore loadStatsStore = mock(LoadStatsStore.class); - doReturn(loadStatsStore).when(localityStore).getLoadStatsStore(); localityStores.add(localityStore); return localityStore; } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index 1c6ab7a44ae..a38ed0a96b0 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -26,11 +26,19 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; +import com.google.protobuf.UInt32Value; +import io.envoyproxy.envoy.api.v2.Cluster; +import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; +import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.Listener; @@ -38,7 +46,10 @@ import io.envoyproxy.envoy.api.v2.core.Address; import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource; import io.envoyproxy.envoy.api.v2.core.ConfigSource; +import io.envoyproxy.envoy.api.v2.core.HealthStatus; import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.core.SelfConfigSource; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; import io.envoyproxy.envoy.api.v2.listener.FilterChain; import io.envoyproxy.envoy.api.v2.route.RedirectAction; import io.envoyproxy.envoy.api.v2.route.Route; @@ -48,6 +59,8 @@ import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; import io.envoyproxy.envoy.config.listener.v2.ApiListener; import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.envoyproxy.envoy.type.FractionalPercent; +import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType; import io.grpc.Context; import io.grpc.Context.CancellationListener; import io.grpc.ManagedChannel; @@ -60,17 +73,30 @@ import io.grpc.internal.FakeClock; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.EnvoyProtoData.DropOverload; +import io.grpc.xds.EnvoyProtoData.LbEndpoint; +import io.grpc.xds.EnvoyProtoData.Locality; +import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; +import io.grpc.xds.XdsClient.ClusterUpdate; +import io.grpc.xds.XdsClient.ClusterWatcher; import io.grpc.xds.XdsClient.ConfigUpdate; import io.grpc.xds.XdsClient.ConfigWatcher; +import io.grpc.xds.XdsClient.EndpointUpdate; +import io.grpc.xds.XdsClient.EndpointWatcher; import java.io.IOException; import java.util.ArrayDeque; +import java.util.HashSet; import java.util.List; import java.util.Queue; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.InOrder; @@ -81,6 +107,7 @@ /** * Tests for {@link XdsClientImpl}. */ +@RunWith(JUnit4.class) public class XdsClientImplTest { private static final String HOSTNAME = "foo.googleapis.com"; @@ -121,6 +148,10 @@ public void uncaughtException(Thread t, Throwable e) { private BackoffPolicy backoffPolicy2; @Mock private ConfigWatcher configWatcher; + @Mock + private ClusterWatcher clusterWatcher; + @Mock + private EndpointWatcher endpointWatcher; private ManagedChannel channel; private XdsClientImpl xdsClient; @@ -166,8 +197,9 @@ public void cancelled(Context context) { channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); xdsClient = - new XdsClientImpl(channel, NODE, syncContext, fakeClock.getScheduledExecutorService(), - backoffPolicyProvider, fakeClock.getStopwatchSupplier().get()); + new XdsClientImpl(channel, NODE, syncContext, + fakeClock.getScheduledExecutorService(), backoffPolicyProvider, + fakeClock.getStopwatchSupplier().get()); // Only the connection to management server is established, no RPC request is sent until at // least one watcher is registered. assertThat(responseObservers).isEmpty(); @@ -1081,205 +1113,1483 @@ public void routeConfigurationRemovedNotifiedToWatcher() { } /** - * RPC stream closed and retry during the period of first tiem resolving service config - * (LDS/RDS only). + * Client receives an CDS response that does not contain a Cluster for the requested resource + * while each received Cluster is valid. The CDS response is ACKed. Cluster watchers are notified + * with an error for resource not found. */ @Test - public void streamClosedAndRetryWhenResolvingConfig() { - InOrder inOrder = - Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, - backoffPolicy2); - xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + public void cdsResponseWithoutMatchingResource() { + xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); - ArgumentCaptor> responseObserverCaptor = - ArgumentCaptor.forClass(null); - inOrder.verify(mockedDiscoveryService) - .streamAggregatedResources(responseObserverCaptor.capture()); - StreamObserver responseObserver = - responseObserverCaptor.getValue(); // same as responseObservers.poll() + // Client sends a CDS request for the only cluster being watched to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response without Cluster for the requested resource. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(clusterWatcher).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(error.getDescription()) + .isEqualTo("Requested cluster [cluster-foo.googleapis.com] does not exist"); + } + + /** + * Normal workflow of receiving a CDS response containing Cluster message for a requested + * cluster. + */ + @Test + public void cdsResponseWithMatchingResource() { + xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); - // Client sends an LDS request for the host name (with port) to management server. + // Client sends a CDS request for the only cluster being watched to management server. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", - XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response without Cluster for the requested resource. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); - // Management server closes the RPC stream immediately. - responseObserver.onCompleted(); - inOrder.verify(backoffPolicyProvider).get(); - inOrder.verify(backoffPolicy1).nextBackoffNanos(); - assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); + verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); + ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); + assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate.isEnableLrs()).isEqualTo(false); + + // Management server sends back another CDS response updating the requested Cluster. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-foo.googleapis.com", "eds-cluster-foo.googleapis.com", true)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + response = + buildDiscoveryResponse("1", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); - // Retry after backoff. - fakeClock.forwardNanos(9L); - assertThat(requestObservers).isEmpty(); - fakeClock.forwardNanos(1L); - inOrder.verify(mockedDiscoveryService) - .streamAggregatedResources(responseObserverCaptor.capture()); - responseObserver = responseObserverCaptor.getValue(); - requestObserver = requestObservers.poll(); + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture()); + clusterUpdate = clusterUpdateCaptor.getValue(); + assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getEdsServiceName()) + .isEqualTo("eds-cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate.getLrsServerName()).isEqualTo(""); + } - // Client retried by sending an LDS request. + @Test + public void multipleClusterWatchers() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + ClusterWatcher watcher3 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); + xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3); + + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends a CDS request containing all clusters being watched to management server. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", - XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + .onNext( + argThat( + new DiscoveryRequestMatcher("", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response contains Cluster for only one of + // requested cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); - // Management server closes the RPC stream with an error. - responseObserver.onError(Status.UNAVAILABLE.asException()); - verifyNoMoreInteractions(backoffPolicyProvider); - inOrder.verify(backoffPolicy1).nextBackoffNanos(); - assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + // Two watchers get notification of cluster update for the cluster they are interested in. + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + + // The other watcher gets an error notification for cluster not found. + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(watcher3).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(error.getDescription()) + .isEqualTo("Requested cluster [cluster-bar.googleapis.com] does not exist"); - // Retry after backoff. - fakeClock.forwardNanos(99L); - assertThat(requestObservers).isEmpty(); - fakeClock.forwardNanos(1L); - inOrder.verify(mockedDiscoveryService) - .streamAggregatedResources(responseObserverCaptor.capture()); - responseObserver = responseObserverCaptor.getValue(); - requestObserver = requestObservers.poll(); + // Management server sends back another CDS response contains Clusters for all + // requested clusters. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", + "eds-cluster-bar.googleapis.com", true))); + response = buildDiscoveryResponse("1", clusters, + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); - // Client retried again by sending an LDS. + // Client sent an ACK CDS request. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", - XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + // All watchers received notification for cluster update. + verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture()); + clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture()); + clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); + ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); + assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(clusterUpdate3.getEdsServiceName()) + .isEqualTo("eds-cluster-bar.googleapis.com"); + assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate3.getLrsServerName()).isEqualTo(""); + } - // Management server responses with a listener for the requested resource. - Rds rdsConfig = - Rds.newBuilder() - .setConfigSource( - ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) - .setRouteConfigName("route-foo.googleapis.com") - .build(); + /** + * (CDS response caching behavior) Adding cluster watchers interested in some cluster that + * some other endpoint watcher had already been watching on will result in cluster update + * notified to the newly added watcher immediately, without sending new CDS requests. + */ + @Test + public void watchClusterAlreadyBeingWatched() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); - List listeners = ImmutableList.of( - Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ - Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) - ); - DiscoveryResponse ldsResponse = - buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); - responseObserver.onNext(ldsResponse); + // Streaming RPC starts after a first watcher is added. + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); - // Client sent back an ACK LDS request. + // Client sends an CDS request to management server. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", - XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); - // Client sent an RDS request based on the received listener. + // Management server sends back an CDS response with Cluster for the requested + // cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com", - XdsClientImpl.ADS_TYPE_URL_RDS, ""))); + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + // Another cluster watcher interested in the same cluster is added. + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); + + // Since the client has received cluster update for this cluster before, cached result is + // notified to the newly added watcher immediately. + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); - // Management server encounters an error and closes the stream. - responseObserver.onError(Status.UNKNOWN.asException()); + verifyNoMoreInteractions(requestObserver); + } - // Reset backoff and retry immediately. - inOrder.verify(backoffPolicyProvider).get(); - fakeClock.runDueTasks(); - inOrder.verify(mockedDiscoveryService) - .streamAggregatedResources(responseObserverCaptor.capture()); - responseObserver = responseObserverCaptor.getValue(); - requestObserver = requestObservers.poll(); + @Test + public void addRemoveClusterWatchersFreely() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + + // Streaming RPC starts after a first watcher is added. + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an CDS request to management server. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", - XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); - // RPC stream closed immediately - responseObserver.onError(Status.UNKNOWN.asException()); - inOrder.verify(backoffPolicy2).nextBackoffNanos(); - assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + // Management server sends back a CDS response with Cluster for the requested + // cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); - // Retry after backoff. - fakeClock.forwardNanos(19L); - assertThat(requestObservers).isEmpty(); - fakeClock.forwardNanos(1L); - inOrder.verify(mockedDiscoveryService) - .streamAggregatedResources(responseObserverCaptor.capture()); - responseObserver = responseObserverCaptor.getValue(); - requestObserver = requestObservers.poll(); + // Client sent an ACK CDS request. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", - XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + // Add another cluster watcher for a different cluster. + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2); + + // Client sent a new CDS request for all interested resources. + verify(requestObserver) + .onNext( + eq(buildDiscoveryRequest("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); - // Management server sends an LDS response. - responseObserver.onNext(ldsResponse); + // Management server sends back a CDS response with Cluster for all requested cluster. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", + "eds-cluster-bar.googleapis.com", true))); + response = buildDiscoveryResponse("1", clusters, + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); - // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) + // Client sent an ACK CDS request for all interested resources. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture()); + clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("eds-cluster-bar.googleapis.com"); + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); + + // Cancel one of the watcher. + xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1); + + // Since the cancelled watcher was the last watcher interested in that cluster (but there + // is still interested resource), client sent an new CDS request to unsubscribe from + // that cluster. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "cluster-bar.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); - List routeConfigs = ImmutableList.of( + // Management server has nothing to respond. + + // Cancel the other watcher. All resources have been unsubscribed. + xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2); + + // All endpoint watchers have been cancelled. Due to protocol limitation, we do not send + // a CDS request for updated resource names (empty) when canceling the last resource. + verifyNoMoreInteractions(requestObserver); + + // Management server sends back a new CDS response. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)), Any.pack( - buildRouteConfiguration( - "route-foo.googleapis.com", - ImmutableList.of( - buildVirtualHost(ImmutableList.of("foo.googleapis.com"), - "cluster.googleapis.com"))))); - DiscoveryResponse rdsResponse = - buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); - // Management server sends an RDS response. - responseObserver.onNext(rdsResponse); + buildCluster("cluster-bar.googleapis.com", null, false))); + response = + buildDiscoveryResponse("2", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0002"); + responseObserver.onNext(response); - // Client has resolved the cluster based on the RDS response. - configWatcher - .onConfigChanged( - eq(ConfigUpdate.newBuilder().setClusterName("cluster.googleapis.com").build())); + // Due to protocol limitation, client sent an ACK CDS request, with resource_names containing + // the last unsubscribed resource. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("2", + ImmutableList.of("cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0002"))); - // RPC stream closed with an error again. - responseObserver.onError(Status.UNKNOWN.asException()); + // Cancelled watchers do not receive notification. + verifyNoMoreInteractions(watcher1, watcher2); - // Reset backoff and retry immediately. - inOrder.verify(backoffPolicyProvider).get(); - fakeClock.runDueTasks(); - requestObserver = requestObservers.poll(); + // A new cluster watcher is added to watch cluster foo again. + ClusterWatcher watcher3 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3); + + // A CDS request is sent to indicate subscription of "cluster-foo.googleapis.com" only. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", - XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + .onNext(eq(buildDiscoveryRequest("2", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0002"))); - verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); - } + // Management server sends back a new CDS response for at least newly requested resources + // (it is required to do so). + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", null, false))); + response = + buildDiscoveryResponse("3", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0003"); + responseObserver.onNext(response); - // TODO(chengyuanzhang): test for race between stream closed and watcher changes. Should only - // for ClusterWatchers and EndpointWatchers. + // Notified with cached data immediately. + ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); + ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); + assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate3.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); - @Test - public void matchHostName_exactlyMatch() { - String pattern = "foo.googleapis.com"; - assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("fo.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("oo.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("foo.googleapis", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue(); - } + verifyNoMoreInteractions(watcher1, watcher2); - @Test - public void matchHostName_prefixWildcard() { - String pattern = "*.foo.googleapis.com"; - assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("bar-baz.foo.googleapis", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isTrue(); - pattern = "*-bar.foo.googleapis.com"; - assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("-bar.foo.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis.com", pattern)) - .isTrue(); + // A CDS request is sent to re-subscribe the cluster again. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("3", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0003"))); } + /** + * Client receives an EDS response that does not contain a ClusterLoadAssignment for the + * requested resource while each received ClusterLoadAssignment is valid. + * The EDS response is ACKed. + * Endpoint watchers are NOT notified with an error (EDS protocol is incremental, responses + * not containing requested resources does not indicate absence). + */ @Test - public void matchHostName_postfixWildCard() { - String pattern = "foo.*"; - assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue(); - assertThat(XdsClientImpl.matchHostName("foo.com", pattern)).isTrue(); - pattern = "foo-*"; - assertThat(XdsClientImpl.matchHostName("bar-.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("foo-", pattern)).isFalse(); - assertThat(XdsClientImpl.matchHostName("foo-bar.com", pattern)).isTrue(); - assertThat(XdsClientImpl.matchHostName("foo-.com", pattern)).isTrue(); - assertThat(XdsClientImpl.matchHostName("foo-bar", pattern)).isTrue(); + public void edsResponseWithoutMatchingResource() { + xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an EDS request for the only cluster being watched to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back an EDS response without ClusterLoadAssignment for the requested + // cluster. + List clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0)), + ImmutableList.of())), + Any.pack(buildClusterLoadAssignment("cluster-baz.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region2", "zone2", "subzone2", + ImmutableList.of( + buildLbEndpoint("192.168.234.52", 8888, HealthStatus.UNKNOWN, 5)), + 6, 1)), + ImmutableList.of()))); + + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); + + verifyZeroInteractions(endpointWatcher); + } + + /** + * Normal workflow of receiving an EDS response containing ClusterLoadAssignment message for + * a requested cluster. + */ + @Test + public void edsResponseWithMatchingResource() { + xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an EDS request for the only cluster being watched to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back an EDS response with ClusterLoadAssignment for the requested + // cluster. + List clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0), + buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of( + buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + 2, 1)), + ImmutableList.of( + buildDropOverload("lb", 200), + buildDropOverload("throttle", 1000)))), + Any.pack(buildClusterLoadAssignment("cluster-baz.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region2", "zone2", "subzone2", + ImmutableList.of( + buildLbEndpoint("192.168.234.52", 8888, HealthStatus.UNKNOWN, 5)), + 6, 1)), + ImmutableList.of()))); + + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); + + ArgumentCaptor endpointUpdateCaptor = ArgumentCaptor.forClass(null); + verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture()); + EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue(); + assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(endpointUpdate.getDropPolicies()) + .containsExactly( + new DropOverload("lb", 200), + new DropOverload("throttle", 1000)); + assertThat(endpointUpdate.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, + 2, true)), 1, 0), + new Locality("region3", "zone3", "subzone3"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.142.5", 80, + 5, true)), 2, 1)); + } + + @Test + public void multipleEndpointWatchers() { + EndpointWatcher watcher1 = mock(EndpointWatcher.class); + EndpointWatcher watcher2 = mock(EndpointWatcher.class); + EndpointWatcher watcher3 = mock(EndpointWatcher.class); + xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2); + xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3); + + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an EDS request containing all clusters being watched to management server. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back an EDS response contains ClusterLoadAssignment for only one of + // requested cluster. + List clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0)), + ImmutableList.of()))); + + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); + + // Two watchers get notification of endpoint update for the cluster they are interested in. + ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); + EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); + assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, + 2, true)), 1, 0)); + + ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher1).onEndpointChanged(endpointUpdateCaptor2.capture()); + EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); + assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, + 2, true)), 1, 0)); + + verifyZeroInteractions(watcher3); + + // Management server sends back another EDS response contains ClusterLoadAssignment for the + // other requested cluster. + clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region2", "zone2", "subzone2", + ImmutableList.of( + buildLbEndpoint("192.168.234.52", 8888, HealthStatus.UNKNOWN, 5)), + 6, 1)), + ImmutableList.of()))); + + response = buildDiscoveryResponse("1", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_EDS, "0001"))); + + // The corresponding watcher gets notified. + ArgumentCaptor endpointUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture()); + EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue(); + assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(endpointUpdate3.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region2", "zone2", "subzone2"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.234.52", 8888, + 5, true)), 6, 1)); + } + + /** + * (EDS response caching behavior) An endpoint watcher is registered for a cluster that already + * has some other endpoint watchers watching on. Endpoint information received previously is + * in local cache and notified to the new watcher immediately. + */ + @Test + public void watchEndpointsForClusterAlreadyBeingWatched() { + EndpointWatcher watcher1 = mock(EndpointWatcher.class); + xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends first EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back an EDS response containing ClusterLoadAssignments for + // some cluster not requested. + List clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0)), + ImmutableList.of()))); + + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); + + ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); + EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); + assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(endpointUpdate1.getDropPolicies()).isEmpty(); + assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, + 2, true)), 1, 0)); + + // A second endpoint watcher is registered for endpoints in the same cluster. + EndpointWatcher watcher2 = mock(EndpointWatcher.class); + xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2); + + // Cached endpoint information is notified to the new watcher immediately, without sending + // another EDS request. + ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture()); + EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); + assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(endpointUpdate2.getDropPolicies()).isEmpty(); + assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, + 2, true)), 1, 0)); + + verifyNoMoreInteractions(requestObserver); + } + + @Test + public void addRemoveEndpointWatchersFreely() { + EndpointWatcher watcher1 = mock(EndpointWatcher.class); + xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + + // Streaming RPC starts after a first watcher is added. + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an EDS request to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back an EDS response with ClusterLoadAssignment for the requested + // cluster. + List clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2), + buildLbEndpoint("192.132.53.5", 80, HealthStatus.UNHEALTHY, 5)), + 1, 0)), + ImmutableList.of()))); + + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); + + ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); + EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); + assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, 2, true), + new LbEndpoint("192.132.53.5", 80,5, false)), + 1, 0)); + + // Add another endpoint watcher for a different cluster. + EndpointWatcher watcher2 = mock(EndpointWatcher.class); + xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher2); + + // Client sent a new EDS request for all interested resources. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); + + // Management server sends back an EDS response with ClusterLoadAssignment for one of requested + // cluster. + clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region2", "zone2", "subzone2", + ImmutableList.of( + buildLbEndpoint("192.168.312.6", 443, HealthStatus.HEALTHY, 1)), + 6, 1)), + ImmutableList.of()))); + + response = buildDiscoveryResponse("1", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request for all interested resources. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_EDS, "0001"))); + + ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture()); + EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); + assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region2", "zone2", "subzone2"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.312.6", 443, 1, true)), + 6, 1)); + + // Cancel one of the watcher. + xdsClient.cancelEndpointDataWatch("cluster-foo.googleapis.com", watcher1); + + // Since the cancelled watcher was the last watcher interested in that cluster, client + // sent an new EDS request to unsubscribe from that cluster. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "cluster-bar.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, "0001"))); + + // Management server should not respond as it had previously sent the requested resource. + + // Cancel the other watcher. + xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher2); + + // Since the cancelled watcher was the last watcher interested in that cluster, client + // sent an new EDS request to unsubscribe from that cluster. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of(), // empty resources + XdsClientImpl.ADS_TYPE_URL_EDS, "0001"))); + + // All endpoint watchers have been cancelled. + + // Management server sends back an EDS response for updating previously sent resources. + clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of( + buildLbEndpoint("192.168.432.6", 80, HealthStatus.HEALTHY, 2)), + 3, 0)), + ImmutableList.of())), + Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region4", "zone4", "subzone4", + ImmutableList.of( + buildLbEndpoint("192.168.75.6", 8888, HealthStatus.HEALTHY, 2)), + 3, 0)), + ImmutableList.of()))); + + response = buildDiscoveryResponse("2", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0002"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("2", + ImmutableList.of(), // empty resources + XdsClientImpl.ADS_TYPE_URL_EDS, "0002"))); + + // Cancelled watchers do not receive notification. + verifyNoMoreInteractions(watcher1, watcher2); + + // A new endpoint watcher is added to watch an old but was no longer interested in cluster. + EndpointWatcher watcher3 = mock(EndpointWatcher.class); + xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3); + + // Nothing should be notified to the new watcher as we are still waiting management server's + // latest response. + // Cached endpoint data should have been purged. + verify(watcher3, never()).onEndpointChanged(any(EndpointUpdate.class)); + + // An EDS request is sent to re-subscribe the cluster again. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("2", "cluster-bar.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, "0002"))); + + // Management server sends back an EDS response for re-subscribed resource. + clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region4", "zone4", "subzone4", + ImmutableList.of( + buildLbEndpoint("192.168.75.6", 8888, HealthStatus.HEALTHY, 2)), + 3, 0)), + ImmutableList.of()))); + + response = buildDiscoveryResponse("3", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0003"); + responseObserver.onNext(response); + + ArgumentCaptor endpointUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture()); + EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue(); + assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(endpointUpdate3.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region4", "zone4", "subzone4"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.75.6", 8888, 2, true)), + 3, 0)); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("3", + ImmutableList.of("cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_EDS, "0003"))); + } + + /** + * RPC stream closed and retry during the period of first tiem resolving service config + * (LDS/RDS only). + */ + @Test + public void streamClosedAndRetryWhenResolvingConfig() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server closes the RPC stream immediately. + responseObserver.onCompleted(); + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + // Client retried by sending an LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server closes the RPC stream with an error. + responseObserver.onError(Status.UNAVAILABLE.asException()); + verifyNoMoreInteractions(backoffPolicyProvider); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + // Client retried again by sending an LDS. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server responses with a listener for the requested resource. + Rds rdsConfig = + Rds.newBuilder() + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse ldsResponse = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(ldsResponse); + + // Client sent back an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + // Client sent an RDS request based on the received listener. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, ""))); + + // Management server encounters an error and closes the stream. + responseObserver.onError(Status.UNKNOWN.asException()); + + // Reset backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // RPC stream closed immediately + responseObserver.onError(Status.UNKNOWN.asException()); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server sends an LDS response. + responseObserver.onNext(ldsResponse); + + // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) + + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster.googleapis.com"))))); + DiscoveryResponse rdsResponse = + buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + // Management server sends an RDS response. + responseObserver.onNext(rdsResponse); + + // Client has resolved the cluster based on the RDS response. + configWatcher + .onConfigChanged( + eq(ConfigUpdate.newBuilder().setClusterName("cluster.googleapis.com").build())); + + // RPC stream closed with an error again. + responseObserver.onError(Status.UNKNOWN.asException()); + + // Reset backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); + } + + /** + * RPC stream close and retry while there are config/cluster/endpoint watchers registered. + */ + @Test + public void streamClosedAndRetry() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + StreamObserver requestObserver = requestObservers.poll(); + + waitUntilConfigResolved(responseObserver); + + // Start watching cluster information. + xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + + // Client sent first CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Start watching endpoint information. + xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + + // Client sent first EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server closes the RPC stream with an error. + responseObserver.onError(Status.UNKNOWN.asException()); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + // Retry resumes requests for all wanted resources. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server is still not reachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back a CDS response. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster.googleapis.com", null, false))); + DiscoveryResponse cdsResponse = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(cdsResponse); + + // Client sent an CDS ACK request (Omitted). + + // Management server closes the RPC stream. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable again. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + } + + /** + * RPC stream closed and retry while some cluster/endpoint watchers have changed (added/removed). + */ + @Test + public void streamClosedAndRetryRaceWithAddingAndRemovingWatchers() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + requestObservers.poll(); + + waitUntilConfigResolved(responseObserver); + + // Management server closes RPC stream. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + StreamObserver requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server becomes unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Start watching cluster information while RPC stream is still in retry backoff. + xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server is still unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Start watching endpoint information while RPC stream is still in retry backoff. + xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back a CDS response. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster.googleapis.com", null, false))); + DiscoveryResponse cdsResponse = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(cdsResponse); + + // Client sent an CDS ACK request (Omitted). + + // Management server closes the RPC stream again. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable again. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // No longer interested in previous cluster and endpoints in that cluster. + xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher); + xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver, never()) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver, never()) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + } + + // Simulates the use case of watching clusters/endpoints based on service config resolved by + // LDS/RDS. + private void waitUntilConfigResolved(StreamObserver responseObserver) { + // Client sent an LDS request for resource "foo.googleapis.com:8080" (Omitted). + + // Management server responses with a listener telling client to do RDS. + Rds rdsConfig = + Rds.newBuilder() + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse ldsResponse = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(ldsResponse); + + // Client sent an LDS ACK request and an RDS request for resource + // "route-foo.googleapis.com" (Omitted). + + // Management server sends an RDS response. + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster.googleapis.com"))))); + DiscoveryResponse rdsResponse = + buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(rdsResponse); + } + + @Test + public void matchHostName_exactlyMatch() { + String pattern = "foo.googleapis.com"; + assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("fo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("oo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue(); + } + + @Test + public void matchHostName_prefixWildcard() { + String pattern = "*.foo.googleapis.com"; + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("bar-baz.foo.googleapis", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isTrue(); + pattern = "*-bar.foo.googleapis.com"; + assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("-bar.foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis.com", pattern)) + .isTrue(); + } + + @Test + public void matchHostName_postfixWildCard() { + String pattern = "foo.*"; + assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue(); + assertThat(XdsClientImpl.matchHostName("foo.com", pattern)).isTrue(); + pattern = "foo-*"; + assertThat(XdsClientImpl.matchHostName("bar-.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo-", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo-bar.com", pattern)).isTrue(); + assertThat(XdsClientImpl.matchHostName("foo-.com", pattern)).isTrue(); + assertThat(XdsClientImpl.matchHostName("foo-bar", pattern)).isTrue(); } private static DiscoveryResponse buildDiscoveryResponse(String versionInfo, @@ -1295,12 +2605,17 @@ private static DiscoveryResponse buildDiscoveryResponse(String versionInfo, private static DiscoveryRequest buildDiscoveryRequest(String versionInfo, String resourceName, String typeUrl, String nonce) { + return buildDiscoveryRequest(versionInfo, ImmutableList.of(resourceName), typeUrl, nonce); + } + + private static DiscoveryRequest buildDiscoveryRequest(String versionInfo, + List resourceNames, String typeUrl, String nonce) { return DiscoveryRequest.newBuilder() .setVersionInfo(versionInfo) .setNode(NODE) .setTypeUrl(typeUrl) - .addResourceNames(resourceName) + .addAllResourceNames(resourceNames) .setResponseNonce(nonce) .build(); } @@ -1338,6 +2653,81 @@ private static VirtualHost buildVirtualHost(List domains, String cluster .build(); } + private static Cluster buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs) { + Cluster.Builder clusterBuilder = Cluster.newBuilder(); + clusterBuilder.setName(clusterName); + clusterBuilder.setType(DiscoveryType.EDS); + EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); + edsClusterConfigBuilder.setEdsConfig( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())); + if (edsServiceName != null) { + edsClusterConfigBuilder.setServiceName(edsServiceName); + } + clusterBuilder.setEdsClusterConfig(edsClusterConfigBuilder); + clusterBuilder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (enableLrs) { + clusterBuilder.setLrsServer( + ConfigSource.newBuilder().setSelf(SelfConfigSource.getDefaultInstance())); + } + return clusterBuilder.build(); + } + + private static ClusterLoadAssignment buildClusterLoadAssignment(String clusterName, + List localityLbEndpoints, + List dropOverloads) { + return + ClusterLoadAssignment.newBuilder() + .setClusterName(clusterName) + .addAllEndpoints(localityLbEndpoints) + .setPolicy( + Policy.newBuilder() + .setDisableOverprovisioning(true) + .addAllDropOverloads(dropOverloads)) + .build(); + } + + private static Policy.DropOverload buildDropOverload(String category, int dropPerMillion) { + return + Policy.DropOverload.newBuilder() + .setCategory(category) + .setDropPercentage( + FractionalPercent.newBuilder() + .setNumerator(dropPerMillion) + .setDenominator(DenominatorType.MILLION)) + .build(); + } + + private static io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints buildLocalityLbEndpoints( + String region, String zone, String subzone, + List lbEndpoints, + int loadBalancingWeight, int priority) { + return + io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder() + .setLocality( + io.envoyproxy.envoy.api.v2.core.Locality.newBuilder() + .setRegion(region) + .setZone(zone) + .setSubZone(subzone)) + .addAllLbEndpoints(lbEndpoints) + .setLoadBalancingWeight(UInt32Value.newBuilder().setValue(loadBalancingWeight)) + .setPriority(priority) + .build(); + } + + private static io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint buildLbEndpoint(String address, + int port, HealthStatus healthStatus, int loadbalancingWeight) { + return + io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint.newBuilder() + .setEndpoint( + io.envoyproxy.envoy.api.v2.endpoint.Endpoint.newBuilder().setAddress( + Address.newBuilder().setSocketAddress( + SocketAddress.newBuilder().setAddress(address).setPortValue(port)))) + .setHealthStatus(healthStatus).setLoadBalancingWeight( + UInt32Value.newBuilder().setValue(loadbalancingWeight)) + .build(); + } + /** * Matcher for DiscoveryRequest without the comparison of error_details field, which is used for * management server debugging purposes. @@ -1349,7 +2739,7 @@ private static VirtualHost buildVirtualHost(List domains, String cluster private static class DiscoveryRequestMatcher implements ArgumentMatcher { private final String versionInfo; private final String typeUrl; - private final List resourceNames; + private final Set resourceNames; private final String responseNonce; private DiscoveryRequestMatcher(String versionInfo, String resourceName, String typeUrl, @@ -1360,7 +2750,7 @@ private DiscoveryRequestMatcher(String versionInfo, String resourceName, String private DiscoveryRequestMatcher(String versionInfo, List resourceNames, String typeUrl, String responseNonce) { this.versionInfo = versionInfo; - this.resourceNames = resourceNames; + this.resourceNames = new HashSet<>(resourceNames); this.typeUrl = typeUrl; this.responseNonce = responseNonce; } @@ -1376,7 +2766,7 @@ public boolean matches(DiscoveryRequest argument) { if (!responseNonce.equals(argument.getResponseNonce())) { return false; } - if (!resourceNames.equals(argument.getResourceNamesList())) { + if (!resourceNames.equals(new HashSet<>(argument.getResourceNamesList()))) { return false; } return NODE.equals(argument.getNode());