diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index aa9276fd2..5c8671cea 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -351,9 +351,10 @@ else if (so.isBind()) { // 4. If no deliver subject (inbox) provided or found, make an inbox. final String fnlInboxDeliver = inboxDeliver == null ? conn.createInbox() : inboxDeliver; - // 5. If consumer does not exist, create - final String fnlConsumerName; - final ConsumerConfiguration fnlServerCC; + // 5. If consumer does not exist, create and settle on the config. Name will have to wait + // If the consumer exists, I know what the settled info is + final String settledConsumerName; + final ConsumerConfiguration settledServerCC; if (serverCC == null) { ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC); @@ -368,14 +369,12 @@ else if (so.isBind()) { ccBuilder.deliverGroup(qgroup); - // createOrUpdateConsumer can fail for security reasons, maybe other reasons? - ConsumerInfo ci = _createConsumer(fnlStream, ccBuilder.build()); - fnlConsumerName = ci.getName(); - fnlServerCC = ci.getConsumerConfiguration(); + settledServerCC = ccBuilder.build(); + settledConsumerName = null; } else { - fnlConsumerName = consumerName; - fnlServerCC = serverCC; + settledServerCC = serverCC; + settledConsumerName = consumerName; } // 6. create the subscription. lambda needs final or effectively final vars @@ -383,18 +382,18 @@ else if (so.isBind()) { if (isPullMode) { final MessageManager[] managers = new MessageManager[] { new PullStatusMessageManager() }; final NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher) - -> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, fnlConsumerName, managers); + -> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, settledConsumerName, managers); sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, factory); } else { final MessageManager statusManager = - PUSH_STATUS_MANAGER_FACTORY.createPushStatusMessageManager(conn, so, fnlServerCC, qgroup != null, dispatcher == null); + PUSH_STATUS_MANAGER_FACTORY.createPushStatusMessageManager(conn, so, settledServerCC, qgroup != null, dispatcher == null); final MessageManager[] managers; if (so.isOrdered()) { managers = new MessageManager[3]; managers[0] = new SidCheckManager(); managers[1] = statusManager; - managers[2] = new OrderedManager(this, dispatcher, fnlStream, fnlServerCC); + managers[2] = new OrderedManager(this, dispatcher, fnlStream, settledServerCC); } else { managers = new MessageManager[1]; @@ -403,17 +402,35 @@ else if (so.isBind()) { final NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher) -> new NatsJetStreamSubscription(sid, lSubject, lQgroup, lConn, lDispatcher, - this, fnlStream, fnlConsumerName, managers); + this, fnlStream, settledConsumerName, managers); if (dispatcher == null) { sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, factory); } else { - AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, fnlServerCC, managers); + AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, settledServerCC, managers); sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(fnlInboxDeliver, qgroup, handler, factory); } } + // 7. The consumer might need to be created, do it here + if (settledConsumerName == null) { + try { + ConsumerInfo ci = _createConsumer(fnlStream, settledServerCC); + sub.setConsumerName(ci.getName()); + } + catch (IOException | JetStreamApiException e) { + // create consumer can fail, unsubscribe and then throw the exception to the user + if (dispatcher == null) { + sub.unsubscribe(); + } + else { + dispatcher.unsubscribe(sub); + } + throw e; + } + } + return sub; } @@ -472,10 +489,10 @@ static class AsyncMessageHandler implements MessageHandler { List managers; List handlers; - public AsyncMessageHandler(MessageHandler userHandler, boolean isAutoAck, ConsumerConfiguration fnlServerCC, MessageManager ... managers) { + public AsyncMessageHandler(MessageHandler userHandler, boolean isAutoAck, ConsumerConfiguration settledServerCC, MessageManager ... managers) { handlers = new ArrayList<>(); handlers.add(userHandler); - if (isAutoAck && fnlServerCC.getAckPolicy() != AckPolicy.None) { + if (isAutoAck && settledServerCC.getAckPolicy() != AckPolicy.None) { handlers.add(Message::ack); }; diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java b/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java index b2dafcdbd..c64dd5d59 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java @@ -46,7 +46,7 @@ public class NatsJetStreamSubscription extends NatsSubscription implements JetSt super(sid, subject, queueName, connection, dispatcher); this.js = js; this.stream = stream; - this.consumerName = consumer; + this.consumerName = consumer; // might be null, someone will call setConsumerName managers = inManagers; for (MessageManager mm : managers) { @@ -54,6 +54,10 @@ public class NatsJetStreamSubscription extends NatsSubscription implements JetSt } } + void setConsumerName(String consumerName) { + this.consumerName = consumerName; + } + String getConsumerName() { return consumerName; }