From d1a9be01bd7311d4d833df87c40bb1ce4966676b Mon Sep 17 00:00:00 2001 From: James Xin Date: Wed, 18 Dec 2024 06:29:54 -0800 Subject: [PATCH] Java: Allow cluster wide scan with not fully covered cluster Signed-off-by: James Xin --- .../api/models/commands/scan/ScanOptions.java | 9 ++ .../java/glide/managers/CommandManager.java | 4 + .../src/test/java/glide/TestUtilities.java | 39 ++++++++ .../test/java/glide/cluster/CommandTests.java | 96 +++++++++++++++++++ 4 files changed, 148 insertions(+) diff --git a/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java b/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java index 6fffc46f35..0bdddd5ed4 100644 --- a/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java @@ -60,6 +60,8 @@ public String[] toArgs() { return super.toArgs(); } + private final Boolean allowNonCoveredSlots; + /** * @return the pattern used for the MATCH filter. */ @@ -86,4 +88,11 @@ public Long getCount() { public ObjectType getType() { return type; } + + /** + * @return whether allow non covered slots. + */ + public Boolean getAllowNonCoveredSlots() { + return allowNonCoveredSlots; + } } diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index d069c6bd72..47b0de7d75 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -428,6 +428,10 @@ protected CommandRequest.Builder prepareCursorRequest( clusterScanBuilder.setObjectType(options.getType().getNativeName()); } + if (options.getAllowNonCoveredSlots() != null) { + clusterScanBuilder.setAllowNonCoveredSlots(options.getAllowNonCoveredSlots()); + } + return CommandRequest.newBuilder().setClusterScan(clusterScanBuilder.build()); } diff --git a/java/integTest/src/test/java/glide/TestUtilities.java b/java/integTest/src/test/java/glide/TestUtilities.java index ea03632c39..bcf984b1fe 100644 --- a/java/integTest/src/test/java/glide/TestUtilities.java +++ b/java/integTest/src/test/java/glide/TestUtilities.java @@ -452,4 +452,43 @@ public static String getServerVersion(@NonNull final BaseClient client) { } return null; } + + /** + * Helper function to get a number of nodes, and ask the cluster till we get the number of nodes + * + * @param clusterClient Glide cluster client to be used for executing custom command + * @param count number of nodes expected + * @return true if we get the number of expected nodes + */ + @SneakyThrows + public static boolean waitForClusterReady(GlideClusterClient clusterClient, int count) { + long timeout = 20000; // 20 seconds + long startTime = System.currentTimeMillis(); + + while (true) { + if (System.currentTimeMillis() - startTime > timeout) { + return false; + } + ClusterValue clusterInfo = + clusterClient.customCommand(new String[] {"CLUSTER", "INFO"}).get(); + if (clusterInfo != null && clusterInfo.hasSingleData()) { + String[] clusterInfoLines = ((String) clusterInfo.getSingleValue()).split("\n"); + Map clusterInfoMap = + Arrays.stream(clusterInfoLines) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .map(s -> s.split(":", 2)) + .collect(Collectors.toMap(parts -> parts[0].trim(), parts -> parts[1].trim())); + if ("ok".equals(clusterInfoMap.get("cluster_state")) + && Integer.parseInt(clusterInfoMap.getOrDefault("cluster_known_nodes", "0")) == count) { + break; + } + } + Thread.sleep(2000); + } + // we need to make sure that the inner core refresh slots so we make sure we accumulate 60 + // seconds + Thread.sleep(60000 - (System.currentTimeMillis() - startTime)); + return true; + } } diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java index 5e96ecb1d6..fb1fb11dd6 100644 --- a/java/integTest/src/test/java/glide/cluster/CommandTests.java +++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java @@ -1,6 +1,7 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.cluster; +import static glide.TestConfiguration.CLUSTER_HOSTS; import static glide.TestConfiguration.SERVER_VERSION; import static glide.TestUtilities.assertDeepEquals; import static glide.TestUtilities.checkFunctionListResponse; @@ -15,6 +16,7 @@ import static glide.TestUtilities.getFirstEntryFromMultiValue; import static glide.TestUtilities.getValueFromInfo; import static glide.TestUtilities.parseInfoResponseToMap; +import static glide.TestUtilities.waitForClusterReady; import static glide.TestUtilities.waitForNotBusy; import static glide.api.BaseClient.OK; import static glide.api.models.GlideString.gs; @@ -86,6 +88,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -3015,6 +3018,99 @@ public void test_cluster_scan_all_stream() { assertEquals(streamData.keySet(), results); } + @Test + @SneakyThrows + @Timeout(100) + public void test_cluster_scan_non_covered_slots() { + assertEquals(OK, clusterClient.flushall().get()); + String key = UUID.randomUUID().toString(); + for (int i = 0; i < 1000; i++) { + String result = clusterClient.set(key + ":" + i, "value").get(); + assertEquals(OK, result); + } + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Object[] response = clusterClient.scan(cursor).get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + assertFalse(cursor.isFinished()); + clusterClient.configSet(Map.of("cluster-require-full-coverage", "no")); + // forget one server + String addressToForget = CLUSTER_HOSTS[0]; + String[] splitAddressToForget = addressToForget.split(":"); + String[] allOtherAddresses = Arrays.copyOfRange(CLUSTER_HOSTS, 1, CLUSTER_HOSTS.length); + var idToForget = + clusterClient + .customCommand( + new String[] {"CLUSTER", "MYID"}, + new ByAddressRoute( + splitAddressToForget[0], Integer.parseInt(splitAddressToForget[1]))) + .get() + .getSingleValue(); + for (String otherAddress : allOtherAddresses) { + String[] splitOtherAddress = otherAddress.split(":"); + clusterClient.customCommand( + new String[] {"CLUSTER", "FORGET", (String) idToForget}, + new ByAddressRoute(splitOtherAddress[0], Integer.parseInt(splitOtherAddress[1]))); + } + // now we let it few seconds gossip to get the new cluster configuration + assertTrue(waitForClusterReady(clusterClient, allOtherAddresses.length)); + // Iterate scan to get missing slots error + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> { + ClusterScanCursor cursor2 = ClusterScanCursor.initalCursor(); + while (!cursor2.isFinished()) { + Object[] scanResponse = clusterClient.scan(cursor2).get(); + cursor2.releaseCursorHandle(); + cursor2 = (ClusterScanCursor) scanResponse[0]; + } + }); + assertTrue( + executionException + .getMessage() + .contains("Could not find an address covering a slot, SCAN operation cannot continue")); + + // Scan with allow_non_covered_slots=true + ClusterScanCursor cursor3 = ClusterScanCursor.initalCursor(); + while (!cursor3.isFinished()) { + Object[] nonCoverScanResult = + clusterClient + .scan(cursor3, ScanOptions.builder().allowNonCoveredSlots(true).build()) + .get(); + cursor3.releaseCursorHandle(); + cursor3 = (ClusterScanCursor) nonCoverScanResult[0]; + } + assertTrue(cursor3.isFinished()); + // Get keys using 'KEYS *' from the remaining nodes + List keys = new ArrayList<>(); + for (String address : allOtherAddresses) { + String[] splitAddress = address.split(":"); + ClusterValue keysResult = + clusterClient + .customCommand( + new String[] {"KEYS", "*"}, + new ByAddressRoute(splitAddress[0], Integer.parseInt(splitAddress[1]))) + .get(); + assertNotNull(keysResult); + assertTrue(keysResult.hasSingleData()); + keys.addAll(List.of((Object[]) keysResult.getSingleValue())); + } + + ClusterScanCursor cursor4 = ClusterScanCursor.initalCursor(); + List results = new ArrayList<>(); + while (!cursor4.isFinished()) { + Object[] result = + clusterClient + .scan(cursor4, ScanOptions.builder().allowNonCoveredSlots(true).build()) + .get(); + results.addAll(List.of((Object[]) result[1])); + cursor4.releaseCursorHandle(); + cursor4 = (ClusterScanCursor) result[0]; + } + assertEquals(new HashSet<>(keys), new HashSet<>(results)); + } + @SneakyThrows @Test public void invokeScript_test() {