From 07d6b9df0a761c6563631ed132ac12b671cec327 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Fri, 20 Oct 2023 11:44:10 -0400 Subject: [PATCH] reconnect on connect validation and support (#1012) --- README.md | 48 +++++++--- dependencies.md | 11 +++ src/main/java/io/nats/client/Nats.java | 87 +++++++++++++++---- .../java/io/nats/client/ConnectTests.java | 73 ++++++++++++++++ .../impl/SimulateSocketDataPortException.java | 20 +++++ 5 files changed, 209 insertions(+), 30 deletions(-) create mode 100644 src/test/java/io/nats/client/impl/SimulateSocketDataPortException.java diff --git a/README.md b/README.md index fbcce685c..520ee8189 100644 --- a/README.md +++ b/README.md @@ -456,47 +456,71 @@ There are also examples in the [java-nats-examples](https://github.com/nats-io/j ### Connecting -There are four different ways to connect using the Java library: +There are five different ways to connect using the Java library, +each with a parallel method that will allow doing reconnect logic if the initial connect fails. +The ability to reconnect on the initial connection failure is _NOT_ an Options setting. -1. Connect to a local server on the default port: +1. Connect to a local server on the default url. From the Options class: `DEFAULT_URL = "nats://localhost:4222";` ```java + // default options Connection nc = Nats.connect(); + + // default options, reconnect on connect + Connection nc = Nats.connectReconnectOnConnect(); ``` -2. Connect to one or more servers using a URL: +1. Connect to one or more servers using a URL: ```java - //single URL + // single URL, all other default options Connection nc = Nats.connect("nats://myhost:4222"); - //comma-separated list of URLs + // comma-separated list of URLs, all other default options Connection nc = Nats.connect("nats://myhost:4222,nats://myhost:4223"); + + // single URL, all other default options, reconnect on connect + Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222"); + + // comma-separated list of URLs, all other default options, reconnect on connect + Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222,nats://myhost:4223"); ``` -3. Connect to one or more servers with a custom configuration: +1. Connect to one or more servers with a custom configuration: ```java Options o = new Options.Builder().server("nats://serverone:4222").server("nats://servertwo:4222").maxReconnects(-1).build(); + + // custom options Connection nc = Nats.connect(o); - ``` - See the javadoc for a complete list of configuration options. + // custom options, reconnect on connect + Connection nc = Nats.connectReconnectOnConnect(o); + ``` -4. Connect asynchronously, this requires a callback to tell the application when the client is connected: +1. Connect asynchronously, this requires a callback to tell the application when the client is connected: ```java Options options = new Options.Builder().server(Options.DEFAULT_URL).connectionListener(handler).build(); Nats.connectAsynchronously(options, true); ``` - This feature is experimental, please let us know if you like it. - -5. Connect with authentication handler: +1. Connect with authentication handler: ```java AuthHandler authHandler = Nats.credentials(System.getenv("NATS_CREDS")); + + // single URL, all other default options Connection nc = Nats.connect("nats://myhost:4222", authHandler); + + // comma-separated list of URLs, all other default options + Connection nc = Nats.connect("nats://myhost:4222,nats://myhost:4223", authHandler); + + // single URL, all other default options, reconnect on connect + Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222", authHandler); + + // comma-separated list of URLs, all other default options, reconnect on connect + Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222,nats://myhost:4223", authHandler); ``` ### Publishing diff --git a/dependencies.md b/dependencies.md index bb5ac6441..e92fb22b8 100644 --- a/dependencies.md +++ b/dependencies.md @@ -2,6 +2,13 @@ This file lists the dependencies used in this repository. +#### Tool Dependencies + +| Dependency | License | +|------------------|--------------------------------------------| +| Java JDK 8 Adopt | GNU General Public License (GPL) Version 2 | +| Gradle 6.8.3 | Apache 2.0 License | + #### Runtime Dependencies | Dependency | License | @@ -51,3 +58,7 @@ This file lists the dependencies used in this repository. [BSD Licenses](https://en.wikipedia.org/wiki/BSD_licenses) [MIT License](https://en.wikipedia.org/wiki/MIT_License) + +[GNU General Public License, version 2, with the Classpath Exception](https://openjdk.org/legal/gplv2+ce.html) + +[Adoption Adopt JDK](https://adoptium.net/about/) diff --git a/src/main/java/io/nats/client/Nats.java b/src/main/java/io/nats/client/Nats.java index 50ba237e3..236999873 100644 --- a/src/main/java/io/nats/client/Nats.java +++ b/src/main/java/io/nats/client/Nats.java @@ -104,7 +104,7 @@ public abstract class Nats { } /** - * Connect to the default URL, {@link Options#DEFAULT_URL Options.DEFAULT_URL}, with all of the + * Connect to the default URL, {@link Options#DEFAULT_URL Options.DEFAULT_URL}, with all the * default options. * *

This is a synchronous call, and the connection should be ready for use on return @@ -114,10 +114,10 @@ public abstract class Nats { *

If the connection fails, an IOException is thrown * *

See {@link Nats#connect(Options) connect(Options)} for more information on exceptions. - * + * + * @return the connection * @throws IOException if a networking issue occurs * @throws InterruptedException if the current thread is interrupted - * @return the connection */ public static Connection connect() throws IOException, InterruptedException { Options options = new Options.Builder().server(Options.DEFAULT_URL).build(); @@ -125,27 +125,40 @@ public static Connection connect() throws IOException, InterruptedException { } /** + * Connect to the default URL, {@link Options#DEFAULT_URL Options.DEFAULT_URL}, with all the + * default options, allowing re-connect attempts if the initial connection fails + * @return the connection + * @throws IOException if an unrecoverable networking issue occurs + * @throws InterruptedException if the current thread is interrupted + */ + public static Connection connectReconnectOnConnect() throws IOException, InterruptedException { + Options options = new Options.Builder().server(Options.DEFAULT_URL).build(); + return createConnection(options, true); + } + + /** + * Connect to specific url, with all the default options. * The Java client generally expects URLs of the form {@code nats://hostname:port} - * - *

but also allows urls with a user password {@code nats://user:pass@hostname:port}. - * - *

or token in them {@code nats://token@hostname:port}. - * + * + *

but also allows urls with a user password {@code nats://user:pass@hostname:port}.

+ * + *

or token in them {@code nats://token@hostname:port}.

+ * *

Moreover, you can initiate a TLS connection, by using the `tls` * schema, which will use the default SSLContext, or fail if one is not set. For * testing and development, the `opentls` schema is support when the server is * in non-verify mode. In this case, the client will accept any server - * certificate and will not provide one of its own. - * + * certificate and will not provide one of its own.

+ * *

This is a synchronous call, and the connection should be ready for use on return * there are network timing issues that could result in a successful connect call but - * the connection is invalid soon after return, where soon is in the network/thread world. - * - *

If the connection fails, an IOException is thrown + * the connection is invalid soon after return, where soon is in the network/thread world.

* - *

See {@link Nats#connect(Options) connect(Options)} for more information on exceptions. + *

If the connection fails, an IOException is thrown

* - * @param url the url of the server, ie. nats://localhost:4222 + *

See {@link Nats#connect(Options) connect(Options)} for more information on exceptions.

+ * + * @param url comma separated list of the URLs of the server, i.e. nats://localhost:4222,nats://localhost:4223 * @throws IOException if a networking issue occurs * @throws InterruptedException if the current thread is interrupted * @return the connection @@ -156,7 +169,20 @@ public static Connection connect(String url) throws IOException, InterruptedExce } /** - * Connect to the specified URL with the specified username, password and default options. + * Connect to specific url, with all the default options, + * allowing re-connect attempts if the initial connection fails + * @param url comma separated list of the URLs of the server, i.e. nats://localhost:4222,nats://localhost:4223 + * @return the connection + * @throws IOException if an unrecoverable networking issue occurs + * @throws InterruptedException if the current thread is interrupted + */ + public static Connection connectReconnectOnConnect(String url) throws IOException, InterruptedException { + Options options = new Options.Builder().server(url).build(); + return createConnection(options, true); + } + + /** + * Connect to the specified URL with the specified auth handler. * *

This is a synchronous call, and the connection should be ready for use on return * there are network timing issues that could result in a successful connect call but @@ -166,7 +192,7 @@ public static Connection connect(String url) throws IOException, InterruptedExce * *

See {@link Nats#connect(Options) connect(Options)} for more information on exceptions. * - * @param url comma separated list of the URLs of the server, ie. nats://localhost:4222,nats://localhost:4223 + * @param url comma separated list of the URLs of the server, i.e. nats://localhost:4222,nats://localhost:4223 * @param handler the authentication handler implementation * @return the connection * @throws IOException if a networking issue occurs @@ -177,6 +203,20 @@ public static Connection connect(String url, AuthHandler handler) throws IOExcep return createConnection(options, false); } + /** + * Connect to the specified URL with the specified auth handler, + * allowing re-connect attempts if the initial connection fails + * @param url comma separated list of the URLs of the server, i.e. nats://localhost:4222,nats://localhost:4223 + * @param handler the authentication handler implementation + * @return the connection + * @throws IOException if an unrecoverable networking issue occurs + * @throws InterruptedException if the current thread is interrupted + */ + public static Connection connectReconnectOnConnect(String url, AuthHandler handler) throws IOException, InterruptedException { + Options options = new Options.Builder().server(url).authHandler(handler).build(); + return createConnection(options, true); + } + /** * Options can be used to set the server URL, or multiple URLS, callback * handlers for various errors, and connection events. @@ -202,14 +242,25 @@ public static Connection connect(String url, AuthHandler handler) throws IOExcep * again in the future. * * @param options the options object to use to create the connection + * @return the connection * @throws IOException if a networking issue occurs * @throws InterruptedException if the current thread is interrupted - * @return the connection */ public static Connection connect(Options options) throws IOException, InterruptedException { return createConnection(options, false); } + /** + * Connect, allowing re-connect attempts if the initial connection fails + * @param options the options object to use to create the connection + * @return the connection + * @throws IOException if an unrecoverable networking issue occurs + * @throws InterruptedException if the current thread is interrupted + */ + public static Connection connectReconnectOnConnect(Options options) throws IOException, InterruptedException { + return createConnection(options, true); + } + /** * Try to connect in another thread, a connection listener is required to get * the connection. diff --git a/src/test/java/io/nats/client/ConnectTests.java b/src/test/java/io/nats/client/ConnectTests.java index 8e44890b1..dfae6bf08 100644 --- a/src/test/java/io/nats/client/ConnectTests.java +++ b/src/test/java/io/nats/client/ConnectTests.java @@ -15,6 +15,7 @@ import io.nats.client.ConnectionListener.Events; import io.nats.client.NatsServerProtocolMock.ExitAt; +import io.nats.client.impl.SimulateSocketDataPortException; import io.nats.client.impl.TestHandler; import org.junit.jupiter.api.Test; @@ -24,6 +25,7 @@ import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static io.nats.client.utils.TestBase.*; import static org.junit.jupiter.api.Assertions.*; @@ -428,4 +430,75 @@ public void run() { standardCloseConnection(nc); } } + + @SuppressWarnings({"unused", "UnusedAssignment", "resource"}) + @Test + public void testSocketLevelException() throws Exception { + int port = NatsTestServer.nextPort(); + + AtomicBoolean simExReceived = new AtomicBoolean(); + TestHandler th = new TestHandler(); + ErrorListener el = new ErrorListener() { + @Override + public void exceptionOccurred(Connection conn, Exception exp) { + if (exp.getMessage().contains("Simulated Exception")) { + simExReceived.set(true); + } + } + }; + + Options options = new Options.Builder() + .server(NatsTestServer.getNatsLocalhostUri(port)) + .dataPortType("io.nats.client.impl.SimulateSocketDataPortException") + .connectionListener(th) + .errorListener(el) + .reconnectDelayHandler(l -> Duration.ofSeconds(1)) + .build(); + + Connection connection = null; + + // 1. DO NOT RECONNECT ON CONNECT + try (NatsTestServer ts = new NatsTestServer(port, false)) { + try { + SimulateSocketDataPortException.THROW_ON_CONNECT.set(true); + connection = Nats.connect(options); + fail(); + } + catch (Exception ignore) {} + } + + Thread.sleep(200); // just making sure messages get through + assertNull(connection); + assertTrue(simExReceived.get()); + simExReceived.set(false); + + // 2. RECONNECT ON CONNECT + try (NatsTestServer ts = new NatsTestServer(port, false)) { + try { + SimulateSocketDataPortException.THROW_ON_CONNECT.set(true); + th.prepForStatusChange(Events.RECONNECTED); + connection = Nats.connectReconnectOnConnect(options); + assertTrue(th.waitForStatusChange(5, TimeUnit.SECONDS)); + th.prepForStatusChange(Events.DISCONNECTED); + } + catch (Exception e) { + fail("should have connected " + e); + } + } + assertTrue(th.waitForStatusChange(5, TimeUnit.SECONDS)); + assertTrue(simExReceived.get()); + simExReceived.set(false); + + // 2. NORMAL RECONNECT + th.prepForStatusChange(Events.RECONNECTED); + try (NatsTestServer ts = new NatsTestServer(port, false)) { + SimulateSocketDataPortException.THROW_ON_CONNECT.set(true); + try { + assertTrue(th.waitForStatusChange(5, TimeUnit.SECONDS)); + } + catch (Exception e) { + fail("should have reconnected " + e); + } + } + } } \ No newline at end of file diff --git a/src/test/java/io/nats/client/impl/SimulateSocketDataPortException.java b/src/test/java/io/nats/client/impl/SimulateSocketDataPortException.java new file mode 100644 index 000000000..990b41e65 --- /dev/null +++ b/src/test/java/io/nats/client/impl/SimulateSocketDataPortException.java @@ -0,0 +1,20 @@ +package io.nats.client.impl; + +import io.nats.client.support.NatsUri; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SimulateSocketDataPortException extends SocketDataPort { + public static AtomicBoolean THROW_ON_CONNECT = new AtomicBoolean(); + + @Override + public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws IOException { + if (THROW_ON_CONNECT.get()) { + SimulateSocketDataPortException.THROW_ON_CONNECT.set(false); + throw new ConnectException("Simulated Exception"); + } + super.connect(conn, nuri, timeoutNanos); + } +}