Skip to content

Commit

Permalink
Merge branch 'main' into put-locks-back
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Aug 20, 2024
2 parents 7fb3345 + 8806fbf commit 5201214
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ boolean push(NatsMessage msg, boolean internal) {
this.length.incrementAndGet();
return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
} finally {
editLock.unlock();
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ public void run() {
this.closeSocket(false, true);
} catch (InterruptedException e) {
processException(e);
Thread.currentThread().interrupt();
}
} finally {
statusLock.lock();
Expand Down Expand Up @@ -1516,7 +1517,11 @@ public Duration RTT() throws IOException {
catch (ExecutionException e) {
throw new IOException(e.getCause());
}
catch (InterruptedException | TimeoutException e) {
catch (TimeoutException e) {
throw new IOException(e);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
Expand Down Expand Up @@ -2211,8 +2216,11 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio

this.close(false, false); // close the connection after the last flush
tracker.complete(consumers.isEmpty());
} catch (TimeoutException | InterruptedException e) {
} catch (TimeoutException e) {
this.processException(e);
} catch (InterruptedException e) {
this.processException(e);
Thread.currentThread().interrupt();
} finally {
try {
this.close(false, false);// close the connection after the last flush
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/nats/client/impl/NatsConnectionReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ else if (this.mode == Mode.GATHER_HEADERS) {
if (running.get()) {
this.connection.handleCommunicationIssue(io);
}
} catch (CancellationException | ExecutionException | InterruptedException ex) {
} catch (CancellationException | ExecutionException ex) {
// Exit
} catch (InterruptedException ex) {
// Exit
Thread.currentThread().interrupt();
} finally {
this.running.set(false);
// Clear the buffers, since they are only used inside this try/catch
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/nats/client/impl/NatsConnectionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,11 @@ public void run() {
if (running.get()) {
this.connection.handleCommunicationIssue(io);
}
} catch (CancellationException | ExecutionException | InterruptedException ex) {
} catch (CancellationException | ExecutionException ex) {
// Exit
} catch (InterruptedException ex) {
// Exit
Thread.currentThread().interrupt();
} finally {
this.running.set(false);
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/impl/NatsConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedExce
this.cleanUpAfterDrain();
} catch (InterruptedException e) {
this.connection.processException(e);
Thread.currentThread().interrupt();
} finally {
tracker.complete(this.isDrained());
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/impl/NatsDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void run() {
if (this.running.get()){
this.connection.processException(exp);
} //otherwise we did it
Thread.currentThread().interrupt();
}
finally {
this.running.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void run() {
if (this.running.get()){
this.connection.processException(exp);
} //otherwise we did it
Thread.currentThread().interrupt();
}
finally {
this.running.set(false);
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeo
try {
return responseRequired(conn.request(prependPrefix(subject), bytes, timeout));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
Expand All @@ -240,6 +241,7 @@ Message makeInternalRequestResponseRequired(String subject, Headers headers, byt
try {
return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
Expand Down

0 comments on commit 5201214

Please sign in to comment.