diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index 455356c88..2c42e82f2 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -112,11 +112,11 @@ public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Di private void checkState() throws IOException { if (lastConsumer != null) { if (ordered) { - if (!lastConsumer.finished) { + if (!lastConsumer.finished.get()) { throw new IOException("The ordered consumer is already receiving messages. Ordered Consumer does not allow multiple instances at time."); } } - if (lastConsumer.finished && !lastConsumer.stopped) { + if (lastConsumer.finished.get() && !lastConsumer.stopped.get()) { lastConsumer.lenientClose(); // finished, might as well make sure the sub is closed. } } @@ -182,7 +182,6 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds."); } - //noinspection resource I close it manually down below con = new NatsMessageConsumerBase(cachedConsumerInfo); con.initSub(subscribe(null, null)); con.sub._pull(PullRequestOptions.builder(1) @@ -197,7 +196,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException } finally { try { - con.finished = true; + con.finished.set(true); con.close(); } catch (Exception e) { diff --git a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java index 805049d61..9a0c3d1c3 100644 --- a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java @@ -43,7 +43,7 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer @Override public Message nextMessage() throws InterruptedException, JetStreamStatusCheckedException { try { - if (finished) { + if (finished.get()) { return null; } @@ -55,7 +55,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked if (m == null) { // if there are no messages in the internal cache AND there are no more pending, // they all have been read and we can go ahead and close the subscription. - finished = true; + finished.set(true); lenientClose(); } return m; diff --git a/src/main/java/io/nats/client/impl/NatsIterableConsumer.java b/src/main/java/io/nats/client/impl/NatsIterableConsumer.java index 01d8c7b7a..d51a2c919 100644 --- a/src/main/java/io/nats/client/impl/NatsIterableConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsIterableConsumer.java @@ -32,8 +32,8 @@ class NatsIterableConsumer extends NatsMessageConsumer implements IterableConsum public Message nextMessage(Duration timeout) throws InterruptedException, JetStreamStatusCheckedException { try { Message msg = sub.nextMessage(timeout); - if (msg != null && stopped && pmm.noMorePending()) { - finished = true; + if (msg != null && stopped.get() && pmm.noMorePending()) { + finished.set(true); } return msg; } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java index d499c1c5b..e427b5d1f 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java @@ -47,8 +47,8 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager MessageHandler mh = userMessageHandler == null ? null : msg -> { userMessageHandler.onMessage(msg); - if (stopped && pmm.noMorePending()) { - finished = true; + if (stopped.get() && pmm.noMorePending()) { + finished.set(true); } }; initSub(subscriptionMaker.subscribe(mh, userDispatcher)); @@ -62,7 +62,7 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager @Override public void pendingUpdated() { - if (!stopped && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))) + if (!stopped.get() && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))) { sub._pull(rePullPro, false, this); } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java index a5ea48955..d6a95debe 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java @@ -18,16 +18,19 @@ import io.nats.client.api.ConsumerInfo; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; class NatsMessageConsumerBase implements MessageConsumer { protected NatsJetStreamPullSubscription sub; protected PullMessageManager pmm; - protected boolean stopped; - protected boolean finished; + protected final AtomicBoolean stopped; + protected final AtomicBoolean finished; protected ConsumerInfo cachedConsumerInfo; NatsMessageConsumerBase(ConsumerInfo cachedConsumerInfo) { this.cachedConsumerInfo = cachedConsumerInfo; + this.stopped = new AtomicBoolean(false); + this.finished = new AtomicBoolean(false); } void initSub(NatsJetStreamPullSubscription sub) { @@ -39,14 +42,14 @@ void initSub(NatsJetStreamPullSubscription sub) { * {@inheritDoc} */ public boolean isStopped() { - return stopped; + return stopped.get(); } /** * {@inheritDoc} */ public boolean isFinished() { - return finished; + return finished.get(); } /** @@ -74,7 +77,7 @@ public ConsumerInfo getCachedConsumerInfo() { */ @Override public void stop() { - stopped = true; + stopped.set(true); } @Override @@ -84,8 +87,8 @@ public void close() throws Exception { protected void lenientClose() { try { - if (!stopped || sub.isActive()) { - stopped = true; + if (!stopped.get() || sub.isActive()) { + stopped.set(true); if (sub.getNatsDispatcher() != null) { sub.getDispatcher().unsubscribe(sub); }