diff --git a/README.md b/README.md index a49c3b0b5..3a8b41eb4 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,11 @@ is an example of one api being added to the Connection interface. It should have Going forward, when a release contains only bug fixes, it's appropriate to simply bump the patch. But if an api is added, even one, then the minor version will be bumped. +##### Force Reconnect + +There is a new `Connection` interface api: `void forceReconnect() throws IOException, InterruptedException;` +If you call this, your connection will be immediately close and the reconnect logic will be executed. + #### Version 2.17.4 Core Improvements This release was full of core improvements which improve use of more asynchronous behaviors including diff --git a/src/main/java/io/nats/client/impl/MessageQueue.java b/src/main/java/io/nats/client/impl/MessageQueue.java index a3c6c5e78..b7920eb38 100644 --- a/src/main/java/io/nats/client/impl/MessageQueue.java +++ b/src/main/java/io/nats/client/impl/MessageQueue.java @@ -37,7 +37,7 @@ class MessageQueue { protected final boolean singleReaderMode; protected final LinkedBlockingQueue queue; protected final Lock editLock; - protected final int publishHighwaterMark; + protected final int maxMessagesInOutgoingQueue; protected final boolean discardWhenFull; protected final long offerLockMillis; protected final long offerTimeoutMillis; @@ -62,17 +62,17 @@ class MessageQueue { * If set to a number of messages, the publish command will block, which provides * backpressure on a publisher if the writer is slow to push things onto the network. Publishers use the value of Options.getMaxMessagesInOutgoingQueue(). * @param singleReaderMode allows the use of "accumulate" - * @param publishHighwaterMark sets a limit on the size of the underlying queue + * @param maxMessagesInOutgoingQueue sets a limit on the size of the underlying queue * @param discardWhenFull allows to discard messages when the underlying queue is full * @param requestCleanupInterval is used to figure the offerTimeoutMillis */ - MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval) { - this(singleReaderMode, publishHighwaterMark, discardWhenFull, requestCleanupInterval, null); + MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval) { + this(singleReaderMode, maxMessagesInOutgoingQueue, discardWhenFull, requestCleanupInterval, null); } - MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) { - this.publishHighwaterMark = publishHighwaterMark; - this.queue = publishHighwaterMark > 0 ? new LinkedBlockingQueue<>(publishHighwaterMark) : new LinkedBlockingQueue<>(); + MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) { + this.maxMessagesInOutgoingQueue = maxMessagesInOutgoingQueue; + this.queue = maxMessagesInOutgoingQueue > 0 ? new LinkedBlockingQueue<>(maxMessagesInOutgoingQueue) : new LinkedBlockingQueue<>(); this.discardWhenFull = discardWhenFull; this.running = new AtomicInteger(RUNNING); this.sizeInBytes = new AtomicLong(0); @@ -87,7 +87,7 @@ class MessageQueue { this.singleReaderMode = singleReaderMode; this.requestCleanupInterval = requestCleanupInterval; - + if (source != null) { source.drainTo(this); } @@ -141,9 +141,26 @@ boolean push(NatsMessage msg) { boolean push(NatsMessage msg, boolean internal) { long start = System.currentTimeMillis(); try { - // try to get the lock, but don't wait forever - // assuming that if we are waiting for the lock - // another push likely has the lock and + /* + This was essentially a Head-Of-Line blocking problem. + + So the crux of the problem was that many threads were waiting to push a message to the queue. + They all waited for the lock and once they had the lock they waited 5 seconds (4750 millis actually) + only to find out the queue was full. They released the lock, so then another thread acquired the lock, + and waited 5 seconds. So instead of being parallel, all these threads had to wait in line + 200 * 4750 = 15.8 minutes + + So what I did was try to acquire the lock but only wait 5 seconds. + If I could not acquire the lock, then I assumed that this means that we are in this exact situation, + another thread can't add b/c the queue is full, and so there is no point in even trying, so just throw the queue full exception. + + If I did acquire the lock, I deducted the time spent waiting for the lock from the time allowed to try to add. + I took the max of that or 100 millis to try to add to the queue. + This ensures that the max total time each thread can take is 5100 millis in parallel. + + Notes: The 5 seconds and the 4750 seconds is derived from the Options requestCleanupInterval, which defaults to 5 seconds and can be modified. + The 4750 is 95% of that time. The 100 ms minimum is arbitrary. + */ if (!editLock.tryLock(offerLockMillis, TimeUnit.MILLISECONDS)) { throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size()); }