Skip to content

Commit

Permalink
Improve socket close behavior (#1155)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jun 6, 2024
1 parent 8a1111c commit 76fac05
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 35 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
id 'signing'
}

def jarVersion = "2.18.2"
def jarVersion = "2.19.0"

def isRelease = System.getenv("BUILD_EVENT") == "release"
def brn = System.getenv("BRANCH_REF_NAME")
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ public class Options {
* {@link Builder#socketWriteTimeout(long) socketWriteTimeout}.
*/
public static final String PROP_SOCKET_WRITE_TIMEOUT = PFX + "socket.write.timeout";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#socketSoLinger(int) socketSoLinger}.
*/
public static final String PROP_SOCKET_SO_LINGER = PFX + "socket.so.linger";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#reconnectBufferSize(long) reconnectBufferSize}.
Expand Down Expand Up @@ -589,6 +594,7 @@ public class Options {
private final Duration reconnectJitterTls;
private final Duration connectionTimeout;
private final Duration socketWriteTimeout;
private final int socketSoLinger;
private final Duration pingInterval;
private final Duration requestCleanupInterval;
private final int maxPingsOut;
Expand Down Expand Up @@ -700,6 +706,7 @@ public static class Builder {
private Duration reconnectJitterTls = DEFAULT_RECONNECT_JITTER_TLS;
private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
private Duration socketWriteTimeout = DEFAULT_SOCKET_WRITE_TIMEOUT;
private int socketSoLinger = -1;
private Duration pingInterval = DEFAULT_PING_INTERVAL;
private Duration requestCleanupInterval = DEFAULT_REQUEST_CLEANUP_INTERVAL;
private int maxPingsOut = DEFAULT_MAX_PINGS_OUT;
Expand Down Expand Up @@ -833,6 +840,7 @@ public Builder properties(Properties props) {
longProperty(props, PROP_RECONNECT_BUF_SIZE, DEFAULT_RECONNECT_BUF_SIZE, l -> this.reconnectBufferSize = l);
durationProperty(props, PROP_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, d -> this.connectionTimeout = d);
durationProperty(props, PROP_SOCKET_WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT, d -> this.socketWriteTimeout = d);
intProperty(props, PROP_SOCKET_SO_LINGER, -1, i -> socketSoLinger = i);

intGtEqZeroProperty(props, PROP_MAX_CONTROL_LINE, DEFAULT_MAX_CONTROL_LINE, i -> this.maxControlLine = i);
durationProperty(props, PROP_PING_INTERVAL, DEFAULT_PING_INTERVAL, d -> this.pingInterval = d);
Expand Down Expand Up @@ -1285,6 +1293,19 @@ public Builder socketWriteTimeout(Duration socketWriteTimeout) {
return this;
}

/**
* Set the value of the socket SO LINGER property in seconds.
* This feature is built in to library data port implementations.
* A value greater than or equal to 0 will call
* socket.setSoLinger with true and the timeout value
* @param socketSoLinger the number of seconds to linger
* @return the Builder for chaining
*/
public Builder socketSoLinger(int socketSoLinger) {
this.socketSoLinger = socketSoLinger;
return this;
}

/**
* Set the interval between attempts to pings the server. These pings are automated,
* and capped by {@link #maxPingsOut(int) maxPingsOut()}. As of 2.4.4 the library
Expand Down Expand Up @@ -1757,6 +1778,10 @@ else if (useDefaultTls) {
}
}

if (socketSoLinger < 0) {
socketSoLinger = -1;
}

if (errorListener == null) {
errorListener = new ErrorListenerLoggerImpl();
}
Expand Down Expand Up @@ -1803,6 +1828,7 @@ public Builder(Options o) {
this.reconnectJitterTls = o.reconnectJitterTls;
this.connectionTimeout = o.connectionTimeout;
this.socketWriteTimeout = o.socketWriteTimeout;
this.socketSoLinger = o.socketSoLinger;
this.pingInterval = o.pingInterval;
this.requestCleanupInterval = o.requestCleanupInterval;
this.maxPingsOut = o.maxPingsOut;
Expand Down Expand Up @@ -1864,6 +1890,7 @@ private Options(Builder b) {
this.reconnectJitterTls = b.reconnectJitterTls;
this.connectionTimeout = b.connectionTimeout;
this.socketWriteTimeout = b.socketWriteTimeout;
this.socketSoLinger = b.socketSoLinger;
this.pingInterval = b.pingInterval;
this.requestCleanupInterval = b.requestCleanupInterval;
this.maxPingsOut = b.maxPingsOut;
Expand Down Expand Up @@ -2186,6 +2213,13 @@ public Duration getSocketWriteTimeout() {
return socketWriteTimeout;
}

/**
* @return the socket so linger number of seconds, see {@link Builder#socketSoLinger(int) socketSoLinger()} in the builder doc
*/
public int getSocketSoLinger() {
return socketSoLinger;
}

/**
* @return the pingInterval, see {@link Builder#pingInterval(Duration) pingInterval()} in the builder doc
*/
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/nats/client/impl/DataPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,9 @@ default void afterConstruct(Options options) {}

void close() throws IOException;

default void forceClose() throws IOException {
close();
}

void flush() throws IOException;
}
57 changes: 33 additions & 24 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ public void forceReconnect() throws IOException, InterruptedException {
}

void forceReconnectImpl() throws IOException, InterruptedException {
NatsConnectionWriter oldWriter = writer;

closeSocketLock.lock();
try {
updateStatus(Status.DISCONNECTED);
Expand All @@ -295,19 +293,21 @@ void forceReconnectImpl() throws IOException, InterruptedException {
dataPortFuture.cancel(true);
dataPortFuture = null;
}

// close the data port as a task so as not to block reconnect
if (dataPort != null) {
try {
dataPort.close();
}
catch (IOException ignore) {
}
finally {
dataPort = null;
}
final DataPort closeMe = dataPort;
dataPort = null;
executor.submit(() -> {
try {
closeMe.forceClose();
}
catch (IOException ignore) {}
});
}

// stop i/o
reader.stop();
reader.stop(false);
writer.stop();

// new reader/writer
Expand Down Expand Up @@ -628,7 +628,9 @@ public void run() {
} catch (Exception exp) {
processException(exp);
try {
this.closeSocket(false);
// allow force reconnect since this is pretty exceptional,
// a connection failure while trying to connect
this.closeSocket(false, true);
} catch (InterruptedException e) {
processException(e);
}
Expand Down Expand Up @@ -691,7 +693,9 @@ void handleCommunicationIssue(Exception io) {
// waiting on read/write threads
executor.submit(() -> {
try {
this.closeSocket(true);
// any issue that brings us here is pretty serious
// so we are comfortable forcing the close
this.closeSocket(true, true);
} catch (InterruptedException e) {
processException(e);
Thread.currentThread().interrupt();
Expand All @@ -701,7 +705,7 @@ void handleCommunicationIssue(Exception io) {

// Close socket is called when another connect attempt is possible
// Close is called when the connection should shut down, period
void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
// Ensure we close the socket exclusively within one thread.
closeSocketLock.lock();
try {
Expand All @@ -720,7 +724,7 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
statusLock.unlock();
}

closeSocketImpl();
closeSocketImpl(forceClose);

statusLock.lock();
try {
Expand Down Expand Up @@ -749,10 +753,10 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
*/
@Override
public void close() throws InterruptedException {
this.close(true);
this.close(true, false);
}

void close(boolean checkDrainStatus) throws InterruptedException {
void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
statusLock.lock();
try {
if (checkDrainStatus && this.isDraining()) {
Expand All @@ -779,7 +783,7 @@ void close(boolean checkDrainStatus) throws InterruptedException {
this.reconnectWaiter.cancel(true);
}

closeSocketImpl();
closeSocketImpl(forceClose);

this.dispatchers.forEach((nuid, d) -> d.stop(false));

Expand Down Expand Up @@ -831,7 +835,7 @@ void close(boolean checkDrainStatus) throws InterruptedException {
}

// Should only be called from closeSocket or close
void closeSocketImpl() {
void closeSocketImpl(boolean forceClose) {
this.currentServer = null;

// Signal both to stop.
Expand All @@ -854,8 +858,13 @@ void closeSocketImpl() {

// Close the current socket and cancel anyone waiting for it
try {
if (this.dataPort != null) {
this.dataPort.close();
if (dataPort != null) {
if (forceClose) {
dataPort.forceClose();
}
else {
dataPort.close();
}
}

} catch (IOException ex) {
Expand Down Expand Up @@ -2121,7 +2130,7 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
try {
this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
} catch (Exception e) {
this.close(false);
this.close(false, false);
throw e;
}

Expand Down Expand Up @@ -2163,13 +2172,13 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
}
}

this.close(false); // close the connection after the last flush
this.close(false, false); // close the connection after the last flush
tracker.complete(consumers.isEmpty());
} catch (TimeoutException | InterruptedException e) {
this.processException(e);
} finally {
try {
this.close(false);// close the connection after the last flush
this.close(false, false);// close the connection after the last flush
} catch (InterruptedException e) {
processException(e);
Thread.currentThread().interrupt();
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/nats/client/impl/NatsConnectionReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,17 @@ void start(Future<DataPort> dataPortFuture) {
this.stopped = connection.getExecutor().submit(this, Boolean.TRUE);
}

Future<Boolean> stop() {
return stop(true);
}

// May be called several times on an error.
// Returns a future that is completed when the thread completes, not when this
// method does.
Future<Boolean> stop() {
Future<Boolean> stop(boolean shutdownDataPort) {
if (running.get()) {
running.set(false);
if (dataPort != null) {
if (shutdownDataPort && dataPort != null) {
try {
dataPort.shutdownInput();
}
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/io/nats/client/impl/SocketDataPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
Expand All @@ -42,10 +43,16 @@ public class SocketDataPort implements DataPort {
protected int port;
protected Socket socket;
protected boolean isSecure = false;
protected int soLinger;

protected InputStream in;
protected OutputStream out;

@Override
public void afterConstruct(Options options) {
soLinger = options.getSocketSoLinger();
}

@Override
public void connect(String serverURI, NatsConnection conn, long timeoutNanos) throws IOException {
try {
Expand Down Expand Up @@ -75,6 +82,9 @@ public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws
socket.setReceiveBufferSize(2 * 1024 * 1024);
socket.setSendBufferSize(2 * 1024 * 1024);
socket.connect(new InetSocketAddress(host, port), (int) timeout);
if (soLinger > -1) {
socket.setSoLinger(true, soLinger);
}

if (isWebsocketScheme(nuri.getScheme())) {
if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) {
Expand Down Expand Up @@ -154,6 +164,18 @@ public void close() throws IOException {
socket.close();
}

@Override
public void forceClose() throws IOException {
try {
// If we are being asked to force close, there is no need to linger.
socket.setSoLinger(true, 0);
}
catch (SocketException e) {
// don't want to fail if I couldn't set linger
}
close();
}

public void flush() throws IOException {
out.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ public void run() {
writeWatcherTimer.cancel(); // we don't need to repeat this
connection.executeCallback((c, el) -> el.socketWriteTimeout(c));
try {
out.close();
connection.forceReconnect();
}
catch (IOException e) {
// retry maybe? forceReconnect
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (IOException ignore) {}
connection.getExecutor().submit(() -> {
try {
connection.forceReconnect();
}
catch (IOException | InterruptedException ignore) {}
});
}
}
}

@Override
public void afterConstruct(Options options) {
super.afterConstruct(options);
long writeTimeoutMillis;
if (options.getSocketWriteTimeout() == null) {
writeTimeoutMillis = Options.DEFAULT_SOCKET_WRITE_TIMEOUT.toMillis();
Expand Down

0 comments on commit 76fac05

Please sign in to comment.