diff --git a/CHANGELOG.md b/CHANGELOG.md index f1866ea07a352..9fd5efdc986d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391) - Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707)) +- Sliced search only fans out to shards matched by the selected slice, reducing open search contexts ([#16771](https://github.com/opensearch-project/OpenSearch/pull/16771)) - Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909)) - Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881)) - Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search_shards.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search_shards.json index 74b7055b4c4b0..9d3d420e8945c 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search_shards.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search_shards.json @@ -62,6 +62,9 @@ "default":"open", "description":"Whether to expand wildcard expression to concrete indices that are open, closed or both." } + }, + "body":{ + "description":"The search source (in order to specify slice parameters)" } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search_shards/20_slice.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search_shards/20_slice.yml new file mode 100644 index 0000000000000..bf1a5429213df --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search_shards/20_slice.yml @@ -0,0 +1,88 @@ +--- +"Search shards with slice specified in body": + - skip: + version: " - 2.99.99" + reason: "Added slice body to search_shards in 2.19" + - do: + indices.create: + index: test_index + body: + settings: + index: + number_of_shards: 7 + number_of_replicas: 0 + + - do: + search_shards: + index: test_index + body: + slice: + id: 0 + max: 3 + - length: { shards: 3 } + - match: { shards.0.0.index: "test_index" } + - match: { shards.0.0.shard: 0 } + - match: { shards.1.0.shard: 3 } + - match: { shards.2.0.shard: 6 } + + - do: + search_shards: + index: test_index + body: + slice: + id: 1 + max: 3 + - length: { shards: 2 } + - match: { shards.0.0.index: "test_index" } + - match: { shards.0.0.shard: 1 } + - match: { shards.1.0.shard: 4 } + + - do: + search_shards: + index: test_index + body: + slice: + id: 2 + max: 3 + - length: { shards: 2 } + - match: { shards.0.0.index: "test_index" } + - match: { shards.0.0.shard: 2 } + - match: { shards.1.0.shard: 5 } + + + - do: + search_shards: + index: test_index + preference: "_shards:0,2,4,6" + body: + slice: + id: 0 + max: 3 + - length: { shards: 2 } + - match: { shards.0.0.index: "test_index" } + - match: { shards.0.0.shard: 0 } + - match: { shards.1.0.shard: 6 } + + - do: + search_shards: + index: test_index + preference: "_shards:0,2,4,6" + body: + slice: + id: 1 + max: 3 + - length: { shards: 1 } + - match: { shards.0.0.index: "test_index" } + - match: { shards.0.0.shard: 2 } + + - do: + search_shards: + index: test_index + preference: "_shards:0,2,4,6" + body: + slice: + id: 2 + max: 3 + - length: { shards: 1 } + - match: { shards.0.0.index: "test_index" } + - match: { shards.0.0.shard: 4 } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java index 62e05ebb37e28..d4bf0efbd3eb5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.cluster.shards; +import org.opensearch.Version; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.IndicesRequest; import org.opensearch.action.support.IndicesOptions; @@ -41,6 +42,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.search.slice.SliceBuilder; import java.io.IOException; import java.util.Objects; @@ -61,6 +63,8 @@ public class ClusterSearchShardsRequest extends ClusterManagerNodeReadRequest nodeIds = new HashSet<>(); GroupShardsIterator groupShardsIterator = clusterService.operationRouting() - .searchShards(clusterState, concreteIndices, routingMap, request.preference()); + .searchShards(clusterState, concreteIndices, routingMap, request.preference(), null, null, request.slice()); ShardRouting shard; ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()]; int currentGroup = 0; diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 10bf4975311d6..52937182e6a63 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -247,8 +247,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener< throw blockException; } - shardsIt = clusterService.operationRouting() - .searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null); + shardsIt = clusterService.operationRouting().searchShards(clusterService.state(), new String[] { request.index() }, null, null); } public void start() { diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 8c4927afa9a14..dfec2e1fda738 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -85,6 +85,7 @@ import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; +import org.opensearch.search.slice.SliceBuilder; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskResourceTrackingService; @@ -551,6 +552,7 @@ private ActionListener buildRewriteListener( ); } else { AtomicInteger skippedClusters = new AtomicInteger(0); + SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice(); collectSearchShards( searchRequest.indicesOptions(), searchRequest.preference(), @@ -559,6 +561,7 @@ private ActionListener buildRewriteListener( remoteClusterIndices, remoteClusterService, threadPool, + slice, ActionListener.wrap(searchShardsResponses -> { final BiFunction clusterNodeLookup = getRemoteClusterNodeLookup( searchShardsResponses @@ -787,6 +790,7 @@ static void collectSearchShards( Map remoteIndicesByCluster, RemoteClusterService remoteClusterService, ThreadPool threadPool, + SliceBuilder slice, ActionListener> listener ) { final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); @@ -800,7 +804,8 @@ static void collectSearchShards( ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices).indicesOptions(indicesOptions) .local(true) .preference(preference) - .routing(routing); + .routing(routing) + .slice(slice); clusterClient.admin() .cluster() .searchShards( @@ -1042,6 +1047,7 @@ private void executeSearch( concreteLocalIndices[i] = indices[i].getName(); } Map nodeSearchCounts = searchTransportService.getPendingSearchRequests(); + SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice(); GroupShardsIterator localShardRoutings = clusterService.operationRouting() .searchShards( clusterState, @@ -1049,7 +1055,8 @@ private void executeSearch( routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), - nodeSearchCounts + nodeSearchCounts, + slice ); localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false) .map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index fe9e00b250e70..eac6f41acde4c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.routing; +import org.apache.lucene.util.CollectionUtil; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; @@ -44,14 +45,17 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.Strings; +import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.search.slice.SliceBuilder; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -230,7 +234,7 @@ public GroupShardsIterator searchShards( @Nullable Map> routing, @Nullable String preference ) { - return searchShards(clusterState, concreteIndices, routing, preference, null, null); + return searchShards(clusterState, concreteIndices, routing, preference, null, null, null); } public GroupShardsIterator searchShards( @@ -239,11 +243,14 @@ public GroupShardsIterator searchShards( @Nullable Map> routing, @Nullable String preference, @Nullable ResponseCollectorService collectorService, - @Nullable Map nodeCounts + @Nullable Map nodeCounts, + @Nullable SliceBuilder slice ) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); - final Set set = new HashSet<>(shards.size()); + + Map> shardIterators = new HashMap<>(); for (IndexShardRoutingTable shard : shards) { + IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName()); if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) { preference = Preference.PRIMARY.type(); @@ -274,10 +281,31 @@ public GroupShardsIterator searchShards( clusterState.metadata().weightedRoutingMetadata() ); if (iterator != null) { - set.add(iterator); + shardIterators.computeIfAbsent(iterator.shardId().getIndex(), k -> new ArrayList<>()).add(iterator); + } + } + List allShardIterators = new ArrayList<>(); + if (slice != null) { + for (List indexIterators : shardIterators.values()) { + // Filter the returned shards for the given slice + CollectionUtil.timSort(indexIterators); + // We use the ordinal of the iterator in the group (after sorting) rather than the shard id, because + // computeTargetedShards may return a subset of shards for an index, if a routing parameter was + // specified. In that case, the set of routable shards is considered the full universe of available + // shards for each index, when mapping shards to slices. If no routing parameter was specified, + // then ordinals and shard IDs are the same. This mimics the logic in + // org.opensearch.search.slice.SliceBuilder.toFilter. + for (int i = 0; i < indexIterators.size(); i++) { + if (slice.shardMatches(i, indexIterators.size())) { + allShardIterators.add(indexIterators.get(i)); + } + } } + } else { + shardIterators.values().forEach(allShardIterators::addAll); } - return GroupShardsIterator.sortAndCreate(new ArrayList<>(set)); + + return GroupShardsIterator.sortAndCreate(allShardIterators); } public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) { @@ -311,6 +339,7 @@ private Set computeTargetedShards( set.add(indexShard); } } + } return set; } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java index 3555576433104..304d1cabefd35 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java @@ -40,6 +40,7 @@ import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.List; @@ -81,6 +82,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC clusterSearchShardsRequest.routing(request.param("routing")); clusterSearchShardsRequest.preference(request.param("preference")); clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions())); + if (request.hasContentOrSourceParam()) { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.parseXContent(request.contentOrSourceParamParser()); + if (sourceBuilder.slice() != null) { + clusterSearchShardsRequest.slice(sourceBuilder.slice()); + } + } return channel -> client.admin().cluster().searchShards(clusterSearchShardsRequest, new RestToXContentListener<>(channel)); } } diff --git a/server/src/main/java/org/opensearch/search/slice/SliceBuilder.java b/server/src/main/java/org/opensearch/search/slice/SliceBuilder.java index c9b8a896ed525..691b829578e1f 100644 --- a/server/src/main/java/org/opensearch/search/slice/SliceBuilder.java +++ b/server/src/main/java/org/opensearch/search/slice/SliceBuilder.java @@ -214,6 +214,15 @@ public int hashCode() { return Objects.hash(this.field, this.id, this.max); } + public boolean shardMatches(int shardOrdinal, int numShards) { + if (max >= numShards) { + // Slices are distributed over shards + return id % numShards == shardOrdinal; + } + // Shards are distributed over slices + return shardOrdinal % max == id; + } + /** * Converts this QueryBuilder to a lucene {@link Query}. * @@ -225,7 +234,7 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, throw new IllegalArgumentException("field " + field + " not found"); } - int shardId = request.shardId().id(); + int shardOrdinal = request.shardId().id(); int numShards = context.getIndexSettings().getNumberOfShards(); if ((request.preference() != null || request.indexRoutings().length > 0)) { GroupShardsIterator group = buildShardIterator(clusterService, request); @@ -241,21 +250,26 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, */ numShards = group.size(); int ord = 0; - shardId = -1; + shardOrdinal = -1; // remap the original shard id with its index (position) in the sorted shard iterator. for (ShardIterator it : group) { assert it.shardId().getIndex().equals(request.shardId().getIndex()); if (request.shardId().equals(it.shardId())) { - shardId = ord; + shardOrdinal = ord; break; } ++ord; } - assert shardId != -1 : "shard id: " + request.shardId().getId() + " not found in index shard routing"; + assert shardOrdinal != -1 : "shard id: " + request.shardId().getId() + " not found in index shard routing"; } } - String field = this.field; + if (shardMatches(shardOrdinal, numShards) == false) { + // We should have already excluded this shard before routing to it. + // If we somehow land here, then we match nothing. + return new MatchNoDocsQuery("this shard is not part of the slice"); + } + boolean useTermQuery = false; if ("_uid".equals(field)) { throw new IllegalArgumentException("Computing slices on the [_uid] field is illegal for 7.x indices, use [_id] instead"); @@ -277,12 +291,7 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, // the number of slices is greater than the number of shards // in such case we can reduce the number of requested shards by slice - // first we check if the slice is responsible of this shard int targetShard = id % numShards; - if (targetShard != shardId) { - // the shard is not part of this slice, we can skip it. - return new MatchNoDocsQuery("this shard is not part of the slice"); - } // compute the number of slices where this shard appears int numSlicesInShard = max / numShards; int rest = max % numShards; @@ -301,14 +310,8 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, ? new TermsSliceQuery(field, shardSlice, numSlicesInShard) : new DocValuesSliceQuery(field, shardSlice, numSlicesInShard); } - // the number of shards is greater than the number of slices + // the number of shards is greater than the number of slices. If we target this shard, we target all of it. - // check if the shard is assigned to the slice - int targetSlice = shardId % max; - if (id != targetSlice) { - // the shard is not part of this slice, we can skip it. - return new MatchNoDocsQuery("this shard is not part of the slice"); - } return new MatchAllDocsQuery(); } @@ -321,6 +324,8 @@ private GroupShardsIterator buildShardIterator(ClusterService clu Map> routingMap = request.indexRoutings().length > 0 ? Collections.singletonMap(indices[0], Sets.newHashSet(request.indexRoutings())) : null; + // Note that we do *not* want to filter this set of shard IDs based on the slice, since we want the + // full set of shards matched by the routing parameters. return clusterService.operationRouting().searchShards(state, indices, routingMap, request.preference()); } diff --git a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java index 84955d01a59ce..0a0015ae8cbf6 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java @@ -809,6 +809,7 @@ public void testCollectSearchShards() throws Exception { remoteIndicesByCluster, remoteClusterService, threadPool, + null, new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch) ); awaitLatch(latch, 5, TimeUnit.SECONDS); @@ -835,6 +836,7 @@ public void testCollectSearchShards() throws Exception { remoteIndicesByCluster, remoteClusterService, threadPool, + null, new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch) ); awaitLatch(latch, 5, TimeUnit.SECONDS); @@ -880,6 +882,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteIndicesByCluster, remoteClusterService, threadPool, + null, new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch) ); awaitLatch(latch, 5, TimeUnit.SECONDS); @@ -907,6 +910,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteIndicesByCluster, remoteClusterService, threadPool, + null, new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch) ); awaitLatch(latch, 5, TimeUnit.SECONDS); @@ -949,6 +953,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteIndicesByCluster, remoteClusterService, threadPool, + null, new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch) ); awaitLatch(latch, 5, TimeUnit.SECONDS); diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index aaeeb52ab5709..4263e1aa347dc 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -604,7 +604,8 @@ public void testAdaptiveReplicaSelection() throws Exception { null, null, collector, - outstandingRequests + outstandingRequests, + null ); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); @@ -616,7 +617,7 @@ public void testAdaptiveReplicaSelection() throws Exception { searchedShards.add(firstChoice); selectedNodes.add(firstChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); @@ -624,7 +625,7 @@ public void testAdaptiveReplicaSelection() throws Exception { searchedShards.add(secondChoice); selectedNodes.add(secondChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); @@ -643,26 +644,26 @@ public void testAdaptiveReplicaSelection() throws Exception { outstandingRequests.put("node_1", 1L); outstandingRequests.put("node_2", 1L); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); // node 1 should be the lowest ranked node to start assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // node 1 starts getting more loaded... collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); shardChoice = groupIterator.get(0).nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // and more loaded... collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(200).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); shardChoice = groupIterator.get(0).nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // and even more collector.addNodeStatistics("node_1", 4, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); shardChoice = groupIterator.get(0).nextOrNull(); // finally, node 2 is chosen instead assertThat(shardChoice.currentNodeId(), equalTo("node_2")); @@ -709,7 +710,8 @@ public void testAdaptiveReplicaSelectionWithZoneAwarenessIgnored() throws Except null, null, collector, - outstandingRequests + outstandingRequests, + null ); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); @@ -722,7 +724,7 @@ public void testAdaptiveReplicaSelectionWithZoneAwarenessIgnored() throws Except searchedShards.add(firstChoice); selectedNodes.add(firstChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); assertThat(groupIterator.get(0).size(), equalTo(numReplicas + 1)); @@ -745,18 +747,18 @@ public void testAdaptiveReplicaSelectionWithZoneAwarenessIgnored() throws Except outstandingRequests.put("node_a1", 1L); outstandingRequests.put("node_b2", 1L); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); // node_a0 or node_a1 should be the lowest ranked node to start groupIterator.forEach(shardRoutings -> assertThat(shardRoutings.nextOrNull().currentNodeId(), containsString("node_a"))); // Adding more load to node_a0 collector.addNodeStatistics("node_a0", 10, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); // Adding more load to node_a0 and node_a1 from zone-a collector.addNodeStatistics("node_a1", 100, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos()); collector.addNodeStatistics("node_a0", 100, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); // ARS should pick node_b2 from zone-b since both node_a0 and node_a1 are overloaded groupIterator.forEach(shardRoutings -> assertThat(shardRoutings.nextOrNull().currentNodeId(), containsString("node_b"))); @@ -842,8 +844,8 @@ public void testWeightedOperationRouting() throws Exception { null, null, collector, - outstandingRequests - + outstandingRequests, + null ); for (ShardIterator it : groupIterator) { @@ -871,7 +873,7 @@ public void testWeightedOperationRouting() throws Exception { opRouting = new OperationRouting(setting, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); // search shards call - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); for (ShardIterator it : groupIterator) { List shardRoutings = Collections.singletonList(it.nextOrNull()); @@ -935,8 +937,8 @@ public void testWeightedOperationRoutingWeightUndefinedForOneZone() throws Excep null, null, collector, - outstandingRequests - + outstandingRequests, + null ); for (ShardIterator it : groupIterator) { @@ -969,7 +971,7 @@ public void testWeightedOperationRoutingWeightUndefinedForOneZone() throws Excep opRouting = new OperationRouting(setting, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); // search shards call - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, null); for (ShardIterator it : groupIterator) { while (it.remaining() > 0) {