From 76fac05eb49c2bb31a7373d1256915ec211f7220 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Thu, 6 Jun 2024 11:36:08 -0400 Subject: [PATCH] Improve socket close behavior (#1155) --- build.gradle | 2 +- src/main/java/io/nats/client/Options.java | 34 +++++++++++ .../java/io/nats/client/impl/DataPort.java | 4 ++ .../io/nats/client/impl/NatsConnection.java | 57 +++++++++++-------- .../client/impl/NatsConnectionReader.java | 8 ++- .../io/nats/client/impl/SocketDataPort.java | 22 +++++++ .../impl/SocketDataPortWithWriteTimeout.java | 16 +++--- 7 files changed, 108 insertions(+), 35 deletions(-) diff --git a/build.gradle b/build.gradle index 744ed69da..a951474f4 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ plugins { id 'signing' } -def jarVersion = "2.18.2" +def jarVersion = "2.19.0" def isRelease = System.getenv("BUILD_EVENT") == "release" def brn = System.getenv("BRANCH_REF_NAME") diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 3fe9c5d49..6c77ff126 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -276,6 +276,11 @@ public class Options { * {@link Builder#socketWriteTimeout(long) socketWriteTimeout}. */ public static final String PROP_SOCKET_WRITE_TIMEOUT = PFX + "socket.write.timeout"; + /** + * Property used to configure a builder from a Properties object. {@value}, see + * {@link Builder#socketSoLinger(int) socketSoLinger}. + */ + public static final String PROP_SOCKET_SO_LINGER = PFX + "socket.so.linger"; /** * Property used to configure a builder from a Properties object. {@value}, see * {@link Builder#reconnectBufferSize(long) reconnectBufferSize}. @@ -589,6 +594,7 @@ public class Options { private final Duration reconnectJitterTls; private final Duration connectionTimeout; private final Duration socketWriteTimeout; + private final int socketSoLinger; private final Duration pingInterval; private final Duration requestCleanupInterval; private final int maxPingsOut; @@ -700,6 +706,7 @@ public static class Builder { private Duration reconnectJitterTls = DEFAULT_RECONNECT_JITTER_TLS; private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; private Duration socketWriteTimeout = DEFAULT_SOCKET_WRITE_TIMEOUT; + private int socketSoLinger = -1; private Duration pingInterval = DEFAULT_PING_INTERVAL; private Duration requestCleanupInterval = DEFAULT_REQUEST_CLEANUP_INTERVAL; private int maxPingsOut = DEFAULT_MAX_PINGS_OUT; @@ -833,6 +840,7 @@ public Builder properties(Properties props) { longProperty(props, PROP_RECONNECT_BUF_SIZE, DEFAULT_RECONNECT_BUF_SIZE, l -> this.reconnectBufferSize = l); durationProperty(props, PROP_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, d -> this.connectionTimeout = d); durationProperty(props, PROP_SOCKET_WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT, d -> this.socketWriteTimeout = d); + intProperty(props, PROP_SOCKET_SO_LINGER, -1, i -> socketSoLinger = i); intGtEqZeroProperty(props, PROP_MAX_CONTROL_LINE, DEFAULT_MAX_CONTROL_LINE, i -> this.maxControlLine = i); durationProperty(props, PROP_PING_INTERVAL, DEFAULT_PING_INTERVAL, d -> this.pingInterval = d); @@ -1285,6 +1293,19 @@ public Builder socketWriteTimeout(Duration socketWriteTimeout) { return this; } + /** + * Set the value of the socket SO LINGER property in seconds. + * This feature is built in to library data port implementations. + * A value greater than or equal to 0 will call + * socket.setSoLinger with true and the timeout value + * @param socketSoLinger the number of seconds to linger + * @return the Builder for chaining + */ + public Builder socketSoLinger(int socketSoLinger) { + this.socketSoLinger = socketSoLinger; + return this; + } + /** * Set the interval between attempts to pings the server. These pings are automated, * and capped by {@link #maxPingsOut(int) maxPingsOut()}. As of 2.4.4 the library @@ -1757,6 +1778,10 @@ else if (useDefaultTls) { } } + if (socketSoLinger < 0) { + socketSoLinger = -1; + } + if (errorListener == null) { errorListener = new ErrorListenerLoggerImpl(); } @@ -1803,6 +1828,7 @@ public Builder(Options o) { this.reconnectJitterTls = o.reconnectJitterTls; this.connectionTimeout = o.connectionTimeout; this.socketWriteTimeout = o.socketWriteTimeout; + this.socketSoLinger = o.socketSoLinger; this.pingInterval = o.pingInterval; this.requestCleanupInterval = o.requestCleanupInterval; this.maxPingsOut = o.maxPingsOut; @@ -1864,6 +1890,7 @@ private Options(Builder b) { this.reconnectJitterTls = b.reconnectJitterTls; this.connectionTimeout = b.connectionTimeout; this.socketWriteTimeout = b.socketWriteTimeout; + this.socketSoLinger = b.socketSoLinger; this.pingInterval = b.pingInterval; this.requestCleanupInterval = b.requestCleanupInterval; this.maxPingsOut = b.maxPingsOut; @@ -2186,6 +2213,13 @@ public Duration getSocketWriteTimeout() { return socketWriteTimeout; } + /** + * @return the socket so linger number of seconds, see {@link Builder#socketSoLinger(int) socketSoLinger()} in the builder doc + */ + public int getSocketSoLinger() { + return socketSoLinger; + } + /** * @return the pingInterval, see {@link Builder#pingInterval(Duration) pingInterval()} in the builder doc */ diff --git a/src/main/java/io/nats/client/impl/DataPort.java b/src/main/java/io/nats/client/impl/DataPort.java index d3af3e9a2..908cfd225 100644 --- a/src/main/java/io/nats/client/impl/DataPort.java +++ b/src/main/java/io/nats/client/impl/DataPort.java @@ -52,5 +52,9 @@ default void afterConstruct(Options options) {} void close() throws IOException; + default void forceClose() throws IOException { + close(); + } + void flush() throws IOException; } \ No newline at end of file diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 18d0f677e..c0abdb4f6 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -284,8 +284,6 @@ public void forceReconnect() throws IOException, InterruptedException { } void forceReconnectImpl() throws IOException, InterruptedException { - NatsConnectionWriter oldWriter = writer; - closeSocketLock.lock(); try { updateStatus(Status.DISCONNECTED); @@ -295,19 +293,21 @@ void forceReconnectImpl() throws IOException, InterruptedException { dataPortFuture.cancel(true); dataPortFuture = null; } + + // close the data port as a task so as not to block reconnect if (dataPort != null) { - try { - dataPort.close(); - } - catch (IOException ignore) { - } - finally { - dataPort = null; - } + final DataPort closeMe = dataPort; + dataPort = null; + executor.submit(() -> { + try { + closeMe.forceClose(); + } + catch (IOException ignore) {} + }); } // stop i/o - reader.stop(); + reader.stop(false); writer.stop(); // new reader/writer @@ -628,7 +628,9 @@ public void run() { } catch (Exception exp) { processException(exp); try { - this.closeSocket(false); + // allow force reconnect since this is pretty exceptional, + // a connection failure while trying to connect + this.closeSocket(false, true); } catch (InterruptedException e) { processException(e); } @@ -691,7 +693,9 @@ void handleCommunicationIssue(Exception io) { // waiting on read/write threads executor.submit(() -> { try { - this.closeSocket(true); + // any issue that brings us here is pretty serious + // so we are comfortable forcing the close + this.closeSocket(true, true); } catch (InterruptedException e) { processException(e); Thread.currentThread().interrupt(); @@ -701,7 +705,7 @@ void handleCommunicationIssue(Exception io) { // Close socket is called when another connect attempt is possible // Close is called when the connection should shut down, period - void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException { + void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException { // Ensure we close the socket exclusively within one thread. closeSocketLock.lock(); try { @@ -720,7 +724,7 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException { statusLock.unlock(); } - closeSocketImpl(); + closeSocketImpl(forceClose); statusLock.lock(); try { @@ -749,10 +753,10 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException { */ @Override public void close() throws InterruptedException { - this.close(true); + this.close(true, false); } - void close(boolean checkDrainStatus) throws InterruptedException { + void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException { statusLock.lock(); try { if (checkDrainStatus && this.isDraining()) { @@ -779,7 +783,7 @@ void close(boolean checkDrainStatus) throws InterruptedException { this.reconnectWaiter.cancel(true); } - closeSocketImpl(); + closeSocketImpl(forceClose); this.dispatchers.forEach((nuid, d) -> d.stop(false)); @@ -831,7 +835,7 @@ void close(boolean checkDrainStatus) throws InterruptedException { } // Should only be called from closeSocket or close - void closeSocketImpl() { + void closeSocketImpl(boolean forceClose) { this.currentServer = null; // Signal both to stop. @@ -854,8 +858,13 @@ void closeSocketImpl() { // Close the current socket and cancel anyone waiting for it try { - if (this.dataPort != null) { - this.dataPort.close(); + if (dataPort != null) { + if (forceClose) { + dataPort.forceClose(); + } + else { + dataPort.close(); + } } } catch (IOException ex) { @@ -2121,7 +2130,7 @@ public CompletableFuture drain(Duration timeout) throws TimeoutExceptio try { this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know } catch (Exception e) { - this.close(false); + this.close(false, false); throw e; } @@ -2163,13 +2172,13 @@ public CompletableFuture drain(Duration timeout) throws TimeoutExceptio } } - this.close(false); // close the connection after the last flush + this.close(false, false); // close the connection after the last flush tracker.complete(consumers.isEmpty()); } catch (TimeoutException | InterruptedException e) { this.processException(e); } finally { try { - this.close(false);// close the connection after the last flush + this.close(false, false);// close the connection after the last flush } catch (InterruptedException e) { processException(e); Thread.currentThread().interrupt(); diff --git a/src/main/java/io/nats/client/impl/NatsConnectionReader.java b/src/main/java/io/nats/client/impl/NatsConnectionReader.java index ba7ad60db..53f22bf76 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionReader.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionReader.java @@ -94,13 +94,17 @@ void start(Future dataPortFuture) { this.stopped = connection.getExecutor().submit(this, Boolean.TRUE); } + Future stop() { + return stop(true); + } + // May be called several times on an error. // Returns a future that is completed when the thread completes, not when this // method does. - Future stop() { + Future stop(boolean shutdownDataPort) { if (running.get()) { running.set(false); - if (dataPort != null) { + if (shutdownDataPort && dataPort != null) { try { dataPort.shutdownInput(); } diff --git a/src/main/java/io/nats/client/impl/SocketDataPort.java b/src/main/java/io/nats/client/impl/SocketDataPort.java index 7c1fc6cc9..f6d472a14 100644 --- a/src/main/java/io/nats/client/impl/SocketDataPort.java +++ b/src/main/java/io/nats/client/impl/SocketDataPort.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketException; import java.net.URISyntaxException; import java.time.Duration; import java.util.concurrent.CompletableFuture; @@ -42,10 +43,16 @@ public class SocketDataPort implements DataPort { protected int port; protected Socket socket; protected boolean isSecure = false; + protected int soLinger; protected InputStream in; protected OutputStream out; + @Override + public void afterConstruct(Options options) { + soLinger = options.getSocketSoLinger(); + } + @Override public void connect(String serverURI, NatsConnection conn, long timeoutNanos) throws IOException { try { @@ -75,6 +82,9 @@ public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws socket.setReceiveBufferSize(2 * 1024 * 1024); socket.setSendBufferSize(2 * 1024 * 1024); socket.connect(new InetSocketAddress(host, port), (int) timeout); + if (soLinger > -1) { + socket.setSoLinger(true, soLinger); + } if (isWebsocketScheme(nuri.getScheme())) { if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) { @@ -154,6 +164,18 @@ public void close() throws IOException { socket.close(); } + @Override + public void forceClose() throws IOException { + try { + // If we are being asked to force close, there is no need to linger. + socket.setSoLinger(true, 0); + } + catch (SocketException e) { + // don't want to fail if I couldn't set linger + } + close(); + } + public void flush() throws IOException { out.flush(); } diff --git a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java index 855123af9..c7ae9b492 100644 --- a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java +++ b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java @@ -39,21 +39,21 @@ public void run() { writeWatcherTimer.cancel(); // we don't need to repeat this connection.executeCallback((c, el) -> el.socketWriteTimeout(c)); try { - out.close(); + connection.forceReconnect(); + } + catch (IOException e) { + // retry maybe? forceReconnect + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - catch (IOException ignore) {} - connection.getExecutor().submit(() -> { - try { - connection.forceReconnect(); - } - catch (IOException | InterruptedException ignore) {} - }); } } } @Override public void afterConstruct(Options options) { + super.afterConstruct(options); long writeTimeoutMillis; if (options.getSocketWriteTimeout() == null) { writeTimeoutMillis = Options.DEFAULT_SOCKET_WRITE_TIMEOUT.toMillis();