diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java
index fd7302658c5..2a66ef6770a 100644
--- a/api/src/main/java/io/grpc/NameResolver.java
+++ b/api/src/main/java/io/grpc/NameResolver.java
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -412,6 +413,7 @@ public static final class Args {
private final ProxyDetector proxyDetector;
private final SynchronizationContext syncContext;
private final ServiceConfigParser serviceConfigParser;
+ @Nullable private final ScheduledExecutorService scheduledExecutorService;
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;
@@ -420,12 +422,14 @@ private Args(
ProxyDetector proxyDetector,
SynchronizationContext syncContext,
ServiceConfigParser serviceConfigParser,
+ @Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger,
@Nullable Executor executor) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set");
+ this.scheduledExecutorService = scheduledExecutorService;
this.channelLogger = channelLogger;
this.executor = executor;
}
@@ -460,6 +464,25 @@ public SynchronizationContext getSynchronizationContext() {
return syncContext;
}
+ /**
+ * Returns a {@link ScheduledExecutorService} for scheduling delayed tasks.
+ *
+ *
This service is a shared resource and is only meant for quick tasks. DO NOT block or run
+ * time-consuming tasks.
+ *
+ *
The returned service doesn't support {@link ScheduledExecutorService#shutdown shutdown()}
+ * and {@link ScheduledExecutorService#shutdownNow shutdownNow()}. They will throw if called.
+ *
+ * @since 1.26.0
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
+ public ScheduledExecutorService getScheduledExecutorService() {
+ if (scheduledExecutorService == null) {
+ throw new IllegalStateException("ScheduledExecutorService not set in Builder");
+ }
+ return scheduledExecutorService;
+ }
+
/**
* Returns the {@link ServiceConfigParser}.
*
@@ -501,6 +524,7 @@ public String toString() {
.add("proxyDetector", proxyDetector)
.add("syncContext", syncContext)
.add("serviceConfigParser", serviceConfigParser)
+ .add("scheduledExecutorService", scheduledExecutorService)
.add("channelLogger", channelLogger)
.add("executor", executor)
.toString();
@@ -517,6 +541,7 @@ public Builder toBuilder() {
builder.setProxyDetector(proxyDetector);
builder.setSynchronizationContext(syncContext);
builder.setServiceConfigParser(serviceConfigParser);
+ builder.setScheduledExecutorService(scheduledExecutorService);
builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor);
return builder;
@@ -541,6 +566,7 @@ public static final class Builder {
private ProxyDetector proxyDetector;
private SynchronizationContext syncContext;
private ServiceConfigParser serviceConfigParser;
+ private ScheduledExecutorService scheduledExecutorService;
private ChannelLogger channelLogger;
private Executor executor;
@@ -577,6 +603,16 @@ public Builder setSynchronizationContext(SynchronizationContext syncContext) {
return this;
}
+ /**
+ * See {@link Args#getScheduledExecutorService}.
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
+ public Builder setScheduledExecutorService(
+ ScheduledExecutorService scheduledExecutorService) {
+ this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
+ return this;
+ }
+
/**
* See {@link Args#getServiceConfigParser}. This is a required field.
*
@@ -618,7 +654,7 @@ public Args build() {
return
new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser,
- channelLogger, executor);
+ scheduledExecutorService, channelLogger, executor);
}
}
}
diff --git a/api/src/test/java/io/grpc/NameResolverTest.java b/api/src/test/java/io/grpc/NameResolverTest.java
index 828047443a4..0924002fd28 100644
--- a/api/src/test/java/io/grpc/NameResolverTest.java
+++ b/api/src/test/java/io/grpc/NameResolverTest.java
@@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
@@ -45,6 +46,8 @@ public class NameResolverTest {
private final SynchronizationContext syncContext =
new SynchronizationContext(mock(UncaughtExceptionHandler.class));
private final ServiceConfigParser parser = mock(ServiceConfigParser.class);
+ private final ScheduledExecutorService scheduledExecutorService =
+ mock(ScheduledExecutorService.class);
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private URI uri;
@@ -62,6 +65,7 @@ public void args() {
assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser);
+ assertThat(args.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);
@@ -70,6 +74,7 @@ public void args() {
assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser);
+ assertThat(args2.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);
@@ -254,6 +259,7 @@ private NameResolver.Args createArgs() {
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(parser)
+ .setScheduledExecutorService(scheduledExecutorService)
.setChannelLogger(channelLogger)
.setOffloadExecutor(executor)
.build();
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index de3a472501c..b21faa322d3 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -137,7 +137,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final NameResolver.Args nameResolverArgs;
private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
private final ClientTransportFactory transportFactory;
- private final ScheduledExecutorForBalancer scheduledExecutorForBalancer;
+ private final RestrictedScheduledExecutor scheduledExecutor;
private final Executor executor;
private final ObjectPool extends Executor> executorPool;
private final ObjectPool extends Executor> balancerRpcExecutorPool;
@@ -562,6 +562,12 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
this.target = checkNotNull(builder.target, "target");
this.logId = InternalLogId.allocate("Channel", target);
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
+ this.executorPool = checkNotNull(builder.executorPool, "executorPool");
+ this.executor = checkNotNull(executorPool.getObject(), "executor");
+ this.transportFactory =
+ new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
+ this.scheduledExecutor =
+ new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
maxTraceEvents = builder.maxTraceEvents;
channelTracer = new ChannelTracer(
logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
@@ -581,6 +587,7 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
.setDefaultPort(builder.getDefaultPort())
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
+ .setScheduledExecutorService(scheduledExecutor)
.setServiceConfigParser(
new ScParser(
retryEnabled,
@@ -598,18 +605,11 @@ public void execute(Runnable command) {
})
.build();
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
- this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
- this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
this.delayedTransport.start(delayedTransportListener);
this.backoffPolicyProvider = backoffPolicyProvider;
- this.transportFactory =
- new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
- this.scheduledExecutorForBalancer =
- new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService());
-
serviceConfigInterceptor = new ServiceConfigInterceptor(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
this.defaultServiceConfig = builder.defaultServiceConfig;
@@ -1269,7 +1269,7 @@ public SynchronizationContext getSynchronizationContext() {
@Override
public ScheduledExecutorService getScheduledExecutorService() {
- return scheduledExecutorForBalancer;
+ return scheduledExecutor;
}
@Override
@@ -1736,10 +1736,10 @@ synchronized void release() {
}
}
- private static final class ScheduledExecutorForBalancer implements ScheduledExecutorService {
+ private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
final ScheduledExecutorService delegate;
- private ScheduledExecutorForBalancer(ScheduledExecutorService delegate) {
+ private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}
diff --git a/core/src/test/java/io/grpc/internal/ReadableBuffersArrayTest.java b/core/src/test/java/io/grpc/internal/ReadableBuffersArrayTest.java
index 2bed7d52b12..d5c4fa77fd8 100644
--- a/core/src/test/java/io/grpc/internal/ReadableBuffersArrayTest.java
+++ b/core/src/test/java/io/grpc/internal/ReadableBuffersArrayTest.java
@@ -23,11 +23,14 @@
import static org.junit.Assert.assertTrue;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
/**
* Tests for the array-backed {@link ReadableBuffer} returned by {@link ReadableBuffers#wrap(byte[],
* int, int)}.
*/
+@RunWith(JUnit4.class)
public class ReadableBuffersArrayTest extends ReadableBufferTestBase {
@Test
diff --git a/cronet/build.gradle b/cronet/build.gradle
index dd0c9174fc1..fcd7e16cfad 100644
--- a/cronet/build.gradle
+++ b/cronet/build.gradle
@@ -42,8 +42,8 @@ android {
buildTypes {
debug { minifyEnabled false }
release {
- minifyEnabled true
- proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
+ minifyEnabled false
+ consumerProguardFiles 'proguard-rules.pro'
}
}
testOptions { unitTests { includeAndroidResources = true } }
diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java
index df8d7a51cc8..6fb6e497795 100644
--- a/netty/src/main/java/io/grpc/netty/Utils.java
+++ b/netty/src/main/java/io/grpc/netty/Utils.java
@@ -91,7 +91,7 @@ class Utils {
@Override
public ByteBufAllocator create() {
if (Boolean.parseBoolean(
- System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
+ System.getProperty("io.grpc.netty.useCustomAllocator", "true"))) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java
index cf770a991e1..3d735a7cf40 100644
--- a/stub/src/main/java/io/grpc/stub/ClientCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java
@@ -366,7 +366,8 @@ public boolean isReady() {
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
if (frozen) {
- throw new IllegalStateException("Cannot alter onReadyHandler after call started");
+ throw new IllegalStateException(
+ "Cannot alter onReadyHandler after call started. Use ClientResponseObserver");
}
this.onReadyHandler = onReadyHandler;
}
@@ -374,7 +375,8 @@ public void setOnReadyHandler(Runnable onReadyHandler) {
@Override
public void disableAutoInboundFlowControl() {
if (frozen) {
- throw new IllegalStateException("Cannot disable auto flow control call started");
+ throw new IllegalStateException(
+ "Cannot disable auto flow control after call started. Use ClientResponseObserver");
}
autoFlowControlEnabled = false;
}
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index 078c80d4adb..b37bf18e8fe 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -380,7 +380,9 @@ public boolean isReady() {
@Override
public void setOnReadyHandler(Runnable r) {
- checkState(!frozen, "Cannot alter onReadyHandler after initialization");
+ checkState(!frozen, "Cannot alter onReadyHandler after initialization. May only be called "
+ + "during the initial call to the application, before the service returns its "
+ + "StreamObserver");
this.onReadyHandler = r;
}
@@ -391,7 +393,9 @@ public boolean isCancelled() {
@Override
public void setOnCancelHandler(Runnable onCancelHandler) {
- checkState(!frozen, "Cannot alter onCancelHandler after initialization");
+ checkState(!frozen, "Cannot alter onCancelHandler after initialization. May only be called "
+ + "during the initial call to the application, before the service returns its "
+ + "StreamObserver");
this.onCancelHandler = onCancelHandler;
}
diff --git a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
index f49879939be..dcd5f31b1f7 100644
--- a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
+++ b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
@@ -195,6 +195,14 @@ static final class LbEndpoint {
private final int loadBalancingWeight;
private final boolean isHealthy;
+ @VisibleForTesting
+ LbEndpoint(String address, int port, int loadBalancingWeight, boolean isHealthy) {
+ this(
+ new EquivalentAddressGroup(
+ new InetSocketAddress(address, port)),
+ loadBalancingWeight, isHealthy);
+ }
+
@VisibleForTesting
LbEndpoint(EquivalentAddressGroup eag, int loadBalancingWeight, boolean isHealthy) {
this.eag = eag;
diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java
index 8f9d396fcc1..2527c0dc2ce 100644
--- a/xds/src/main/java/io/grpc/xds/LocalityStore.java
+++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java
@@ -76,15 +76,14 @@ interface LocalityStore {
void updateOobMetricsReportInterval(long reportIntervalNano);
- LoadStatsStore getLoadStatsStore();
-
@VisibleForTesting
abstract class LocalityStoreFactory {
private static final LocalityStoreFactory DEFAULT_INSTANCE =
new LocalityStoreFactory() {
@Override
- LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) {
- return new LocalityStoreImpl(helper, lbRegistry);
+ LocalityStore newLocalityStore(
+ Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) {
+ return new LocalityStoreImpl(helper, lbRegistry, loadStatsStore);
}
};
@@ -92,7 +91,8 @@ static LocalityStoreFactory getInstance() {
return DEFAULT_INSTANCE;
}
- abstract LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry);
+ abstract LocalityStore newLocalityStore(
+ Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore);
}
final class LocalityStoreImpl implements LocalityStore {
@@ -113,9 +113,10 @@ final class LocalityStoreImpl implements LocalityStore {
private List dropOverloads = ImmutableList.of();
private long metricsReportIntervalNano = -1;
- LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) {
+ LocalityStoreImpl(
+ Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) {
this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance,
- new LoadStatsStoreImpl(), OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance());
+ loadStatsStore, OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance());
}
@VisibleForTesting
@@ -284,11 +285,6 @@ public String toString() {
TimeUnit.MINUTES, helper.getScheduledExecutorService());
}
- @Override
- public LoadStatsStore getLoadStatsStore() {
- return loadStatsStore;
- }
-
@Override
public void updateOobMetricsReportInterval(long reportIntervalNano) {
metricsReportIntervalNano = reportIntervalNano;
diff --git a/xds/src/main/java/io/grpc/xds/LookasideLb.java b/xds/src/main/java/io/grpc/xds/LookasideLb.java
index 9fd9a8c8cf3..d32530c6423 100644
--- a/xds/src/main/java/io/grpc/xds/LookasideLb.java
+++ b/xds/src/main/java/io/grpc/xds/LookasideLb.java
@@ -175,11 +175,13 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
xdsClient = new XdsComms2(
channel, helper, new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER, node);
- localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry);
+ LoadStatsStore loadStatsStore = new LoadStatsStoreImpl();
+ localityStore = localityStoreFactory.newLocalityStore(
+ helper, lbRegistry, loadStatsStore);
// TODO(zdapeng): Use XdsClient to do Lrs directly.
lrsClient = loadReportClientFactory.createLoadReportClient(
channel, helper, new ExponentialBackoffPolicy.Provider(),
- localityStore.getLoadStatsStore());
+ loadStatsStore);
final LoadReportCallback lrsCallback =
new LoadReportCallback() {
@Override
diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java
index 39945ab074e..6ed206715bc 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClient.java
@@ -344,6 +344,8 @@ void watchConfigData(String hostName, int port, ConfigWatcher watcher) {
/**
* Registers a data watcher for the given cluster.
+ *
+ * Adding the same watcher for the same cluster more than once is a no-op.
*/
void watchClusterData(String clusterName, ClusterWatcher watcher) {
}
@@ -351,12 +353,16 @@ void watchClusterData(String clusterName, ClusterWatcher watcher) {
/**
* Unregisters the given cluster watcher, which was registered to receive updates for the
* given cluster.
+ *
+ *
Cancelling a watcher that was not registered for the given cluster is a no-op.
*/
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
}
/**
* Registers a data watcher for endpoints in the given cluster.
+ *
+ *
Adding the same watcher for the same cluster more than once is a no-op.
*/
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
}
@@ -364,6 +370,8 @@ void watchEndpointData(String clusterName, EndpointWatcher watcher) {
/**
* Unregisters the given endpoints watcher, which was registered to receive updates for
* endpoints information in the given cluster.
+ *
+ *
Cancelling a watcher that was not registered for the given cluster is a no-op.
*/
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
}
diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
index c497f4ad9eb..e2f9c94b98e 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
@@ -25,6 +25,11 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Code;
+import io.envoyproxy.envoy.api.v2.Cluster;
+import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType;
+import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig;
+import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy;
+import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.Listener;
@@ -44,6 +49,9 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ChannelCreds;
+import io.grpc.xds.EnvoyProtoData.DropOverload;
+import io.grpc.xds.EnvoyProtoData.Locality;
+import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -66,6 +74,11 @@ final class XdsClientImpl extends XdsClient {
@VisibleForTesting
static final String ADS_TYPE_URL_RDS =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
+ @VisibleForTesting
+ static final String ADS_TYPE_URL_CDS = "type.googleapis.com/envoy.api.v2.Cluster";
+ @VisibleForTesting
+ static final String ADS_TYPE_URL_EDS =
+ "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
private final ManagedChannel channel;
private final SynchronizationContext syncContext;
@@ -84,6 +97,25 @@ final class XdsClientImpl extends XdsClient {
// responses.
private final Map routeConfigNamesToClusterNames = new HashMap<>();
+ // Cached data for CDS responses, keyed by cluster names.
+ // Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead
+ // of whole Cluster messages to reduce memory usage.
+ private final Map clusterNamesToClusterUpdates = new HashMap<>();
+
+ // Cached data for EDS responses, keyed by cluster names.
+ // CDS responses indicate absence of clusters and EDS responses indicate presence of clusters.
+ // Optimization: cache EndpointUpdate, which contains only information needed by gRPC, instead
+ // of whole ClusterLoadAssignment messages to reduce memory usage.
+ private final Map clusterNamesToEndpointUpdates = new HashMap<>();
+
+ // Cluster watchers waiting for cluster information updates. Multiple cluster watchers
+ // can watch on information for the same cluster.
+ private final Map> clusterWatchers = new HashMap<>();
+
+ // Endpoint watchers waiting for endpoint updates for each cluster. Multiple endpoint
+ // watchers can watch endpoints in the same cluster.
+ private final Map> endpointWatchers = new HashMap<>();
+
@Nullable
private AdsStream adsStream;
@Nullable
@@ -170,6 +202,122 @@ void watchConfigData(String hostName, int port, ConfigWatcher watcher) {
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName));
}
+ @Override
+ void watchClusterData(String clusterName, ClusterWatcher watcher) {
+ checkNotNull(watcher, "watcher");
+ boolean needRequest = false;
+ if (!clusterWatchers.containsKey(clusterName)) {
+ logger.log(Level.FINE, "Start watching cluster {0}", clusterName);
+ needRequest = true;
+ clusterWatchers.put(clusterName, new HashSet());
+ }
+ Set watchers = clusterWatchers.get(clusterName);
+ if (watchers.contains(watcher)) {
+ logger.log(Level.WARNING, "Watcher {0} already registered", watcher);
+ return;
+ }
+ watchers.add(watcher);
+ // If local cache contains cluster information to be watched, notify the watcher immediately.
+ if (clusterNamesToClusterUpdates.containsKey(clusterName)) {
+ watcher.onClusterChanged(clusterNamesToClusterUpdates.get(clusterName));
+ }
+ if (rpcRetryTimer != null) {
+ // Currently in retry backoff.
+ return;
+ }
+ if (needRequest) {
+ if (adsStream == null) {
+ startRpcStream();
+ }
+ adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
+ }
+ }
+
+ @Override
+ void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
+ checkNotNull(watcher, "watcher");
+ Set watchers = clusterWatchers.get(clusterName);
+ if (watchers == null || !watchers.contains(watcher)) {
+ logger.log(Level.FINE, "Watcher {0} was not registered", watcher);
+ return;
+ }
+ watchers.remove(watcher);
+ if (watchers.isEmpty()) {
+ logger.log(Level.FINE, "Stop watching cluster {0}", clusterName);
+ clusterWatchers.remove(clusterName);
+ // If unsubscribe the last resource, do NOT send a CDS request for an empty resource list.
+ // This is a workaround for CDS protocol resource unsubscribe.
+ if (clusterWatchers.isEmpty()) {
+ return;
+ }
+ // No longer interested in this cluster, send an updated CDS request to unsubscribe
+ // this resource.
+ if (rpcRetryTimer != null) {
+ // Currently in retry backoff.
+ return;
+ }
+ checkState(adsStream != null,
+ "Severe bug: ADS stream was not created while an endpoint watcher was registered");
+ adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
+ }
+ }
+
+ @Override
+ void watchEndpointData(String clusterName, EndpointWatcher watcher) {
+ checkNotNull(watcher, "watcher");
+ boolean needRequest = false;
+ if (!endpointWatchers.containsKey(clusterName)) {
+ logger.log(Level.FINE, "Start watching endpoints in cluster {0}", clusterName);
+ needRequest = true;
+ endpointWatchers.put(clusterName, new HashSet());
+ }
+ Set watchers = endpointWatchers.get(clusterName);
+ if (watchers.contains(watcher)) {
+ logger.log(Level.WARNING, "Watcher {0} already registered", watcher);
+ return;
+ }
+ watchers.add(watcher);
+ // If local cache contains endpoint information for the cluster to be watched, notify
+ // the watcher immediately.
+ if (clusterNamesToEndpointUpdates.containsKey(clusterName)) {
+ watcher.onEndpointChanged(clusterNamesToEndpointUpdates.get(clusterName));
+ }
+ if (rpcRetryTimer != null) {
+ // Currently in retry backoff.
+ return;
+ }
+ if (needRequest) {
+ if (adsStream == null) {
+ startRpcStream();
+ }
+ adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet());
+ }
+ }
+
+ @Override
+ void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
+ checkNotNull(watcher, "watcher");
+ Set watchers = endpointWatchers.get(clusterName);
+ if (watchers == null || !watchers.contains(watcher)) {
+ logger.log(Level.FINE, "Watcher {0} was not registered", watcher);
+ return;
+ }
+ watchers.remove(watcher);
+ if (watchers.isEmpty()) {
+ logger.log(Level.FINE, "Stop watching endpoints in cluster {0}", clusterName);
+ endpointWatchers.remove(clusterName);
+ // Remove the corresponding EDS cache entry.
+ clusterNamesToEndpointUpdates.remove(clusterName);
+ // No longer interested in this cluster, send an updated EDS request to unsubscribe
+ // this resource.
+ if (rpcRetryTimer != null) {
+ // Currently in retry backoff.
+ return;
+ }
+ adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet());
+ }
+ }
+
/**
* Builds a channel to the given server URI with the first supported channel creds config.
*/
@@ -214,7 +362,6 @@ private void handleLdsResponse(DiscoveryResponse ldsResponse) {
logger.log(Level.FINE, "Received an LDS response: {0}", ldsResponse);
checkState(ldsResourceName != null && configWatcher != null,
"No LDS request was ever sent. Management server is doing something wrong");
- adsStream.ldsRespNonce = ldsResponse.getNonce();
// Unpack Listener messages.
List listeners = new ArrayList<>(ldsResponse.getResourcesCount());
@@ -224,7 +371,7 @@ private void handleLdsResponse(DiscoveryResponse ldsResponse) {
}
} catch (InvalidProtocolBufferException e) {
adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
- ldsResponse.getNonce(), "Broken LDS response.");
+ "Broken LDS response.");
return;
}
@@ -242,7 +389,7 @@ private void handleLdsResponse(DiscoveryResponse ldsResponse) {
}
} catch (InvalidProtocolBufferException e) {
adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
- ldsResponse.getNonce(), "Broken LDS response.");
+ "Broken LDS response.");
return;
}
@@ -289,12 +436,11 @@ private void handleLdsResponse(DiscoveryResponse ldsResponse) {
}
if (errorMessage != null) {
- adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
- ldsResponse.getNonce(), errorMessage);
+ adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), errorMessage);
return;
}
adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
- ldsResponse.getVersionInfo(), ldsResponse.getNonce());
+ ldsResponse.getVersionInfo());
// Remove RDS cache entries for RouteConfigurations not referenced by this LDS response.
// LDS responses represents the state of the world, RouteConfigurations not referenced
@@ -342,7 +488,6 @@ private void handleRdsResponse(DiscoveryResponse rdsResponse) {
logger.log(Level.FINE, "Received an RDS response: {0}", rdsResponse);
checkState(adsStream.rdsResourceName != null,
"Never requested for RDS resources, management server is doing something wrong");
- adsStream.rdsRespNonce = rdsResponse.getNonce();
// Unpack RouteConfiguration messages.
List routeConfigs = new ArrayList<>(rdsResponse.getResourcesCount());
@@ -352,7 +497,7 @@ private void handleRdsResponse(DiscoveryResponse rdsResponse) {
}
} catch (InvalidProtocolBufferException e) {
adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
- rdsResponse.getNonce(), "Broken RDS response.");
+ "Broken RDS response.");
return;
}
@@ -362,7 +507,6 @@ private void handleRdsResponse(DiscoveryResponse rdsResponse) {
String clusterName = processRouteConfig(routeConfig);
if (clusterName == null) {
adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
- rdsResponse.getNonce(),
"Cannot find a valid cluster name in VirtualHost inside "
+ "RouteConfiguration with domains matching: " + hostName + ".");
return;
@@ -372,7 +516,7 @@ private void handleRdsResponse(DiscoveryResponse rdsResponse) {
routeConfigNamesToClusterNames.putAll(clusterNames);
adsStream.sendAckRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
- rdsResponse.getVersionInfo(), rdsResponse.getNonce());
+ rdsResponse.getVersionInfo());
// Notify the ConfigWatcher if this RDS response contains the most recently requested
// RDS resource.
@@ -426,6 +570,229 @@ private String processRouteConfig(RouteConfiguration config) {
return null;
}
+ /**
+ * Handles CDS response, which contains a list of Cluster messages with information for a logical
+ * cluster. The response is NACKed if messages for requested resources contain invalid
+ * information for gRPC's usage. Otherwise, an ACK request is sent to management server.
+ * Response data for requested clusters is cached locally, in case of new cluster watchers
+ * interested in the same clusters are added later.
+ */
+ private void handleCdsResponse(DiscoveryResponse cdsResponse) {
+ logger.log(Level.FINE, "Received an CDS response: {0}", cdsResponse);
+ checkState(adsStream.cdsResourceNames != null,
+ "Never requested for CDS resources, management server is doing something wrong");
+ adsStream.cdsRespNonce = cdsResponse.getNonce();
+
+ // Unpack Cluster messages.
+ List clusters = new ArrayList<>(cdsResponse.getResourcesCount());
+ try {
+ for (com.google.protobuf.Any res : cdsResponse.getResourcesList()) {
+ clusters.add(res.unpack(Cluster.class));
+ }
+ } catch (InvalidProtocolBufferException e) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames,
+ "Broken CDS response");
+ return;
+ }
+
+ String errorMessage = null;
+ // Cluster information update for requested clusters received in this CDS response.
+ Map clusterUpdates = new HashMap<>();
+ // CDS responses represents the state of the world, EDS services not referenced by
+ // Clusters are those no longer exist.
+ Set edsServices = new HashSet<>();
+ for (Cluster cluster : clusters) {
+ String clusterName = cluster.getName();
+ // Skip information for clusters not requested.
+ // Management server is required to always send newly requested resources, even if they
+ // may have been sent previously (proactively). Thus, client does not need to cache
+ // unrequested resources.
+ if (!adsStream.cdsResourceNames.contains(clusterName)) {
+ continue;
+ }
+ ClusterUpdate.Builder updateBuilder = ClusterUpdate.newBuilder();
+ updateBuilder.setClusterName(clusterName);
+ // The type field must be set to EDS.
+ if (!cluster.getType().equals(DiscoveryType.EDS)) {
+ errorMessage = "Cluster [" + clusterName + "]: only EDS discovery type is supported "
+ + "in gRPC.";
+ break;
+ }
+ // In the eds_cluster_config field, the eds_config field must be set to indicate to
+ // use EDS (must be set to use ADS).
+ EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig();
+ if (!edsClusterConfig.getEdsConfig().hasAds()) {
+ errorMessage = "Cluster [" + clusterName + "]: field eds_cluster_config must be set to "
+ + "indicate to use EDS over ADS.";
+ break;
+ }
+ // If the service_name field is set, that value will be used for the EDS request
+ // instead of the cluster name (default).
+ if (!edsClusterConfig.getServiceName().isEmpty()) {
+ updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName());
+ edsServices.add(edsClusterConfig.getServiceName());
+ } else {
+ edsServices.add(clusterName);
+ }
+ // The lb_policy field must be set to ROUND_ROBIN.
+ if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) {
+ errorMessage = "Cluster [" + clusterName + "]: only round robin load balancing policy is "
+ + "supported in gRPC.";
+ break;
+ }
+ updateBuilder.setLbPolicy("round_robin");
+ // If the lrs_server field is set, it must have its self field set, in which case the
+ // client should use LRS for load reporting. Otherwise (the lrs_server field is not set),
+ // LRS load reporting will be disabled.
+ if (cluster.hasLrsServer()) {
+ if (!cluster.getLrsServer().hasSelf()) {
+ errorMessage = "Cluster [" + clusterName + "]: only support enabling LRS for the same "
+ + "management server.";
+ break;
+ }
+ updateBuilder.setEnableLrs(true);
+ updateBuilder.setLrsServerName("");
+ } else {
+ updateBuilder.setEnableLrs(false);
+ }
+ clusterUpdates.put(clusterName, updateBuilder.build());
+ }
+ if (errorMessage != null) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames, errorMessage);
+ return;
+ }
+ adsStream.sendAckRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames,
+ cdsResponse.getVersionInfo());
+
+ // Update local CDS cache with data in this response.
+ clusterNamesToClusterUpdates.clear();
+ clusterNamesToClusterUpdates.putAll(clusterUpdates);
+
+ // Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response.
+ clusterNamesToEndpointUpdates.keySet().retainAll(edsServices);
+
+ // Notify watchers if clusters interested in present. Otherwise, notify with an error.
+ for (Map.Entry> entry : clusterWatchers.entrySet()) {
+ if (clusterUpdates.containsKey(entry.getKey())) {
+ ClusterUpdate clusterUpdate = clusterUpdates.get(entry.getKey());
+ for (ClusterWatcher watcher : entry.getValue()) {
+ watcher.onClusterChanged(clusterUpdate);
+ }
+ } else {
+ for (ClusterWatcher watcher : entry.getValue()) {
+ watcher.onError(
+ Status.NOT_FOUND.withDescription(
+ "Requested cluster [" + entry.getKey() + "] does not exist"));
+ }
+ }
+ }
+ }
+
+ /**
+ * Handles EDS response, which contains a list of ClusterLoadAssignment messages with
+ * endpoint load balancing information for each cluster. The response is NACKed if messages
+ * for requested resources contain invalid information for gRPC's usage. Otherwise,
+ * an ACK request is sent to management server. Response data for requested clusters is
+ * cached locally, in case of new endpoint watchers interested in the same clusters
+ * are added later.
+ */
+ private void handleEdsResponse(DiscoveryResponse edsResponse) {
+ logger.log(Level.FINE, "Received an EDS response: {0}", edsResponse);
+
+ // Unpack ClusterLoadAssignment messages.
+ List clusterLoadAssignments =
+ new ArrayList<>(edsResponse.getResourcesCount());
+ try {
+ for (com.google.protobuf.Any res : edsResponse.getResourcesList()) {
+ clusterLoadAssignments.add(res.unpack(ClusterLoadAssignment.class));
+ }
+ } catch (InvalidProtocolBufferException e) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet(),
+ "Broken EDS response");
+ return;
+ }
+
+ String errorMessage = null;
+ // Endpoint information updates for requested clusters received in this EDS response.
+ Map endpointUpdates = new HashMap<>();
+ // Walk through each ClusterLoadAssignment message. If any of them for requested clusters
+ // contain invalid information for gRPC's load balancing usage, the whole response is rejected.
+ for (ClusterLoadAssignment assignment : clusterLoadAssignments) {
+ String clusterName = assignment.getClusterName();
+ // Skip information for clusters not requested.
+ // Management server is required to always send newly requested resources, even if they
+ // may have been sent previously (proactively). Thus, client does not need to cache
+ // unrequested resources.
+ if (!endpointWatchers.containsKey(clusterName)) {
+ continue;
+ }
+ EndpointUpdate.Builder updateBuilder = EndpointUpdate.newBuilder();
+ updateBuilder.setClusterName(clusterName);
+ if (assignment.getEndpointsCount() == 0) {
+ errorMessage = "Cluster without any locality endpoint.";
+ break;
+ }
+ // The policy.disable_overprovisioning field must be set to true.
+ if (!assignment.getPolicy().getDisableOverprovisioning()) {
+ errorMessage = "Cluster requires overprovisioning.";
+ break;
+ }
+ for (io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints localityLbEndpoints
+ : assignment.getEndpointsList()) {
+ // The lb_endpoints field for LbEndpoint must contain at least one entry.
+ if (localityLbEndpoints.getLbEndpointsCount() == 0) {
+ errorMessage = "Locality with no endpoint.";
+ break;
+ }
+ // The endpoint field of each lb_endpoints must be set.
+ // Inside of it: the address field must be set.
+ for (io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint lbEndpoint
+ : localityLbEndpoints.getLbEndpointsList()) {
+ if (!lbEndpoint.hasEndpoint() || !lbEndpoint.getEndpoint().hasAddress()) {
+ errorMessage = "Invalid endpoint address information.";
+ break;
+ }
+ }
+ if (errorMessage != null) {
+ break;
+ }
+ // Note endpoints with health status other than UNHEALTHY and UNKNOWN are still
+ // handed over to watching parties. It is watching parties' responsibility to
+ // filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy().
+ updateBuilder.addLocalityLbEndpoints(
+ Locality.fromEnvoyProtoLocality(localityLbEndpoints.getLocality()),
+ LocalityLbEndpoints.fromEnvoyProtoLocalityLbEndpoints(localityLbEndpoints));
+ }
+ if (errorMessage != null) {
+ break;
+ }
+ for (io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload dropOverload
+ : assignment.getPolicy().getDropOverloadsList()) {
+ updateBuilder.addDropPolicy(DropOverload.fromEnvoyProtoDropOverload(dropOverload));
+ }
+ EndpointUpdate update = updateBuilder.build();
+ endpointUpdates.put(clusterName, update);
+ }
+ if (errorMessage != null) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet(),
+ "ClusterLoadAssignment message contains invalid information for gRPC's usage: "
+ + errorMessage);
+ return;
+ }
+ adsStream.sendAckRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet(),
+ edsResponse.getVersionInfo());
+
+ // Update local EDS cache by inserting updated endpoint information.
+ clusterNamesToEndpointUpdates.putAll(endpointUpdates);
+
+ // Notify watchers waiting for updates of endpoint information received in this EDS response.
+ for (Map.Entry entry : endpointUpdates.entrySet()) {
+ for (EndpointWatcher watcher : endpointWatchers.get(entry.getKey())) {
+ watcher.onEndpointChanged(entry.getValue());
+ }
+ }
+ }
+
@VisibleForTesting
final class RpcRetryTask implements Runnable {
@Override
@@ -434,7 +801,12 @@ public void run() {
if (configWatcher != null) {
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName));
}
- // TODO(chengyuanzhang): send CDS/EDS requests if CDS/EDS watcher presents.
+ if (!clusterWatchers.isEmpty()) {
+ adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
+ }
+ if (!endpointWatchers.isEmpty()) {
+ adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet());
+ }
}
}
@@ -450,6 +822,8 @@ private final class AdsStream implements StreamObserver {
// resources.
private String ldsVersion = "";
private String rdsVersion = "";
+ private String cdsVersion = "";
+ private String edsVersion = "";
// Response nonce for the most recently received discovery responses of each resource type.
// Client initiated requests start response nonce with empty string.
@@ -459,11 +833,24 @@ private final class AdsStream implements StreamObserver {
// DiscoveryResponse.
private String ldsRespNonce = "";
private String rdsRespNonce = "";
+ private String cdsRespNonce = "";
+ private String edsRespNonce = "";
- // Most recently requested resource name(s) for each resource type. Note the resource_name in
- // LDS requests will always be "xds:" URI (including port suffix if present).
+ // Most recently requested RDS resource name, which is an intermediate resource name for
+ // resolving service config.
+ // LDS request always use the same resource name, which is the "xds:" URI.
+ // Resource names for EDS requests are always represented by the cluster names that
+ // watchers are interested in.
@Nullable
private String rdsResourceName;
+ // Most recently requested CDS resource names.
+ // Due to CDS protocol limitation, client does not send a CDS request for empty resource
+ // names when unsubscribing the last resource. Management server assumes it is still
+ // subscribing to the last resource, client also need to behave so to avoid data lose.
+ // Therefore, cluster names that watchers interested in cannot always represent resource names
+ // in most recently sent CDS requests.
+ @Nullable
+ private Collection cdsResourceNames;
private AdsStream(AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub) {
this.stub = checkNotNull(stub, "stub");
@@ -480,12 +867,26 @@ public void onNext(final DiscoveryResponse response) {
public void run() {
responseReceived = true;
String typeUrl = response.getTypeUrl();
+ // Nonce in each response is echoed back in the following ACK/NACK request. It is
+ // used for management server to identify which response the client is ACKing/NACking.
+ // To avoid confusion, client-initiated requests will always use the nonce in
+ // most recently received responses of each resource type.
if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
+ ldsRespNonce = response.getNonce();
handleLdsResponse(response);
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
+ rdsRespNonce = response.getNonce();
handleRdsResponse(response);
+ } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
+ cdsRespNonce = response.getNonce();
+ handleCdsResponse(response);
+ } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
+ edsRespNonce = response.getNonce();
+ handleEdsResponse(response);
+ } else {
+ logger.log(Level.FINE, "Received an unknown type of DiscoveryResponse {0}",
+ response);
}
- // TODO(zdapeng): add CDS/EDS response handles.
}
});
}
@@ -571,8 +972,17 @@ private void sendXdsRequest(String typeUrl, Collection resourceNames) {
version = rdsVersion;
nonce = rdsRespNonce;
rdsResourceName = resourceNames.iterator().next();
+ } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
+ version = cdsVersion;
+ nonce = cdsRespNonce;
+ // For CDS protocol resource unsubscribe workaround, keep the last unsubscribed cluster
+ // as the requested resource name for ACK requests when all all resources have
+ // been unsubscribed.
+ cdsResourceNames = ImmutableList.copyOf(resourceNames);
+ } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
+ version = edsVersion;
+ nonce = edsRespNonce;
}
- // TODO(chengyuanzhang): cases for CDS/EDS.
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()
@@ -590,16 +1000,22 @@ private void sendXdsRequest(String typeUrl, Collection resourceNames) {
* version for the corresponding resource type.
*/
private void sendAckRequest(String typeUrl, Collection resourceNames,
- String versionInfo, String nonce) {
+ String versionInfo) {
checkState(requestWriter != null, "ADS stream has not been started");
+ String nonce = "";
if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
ldsVersion = versionInfo;
- ldsRespNonce = nonce;
+ nonce = ldsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
rdsVersion = versionInfo;
- rdsRespNonce = nonce;
+ nonce = rdsRespNonce;
+ } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
+ cdsVersion = versionInfo;
+ nonce = cdsRespNonce;
+ } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
+ edsVersion = versionInfo;
+ nonce = edsRespNonce;
}
- // TODO(chengyuanzhang): cases for CDS/EDS.
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()
@@ -616,16 +1032,24 @@ private void sendAckRequest(String typeUrl, Collection resourceNames,
* Sends a DiscoveryRequest with the given information as an NACK. NACK takes the previous
* accepted version.
*/
- private void sendNackRequest(String typeUrl, Collection resourceNames, String nonce,
+ private void sendNackRequest(String typeUrl, Collection resourceNames,
String message) {
checkState(requestWriter != null, "ADS stream has not been started");
String versionInfo = "";
+ String nonce = "";
if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
versionInfo = ldsVersion;
+ nonce = ldsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
versionInfo = rdsVersion;
+ nonce = rdsRespNonce;
+ } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
+ versionInfo = cdsVersion;
+ nonce = cdsRespNonce;
+ } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
+ versionInfo = edsVersion;
+ nonce = edsRespNonce;
}
- // TODO(chengyuanzhang): cases for CDS/EDS.
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()
diff --git a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java
index d32b1dcb8e7..0f8ff762702 100644
--- a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java
+++ b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java
@@ -194,11 +194,10 @@ public void onCompleted() {
localityStoreFactory = new LocalityStoreFactory() {
@Override
- public LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) {
+ public LocalityStore newLocalityStore(
+ Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) {
helpers.add(helper);
LocalityStore localityStore = mock(LocalityStore.class);
- LoadStatsStore loadStatsStore = mock(LoadStatsStore.class);
- doReturn(loadStatsStore).when(localityStore).getLoadStatsStore();
localityStores.add(localityStore);
return localityStore;
}
diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
index 1c6ab7a44ae..a38ed0a96b0 100644
--- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
@@ -26,11 +26,19 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
+import com.google.protobuf.UInt32Value;
+import io.envoyproxy.envoy.api.v2.Cluster;
+import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType;
+import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig;
+import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy;
+import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
+import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.Listener;
@@ -38,7 +46,10 @@
import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource;
import io.envoyproxy.envoy.api.v2.core.ConfigSource;
+import io.envoyproxy.envoy.api.v2.core.HealthStatus;
import io.envoyproxy.envoy.api.v2.core.Node;
+import io.envoyproxy.envoy.api.v2.core.SelfConfigSource;
+import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
import io.envoyproxy.envoy.api.v2.route.RedirectAction;
import io.envoyproxy.envoy.api.v2.route.Route;
@@ -48,6 +59,8 @@
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
import io.envoyproxy.envoy.config.listener.v2.ApiListener;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
+import io.envoyproxy.envoy.type.FractionalPercent;
+import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.ManagedChannel;
@@ -60,17 +73,30 @@
import io.grpc.internal.FakeClock;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.xds.EnvoyProtoData.DropOverload;
+import io.grpc.xds.EnvoyProtoData.LbEndpoint;
+import io.grpc.xds.EnvoyProtoData.Locality;
+import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
+import io.grpc.xds.XdsClient.ClusterUpdate;
+import io.grpc.xds.XdsClient.ClusterWatcher;
import io.grpc.xds.XdsClient.ConfigUpdate;
import io.grpc.xds.XdsClient.ConfigWatcher;
+import io.grpc.xds.XdsClient.EndpointUpdate;
+import io.grpc.xds.XdsClient.EndpointWatcher;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.HashSet;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
@@ -81,6 +107,7 @@
/**
* Tests for {@link XdsClientImpl}.
*/
+@RunWith(JUnit4.class)
public class XdsClientImplTest {
private static final String HOSTNAME = "foo.googleapis.com";
@@ -121,6 +148,10 @@ public void uncaughtException(Thread t, Throwable e) {
private BackoffPolicy backoffPolicy2;
@Mock
private ConfigWatcher configWatcher;
+ @Mock
+ private ClusterWatcher clusterWatcher;
+ @Mock
+ private EndpointWatcher endpointWatcher;
private ManagedChannel channel;
private XdsClientImpl xdsClient;
@@ -166,8 +197,9 @@ public void cancelled(Context context) {
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
xdsClient =
- new XdsClientImpl(channel, NODE, syncContext, fakeClock.getScheduledExecutorService(),
- backoffPolicyProvider, fakeClock.getStopwatchSupplier().get());
+ new XdsClientImpl(channel, NODE, syncContext,
+ fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
+ fakeClock.getStopwatchSupplier().get());
// Only the connection to management server is established, no RPC request is sent until at
// least one watcher is registered.
assertThat(responseObservers).isEmpty();
@@ -1081,205 +1113,1483 @@ public void routeConfigurationRemovedNotifiedToWatcher() {
}
/**
- * RPC stream closed and retry during the period of first tiem resolving service config
- * (LDS/RDS only).
+ * Client receives an CDS response that does not contain a Cluster for the requested resource
+ * while each received Cluster is valid. The CDS response is ACKed. Cluster watchers are notified
+ * with an error for resource not found.
*/
@Test
- public void streamClosedAndRetryWhenResolvingConfig() {
- InOrder inOrder =
- Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
- backoffPolicy2);
- xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ public void cdsResponseWithoutMatchingResource() {
+ xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
- ArgumentCaptor> responseObserverCaptor =
- ArgumentCaptor.forClass(null);
- inOrder.verify(mockedDiscoveryService)
- .streamAggregatedResources(responseObserverCaptor.capture());
- StreamObserver responseObserver =
- responseObserverCaptor.getValue(); // same as responseObservers.poll()
+ // Client sends a CDS request for the only cluster being watched to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+
+ // Management server sends back a CDS response without Cluster for the requested resource.
+ List clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)),
+ Any.pack(buildCluster("cluster-baz.googleapis.com", null, false)));
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK CDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
+
+ ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null);
+ verify(clusterWatcher).onError(errorStatusCaptor.capture());
+ Status error = errorStatusCaptor.getValue();
+ assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND);
+ assertThat(error.getDescription())
+ .isEqualTo("Requested cluster [cluster-foo.googleapis.com] does not exist");
+ }
+
+ /**
+ * Normal workflow of receiving a CDS response containing Cluster message for a requested
+ * cluster.
+ */
+ @Test
+ public void cdsResponseWithMatchingResource() {
+ xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
StreamObserver requestObserver = requestObservers.poll();
- // Client sends an LDS request for the host name (with port) to management server.
+ // Client sends a CDS request for the only cluster being watched to management server.
verify(requestObserver)
- .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
- XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+
+ // Management server sends back a CDS response without Cluster for the requested resource.
+ List clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)),
+ Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)),
+ Any.pack(buildCluster("cluster-baz.googleapis.com", null, false)));
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
+ responseObserver.onNext(response);
- // Management server closes the RPC stream immediately.
- responseObserver.onCompleted();
- inOrder.verify(backoffPolicyProvider).get();
- inOrder.verify(backoffPolicy1).nextBackoffNanos();
- assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+ // Client sent an ACK CDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
+
+ ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture());
+ ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
+ assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate.isEnableLrs()).isEqualTo(false);
+
+ // Management server sends back another CDS response updating the requested Cluster.
+ clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)),
+ Any.pack(
+ buildCluster("cluster-foo.googleapis.com", "eds-cluster-foo.googleapis.com", true)),
+ Any.pack(buildCluster("cluster-baz.googleapis.com", null, false)));
+ response =
+ buildDiscoveryResponse("1", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0001");
+ responseObserver.onNext(response);
- // Retry after backoff.
- fakeClock.forwardNanos(9L);
- assertThat(requestObservers).isEmpty();
- fakeClock.forwardNanos(1L);
- inOrder.verify(mockedDiscoveryService)
- .streamAggregatedResources(responseObserverCaptor.capture());
- responseObserver = responseObserverCaptor.getValue();
- requestObserver = requestObservers.poll();
+ // Client sent an ACK CDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
+
+ verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture());
+ clusterUpdate = clusterUpdateCaptor.getValue();
+ assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate.getEdsServiceName())
+ .isEqualTo("eds-cluster-foo.googleapis.com");
+ assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate.isEnableLrs()).isEqualTo(true);
+ assertThat(clusterUpdate.getLrsServerName()).isEqualTo("");
+ }
- // Client retried by sending an LDS request.
+ @Test
+ public void multipleClusterWatchers() {
+ ClusterWatcher watcher1 = mock(ClusterWatcher.class);
+ ClusterWatcher watcher2 = mock(ClusterWatcher.class);
+ ClusterWatcher watcher3 = mock(ClusterWatcher.class);
+ xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
+ xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
+ xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3);
+
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends a CDS request containing all clusters being watched to management server.
verify(requestObserver)
- .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
- XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+
+ // Management server sends back a CDS response contains Cluster for only one of
+ // requested cluster.
+ List clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)));
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
+ responseObserver.onNext(response);
- // Management server closes the RPC stream with an error.
- responseObserver.onError(Status.UNAVAILABLE.asException());
- verifyNoMoreInteractions(backoffPolicyProvider);
- inOrder.verify(backoffPolicy1).nextBackoffNanos();
- assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+ // Client sent an ACK CDS request.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("0",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
+
+ // Two watchers get notification of cluster update for the cluster they are interested in.
+ ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
+ verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
+ ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
+ assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate1.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
+
+ ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
+ verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
+ ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
+ assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate2.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
+
+ // The other watcher gets an error notification for cluster not found.
+ ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null);
+ verify(watcher3).onError(errorStatusCaptor.capture());
+ Status error = errorStatusCaptor.getValue();
+ assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND);
+ assertThat(error.getDescription())
+ .isEqualTo("Requested cluster [cluster-bar.googleapis.com] does not exist");
- // Retry after backoff.
- fakeClock.forwardNanos(99L);
- assertThat(requestObservers).isEmpty();
- fakeClock.forwardNanos(1L);
- inOrder.verify(mockedDiscoveryService)
- .streamAggregatedResources(responseObserverCaptor.capture());
- responseObserver = responseObserverCaptor.getValue();
- requestObserver = requestObservers.poll();
+ // Management server sends back another CDS response contains Clusters for all
+ // requested clusters.
+ clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)),
+ Any.pack(
+ buildCluster("cluster-bar.googleapis.com",
+ "eds-cluster-bar.googleapis.com", true)));
+ response = buildDiscoveryResponse("1", clusters,
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0001");
+ responseObserver.onNext(response);
- // Client retried again by sending an LDS.
+ // Client sent an ACK CDS request.
verify(requestObserver)
- .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
- XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("1",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
+
+ // All watchers received notification for cluster update.
+ verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
+ clusterUpdate1 = clusterUpdateCaptor1.getValue();
+ assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate1.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
+
+ clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
+ verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture());
+ clusterUpdate2 = clusterUpdateCaptor2.getValue();
+ assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate2.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
+
+ ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
+ verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
+ ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
+ assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
+ assertThat(clusterUpdate3.getEdsServiceName())
+ .isEqualTo("eds-cluster-bar.googleapis.com");
+ assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true);
+ assertThat(clusterUpdate3.getLrsServerName()).isEqualTo("");
+ }
- // Management server responses with a listener for the requested resource.
- Rds rdsConfig =
- Rds.newBuilder()
- .setConfigSource(
- ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
- .setRouteConfigName("route-foo.googleapis.com")
- .build();
+ /**
+ * (CDS response caching behavior) Adding cluster watchers interested in some cluster that
+ * some other endpoint watcher had already been watching on will result in cluster update
+ * notified to the newly added watcher immediately, without sending new CDS requests.
+ */
+ @Test
+ public void watchClusterAlreadyBeingWatched() {
+ ClusterWatcher watcher1 = mock(ClusterWatcher.class);
+ xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
- List listeners = ImmutableList.of(
- Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
- Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
- );
- DiscoveryResponse ldsResponse =
- buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
- responseObserver.onNext(ldsResponse);
+ // Streaming RPC starts after a first watcher is added.
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
- // Client sent back an ACK LDS request.
+ // Client sends an CDS request to management server.
verify(requestObserver)
- .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
- XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+ .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
- // Client sent an RDS request based on the received listener.
+ // Management server sends back an CDS response with Cluster for the requested
+ // cluster.
+ List clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)));
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK CDS request.
verify(requestObserver)
- .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com",
- XdsClientImpl.ADS_TYPE_URL_RDS, "")));
+ .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
+
+ ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
+ verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
+ ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
+ assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate1.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
+
+ // Another cluster watcher interested in the same cluster is added.
+ ClusterWatcher watcher2 = mock(ClusterWatcher.class);
+ xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
+
+ // Since the client has received cluster update for this cluster before, cached result is
+ // notified to the newly added watcher immediately.
+ ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
+ verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
+ ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
+ assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate2.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
- // Management server encounters an error and closes the stream.
- responseObserver.onError(Status.UNKNOWN.asException());
+ verifyNoMoreInteractions(requestObserver);
+ }
- // Reset backoff and retry immediately.
- inOrder.verify(backoffPolicyProvider).get();
- fakeClock.runDueTasks();
- inOrder.verify(mockedDiscoveryService)
- .streamAggregatedResources(responseObserverCaptor.capture());
- responseObserver = responseObserverCaptor.getValue();
- requestObserver = requestObservers.poll();
+ @Test
+ public void addRemoveClusterWatchersFreely() {
+ ClusterWatcher watcher1 = mock(ClusterWatcher.class);
+ xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
+
+ // Streaming RPC starts after a first watcher is added.
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an CDS request to management server.
verify(requestObserver)
- .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
- XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
- // RPC stream closed immediately
- responseObserver.onError(Status.UNKNOWN.asException());
- inOrder.verify(backoffPolicy2).nextBackoffNanos();
- assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+ // Management server sends back a CDS response with Cluster for the requested
+ // cluster.
+ List clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)));
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
+ responseObserver.onNext(response);
- // Retry after backoff.
- fakeClock.forwardNanos(19L);
- assertThat(requestObservers).isEmpty();
- fakeClock.forwardNanos(1L);
- inOrder.verify(mockedDiscoveryService)
- .streamAggregatedResources(responseObserverCaptor.capture());
- responseObserver = responseObserverCaptor.getValue();
- requestObserver = requestObservers.poll();
+ // Client sent an ACK CDS request.
verify(requestObserver)
- .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
- XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
+
+ ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
+ verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
+ ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
+ assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate1.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
+
+ // Add another cluster watcher for a different cluster.
+ ClusterWatcher watcher2 = mock(ClusterWatcher.class);
+ xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2);
+
+ // Client sent a new CDS request for all interested resources.
+ verify(requestObserver)
+ .onNext(
+ eq(buildDiscoveryRequest("0",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
- // Management server sends an LDS response.
- responseObserver.onNext(ldsResponse);
+ // Management server sends back a CDS response with Cluster for all requested cluster.
+ clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)),
+ Any.pack(
+ buildCluster("cluster-bar.googleapis.com",
+ "eds-cluster-bar.googleapis.com", true)));
+ response = buildDiscoveryResponse("1", clusters,
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0001");
+ responseObserver.onNext(response);
- // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted)
+ // Client sent an ACK CDS request for all interested resources.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("1",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
+
+ verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
+ clusterUpdate1 = clusterUpdateCaptor1.getValue();
+ assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate1.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
+
+ ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
+ verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
+ ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
+ assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
+ assertThat(clusterUpdate2.getEdsServiceName())
+ .isEqualTo("eds-cluster-bar.googleapis.com");
+ assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(true);
+ assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
+
+ // Cancel one of the watcher.
+ xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1);
+
+ // Since the cancelled watcher was the last watcher interested in that cluster (but there
+ // is still interested resource), client sent an new CDS request to unsubscribe from
+ // that cluster.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "cluster-bar.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
- List routeConfigs = ImmutableList.of(
+ // Management server has nothing to respond.
+
+ // Cancel the other watcher. All resources have been unsubscribed.
+ xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2);
+
+ // All endpoint watchers have been cancelled. Due to protocol limitation, we do not send
+ // a CDS request for updated resource names (empty) when canceling the last resource.
+ verifyNoMoreInteractions(requestObserver);
+
+ // Management server sends back a new CDS response.
+ clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)),
Any.pack(
- buildRouteConfiguration(
- "route-foo.googleapis.com",
- ImmutableList.of(
- buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
- "cluster.googleapis.com")))));
- DiscoveryResponse rdsResponse =
- buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
- // Management server sends an RDS response.
- responseObserver.onNext(rdsResponse);
+ buildCluster("cluster-bar.googleapis.com", null, false)));
+ response =
+ buildDiscoveryResponse("2", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0002");
+ responseObserver.onNext(response);
- // Client has resolved the cluster based on the RDS response.
- configWatcher
- .onConfigChanged(
- eq(ConfigUpdate.newBuilder().setClusterName("cluster.googleapis.com").build()));
+ // Due to protocol limitation, client sent an ACK CDS request, with resource_names containing
+ // the last unsubscribed resource.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("2",
+ ImmutableList.of("cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0002")));
- // RPC stream closed with an error again.
- responseObserver.onError(Status.UNKNOWN.asException());
+ // Cancelled watchers do not receive notification.
+ verifyNoMoreInteractions(watcher1, watcher2);
- // Reset backoff and retry immediately.
- inOrder.verify(backoffPolicyProvider).get();
- fakeClock.runDueTasks();
- requestObserver = requestObservers.poll();
+ // A new cluster watcher is added to watch cluster foo again.
+ ClusterWatcher watcher3 = mock(ClusterWatcher.class);
+ xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3);
+
+ // A CDS request is sent to indicate subscription of "cluster-foo.googleapis.com" only.
verify(requestObserver)
- .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
- XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ .onNext(eq(buildDiscoveryRequest("2", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0002")));
- verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
- }
+ // Management server sends back a new CDS response for at least newly requested resources
+ // (it is required to do so).
+ clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)),
+ Any.pack(
+ buildCluster("cluster-bar.googleapis.com", null, false)));
+ response =
+ buildDiscoveryResponse("3", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0003");
+ responseObserver.onNext(response);
- // TODO(chengyuanzhang): test for race between stream closed and watcher changes. Should only
- // for ClusterWatchers and EndpointWatchers.
+ // Notified with cached data immediately.
+ ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
+ verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
+ ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
+ assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(clusterUpdate3.getEdsServiceName())
+ .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
+ assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
+ assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true);
+ assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
- @Test
- public void matchHostName_exactlyMatch() {
- String pattern = "foo.googleapis.com";
- assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("fo.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("oo.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("foo.googleapis", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue();
- }
+ verifyNoMoreInteractions(watcher1, watcher2);
- @Test
- public void matchHostName_prefixWildcard() {
- String pattern = "*.foo.googleapis.com";
- assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("bar-baz.foo.googleapis", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isTrue();
- pattern = "*-bar.foo.googleapis.com";
- assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("-bar.foo.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis.com", pattern))
- .isTrue();
+ // A CDS request is sent to re-subscribe the cluster again.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("3", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "0003")));
}
+ /**
+ * Client receives an EDS response that does not contain a ClusterLoadAssignment for the
+ * requested resource while each received ClusterLoadAssignment is valid.
+ * The EDS response is ACKed.
+ * Endpoint watchers are NOT notified with an error (EDS protocol is incremental, responses
+ * not containing requested resources does not indicate absence).
+ */
@Test
- public void matchHostName_postfixWildCard() {
- String pattern = "foo.*";
- assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue();
- assertThat(XdsClientImpl.matchHostName("foo.com", pattern)).isTrue();
- pattern = "foo-*";
- assertThat(XdsClientImpl.matchHostName("bar-.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("foo-", pattern)).isFalse();
- assertThat(XdsClientImpl.matchHostName("foo-bar.com", pattern)).isTrue();
- assertThat(XdsClientImpl.matchHostName("foo-.com", pattern)).isTrue();
- assertThat(XdsClientImpl.matchHostName("foo-bar", pattern)).isTrue();
+ public void edsResponseWithoutMatchingResource() {
+ xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an EDS request for the only cluster being watched to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server sends back an EDS response without ClusterLoadAssignment for the requested
+ // cluster.
+ List clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region1", "zone1", "subzone1",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)),
+ 1, 0)),
+ ImmutableList.of())),
+ Any.pack(buildClusterLoadAssignment("cluster-baz.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region2", "zone2", "subzone2",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.234.52", 8888, HealthStatus.UNKNOWN, 5)),
+ 6, 1)),
+ ImmutableList.of())));
+
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK EDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
+
+ verifyZeroInteractions(endpointWatcher);
+ }
+
+ /**
+ * Normal workflow of receiving an EDS response containing ClusterLoadAssignment message for
+ * a requested cluster.
+ */
+ @Test
+ public void edsResponseWithMatchingResource() {
+ xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an EDS request for the only cluster being watched to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server sends back an EDS response with ClusterLoadAssignment for the requested
+ // cluster.
+ List clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region1", "zone1", "subzone1",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)),
+ 1, 0),
+ buildLocalityLbEndpoints("region3", "zone3", "subzone3",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)),
+ 2, 1)),
+ ImmutableList.of(
+ buildDropOverload("lb", 200),
+ buildDropOverload("throttle", 1000)))),
+ Any.pack(buildClusterLoadAssignment("cluster-baz.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region2", "zone2", "subzone2",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.234.52", 8888, HealthStatus.UNKNOWN, 5)),
+ 6, 1)),
+ ImmutableList.of())));
+
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK EDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
+
+ ArgumentCaptor endpointUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture());
+ EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue();
+ assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(endpointUpdate.getDropPolicies())
+ .containsExactly(
+ new DropOverload("lb", 200),
+ new DropOverload("throttle", 1000));
+ assertThat(endpointUpdate.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region1", "zone1", "subzone1"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.0.1", 8080,
+ 2, true)), 1, 0),
+ new Locality("region3", "zone3", "subzone3"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.142.5", 80,
+ 5, true)), 2, 1));
+ }
+
+ @Test
+ public void multipleEndpointWatchers() {
+ EndpointWatcher watcher1 = mock(EndpointWatcher.class);
+ EndpointWatcher watcher2 = mock(EndpointWatcher.class);
+ EndpointWatcher watcher3 = mock(EndpointWatcher.class);
+ xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
+ xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2);
+ xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3);
+
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an EDS request containing all clusters being watched to management server.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server sends back an EDS response contains ClusterLoadAssignment for only one of
+ // requested cluster.
+ List clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region1", "zone1", "subzone1",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)),
+ 1, 0)),
+ ImmutableList.of())));
+
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK EDS request.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("0",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
+
+ // Two watchers get notification of endpoint update for the cluster they are interested in.
+ ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
+ verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
+ EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
+ assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region1", "zone1", "subzone1"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.0.1", 8080,
+ 2, true)), 1, 0));
+
+ ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
+ verify(watcher1).onEndpointChanged(endpointUpdateCaptor2.capture());
+ EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
+ assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region1", "zone1", "subzone1"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.0.1", 8080,
+ 2, true)), 1, 0));
+
+ verifyZeroInteractions(watcher3);
+
+ // Management server sends back another EDS response contains ClusterLoadAssignment for the
+ // other requested cluster.
+ clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region2", "zone2", "subzone2",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.234.52", 8888, HealthStatus.UNKNOWN, 5)),
+ 6, 1)),
+ ImmutableList.of())));
+
+ response = buildDiscoveryResponse("1", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0001");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK EDS request.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("1",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0001")));
+
+ // The corresponding watcher gets notified.
+ ArgumentCaptor endpointUpdateCaptor3 = ArgumentCaptor.forClass(null);
+ verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture());
+ EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue();
+ assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
+ assertThat(endpointUpdate3.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region2", "zone2", "subzone2"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.234.52", 8888,
+ 5, true)), 6, 1));
+ }
+
+ /**
+ * (EDS response caching behavior) An endpoint watcher is registered for a cluster that already
+ * has some other endpoint watchers watching on. Endpoint information received previously is
+ * in local cache and notified to the new watcher immediately.
+ */
+ @Test
+ public void watchEndpointsForClusterAlreadyBeingWatched() {
+ EndpointWatcher watcher1 = mock(EndpointWatcher.class);
+ xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends first EDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server sends back an EDS response containing ClusterLoadAssignments for
+ // some cluster not requested.
+ List clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region1", "zone1", "subzone1",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)),
+ 1, 0)),
+ ImmutableList.of())));
+
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK EDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
+
+ ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
+ verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
+ EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
+ assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(endpointUpdate1.getDropPolicies()).isEmpty();
+ assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region1", "zone1", "subzone1"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.0.1", 8080,
+ 2, true)), 1, 0));
+
+ // A second endpoint watcher is registered for endpoints in the same cluster.
+ EndpointWatcher watcher2 = mock(EndpointWatcher.class);
+ xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2);
+
+ // Cached endpoint information is notified to the new watcher immediately, without sending
+ // another EDS request.
+ ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
+ verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture());
+ EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
+ assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(endpointUpdate2.getDropPolicies()).isEmpty();
+ assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region1", "zone1", "subzone1"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.0.1", 8080,
+ 2, true)), 1, 0));
+
+ verifyNoMoreInteractions(requestObserver);
+ }
+
+ @Test
+ public void addRemoveEndpointWatchersFreely() {
+ EndpointWatcher watcher1 = mock(EndpointWatcher.class);
+ xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
+
+ // Streaming RPC starts after a first watcher is added.
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an EDS request to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server sends back an EDS response with ClusterLoadAssignment for the requested
+ // cluster.
+ List clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region1", "zone1", "subzone1",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2),
+ buildLbEndpoint("192.132.53.5", 80, HealthStatus.UNHEALTHY, 5)),
+ 1, 0)),
+ ImmutableList.of())));
+
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK EDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
+
+ ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
+ verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
+ EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
+ assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
+ assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region1", "zone1", "subzone1"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.0.1", 8080, 2, true),
+ new LbEndpoint("192.132.53.5", 80,5, false)),
+ 1, 0));
+
+ // Add another endpoint watcher for a different cluster.
+ EndpointWatcher watcher2 = mock(EndpointWatcher.class);
+ xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher2);
+
+ // Client sent a new EDS request for all interested resources.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("0",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
+
+ // Management server sends back an EDS response with ClusterLoadAssignment for one of requested
+ // cluster.
+ clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region2", "zone2", "subzone2",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.312.6", 443, HealthStatus.HEALTHY, 1)),
+ 6, 1)),
+ ImmutableList.of())));
+
+ response = buildDiscoveryResponse("1", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0001");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK EDS request for all interested resources.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("1",
+ ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0001")));
+
+ ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
+ verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture());
+ EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
+ assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
+ assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region2", "zone2", "subzone2"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.312.6", 443, 1, true)),
+ 6, 1));
+
+ // Cancel one of the watcher.
+ xdsClient.cancelEndpointDataWatch("cluster-foo.googleapis.com", watcher1);
+
+ // Since the cancelled watcher was the last watcher interested in that cluster, client
+ // sent an new EDS request to unsubscribe from that cluster.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "cluster-bar.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0001")));
+
+ // Management server should not respond as it had previously sent the requested resource.
+
+ // Cancel the other watcher.
+ xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher2);
+
+ // Since the cancelled watcher was the last watcher interested in that cluster, client
+ // sent an new EDS request to unsubscribe from that cluster.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("1",
+ ImmutableList.of(), // empty resources
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0001")));
+
+ // All endpoint watchers have been cancelled.
+
+ // Management server sends back an EDS response for updating previously sent resources.
+ clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region3", "zone3", "subzone3",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.432.6", 80, HealthStatus.HEALTHY, 2)),
+ 3, 0)),
+ ImmutableList.of())),
+ Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region4", "zone4", "subzone4",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.75.6", 8888, HealthStatus.HEALTHY, 2)),
+ 3, 0)),
+ ImmutableList.of())));
+
+ response = buildDiscoveryResponse("2", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0002");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK EDS request.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("2",
+ ImmutableList.of(), // empty resources
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0002")));
+
+ // Cancelled watchers do not receive notification.
+ verifyNoMoreInteractions(watcher1, watcher2);
+
+ // A new endpoint watcher is added to watch an old but was no longer interested in cluster.
+ EndpointWatcher watcher3 = mock(EndpointWatcher.class);
+ xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3);
+
+ // Nothing should be notified to the new watcher as we are still waiting management server's
+ // latest response.
+ // Cached endpoint data should have been purged.
+ verify(watcher3, never()).onEndpointChanged(any(EndpointUpdate.class));
+
+ // An EDS request is sent to re-subscribe the cluster again.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("2", "cluster-bar.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0002")));
+
+ // Management server sends back an EDS response for re-subscribed resource.
+ clusterLoadAssignments = ImmutableList.of(
+ Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com",
+ ImmutableList.of(
+ buildLocalityLbEndpoints("region4", "zone4", "subzone4",
+ ImmutableList.of(
+ buildLbEndpoint("192.168.75.6", 8888, HealthStatus.HEALTHY, 2)),
+ 3, 0)),
+ ImmutableList.of())));
+
+ response = buildDiscoveryResponse("3", clusterLoadAssignments,
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0003");
+ responseObserver.onNext(response);
+
+ ArgumentCaptor endpointUpdateCaptor3 = ArgumentCaptor.forClass(null);
+ verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture());
+ EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue();
+ assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
+ assertThat(endpointUpdate3.getLocalityLbEndpointsMap())
+ .containsExactly(
+ new Locality("region4", "zone4", "subzone4"),
+ new LocalityLbEndpoints(
+ ImmutableList.of(
+ new LbEndpoint("192.168.75.6", 8888, 2, true)),
+ 3, 0));
+
+ // Client sent an ACK EDS request.
+ verify(requestObserver)
+ .onNext(
+ argThat(
+ new DiscoveryRequestMatcher("3",
+ ImmutableList.of("cluster-bar.googleapis.com"),
+ XdsClientImpl.ADS_TYPE_URL_EDS, "0003")));
+ }
+
+ /**
+ * RPC stream closed and retry during the period of first tiem resolving service config
+ * (LDS/RDS only).
+ */
+ @Test
+ public void streamClosedAndRetryWhenResolvingConfig() {
+ InOrder inOrder =
+ Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
+ backoffPolicy2);
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+
+ ArgumentCaptor> responseObserverCaptor =
+ ArgumentCaptor.forClass(null);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ StreamObserver responseObserver =
+ responseObserverCaptor.getValue(); // same as responseObservers.poll()
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server closes the RPC stream immediately.
+ responseObserver.onCompleted();
+ inOrder.verify(backoffPolicyProvider).get();
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(9L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+
+ // Client retried by sending an LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server closes the RPC stream with an error.
+ responseObserver.onError(Status.UNAVAILABLE.asException());
+ verifyNoMoreInteractions(backoffPolicyProvider);
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(99L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+
+ // Client retried again by sending an LDS.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server responses with a listener for the requested resource.
+ Rds rdsConfig =
+ Rds.newBuilder()
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse ldsResponse =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(ldsResponse);
+
+ // Client sent back an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ // Client sent an RDS request based on the received listener.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "")));
+
+ // Management server encounters an error and closes the stream.
+ responseObserver.onError(Status.UNKNOWN.asException());
+
+ // Reset backoff and retry immediately.
+ inOrder.verify(backoffPolicyProvider).get();
+ fakeClock.runDueTasks();
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // RPC stream closed immediately
+ responseObserver.onError(Status.UNKNOWN.asException());
+ inOrder.verify(backoffPolicy2).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(19L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server sends an LDS response.
+ responseObserver.onNext(ldsResponse);
+
+ // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted)
+
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "cluster.googleapis.com")))));
+ DiscoveryResponse rdsResponse =
+ buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ // Management server sends an RDS response.
+ responseObserver.onNext(rdsResponse);
+
+ // Client has resolved the cluster based on the RDS response.
+ configWatcher
+ .onConfigChanged(
+ eq(ConfigUpdate.newBuilder().setClusterName("cluster.googleapis.com").build()));
+
+ // RPC stream closed with an error again.
+ responseObserver.onError(Status.UNKNOWN.asException());
+
+ // Reset backoff and retry immediately.
+ inOrder.verify(backoffPolicyProvider).get();
+ fakeClock.runDueTasks();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
+ }
+
+ /**
+ * RPC stream close and retry while there are config/cluster/endpoint watchers registered.
+ */
+ @Test
+ public void streamClosedAndRetry() {
+ InOrder inOrder =
+ Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
+ backoffPolicy2);
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+
+ ArgumentCaptor> responseObserverCaptor =
+ ArgumentCaptor.forClass(null);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ StreamObserver responseObserver =
+ responseObserverCaptor.getValue(); // same as responseObservers.poll()
+ StreamObserver requestObserver = requestObservers.poll();
+
+ waitUntilConfigResolved(responseObserver);
+
+ // Start watching cluster information.
+ xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
+
+ // Client sent first CDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+
+ // Start watching endpoint information.
+ xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);
+
+ // Client sent first EDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server closes the RPC stream with an error.
+ responseObserver.onError(Status.UNKNOWN.asException());
+
+ // Resets backoff and retry immediately.
+ inOrder.verify(backoffPolicyProvider).get();
+ fakeClock.runDueTasks();
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+
+ // Retry resumes requests for all wanted resources.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server becomes unreachable.
+ responseObserver.onError(Status.UNAVAILABLE.asException());
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(9L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server is still not reachable.
+ responseObserver.onError(Status.UNAVAILABLE.asException());
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(99L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server sends back a CDS response.
+ List clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster.googleapis.com", null, false)));
+ DiscoveryResponse cdsResponse =
+ buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
+ responseObserver.onNext(cdsResponse);
+
+ // Client sent an CDS ACK request (Omitted).
+
+ // Management server closes the RPC stream.
+ responseObserver.onCompleted();
+
+ // Resets backoff and retry immediately
+ inOrder.verify(backoffPolicyProvider).get();
+ fakeClock.runDueTasks();
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server becomes unreachable again.
+ responseObserver.onError(Status.UNAVAILABLE.asException());
+ inOrder.verify(backoffPolicy2).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(19L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
+ backoffPolicy2);
+ }
+
+ /**
+ * RPC stream closed and retry while some cluster/endpoint watchers have changed (added/removed).
+ */
+ @Test
+ public void streamClosedAndRetryRaceWithAddingAndRemovingWatchers() {
+ InOrder inOrder =
+ Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
+ backoffPolicy2);
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+
+ ArgumentCaptor> responseObserverCaptor =
+ ArgumentCaptor.forClass(null);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ StreamObserver responseObserver =
+ responseObserverCaptor.getValue(); // same as responseObservers.poll()
+ requestObservers.poll();
+
+ waitUntilConfigResolved(responseObserver);
+
+ // Management server closes RPC stream.
+ responseObserver.onCompleted();
+
+ // Resets backoff and retry immediately.
+ inOrder.verify(backoffPolicyProvider).get();
+ fakeClock.runDueTasks();
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server becomes unreachable.
+ responseObserver.onError(Status.UNAVAILABLE.asException());
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Start watching cluster information while RPC stream is still in retry backoff.
+ xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(9L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+
+ // Management server is still unreachable.
+ responseObserver.onError(Status.UNAVAILABLE.asException());
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Start watching endpoint information while RPC stream is still in retry backoff.
+ xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(99L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server sends back a CDS response.
+ List clusters = ImmutableList.of(
+ Any.pack(buildCluster("cluster.googleapis.com", null, false)));
+ DiscoveryResponse cdsResponse =
+ buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
+ responseObserver.onNext(cdsResponse);
+
+ // Client sent an CDS ACK request (Omitted).
+
+ // Management server closes the RPC stream again.
+ responseObserver.onCompleted();
+
+ // Resets backoff and retry immediately.
+ inOrder.verify(backoffPolicyProvider).get();
+ fakeClock.runDueTasks();
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ // Management server becomes unreachable again.
+ responseObserver.onError(Status.UNAVAILABLE.asException());
+ inOrder.verify(backoffPolicy2).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // No longer interested in previous cluster and endpoints in that cluster.
+ xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher);
+ xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(19L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ requestObserver = requestObservers.poll();
+
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+ verify(requestObserver, never())
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_CDS, "")));
+ verify(requestObserver, never())
+ .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_EDS, "")));
+
+ verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
+ backoffPolicy2);
+ }
+
+ // Simulates the use case of watching clusters/endpoints based on service config resolved by
+ // LDS/RDS.
+ private void waitUntilConfigResolved(StreamObserver responseObserver) {
+ // Client sent an LDS request for resource "foo.googleapis.com:8080" (Omitted).
+
+ // Management server responses with a listener telling client to do RDS.
+ Rds rdsConfig =
+ Rds.newBuilder()
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse ldsResponse =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(ldsResponse);
+
+ // Client sent an LDS ACK request and an RDS request for resource
+ // "route-foo.googleapis.com" (Omitted).
+
+ // Management server sends an RDS response.
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "cluster.googleapis.com")))));
+ DiscoveryResponse rdsResponse =
+ buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(rdsResponse);
+ }
+
+ @Test
+ public void matchHostName_exactlyMatch() {
+ String pattern = "foo.googleapis.com";
+ assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("fo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("oo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue();
+ }
+
+ @Test
+ public void matchHostName_prefixWildcard() {
+ String pattern = "*.foo.googleapis.com";
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("bar-baz.foo.googleapis", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isTrue();
+ pattern = "*-bar.foo.googleapis.com";
+ assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("-bar.foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis.com", pattern))
+ .isTrue();
+ }
+
+ @Test
+ public void matchHostName_postfixWildCard() {
+ String pattern = "foo.*";
+ assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue();
+ assertThat(XdsClientImpl.matchHostName("foo.com", pattern)).isTrue();
+ pattern = "foo-*";
+ assertThat(XdsClientImpl.matchHostName("bar-.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo-", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo-bar.com", pattern)).isTrue();
+ assertThat(XdsClientImpl.matchHostName("foo-.com", pattern)).isTrue();
+ assertThat(XdsClientImpl.matchHostName("foo-bar", pattern)).isTrue();
}
private static DiscoveryResponse buildDiscoveryResponse(String versionInfo,
@@ -1295,12 +2605,17 @@ private static DiscoveryResponse buildDiscoveryResponse(String versionInfo,
private static DiscoveryRequest buildDiscoveryRequest(String versionInfo,
String resourceName, String typeUrl, String nonce) {
+ return buildDiscoveryRequest(versionInfo, ImmutableList.of(resourceName), typeUrl, nonce);
+ }
+
+ private static DiscoveryRequest buildDiscoveryRequest(String versionInfo,
+ List resourceNames, String typeUrl, String nonce) {
return
DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(NODE)
.setTypeUrl(typeUrl)
- .addResourceNames(resourceName)
+ .addAllResourceNames(resourceNames)
.setResponseNonce(nonce)
.build();
}
@@ -1338,6 +2653,81 @@ private static VirtualHost buildVirtualHost(List domains, String cluster
.build();
}
+ private static Cluster buildCluster(String clusterName, @Nullable String edsServiceName,
+ boolean enableLrs) {
+ Cluster.Builder clusterBuilder = Cluster.newBuilder();
+ clusterBuilder.setName(clusterName);
+ clusterBuilder.setType(DiscoveryType.EDS);
+ EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder();
+ edsClusterConfigBuilder.setEdsConfig(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()));
+ if (edsServiceName != null) {
+ edsClusterConfigBuilder.setServiceName(edsServiceName);
+ }
+ clusterBuilder.setEdsClusterConfig(edsClusterConfigBuilder);
+ clusterBuilder.setLbPolicy(LbPolicy.ROUND_ROBIN);
+ if (enableLrs) {
+ clusterBuilder.setLrsServer(
+ ConfigSource.newBuilder().setSelf(SelfConfigSource.getDefaultInstance()));
+ }
+ return clusterBuilder.build();
+ }
+
+ private static ClusterLoadAssignment buildClusterLoadAssignment(String clusterName,
+ List localityLbEndpoints,
+ List dropOverloads) {
+ return
+ ClusterLoadAssignment.newBuilder()
+ .setClusterName(clusterName)
+ .addAllEndpoints(localityLbEndpoints)
+ .setPolicy(
+ Policy.newBuilder()
+ .setDisableOverprovisioning(true)
+ .addAllDropOverloads(dropOverloads))
+ .build();
+ }
+
+ private static Policy.DropOverload buildDropOverload(String category, int dropPerMillion) {
+ return
+ Policy.DropOverload.newBuilder()
+ .setCategory(category)
+ .setDropPercentage(
+ FractionalPercent.newBuilder()
+ .setNumerator(dropPerMillion)
+ .setDenominator(DenominatorType.MILLION))
+ .build();
+ }
+
+ private static io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints buildLocalityLbEndpoints(
+ String region, String zone, String subzone,
+ List lbEndpoints,
+ int loadBalancingWeight, int priority) {
+ return
+ io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder()
+ .setLocality(
+ io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
+ .setRegion(region)
+ .setZone(zone)
+ .setSubZone(subzone))
+ .addAllLbEndpoints(lbEndpoints)
+ .setLoadBalancingWeight(UInt32Value.newBuilder().setValue(loadBalancingWeight))
+ .setPriority(priority)
+ .build();
+ }
+
+ private static io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint buildLbEndpoint(String address,
+ int port, HealthStatus healthStatus, int loadbalancingWeight) {
+ return
+ io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint.newBuilder()
+ .setEndpoint(
+ io.envoyproxy.envoy.api.v2.endpoint.Endpoint.newBuilder().setAddress(
+ Address.newBuilder().setSocketAddress(
+ SocketAddress.newBuilder().setAddress(address).setPortValue(port))))
+ .setHealthStatus(healthStatus).setLoadBalancingWeight(
+ UInt32Value.newBuilder().setValue(loadbalancingWeight))
+ .build();
+ }
+
/**
* Matcher for DiscoveryRequest without the comparison of error_details field, which is used for
* management server debugging purposes.
@@ -1349,7 +2739,7 @@ private static VirtualHost buildVirtualHost(List domains, String cluster
private static class DiscoveryRequestMatcher implements ArgumentMatcher {
private final String versionInfo;
private final String typeUrl;
- private final List resourceNames;
+ private final Set resourceNames;
private final String responseNonce;
private DiscoveryRequestMatcher(String versionInfo, String resourceName, String typeUrl,
@@ -1360,7 +2750,7 @@ private DiscoveryRequestMatcher(String versionInfo, String resourceName, String
private DiscoveryRequestMatcher(String versionInfo, List resourceNames, String typeUrl,
String responseNonce) {
this.versionInfo = versionInfo;
- this.resourceNames = resourceNames;
+ this.resourceNames = new HashSet<>(resourceNames);
this.typeUrl = typeUrl;
this.responseNonce = responseNonce;
}
@@ -1376,7 +2766,7 @@ public boolean matches(DiscoveryRequest argument) {
if (!responseNonce.equals(argument.getResponseNonce())) {
return false;
}
- if (!resourceNames.equals(argument.getResourceNamesList())) {
+ if (!resourceNames.equals(new HashSet<>(argument.getResourceNamesList()))) {
return false;
}
return NODE.equals(argument.getNode());