From 66311ec81c134453441ec0a53c172e7630acb9bf Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Mon, 4 Mar 2024 10:58:07 -0500 Subject: [PATCH] Allow simplification fetch to have noWait with or without expires. (#1089) --- .../io/nats/client/BaseConsumeOptions.java | 24 ++++++++++++++--- .../io/nats/client/FetchConsumeOptions.java | 25 ++++++++++++++++++ .../nats/client/impl/NatsFetchConsumer.java | 26 ++++++++++++++++--- .../client/impl/NatsJetStreamMetaData.java | 22 ++++++++-------- 4 files changed, 79 insertions(+), 18 deletions(-) diff --git a/src/main/java/io/nats/client/BaseConsumeOptions.java b/src/main/java/io/nats/client/BaseConsumeOptions.java index be3937f45..12d8c2128 100644 --- a/src/main/java/io/nats/client/BaseConsumeOptions.java +++ b/src/main/java/io/nats/client/BaseConsumeOptions.java @@ -13,6 +13,8 @@ package io.nats.client; +import io.nats.client.api.ConsumerConfiguration; + /** * Base Consume Options are provided to customize the way the consume and * fetch operate. It is the base class for ConsumeOptions and FetchConsumeOptions. @@ -23,6 +25,7 @@ public class BaseConsumeOptions { public static final int DEFAULT_THRESHOLD_PERCENT = 25; public static final long DEFAULT_EXPIRES_IN_MILLIS = 30000; public static final long MIN_EXPIRES_MILLS = 1000; + public static final long MIN_NOWAIT_EXPIRES_MILLS = 100; public static final long MAX_HEARTBEAT_MILLIS = 30000; public static final int MAX_IDLE_HEARTBEAT_PERCENT = 50; @@ -31,6 +34,7 @@ public class BaseConsumeOptions { protected final long expiresIn; protected final long idleHeartbeat; protected final int thresholdPercent; + protected final boolean noWait; @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars protected BaseConsumeOptions(Builder b) { @@ -45,6 +49,7 @@ protected BaseConsumeOptions(Builder b) { // validation handled in builder thresholdPercent = b.thresholdPercent; expiresIn = b.expiresIn; + noWait = b.noWait; // calculated idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100); @@ -67,6 +72,7 @@ protected static abstract class Builder { protected long bytes = 0; protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT; protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS; + protected boolean noWait = false; protected abstract B getThis(); @@ -90,11 +96,23 @@ protected B bytes(long bytes) { * @return the builder */ public B expiresIn(long expiresInMillis) { - if (expiresInMillis < 1) { - expiresIn = DEFAULT_EXPIRES_IN_MILLIS; + if (expiresInMillis < 1) { // this is way to clear or reset, just a code guard really + if (noWait) { + expiresIn = ConsumerConfiguration.LONG_UNSET; + } + else { + expiresIn = DEFAULT_EXPIRES_IN_MILLIS; + } } else if (expiresInMillis < MIN_EXPIRES_MILLS) { - throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS); + if (noWait) { + if (expiresInMillis < MIN_NOWAIT_EXPIRES_MILLS) { + throw new IllegalArgumentException("Expires when No Wait must be greater than or equal to " + MIN_NOWAIT_EXPIRES_MILLS); + } + } + else { + throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS); + } } else { expiresIn = expiresInMillis; diff --git a/src/main/java/io/nats/client/FetchConsumeOptions.java b/src/main/java/io/nats/client/FetchConsumeOptions.java index f3f8b8d52..9094237a6 100644 --- a/src/main/java/io/nats/client/FetchConsumeOptions.java +++ b/src/main/java/io/nats/client/FetchConsumeOptions.java @@ -13,6 +13,8 @@ package io.nats.client; +import io.nats.client.api.ConsumerConfiguration; + /** * Fetch Consume Options are provided to customize the fetch operation. */ @@ -39,6 +41,8 @@ public long getMaxBytes() { return bytes; } + public boolean isNoWait() { return noWait; } + public static Builder builder() { return new Builder(); } @@ -92,6 +96,27 @@ public Builder max(int maxBytes, int maxMessages) { return bytes(maxBytes); } + /** + * Set no wait to true + * @return the builder + */ + public Builder noWait() { + this.noWait = true; + expiresIn = ConsumerConfiguration.LONG_UNSET; + return this; + } + + /** + * Set no wait to true with an expiration, special behavior. + * @param expiresInMillis the expiration time in milliseconds + * @return the builder + */ + public Builder noWaitExpiresIn(long expiresInMillis) { + this.noWait = true; + expiresIn(expiresInMillis); + return this; + } + /** * Build the FetchConsumeOptions. * @return a FetchConsumeOptions instance diff --git a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java index 61374b056..7e18fa5c7 100644 --- a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java @@ -18,7 +18,11 @@ import java.io.IOException; +import static io.nats.client.BaseConsumeOptions.MIN_EXPIRES_MILLS; +import static io.nats.client.BaseConsumeOptions.MIN_NOWAIT_EXPIRES_MILLS; + class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer, PullManagerObserver { + private final boolean isNoWait; private final long maxWaitNanos; private final String pullSubject; private long startNanos; @@ -29,13 +33,22 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer { super(cachedConsumerInfo); + isNoWait = fetchConsumeOptions.isNoWait(); long expiresInMillis = fetchConsumeOptions.getExpiresInMillis(); - maxWaitNanos = expiresInMillis * 1_000_000; - long inactiveThreshold = expiresInMillis * 110 / 100; // ten % longer than the wait + long inactiveThreshold; + if (expiresInMillis <= MIN_NOWAIT_EXPIRES_MILLS ) { // can be for noWait + maxWaitNanos = MIN_NOWAIT_EXPIRES_MILLS * 1_000_000; + inactiveThreshold = MIN_EXPIRES_MILLS; // no need to do the 10% longer + } + else { + maxWaitNanos = expiresInMillis * 1_000_000; + inactiveThreshold = expiresInMillis * 110 / 100; // 10% longer than the wait + } PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages()) .maxBytes(fetchConsumeOptions.getMaxBytes()) - .expiresIn(fetchConsumeOptions.getExpiresInMillis()) + .expiresIn(expiresInMillis) .idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat()) + .noWait(isNoWait) .build(); initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold)); pullSubject = sub._pull(pro, false, this); @@ -82,7 +95,12 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked // if the timer has run out, don't allow waiting // this might happen once, but it should already be noMorePending if (timeLeftMillis < 1) { - return sub._nextUnmanagedNoWait(pullSubject); // null means don't wait + Message m = sub._nextUnmanagedNoWait(pullSubject); // null means don't wait + if (m == null && isNoWait) { + finished.set(true); + lenientClose(); + } + return m; } return sub._nextUnmanaged(timeLeftMillis, pullSubject); diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamMetaData.java b/src/main/java/io/nats/client/impl/NatsJetStreamMetaData.java index 92a5dd0a7..fd2b47c66 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamMetaData.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamMetaData.java @@ -35,22 +35,22 @@ public class NatsJetStreamMetaData { private final long consumerSeq; private final ZonedDateTime timestamp; private final long pending; - @Override public String toString() { return "NatsJetStreamMetaData{" + - "prefix='" + prefix + '\'' + - ", domain='" + domain + '\'' + - ", stream='" + stream + '\'' + - ", consumer='" + consumer + '\'' + - ", delivered=" + delivered + - ", streamSeq=" + streamSeq + - ", consumerSeq=" + consumerSeq + - ", timestamp=" + timestamp + - ", pending=" + pending + - '}'; + "prefix='" + prefix + '\'' + + ", domain='" + domain + '\'' + + ", stream='" + stream + '\'' + + ", consumer='" + consumer + '\'' + + ", delivered=" + delivered + + ", streamSeq=" + streamSeq + + ", consumerSeq=" + consumerSeq + + ", timestamp=" + timestamp + + ", pending=" + pending + + '}'; } + /* v0 .ACK...... v1 .ACK.......