diff --git a/src/main/java/io/nats/client/impl/MessageQueue.java b/src/main/java/io/nats/client/impl/MessageQueue.java index 225d70ce4..06691f960 100644 --- a/src/main/java/io/nats/client/impl/MessageQueue.java +++ b/src/main/java/io/nats/client/impl/MessageQueue.java @@ -259,7 +259,7 @@ NatsMessage pop(Duration timeout) throws InterruptedException { // Only works in single reader mode, because we want to maintain order. // accumulate reads off the concurrent queue one at a time, so if multiple // readers are present, you could get out of order message delivery. - NatsMessage accumulate(long maxSize, long maxMessagesToAccumulate, Duration timeout) + NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout) throws InterruptedException { if (!this.singleReaderMode) { @@ -278,7 +278,7 @@ NatsMessage accumulate(long maxSize, long maxMessagesToAccumulate, Duration time long size = msg.getSizeInBytes(); - if (maxMessagesToAccumulate <= 1 || size >= maxSize) { + if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate) { this.sizeInBytes.addAndGet(-size); this.length.decrementAndGet(); return msg; @@ -291,7 +291,7 @@ NatsMessage accumulate(long maxSize, long maxMessagesToAccumulate, Duration time NatsMessage next = this.queue.peek(); if (next != null && !isPoison(next)) { long s = next.getSizeInBytes(); - if (maxSize < 0 || (size + s) < maxSize) { // keep going + if (maxBytesToAccumulate < 0 || (size + s) < maxBytesToAccumulate) { // keep going size += s; count++; diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index 2a3934356..554fb6730 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -186,21 +186,20 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st @Override public void run() { - Duration waitForMessage = Duration.ofMinutes(2); // This can be long since no one is sending - Duration reconnectWait = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through + Duration outgoingTimeout = Duration.ofMinutes(2); // This can be long since no one is sending + Duration reconnectTimeout = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through try { dataPort = this.dataPortFuture.get(); // Will wait for the future to complete StatisticsCollector stats = this.connection.getNatsStatistics(); - int maxAccumulate = Options.MAX_MESSAGES_IN_NETWORK_BUFFER; while (this.running.get()) { NatsMessage msg; if (this.reconnectMode.get()) { - msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), maxAccumulate, reconnectWait); + msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout); } else { - msg = this.outgoing.accumulate(sendBufferLength.get(), maxAccumulate, waitForMessage); + msg = this.outgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, outgoingTimeout); } if (msg != null) { sendMessageBatch(msg, dataPort, stats);