Skip to content

Commit

Permalink
new pull apis for nowait + expire, fetch and iterate re-implement (#596)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Feb 23, 2022
1 parent 1677399 commit cb2c585
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 153 deletions.
34 changes: 24 additions & 10 deletions src/main/java/io/nats/client/JetStreamSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public interface JetStreamSubscription extends Subscription {
/**
* Do a pull in noWait mode with the specified batch size.
*
* When when no messages are available a response with a 404 status header will be returned
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
*
Expand All @@ -49,19 +47,39 @@ public interface JetStreamSubscription extends Subscription {
*/
void pullNoWait(int batchSize);

/**
* Do a pull in noWait mode with the specified batch size.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
*
* @param batchSize the size of the batch
* @param expiresIn how long from now this request should be expired from the server wait list
* @throws IllegalStateException if not a pull subscription.
*/
void pullNoWait(int batchSize, Duration expiresIn);

/**
* Do a pull in noWait mode with the specified batch size.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
*
* @param batchSize the size of the batch
* @param expiresInMillis how long from now this request should be expired from the server wait list, in milliseconds
* @throws IllegalStateException if not a pull subscription.
*/
void pullNoWait(int batchSize, long expiresInMillis);

/**
* Initiate pull for all messages available before expiration.
* <p>
* <code>sub.nextMessage(timeout)</code> can return a:
* <ul>
* <li>regular message
* <li>null
* <li>408 status message
* </ul>
* <p>
* Multiple 408 status messages may come. Each one indicates a
* missing item from the previous batch and can be discarded.
* <p>
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
Expand All @@ -80,12 +98,8 @@ public interface JetStreamSubscription extends Subscription {
* <ul>
* <li>regular message
* <li>null
* <li>408 status message
* </ul>
* <p>
* Multiple 408 status messages may come. Each one indicates a
* missing item from the previous batch and can be discarded.
* <p>
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
Expand Down
29 changes: 17 additions & 12 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,32 +372,37 @@ else if (so.isBind()) {
}

// 6. create the subscription. lambda needs final or effectively final vars
final MessageManager statusManager = isPullMode
? new PullStatusMessageManager()
: PUSH_STATUS_MANAGER_FACTORY.createPushStatusMessageManager(conn, so, fnlServerCC, qgroup != null, dispatcher == null);

NatsJetStreamSubscription sub;
if (isPullMode) {
final MessageManager[] managers = new MessageManager[] { new PullStatusMessageManager() };
final NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher)
-> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, fnlConsumerName, statusManager);
-> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, fnlConsumerName, managers);
sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, factory);
}
else {
final MessageManager sidCheckManager = so.isOrdered() ? new SidCheckManager() : null;

final MessageManager orderedManager = so.isOrdered()
? new OrderedManager(this, dispatcher, fnlStream, fnlServerCC)
: null;
final MessageManager statusManager =
PUSH_STATUS_MANAGER_FACTORY.createPushStatusMessageManager(conn, so, fnlServerCC, qgroup != null, dispatcher == null);
final MessageManager[] managers;
if (so.isOrdered()) {
managers = new MessageManager[3];
managers[0] = new SidCheckManager();
managers[1] = statusManager;
managers[2] = new OrderedManager(this, dispatcher, fnlStream, fnlServerCC);
}
else {
managers = new MessageManager[1];
managers[0] = statusManager;
}

final NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher)
-> new NatsJetStreamSubscription(sid, lSubject, lQgroup, lConn, lDispatcher,
this, fnlStream, fnlConsumerName, sidCheckManager, statusManager, orderedManager);
this, fnlStream, fnlConsumerName, managers);

if (dispatcher == null) {
sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, factory);
}
else {
AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, fnlServerCC, sidCheckManager, statusManager, orderedManager);
AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, fnlServerCC, managers);
sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(fnlInboxDeliver, qgroup, handler, factory);
}
}
Expand Down
Loading

0 comments on commit cb2c585

Please sign in to comment.