Skip to content

Commit

Permalink
Complete retry tests for incorperating all xds protocols.
Browse files Browse the repository at this point in the history
  • Loading branch information
voidzcy committed Nov 15, 2019
1 parent 1f8d6c8 commit cfe5bdd
Showing 1 changed file with 339 additions and 5 deletions.
344 changes: 339 additions & 5 deletions xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -2130,9 +2131,6 @@ public void addRemoveEndpointWatchersFreely() {
XdsClientImpl.ADS_TYPE_URL_EDS, "0002")));
}

// TODO(chengyuanzhang): incorporate interactions with cluster watchers and end endpoint watchers
// during retry.

/**
* RPC stream closed and retry during the period of first tiem resolving service config
* (LDS/RDS only).
Expand Down Expand Up @@ -2288,8 +2286,344 @@ public void streamClosedAndRetryWhenResolvingConfig() {
verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
}

// TODO(chengyuanzhang): test for race between stream closed and watcher changes. Should only
// for ClusterWatchers and EndpointWatchers.
/**
* RPC stream close and retry while there are config/cluster/endpoint watchers registered.
*/
@Test
public void streamClosedAndRetry() {
InOrder inOrder =
Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);

ArgumentCaptor<StreamObserver<DiscoveryResponse>> responseObserverCaptor =
ArgumentCaptor.forClass(null);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
StreamObserver<DiscoveryResponse> responseObserver =
responseObserverCaptor.getValue(); // same as responseObservers.poll()
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();

waitUntilConfigResolved(responseObserver);

// Start watching cluster information.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);

// Client sent first CDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));

// Start watching endpoint information.
xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);

// Client sent first EDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

// Management server closes the RPC stream with an error.
responseObserver.onError(Status.UNKNOWN.asException());

// Resets backoff and retry immediately.
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();

// Retry resumes requests for all wanted resources.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);

// Retry after backoff.
fakeClock.forwardNanos(9L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);

// Retry after backoff.
fakeClock.forwardNanos(99L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

// Management server sends back a CDS response.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster.googleapis.com", null, false)));
DiscoveryResponse cdsResponse =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(cdsResponse);

// Client sent an CDS ACK request (Omitted).

// Management server closes the RPC stream.
responseObserver.onCompleted();

// Resets backoff and retry immediately
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();

verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);

// Retry after backoff.
fakeClock.forwardNanos(19L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
}

/**
* RPC stream closed and retry while some cluster/endpoint watchers have changed (added/removed).
*/
@Test
public void streamClosedAndRetryRaceWithAddingAndRemovingWatchers() {
InOrder inOrder =
Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);

ArgumentCaptor<StreamObserver<DiscoveryResponse>> responseObserverCaptor =
ArgumentCaptor.forClass(null);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
StreamObserver<DiscoveryResponse> responseObserver =
responseObserverCaptor.getValue(); // same as responseObservers.poll()
requestObservers.poll();

waitUntilConfigResolved(responseObserver);

// Management server closes RPC stream.
responseObserver.onCompleted();

// Resets backoff and retry immediately.
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();

verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));

// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);

// Start watching cluster information while RPC stream is still in retry backoff.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);

// Retry after backoff.
fakeClock.forwardNanos(9L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();

verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));

// Management server is still unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);

// Start watching endpoint information while RPC stream is still in retry backoff.
xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);

// Retry after backoff.
fakeClock.forwardNanos(99L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();

verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

// Management server sends back a CDS response.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster.googleapis.com", null, false)));
DiscoveryResponse cdsResponse =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(cdsResponse);

// Client sent an CDS ACK request (Omitted).

// Management server closes the RPC stream again.
responseObserver.onCompleted();

// Resets backoff and retry immediately.
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);

// No longer interested in previous cluster and endpoints in that cluster.
xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher);
xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher);

// Retry after backoff.
fakeClock.forwardNanos(19L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
requestObserver = requestObservers.poll();

verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver, never())
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver, never())
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
}

// Simulates the use case of watching clusters/endpoints based on service config resolved by
// LDS/RDS.
private void waitUntilConfigResolved(StreamObserver<DiscoveryResponse> responseObserver) {
// Client sent an LDS request for resource "foo.googleapis.com:8080" (Omitted).

// Management server responses with a listener telling client to do RDS.
Rds rdsConfig =
Rds.newBuilder()
.setConfigSource(
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
.setRouteConfigName("route-foo.googleapis.com")
.build();

List<Any> listeners = ImmutableList.of(
Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
);
DiscoveryResponse ldsResponse =
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
responseObserver.onNext(ldsResponse);

// Client sent an LDS ACK request and an RDS request for resource
// "route-foo.googleapis.com" (Omitted).

// Management server sends an RDS response.
List<Any> routeConfigs = ImmutableList.of(
Any.pack(
buildRouteConfiguration(
"route-foo.googleapis.com",
ImmutableList.of(
buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
"cluster.googleapis.com")))));
DiscoveryResponse rdsResponse =
buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
responseObserver.onNext(rdsResponse);
}

@Test
public void matchHostName_exactlyMatch() {
Expand Down

0 comments on commit cfe5bdd

Please sign in to comment.