Skip to content

Commit

Permalink
experimental pull reader (#683)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jul 3, 2022
1 parent 3fe2372 commit c725606
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 5 deletions.
25 changes: 25 additions & 0 deletions src/main/java/io/nats/client/JetStreamReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client;

import java.time.Duration;

/**
* THIS IS PART OF AN EXPERIMENTAL API AND IS CONSIDERED EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public interface JetStreamReader {
Message nextMessage(Duration timeout) throws InterruptedException, IllegalStateException;
Message nextMessage(long timeoutMillis) throws InterruptedException, IllegalStateException;
void stop();
}
17 changes: 17 additions & 0 deletions src/main/java/io/nats/client/JetStreamSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,23 @@ public interface JetStreamSubscription extends Subscription {
*/
Iterator<Message> iterate(final int batchSize, long maxWaitMillis);

/**
* Prepares a reader. A reader looks like a push sync subscription,
* meaning it is just an endless stream of messages to ask for by nextMessage,
* but uses pull under the covers.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
*
* THIS API IS CONSIDERED EXPERIMENTAL AND SUBJECT TO CHANGE
*
* @param batchSize the size of the batch
* @param repullAt the point in the current batch to tell the server to start the next batch
*
* @return the message iterator
* @throws IllegalStateException if not a pull subscription.
*/
JetStreamReader reader(final int batchSize, int repullAt);

/**
* Gets information about the consumer behind this subscription.
* @return consumer information
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The NATS Authors
// Copyright 2021-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
Expand All @@ -13,6 +13,7 @@

package io.nats.client.impl;

import io.nats.client.JetStreamReader;
import io.nats.client.Message;
import io.nats.client.PullRequestOptions;

Expand Down Expand Up @@ -277,4 +278,57 @@ public Message next() {
}
};
}

static class JetStreamReaderImpl implements JetStreamReader {
private final NatsJetStreamPullSubscription sub;
private final int batchSize;
private final int repullAt;
private int currentBatchRed;
private boolean keepGoing = true;

public JetStreamReaderImpl(final NatsJetStreamPullSubscription sub, final int batchSize, final int repullAt) {
this.sub = sub;
this.batchSize = batchSize;
this.repullAt = Math.max(1, Math.min(batchSize, repullAt));
currentBatchRed = 0;
sub.pull(batchSize);
}

@Override
public Message nextMessage(Duration timeout) throws InterruptedException, IllegalStateException {
return track(sub.nextMessage(timeout));
}

@Override
public Message nextMessage(long timeoutMillis) throws InterruptedException, IllegalStateException {
return track(sub.nextMessage(timeoutMillis));
}

private Message track(Message msg) {
if (msg != null) {
if (++currentBatchRed == repullAt) {
if (keepGoing) {
sub.pull(batchSize);
}
}
if (currentBatchRed == batchSize) {
currentBatchRed = 0;
}
}
return msg;
}

@Override
public void stop() {
keepGoing = false;
}
}

/**
* {@inheritDoc}
*/
@Override
public JetStreamReader reader(final int batchSize, final int repullAt) {
return new JetStreamReaderImpl(this, batchSize, repullAt);
}
}
13 changes: 9 additions & 4 deletions src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@

package io.nats.client.impl;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PullRequestOptions;
import io.nats.client.*;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.support.NatsJetStreamConstants;

Expand Down Expand Up @@ -233,6 +230,14 @@ public Iterator<Message> iterate(final int batchSize, long maxWaitMillis) {
throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
}

/**
* {@inheritDoc}
*/
@Override
public JetStreamReader reader(int batchSize, int repullAt) {
throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
}

/**
* {@inheritDoc}
*/
Expand Down

0 comments on commit c725606

Please sign in to comment.