From 17eda5a2a1f7d89488fae0e809529ea7def96361 Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Wed, 13 May 2020 16:26:31 -0600 Subject: [PATCH 1/2] Avoid retrying the last server on reconnect Signed-off-by: Colin Sullivan --- .../io/nats/client/impl/NatsConnection.java | 20 ++++++++++++++----- .../io/nats/client/impl/ReconnectTests.java | 3 +-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index a91ef57a3..eafe93b0d 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -192,7 +192,7 @@ void connect(boolean reconnectOnConnect) throws InterruptedException, IOExceptio timeTrace(trace, "starting connect loop"); - Collection serversToTry = buildServerList(); + Collection serversToTry = buildServerList(false); for (String serverURI : serversToTry) { if (isClosed()) { break; @@ -263,7 +263,7 @@ void reconnect() throws InterruptedException { boolean doubleAuthError = false; while (!isConnected() && !isClosed() && !this.isClosing()) { - Collection serversToTry = buildServerList(); + Collection serversToTry = buildServerList(true); for (String server : serversToTry) { if (isClosed()) { @@ -1643,7 +1643,7 @@ void waitForReconnectTimeout() { this.reconnectWaiter.complete(Boolean.TRUE); } - Collection buildServerList() { + Collection buildServerList(boolean isReconnecting) { ArrayList reconnectList = new ArrayList<>(); reconnectList.addAll(getServers()); @@ -1652,8 +1652,18 @@ Collection buildServerList() { return reconnectList; } - Collections.shuffle(reconnectList); - + if (!isReconnecting) { + Collections.shuffle(reconnectList); + } else { + // Remove the current (first) server from the list, shuffle if it makes sense, + // and then add it to the end of the list. This prevents the client + // from immediately reconnecting to a server it just lost connection with. + String s = reconnectList.remove(0); + if (reconnectList.size() > 1) { + Collections.shuffle(reconnectList); + } + reconnectList.add(s); + } return reconnectList; } diff --git a/src/test/java/io/nats/client/impl/ReconnectTests.java b/src/test/java/io/nats/client/impl/ReconnectTests.java index 850f2eb93..fe39fff17 100644 --- a/src/test/java/io/nats/client/impl/ReconnectTests.java +++ b/src/test/java/io/nats/client/impl/ReconnectTests.java @@ -341,7 +341,6 @@ public void testReconnectToSecondServer() throws Exception { } } - @Test public void testNoRandomizeReconnectToSecondServer() throws Exception { NatsConnection nc = null; @@ -615,7 +614,7 @@ public void testReconnectNoIPTLSConnection() throws Exception { server(ts.getURI()). secure(). connectionListener(handler). - maxReconnects(10). // we get multiples for some, so need enough + maxReconnects(20). // we get multiples for some, so need enough reconnectWait(Duration.ofMillis(100)). connectionTimeout(Duration.ofSeconds(5)). noRandomize(). From 2393b9e28adfdf25cf80886540f01dfebc079388 Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Wed, 13 May 2020 16:49:09 -0600 Subject: [PATCH 2/2] Track current server Signed-off-by: Colin Sullivan --- .../io/nats/client/impl/NatsConnection.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index eafe93b0d..cf42d2680 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -133,6 +133,8 @@ class NatsConnection implements Connection { private ExecutorService executor; private ExecutorService connectExecutor; + private String currentServer = null; + NatsConnection(Options options) { boolean trace = options.isTraceConnection(); timeTrace(trace, "creating connection object"); @@ -192,7 +194,7 @@ void connect(boolean reconnectOnConnect) throws InterruptedException, IOExceptio timeTrace(trace, "starting connect loop"); - Collection serversToTry = buildServerList(false); + Collection serversToTry = buildServerList(); for (String serverURI : serversToTry) { if (isClosed()) { break; @@ -206,6 +208,7 @@ void connect(boolean reconnectOnConnect) throws InterruptedException, IOExceptio tryToConnect(serverURI, System.nanoTime()); if (isConnected()) { + this.currentServer = serverURI; break; } else { timeTrace(trace, "setting status to disconnected"); @@ -263,7 +266,7 @@ void reconnect() throws InterruptedException { boolean doubleAuthError = false; while (!isConnected() && !isClosed() && !this.isClosing()) { - Collection serversToTry = buildServerList(true); + Collection serversToTry = buildServerList(); for (String server : serversToTry) { if (isClosed()) { @@ -291,6 +294,7 @@ void reconnect() throws InterruptedException { break; } else if (isConnected()) { this.statistics.incrementReconnects(); + this.currentServer = server; break; } else { String err = connectError.get(); @@ -1643,7 +1647,7 @@ void waitForReconnectTimeout() { this.reconnectWaiter.complete(Boolean.TRUE); } - Collection buildServerList(boolean isReconnecting) { + Collection buildServerList() { ArrayList reconnectList = new ArrayList<>(); reconnectList.addAll(getServers()); @@ -1652,17 +1656,17 @@ Collection buildServerList(boolean isReconnecting) { return reconnectList; } - if (!isReconnecting) { + if (currentServer == null) { Collections.shuffle(reconnectList); } else { - // Remove the current (first) server from the list, shuffle if it makes sense, + // Remove the current server from the list, shuffle if it makes sense, // and then add it to the end of the list. This prevents the client // from immediately reconnecting to a server it just lost connection with. - String s = reconnectList.remove(0); + reconnectList.remove(this.currentServer); if (reconnectList.size() > 1) { Collections.shuffle(reconnectList); } - reconnectList.add(s); + reconnectList.add(this.currentServer); } return reconnectList; }