From 4398806a664e14ce3c3ca98406311cc9bde75540 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 13 Nov 2019 17:25:59 -0800 Subject: [PATCH] Complete retry tests for incorperating all xds protocols. --- .../java/io/grpc/xds/XdsClientImplTest.java | 344 +++++++++++++++++- 1 file changed, 339 insertions(+), 5 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index 233c2b0a4f37..5970df22880a 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -2130,9 +2131,6 @@ public void addRemoveEndpointWatchersFreely() { XdsClientImpl.ADS_TYPE_URL_EDS, "0002"))); } - // TODO(chengyuanzhang): incorporate interactions with cluster watchers and end endpoint watchers - // during retry. - /** * RPC stream closed and retry during the period of first tiem resolving service config * (LDS/RDS only). @@ -2288,8 +2286,344 @@ public void streamClosedAndRetryWhenResolvingConfig() { verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); } - // TODO(chengyuanzhang): test for race between stream closed and watcher changes. Should only - // for ClusterWatchers and EndpointWatchers. + /** + * RPC stream close and retry while there are config/cluster/endpoint watchers registered. + */ + @Test + public void streamClosedAndRetry() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + StreamObserver requestObserver = requestObservers.poll(); + + waitUntilConfigResolved(responseObserver); + + // Start watching cluster information. + xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + + // Client sent first CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Start watching endpoint information. + xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + + // Client sent first EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server closes the RPC stream with an error. + responseObserver.onError(Status.UNKNOWN.asException()); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + // Retry resumes requests for all wanted resources. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server is still not reachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back a CDS response. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster.googleapis.com", null, false))); + DiscoveryResponse cdsResponse = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(cdsResponse); + + // Client sent an CDS ACK request (Omitted). + + // Management server closes the RPC stream. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable again. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + } + + /** + * RPC stream closed and retry while some cluster/endpoint watchers have changed (added/removed). + */ + @Test + public void streamClosedAndRetryRaceWithAddingAndRemovingWatchers() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + requestObservers.poll(); + + waitUntilConfigResolved(responseObserver); + + // Management server closes RPC stream. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + StreamObserver requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server becomes unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Start watching cluster information while RPC stream is still in retry backoff. + xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server is still unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Start watching endpoint information while RPC stream is still in retry backoff. + xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back a CDS response. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster.googleapis.com", null, false))); + DiscoveryResponse cdsResponse = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(cdsResponse); + + // Client sent an CDS ACK request (Omitted). + + // Management server closes the RPC stream again. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable again. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // No longer interested in previous cluster and endpoints in that cluster. + xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher); + xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver, never()) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver, never()) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + } + + // Simulates the use case of watching clusters/endpoints based on service config resolved by + // LDS/RDS. + private void waitUntilConfigResolved(StreamObserver responseObserver) { + // Client sent an LDS request for resource "foo.googleapis.com:8080" (Omitted). + + // Management server responses with a listener telling client to do RDS. + Rds rdsConfig = + Rds.newBuilder() + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse ldsResponse = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(ldsResponse); + + // Client sent an LDS ACK request and an RDS request for resource + // "route-foo.googleapis.com" (Omitted). + + // Management server sends an RDS response. + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster.googleapis.com"))))); + DiscoveryResponse rdsResponse = + buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(rdsResponse); + } @Test public void matchHostName_exactlyMatch() {