Skip to content

Commit

Permalink
Allow simplification fetch to have noWait with or without expires. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Mar 4, 2024
1 parent 39d651b commit 66311ec
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 18 deletions.
24 changes: 21 additions & 3 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.nats.client;

import io.nats.client.api.ConsumerConfiguration;

/**
* Base Consume Options are provided to customize the way the consume and
* fetch operate. It is the base class for ConsumeOptions and FetchConsumeOptions.
Expand All @@ -23,6 +25,7 @@ public class BaseConsumeOptions {
public static final int DEFAULT_THRESHOLD_PERCENT = 25;
public static final long DEFAULT_EXPIRES_IN_MILLIS = 30000;
public static final long MIN_EXPIRES_MILLS = 1000;
public static final long MIN_NOWAIT_EXPIRES_MILLS = 100;
public static final long MAX_HEARTBEAT_MILLIS = 30000;
public static final int MAX_IDLE_HEARTBEAT_PERCENT = 50;

Expand All @@ -31,6 +34,7 @@ public class BaseConsumeOptions {
protected final long expiresIn;
protected final long idleHeartbeat;
protected final int thresholdPercent;
protected final boolean noWait;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected BaseConsumeOptions(Builder b) {
Expand All @@ -45,6 +49,7 @@ protected BaseConsumeOptions(Builder b) {
// validation handled in builder
thresholdPercent = b.thresholdPercent;
expiresIn = b.expiresIn;
noWait = b.noWait;

// calculated
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
Expand All @@ -67,6 +72,7 @@ protected static abstract class Builder<B, CO> {
protected long bytes = 0;
protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT;
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
protected boolean noWait = false;

protected abstract B getThis();

Expand All @@ -90,11 +96,23 @@ protected B bytes(long bytes) {
* @return the builder
*/
public B expiresIn(long expiresInMillis) {
if (expiresInMillis < 1) {
expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
if (expiresInMillis < 1) { // this is way to clear or reset, just a code guard really
if (noWait) {
expiresIn = ConsumerConfiguration.LONG_UNSET;
}
else {
expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
}
}
else if (expiresInMillis < MIN_EXPIRES_MILLS) {
throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS);
if (noWait) {
if (expiresInMillis < MIN_NOWAIT_EXPIRES_MILLS) {
throw new IllegalArgumentException("Expires when No Wait must be greater than or equal to " + MIN_NOWAIT_EXPIRES_MILLS);
}
}
else {
throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS);
}
}
else {
expiresIn = expiresInMillis;
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/io/nats/client/FetchConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.nats.client;

import io.nats.client.api.ConsumerConfiguration;

/**
* Fetch Consume Options are provided to customize the fetch operation.
*/
Expand All @@ -39,6 +41,8 @@ public long getMaxBytes() {
return bytes;
}

public boolean isNoWait() { return noWait; }

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -92,6 +96,27 @@ public Builder max(int maxBytes, int maxMessages) {
return bytes(maxBytes);
}

/**
* Set no wait to true
* @return the builder
*/
public Builder noWait() {
this.noWait = true;
expiresIn = ConsumerConfiguration.LONG_UNSET;
return this;
}

/**
* Set no wait to true with an expiration, special behavior.
* @param expiresInMillis the expiration time in milliseconds
* @return the builder
*/
public Builder noWaitExpiresIn(long expiresInMillis) {
this.noWait = true;
expiresIn(expiresInMillis);
return this;
}

/**
* Build the FetchConsumeOptions.
* @return a FetchConsumeOptions instance
Expand Down
26 changes: 22 additions & 4 deletions src/main/java/io/nats/client/impl/NatsFetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

import java.io.IOException;

import static io.nats.client.BaseConsumeOptions.MIN_EXPIRES_MILLS;
import static io.nats.client.BaseConsumeOptions.MIN_NOWAIT_EXPIRES_MILLS;

class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer, PullManagerObserver {
private final boolean isNoWait;
private final long maxWaitNanos;
private final String pullSubject;
private long startNanos;
Expand All @@ -29,13 +33,22 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
{
super(cachedConsumerInfo);

isNoWait = fetchConsumeOptions.isNoWait();
long expiresInMillis = fetchConsumeOptions.getExpiresInMillis();
maxWaitNanos = expiresInMillis * 1_000_000;
long inactiveThreshold = expiresInMillis * 110 / 100; // ten % longer than the wait
long inactiveThreshold;
if (expiresInMillis <= MIN_NOWAIT_EXPIRES_MILLS ) { // can be for noWait
maxWaitNanos = MIN_NOWAIT_EXPIRES_MILLS * 1_000_000;
inactiveThreshold = MIN_EXPIRES_MILLS; // no need to do the 10% longer
}
else {
maxWaitNanos = expiresInMillis * 1_000_000;
inactiveThreshold = expiresInMillis * 110 / 100; // 10% longer than the wait
}
PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
.maxBytes(fetchConsumeOptions.getMaxBytes())
.expiresIn(fetchConsumeOptions.getExpiresInMillis())
.expiresIn(expiresInMillis)
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
.noWait(isNoWait)
.build();
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold));
pullSubject = sub._pull(pro, false, this);
Expand Down Expand Up @@ -82,7 +95,12 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
// if the timer has run out, don't allow waiting
// this might happen once, but it should already be noMorePending
if (timeLeftMillis < 1) {
return sub._nextUnmanagedNoWait(pullSubject); // null means don't wait
Message m = sub._nextUnmanagedNoWait(pullSubject); // null means don't wait
if (m == null && isNoWait) {
finished.set(true);
lenientClose();
}
return m;
}

return sub._nextUnmanaged(timeLeftMillis, pullSubject);
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/nats/client/impl/NatsJetStreamMetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ public class NatsJetStreamMetaData {
private final long consumerSeq;
private final ZonedDateTime timestamp;
private final long pending;

@Override
public String toString() {
return "NatsJetStreamMetaData{" +
"prefix='" + prefix + '\'' +
", domain='" + domain + '\'' +
", stream='" + stream + '\'' +
", consumer='" + consumer + '\'' +
", delivered=" + delivered +
", streamSeq=" + streamSeq +
", consumerSeq=" + consumerSeq +
", timestamp=" + timestamp +
", pending=" + pending +
'}';
"prefix='" + prefix + '\'' +
", domain='" + domain + '\'' +
", stream='" + stream + '\'' +
", consumer='" + consumer + '\'' +
", delivered=" + delivered +
", streamSeq=" + streamSeq +
", consumerSeq=" + consumerSeq +
", timestamp=" + timestamp +
", pending=" + pending +
'}';
}


/*
v0 <prefix>.ACK.<stream name>.<consumer name>.<num delivered>.<stream sequence>.<consumer sequence>.<timestamp>
v1 <prefix>.ACK.<stream name>.<consumer name>.<num delivered>.<stream sequence>.<consumer sequence>.<timestamp>.<num pending>
Expand Down

0 comments on commit 66311ec

Please sign in to comment.