Skip to content

Commit

Permalink
subscription before consumer (#639)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored May 4, 2022
1 parent d0c13fe commit ddad68d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
49 changes: 33 additions & 16 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,10 @@ else if (so.isBind()) {
// 4. If no deliver subject (inbox) provided or found, make an inbox.
final String fnlInboxDeliver = inboxDeliver == null ? conn.createInbox() : inboxDeliver;

// 5. If consumer does not exist, create
final String fnlConsumerName;
final ConsumerConfiguration fnlServerCC;
// 5. If consumer does not exist, create and settle on the config. Name will have to wait
// If the consumer exists, I know what the settled info is
final String settledConsumerName;
final ConsumerConfiguration settledServerCC;
if (serverCC == null) {
ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC);

Expand All @@ -368,33 +369,31 @@ else if (so.isBind()) {

ccBuilder.deliverGroup(qgroup);

// createOrUpdateConsumer can fail for security reasons, maybe other reasons?
ConsumerInfo ci = _createConsumer(fnlStream, ccBuilder.build());
fnlConsumerName = ci.getName();
fnlServerCC = ci.getConsumerConfiguration();
settledServerCC = ccBuilder.build();
settledConsumerName = null;
}
else {
fnlConsumerName = consumerName;
fnlServerCC = serverCC;
settledServerCC = serverCC;
settledConsumerName = consumerName;
}

// 6. create the subscription. lambda needs final or effectively final vars
NatsJetStreamSubscription sub;
if (isPullMode) {
final MessageManager[] managers = new MessageManager[] { new PullStatusMessageManager() };
final NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher)
-> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, fnlConsumerName, managers);
-> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, settledConsumerName, managers);
sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, factory);
}
else {
final MessageManager statusManager =
PUSH_STATUS_MANAGER_FACTORY.createPushStatusMessageManager(conn, so, fnlServerCC, qgroup != null, dispatcher == null);
PUSH_STATUS_MANAGER_FACTORY.createPushStatusMessageManager(conn, so, settledServerCC, qgroup != null, dispatcher == null);
final MessageManager[] managers;
if (so.isOrdered()) {
managers = new MessageManager[3];
managers[0] = new SidCheckManager();
managers[1] = statusManager;
managers[2] = new OrderedManager(this, dispatcher, fnlStream, fnlServerCC);
managers[2] = new OrderedManager(this, dispatcher, fnlStream, settledServerCC);
}
else {
managers = new MessageManager[1];
Expand All @@ -403,17 +402,35 @@ else if (so.isBind()) {

final NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher)
-> new NatsJetStreamSubscription(sid, lSubject, lQgroup, lConn, lDispatcher,
this, fnlStream, fnlConsumerName, managers);
this, fnlStream, settledConsumerName, managers);

if (dispatcher == null) {
sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, factory);
}
else {
AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, fnlServerCC, managers);
AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, settledServerCC, managers);
sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(fnlInboxDeliver, qgroup, handler, factory);
}
}

// 7. The consumer might need to be created, do it here
if (settledConsumerName == null) {
try {
ConsumerInfo ci = _createConsumer(fnlStream, settledServerCC);
sub.setConsumerName(ci.getName());
}
catch (IOException | JetStreamApiException e) {
// create consumer can fail, unsubscribe and then throw the exception to the user
if (dispatcher == null) {
sub.unsubscribe();
}
else {
dispatcher.unsubscribe(sub);
}
throw e;
}
}

return sub;
}

Expand Down Expand Up @@ -472,10 +489,10 @@ static class AsyncMessageHandler implements MessageHandler {
List<MessageManager> managers;
List<MessageHandler> handlers;

public AsyncMessageHandler(MessageHandler userHandler, boolean isAutoAck, ConsumerConfiguration fnlServerCC, MessageManager ... managers) {
public AsyncMessageHandler(MessageHandler userHandler, boolean isAutoAck, ConsumerConfiguration settledServerCC, MessageManager ... managers) {
handlers = new ArrayList<>();
handlers.add(userHandler);
if (isAutoAck && fnlServerCC.getAckPolicy() != AckPolicy.None) {
if (isAutoAck && settledServerCC.getAckPolicy() != AckPolicy.None) {
handlers.add(Message::ack);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,18 @@ public class NatsJetStreamSubscription extends NatsSubscription implements JetSt
super(sid, subject, queueName, connection, dispatcher);
this.js = js;
this.stream = stream;
this.consumerName = consumer;
this.consumerName = consumer; // might be null, someone will call setConsumerName

managers = inManagers;
for (MessageManager mm : managers) {
mm.setSub(this);
}
}

void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}

String getConsumerName() {
return consumerName;
}
Expand Down

0 comments on commit ddad68d

Please sign in to comment.