-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NATS consumers not pulling messages from Jetstream even though consumer status shows active. Restart of client application required as workaround to resolve the issue. [v2.10.18] #1272
Comments
Could you share additional information..
|
Push or Pull consumer? - Pull consumer |
Can you share the consumer info? We want to see if there are pull requests waiting or not etc.. |
When we encounter the issue again, I will provide the details of consumer info. |
Hi @derekcollison we encounter the same issue.
After restarting the consumer:
|
@mohamedsaleem18 @burl21 I'll probably move this issue to the Java repo, but in the meantime I could use some help reproducing this problem. I have tested the simplification consumers pretty thoroughly with a steady flow of messages, where the server is shutdown then restarted and the consumer resumes. But not necessarily a long time between messages so that's where I will focus now. When you say pull consumer, are you using original pull or simplification? Maybe you have a snippet of code that can help reproduce it? In thinking about reproducing it, I figured to set up a consumer. The stream can have messages to start, but the consumer should be able to work through those quickly. Maybe just 10 messages. Then how long should I wait before publishing another message? 1/10/60 minutes? Are there any disconnects in that time that might contribute? I'm assuming that the consumer could be live for an hour without getting messages, 1 finally comes in and client should pick it up. |
@mohamedsaleem18 @burl21 Please review the code in this gist: https://gist.github.com/scottf/845fa51d8b78c299bb8f67c2ba34924f The application is designed to recreate the stalling. It does the following
|
While analyzing the consumer logs, we observed intermittent disconnection and reconnection to the server. These events Pod Logs:
CodeWe have created a Spring Boot Starter library to simplify the use of NATS. The following code snippet highlights the issue within the dispatcher registration logic. Specifically, when an // Create Dispatcher
void register(RegisterArgs args) {
try {
var jsm = connection.jetStreamManagement();
createStreamIfNotExists(jsm, args);
createConsumerOrUpdate(jsm, args); // <- 👀 Error occurs here
var pullOptions = PullSubscribeOptions.bind(stream, durable);
var subscription = connection.jetStream().subscribe(args.subject(), pullOptions);
var dispatcher = new NatsSubscriptionDispatcher(subscription, args.handler(), tasksSemaphore, tasksExecutorService);
dispatchersExecutorService.execute(dispatcher);
log.info("Subject: {} registered and dispatcher started.", args.subject());
} catch (Exception e) {
// ❌😭 Bug: Exception is ignored! As a result, the dispatcher is not started and does not fetch messages!
log.error("Nats register error", e);
}
}
private void createStreamIfNotExists(JetStreamManagement jsm, RegisterArgs args)
throws IOException, JetStreamApiException {
if (jsm.getStreamNames(args.subject()).isEmpty()) {
var configuration =
StreamConfiguration.builder()
.name(args.stream())
.retentionPolicy(RetentionPolicy.WorkQueue)
.storageType(natsProps.getStorageType())
.discardPolicy(DiscardPolicy.Old)
.subjects(args.subject())
.denyDelete(true)
.build();
jsm.addStream(configuration);
log.info("Stream with name {} created.", name);
}
}
private void createConsumerOrUpdate(JetStreamManagement jsm, RegisterArgs args)
throws IOException, JetStreamApiException {
var consumer =
ConsumerConfiguration.builder()
.deliverPolicy(DeliverPolicy.All)
.ackPolicy(AckPolicy.Explicit)
.durable(args.consumer())
.filterSubject(args.subject())
.maxDeliver(natsProps.getMaxDeliver())
.ackWait(natsProps.getAckTimeout())
.build();
jsm.addOrUpdateConsumer(args.stream(), consumer); // <- 👀 Error occurs here
} Shouldn’t @scottf Thanks for your help, and apologies for any inconvenience caused! |
The jetstream management object is just a functional wrapper - it doesn't actually do any io on creation, but it uses the connection it was given. If register fails, try pausing and retrying in a few seconds. That should give any connection time to reset. Once the [simplified] consumer is running, it is very resilient. If there is a disconnection, it will go into a failure loop where it periodically retries (based on the idleHeartbeat inside the simplified consumer). getStreamNames throws a timeout, correct? Maybe also something to retry. Using the client requires a lot of error handling for all the disconnection edge cases. The client does a lot for core subscriptions, some old style subscriptions and all simplification subscriptions, but it does not do everything for you. |
Observed behavior
NATS cluster :
There are pending messages in the stream. Though consumers are connected to NATS, consumers are becoming stale. The workaround consists of purging the stream, deleting the affected consumers, and restarting the service they originate from. This recreates them in a healthy state to resolve the issue. This issue occurs at least once every day.
Client side NATS configuration:
Expected behavior
Consumer should not go stale. In case if Consumer is stale then NATS should automatically modify the consumer status as inactive or drop the consumer.
Server and client version
NATS cluster :
Host environment
VM - OS details
NAME="Oracle Linux Server"
VERSION="8.10"
Steps to reproduce
Please refer description section for workaround to resolve this issue.
The text was updated successfully, but these errors were encountered: