Skip to content

Commit

Permalink
Added test covering caching behaviors for EDS.
Browse files Browse the repository at this point in the history
  • Loading branch information
voidzcy committed Nov 19, 2019
1 parent 055920f commit 6e8159e
Showing 1 changed file with 127 additions and 2 deletions.
129 changes: 127 additions & 2 deletions xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,133 @@ public void receivedEndpointUpdateNotifiedToWatcherImmediately() {
verifyNoMoreInteractions(requestObserver);
}

/**
* Client receives EDS responses containing ClusterLoadAssignments for resources that were
* not requested (management server sends them proactively). Later client receives a CDS
* response with the requested Cluster referencing one of the previously received
* ClusterLoadAssignments. No EDS request needs to be sent for that
* ClusterLoadAssignment as it can be found in local cache (management server will not send
* EDS responses for that ClusterLoadAssignment again). A future EDS response update for
* that ClusterLoadAssignment should be notified to corresponding endpoint watchers.
*
* <p>Tests for caching EDS response data behavior.
*/
@Test
public void receiveEdsResponsesForClusterLoadAssignmentToBeUsedLater() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();

// Client sends first EDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));

// Management server sends back an EDS response containing ClusterLoadAssignments for
// some cluster not requested.
List<Any> clusterLoadAssignments = ImmutableList.of(
Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com",
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)),
1, 0)),
ImmutableList.<Policy.DropOverload>of())),
Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com",
ImmutableList.of(
buildLocalityLbEndpoints("region2", "zone2", "subzone2",
ImmutableList.of(
buildLbEndpoint("192.168.234.52", 8888, HealthStatus.UNKNOWN, 5)),
6, 1)),
ImmutableList.<ClusterLoadAssignment.Policy.DropOverload>of())));

DiscoveryResponse response =
buildDiscoveryResponse("0", clusterLoadAssignments,
XdsClientImpl.ADS_TYPE_URL_EDS, "0000");
responseObserver.onNext(response);

// Client sent an ACK EDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));

ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate1.getDropPolicies()).isEmpty();
assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
ImmutableList.of(
new LbEndpoint("192.168.0.1", 8080,
2, true)), 1, 0));

// Start watching endpoints in cluster that server had already sent to us.
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher2);

// Cached result is notified to watcher immediately.
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture());
EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(endpointUpdate2.getDropPolicies()).isEmpty();
assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region2", "zone2", "subzone2"),
new LocalityLbEndpoints(
ImmutableList.of(
new LbEndpoint("192.168.234.52", 8888,
5, true)), 6, 1));

// Client sent an EDS request to subscribe to resources newly being watched.
verify(requestObserver)
.onNext(
argThat(
new DiscoveryRequestMatcher("0",
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));

// Management server sends back an EDS response updating endpoints in a previously known
// cluster.
clusterLoadAssignments = ImmutableList.of(
Any.pack(buildClusterLoadAssignment("cluster-bar.googleapis.com",
ImmutableList.of(
buildLocalityLbEndpoints("region3", "zone3", "subzone3",
ImmutableList.of(
buildLbEndpoint("192.168.41.54", 443, HealthStatus.UNKNOWN, 4)),
2, 0)),
ImmutableList.<ClusterLoadAssignment.Policy.DropOverload>of())));

response = buildDiscoveryResponse("1", clusterLoadAssignments,
XdsClientImpl.ADS_TYPE_URL_EDS, "0001");
responseObserver.onNext(response);

// Client sent an ACK EDS request.
verify(requestObserver)
.onNext(
argThat(
new DiscoveryRequestMatcher("1",
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_EDS, "0001")));

// Updated result is notified to the corresponding watcher.
verify(watcher2, times(2)).onEndpointChanged(endpointUpdateCaptor2.capture());
endpointUpdate2 = endpointUpdateCaptor2.getValue();
assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(endpointUpdate2.getDropPolicies()).isEmpty();
assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region3", "zone3", "subzone3"),
new LocalityLbEndpoints(
ImmutableList.of(
new LbEndpoint("192.168.41.54", 443,
4, true)), 2, 0));
}

@Test
public void addRemoveEndpointWatchersFreely() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
Expand Down Expand Up @@ -1569,8 +1696,6 @@ public void addRemoveEndpointWatchersFreely() {
XdsClientImpl.ADS_TYPE_URL_EDS, "0002")));
}

// TODO(chengyuanzhang): tests for caching EDS responses proactively sent by management server.

// TODO(chengyuanzhang): tests for LDS/RDS/CDS/EDS sharing the same RPC stream.

// TODO(chengyuanzhang): incorporate interactions with cluster watchers and end endpoint watchers
Expand Down

0 comments on commit 6e8159e

Please sign in to comment.