diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index bee22966f9d6..82cc1a9abe6f 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -1391,6 +1391,133 @@ public void receivedEndpointUpdateNotifiedToWatcherImmediately() { verifyNoMoreInteractions(requestObserver); } + /** + * Client receives EDS responses containing ClusterLoadAssignments for resources that were + * not requested (management server sends them proactively). Later client receives a CDS + * response with the requested Cluster referencing one of the previously received + * ClusterLoadAssignments. No EDS request needs to be sent for that + * ClusterLoadAssignment as it can be found in local cache (management server will not send + * EDS responses for that ClusterLoadAssignment again). A future EDS response update for + * that ClusterLoadAssignment should be notified to corresponding endpoint watchers. + * + *

Tests for caching EDS response data behavior. + */ + @Test + public void receiveEdsResponsesForClusterLoadAssignmentToBeUsedLater() { + EndpointWatcher watcher1 = mock(EndpointWatcher.class); + xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends first EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back an EDS response containing ClusterLoadAssignments for + // some cluster not requested. + List clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0)), + ImmutableList.of())), + Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region2", "zone2", "subzone2", + ImmutableList.of( + buildLbEndpoint("192.168.234.52", 8888, HealthStatus.UNKNOWN, 5)), + 6, 1)), + ImmutableList.of()))); + + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); + + ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); + EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); + assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(endpointUpdate1.getDropPolicies()).isEmpty(); + assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, + 2, true)), 1, 0)); + + // Start watching endpoints in cluster that server had already sent to us. + EndpointWatcher watcher2 = mock(EndpointWatcher.class); + xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher2); + + // Cached result is notified to watcher immediately. + ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture()); + EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); + assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(endpointUpdate2.getDropPolicies()).isEmpty(); + assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region2", "zone2", "subzone2"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.234.52", 8888, + 5, true)), 6, 1)); + + // Client sent an EDS request to subscribe to resources newly being watched. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); + + // Management server sends back an EDS response updating endpoints in a previously known + // cluster. + clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of( + buildLbEndpoint("192.168.41.54", 443, HealthStatus.UNKNOWN, 4)), + 2, 0)), + ImmutableList.of()))); + + response = buildDiscoveryResponse("1", clusterLoadAssignments, + XdsClientImpl.ADS_TYPE_URL_EDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_EDS, "0001"))); + + // Updated result is notified to the corresponding watcher. + verify(watcher2, times(2)).onEndpointChanged(endpointUpdateCaptor2.capture()); + endpointUpdate2 = endpointUpdateCaptor2.getValue(); + assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(endpointUpdate2.getDropPolicies()).isEmpty(); + assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region3", "zone3", "subzone3"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.41.54", 443, + 4, true)), 2, 0)); + } + @Test public void addRemoveEndpointWatchersFreely() { EndpointWatcher watcher1 = mock(EndpointWatcher.class); @@ -1567,8 +1694,6 @@ public void addRemoveEndpointWatchersFreely() { XdsClientImpl.ADS_TYPE_URL_EDS, "0002"))); } - // TODO(chengyuanzhang): tests for caching EDS responses proactively sent by management server. - // TODO(chengyuanzhang): tests for LDS/RDS/CDS/EDS sharing the same RPC stream. // TODO(chengyuanzhang): incorporate interactions with cluster watchers and end endpoint watchers