Skip to content

Commit

Permalink
Track current server
Browse files Browse the repository at this point in the history
Signed-off-by: Colin Sullivan <[email protected]>
  • Loading branch information
ColinSullivan1 committed May 13, 2020
1 parent 17eda5a commit 2393b9e
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class NatsConnection implements Connection {
private ExecutorService executor;
private ExecutorService connectExecutor;

private String currentServer = null;

NatsConnection(Options options) {
boolean trace = options.isTraceConnection();
timeTrace(trace, "creating connection object");
Expand Down Expand Up @@ -192,7 +194,7 @@ void connect(boolean reconnectOnConnect) throws InterruptedException, IOExceptio

timeTrace(trace, "starting connect loop");

Collection<String> serversToTry = buildServerList(false);
Collection<String> serversToTry = buildServerList();
for (String serverURI : serversToTry) {
if (isClosed()) {
break;
Expand All @@ -206,6 +208,7 @@ void connect(boolean reconnectOnConnect) throws InterruptedException, IOExceptio
tryToConnect(serverURI, System.nanoTime());

if (isConnected()) {
this.currentServer = serverURI;
break;
} else {
timeTrace(trace, "setting status to disconnected");
Expand Down Expand Up @@ -263,7 +266,7 @@ void reconnect() throws InterruptedException {
boolean doubleAuthError = false;

while (!isConnected() && !isClosed() && !this.isClosing()) {
Collection<String> serversToTry = buildServerList(true);
Collection<String> serversToTry = buildServerList();

for (String server : serversToTry) {
if (isClosed()) {
Expand Down Expand Up @@ -291,6 +294,7 @@ void reconnect() throws InterruptedException {
break;
} else if (isConnected()) {
this.statistics.incrementReconnects();
this.currentServer = server;
break;
} else {
String err = connectError.get();
Expand Down Expand Up @@ -1643,7 +1647,7 @@ void waitForReconnectTimeout() {
this.reconnectWaiter.complete(Boolean.TRUE);
}

Collection<String> buildServerList(boolean isReconnecting) {
Collection<String> buildServerList() {
ArrayList<String> reconnectList = new ArrayList<>();

reconnectList.addAll(getServers());
Expand All @@ -1652,17 +1656,17 @@ Collection<String> buildServerList(boolean isReconnecting) {
return reconnectList;
}

if (!isReconnecting) {
if (currentServer == null) {
Collections.shuffle(reconnectList);
} else {
// Remove the current (first) server from the list, shuffle if it makes sense,
// Remove the current server from the list, shuffle if it makes sense,
// and then add it to the end of the list. This prevents the client
// from immediately reconnecting to a server it just lost connection with.
String s = reconnectList.remove(0);
reconnectList.remove(this.currentServer);
if (reconnectList.size() > 1) {
Collections.shuffle(reconnectList);
}
reconnectList.add(s);
reconnectList.add(this.currentServer);
}
return reconnectList;
}
Expand Down

0 comments on commit 2393b9e

Please sign in to comment.