Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS consumers not pulling messages from Jetstream even though consumer status shows active. Restart of client application required as workaround to resolve the issue. [v2.10.18] #1272

Open
mohamedsaleem18 opened this issue Aug 28, 2024 · 9 comments
Assignees
Labels
defect Suspected defect such as a bug or regression stale This issue has had no activity in a while

Comments

@mohamedsaleem18
Copy link

mohamedsaleem18 commented Aug 28, 2024

Observed behavior

NATS cluster :

  • 3 Nodes
  • NATS version: 2.10.18

There are pending messages in the stream. Though consumers are connected to NATS, consumers are becoming stale. The workaround consists of purging the stream, deleting the affected consumers, and restarting the service they originate from. This recreates them in a healthy state to resolve the issue. This issue occurs at least once every day.
Client side NATS configuration:

nats {
  enabled = true
  address = "localhost:4222"
  bufferSize = 8
  timeout = 5000
  errorListener = "io.nats.client.impl.ErrorListenerLoggerImpl"
  streams {
    xxxStream {
      name = "lpsn32"
      subject = "xxx-response-details"
      consumerName = "xxxResponseHandler"
      handler = "xxxResponseHandler"
      batchSize = 1
      # pollingDuration is in ms
      pollingDuration = 100
    }
  }
  protocol = "stream"
}

Expected behavior

Consumer should not go stale. In case if Consumer is stale then NATS should automatically modify the consumer status as inactive or drop the consumer.

Server and client version

NATS cluster :

  • 3 Nodes.
  • NATS version :- 2.10.18

Host environment

VM - OS details
NAME="Oracle Linux Server"
VERSION="8.10"

Steps to reproduce

  1. Publish message to stream when consumer is connected.
  2. Consumer will consume the messages without any issue.
  3. If there is idle time or after long time, consumer goes stale. Messages are pending in the stream but consumer are not consuming the message. NATS shows consumer as active.

Please refer description section for workaround to resolve this issue.

@mohamedsaleem18 mohamedsaleem18 added the defect Suspected defect such as a bug or regression label Aug 28, 2024
@derekcollison
Copy link
Member

derekcollison commented Aug 28, 2024

Could you share additional information..

  1. Push or Pull consumer?
  2. What client lib and version?
  3. What does consumer info (NATS cli) report?
  4. What does stream info report?

@mohamedsaleem18
Copy link
Author

mohamedsaleem18 commented Aug 28, 2024

Push or Pull consumer? - Pull consumer
What client lib and version? - Java NATS client, jnats 2.18.1
What does consumer info (NATS cli) report? Consumer was active
What does stream info report? Messages were pending in the stream but consumers were connected as well.

@derekcollison
Copy link
Member

Can you share the consumer info? We want to see if there are pull requests waiting or not etc..

@mohamedsaleem18
Copy link
Author

When we encounter the issue again, I will provide the details of consumer info.

@wallyqs wallyqs changed the title NATS consumers not pulling messages from Jetstream even though consumer status shows active. Restart of client application required as workaround to resolve the issue. NATS consumers not pulling messages from Jetstream even though consumer status shows active. Restart of client application required as workaround to resolve the issue. [v2.10.18] Sep 4, 2024
@github-actions github-actions bot added the stale This issue has had no activity in a while label Oct 31, 2024
@burl21
Copy link

burl21 commented Jan 16, 2025

Hi @derekcollison we encounter the same issue.

  1. Pull consumer
  2. Client Java v2.20.5 | Server v2.10.24
Information for Consumer certificate > d-certificate-create created 2024-06-19T11:55:39+02:00

Configuration:

                    Name: d-certificate-create
               Pull Mode: true
          Filter Subject: certificate.create
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 10m0s
           Replay Policy: Instant
      Maximum Deliveries: 5
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

Cluster Information:

                    Name: nats
                  Leader: nats-0

State:

  Last Delivered Message: Consumer sequence: 29 Stream sequence: 29
    Acknowledgment Floor: Consumer sequence: 29 Stream sequence: 29
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 1
           Waiting Pulls: 0 of maximum 512  
Image
Information for Stream certificate created 2024-06-19 11:55:07

              Subjects: certificate.>
              Replicas: 1
               Storage: File

Options:

             Retention: WorkQueue
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
     Allows Msg Delete: false
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

Cluster Information:

                  Name: nats
                Leader: nats-0

State:

              Messages: 1
                 Bytes: 8.2 KiB
        First Sequence: 35 @ 2025-01-16 14:07:21
         Last Sequence: 35 @ 2025-01-16 14:07:21
      Active Consumers: 3
    Number of Subjects: 1

After restarting the consumer:

? Select a Consumer d-certificate-create
Information for Consumer certificate > d-certificate-create created 2024-06-19T11:55:39+02:00

Configuration:

                    Name: d-certificate-create
               Pull Mode: true
          Filter Subject: certificate.create
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 10m0s
           Replay Policy: Instant
      Maximum Deliveries: 5
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

Cluster Information:

                    Name: nats
                  Leader: nats-0

State:

  Last Delivered Message: Consumer sequence: 30 Stream sequence: 35 Last delivery: 19.95s ago
    Acknowledgment Floor: Consumer sequence: 30 Stream sequence: 35 Last Ack: 2.22s ago
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 1 of maximum 512    
Information for Stream certificate created 2024-06-19 11:55:07

              Subjects: certificate.>
              Replicas: 1
               Storage: File

Options:

             Retention: WorkQueue
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
     Allows Msg Delete: false
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

Cluster Information:

                  Name: nats
                Leader: nats-0

State:

              Messages: 0
                 Bytes: 0 B
        First Sequence: 36
         Last Sequence: 35 @ 2025-01-16 14:07:21
      Active Consumers: 3

@scottf
Copy link
Contributor

scottf commented Jan 16, 2025

@mohamedsaleem18 @burl21 I'll probably move this issue to the Java repo, but in the meantime I could use some help reproducing this problem.

I have tested the simplification consumers pretty thoroughly with a steady flow of messages, where the server is shutdown then restarted and the consumer resumes. But not necessarily a long time between messages so that's where I will focus now.

When you say pull consumer, are you using original pull or simplification? Maybe you have a snippet of code that can help reproduce it?

In thinking about reproducing it, I figured to set up a consumer. The stream can have messages to start, but the consumer should be able to work through those quickly. Maybe just 10 messages. Then how long should I wait before publishing another message? 1/10/60 minutes? Are there any disconnects in that time that might contribute? I'm assuming that the consumer could be live for an hour without getting messages, 1 finally comes in and client should pick it up.

@scottf scottf transferred this issue from nats-io/nats-server Jan 16, 2025
@scottf
Copy link
Contributor

scottf commented Jan 16, 2025

@mohamedsaleem18 @burl21 Please review the code in this gist: https://gist.github.com/scottf/845fa51d8b78c299bb8f67c2ba34924f

The application is designed to recreate the stalling. It does the following

  1. sets up the stream and consumer
  2. Starts an admin thread to periodically report the state of the consumer
  3. Starts a publish thread that publishes a bunch of messages up front and then loops to periodically publish a message. The time between publishes is a constant designed to be configured.
  4. Start a simplified consumer that runs until program is terminated. It prints every time it gets a message.

@burl21
Copy link

burl21 commented Jan 16, 2025

While analyzing the consumer logs, we observed intermittent disconnection and reconnection to the server. These events
occurred due to a cluster upgrade process. When the pod restarted, it successfully established a connection to the
server. However, during the dispatcher registration phase, a connection error was encountered. This prevented the
dispatcher from being started. Since the error was not rethrown, the thread responsible for fetching messages did not
start.

Pod Logs:

2025-01-16T10:02:39.680Z [DEBUG] Nats connection event type: nats: lame duck mode
2025-01-16T10:02:51.113Z [WARN] Exception occurred, Connection: 81, Exception: java.io.IOException: Read channel closed.
2025-01-16T10:02:51.116Z [DEBUG] Nats connection event type: nats: connection disconnected
2025-01-16T10:02:51.137Z [DEBUG] Nats connection event type: nats: connection reconnected
2025-01-16T10:02:51.138Z [DEBUG] Nats connection event type: nats: subscriptions re-established
...

Pod restart...

...
2025-01-16T10:05:44.602Z [ERROR] Nats register error
java.io.IOException: Timeout or no response waiting for NATS JetStream server
...
at io.nats.client.impl.NatsJetStreamImpl.responseRequired(NatsJetStreamImpl.java:260)
at io.nats.client.impl.NatsJetStreamImpl.makeRequestResponseRequired(NatsJetStreamImpl.java:242)
at io.nats.client.impl.NatsJetStreamImpl._createConsumer(NatsJetStreamImpl.java:131)
at io.nats.client.impl.NatsJetStreamManagement.addOrUpdateConsumer(NatsJetStreamManagement.java:131)

Code

We have created a Spring Boot Starter library to simplify the use of NATS.

The following code snippet highlights the issue within the dispatcher registration logic. Specifically, when an
exception is caught during the register method, the dispatcher is not started, leaving the consumer unable to fetch
messages:

// Create Dispatcher
void register(RegisterArgs args) {
  try {
    var jsm = connection.jetStreamManagement();
    createStreamIfNotExists(jsm, args);
    createConsumerOrUpdate(jsm, args); // <- 👀 Error occurs here

    var pullOptions = PullSubscribeOptions.bind(stream, durable);
    var subscription = connection.jetStream().subscribe(args.subject(), pullOptions);
    var dispatcher = new NatsSubscriptionDispatcher(subscription, args.handler(), tasksSemaphore, tasksExecutorService);
    dispatchersExecutorService.execute(dispatcher);

    log.info("Subject: {} registered and dispatcher started.", args.subject());
  } catch (Exception e) {
    // ❌😭 Bug: Exception is ignored! As a result, the dispatcher is not started and does not fetch messages!
    log.error("Nats register error", e);
  }
}

private void createStreamIfNotExists(JetStreamManagement jsm, RegisterArgs args)
  throws IOException, JetStreamApiException {
  if (jsm.getStreamNames(args.subject()).isEmpty()) {
    var configuration =
      StreamConfiguration.builder()
        .name(args.stream())
        .retentionPolicy(RetentionPolicy.WorkQueue)
        .storageType(natsProps.getStorageType())
        .discardPolicy(DiscardPolicy.Old)
        .subjects(args.subject())
        .denyDelete(true)
        .build();
    
    jsm.addStream(configuration);
    log.info("Stream with name {} created.", name);
  }
}

private void createConsumerOrUpdate(JetStreamManagement jsm, RegisterArgs args)
  throws IOException, JetStreamApiException {
  var consumer =
    ConsumerConfiguration.builder()
      .deliverPolicy(DeliverPolicy.All)
      .ackPolicy(AckPolicy.Explicit)
      .durable(args.consumer())
      .filterSubject(args.subject())
      .maxDeliver(natsProps.getMaxDeliver())
      .ackWait(natsProps.getAckTimeout())
      .build();

  jsm.addOrUpdateConsumer(args.stream(), consumer); // <- 👀 Error occurs here
}

Shouldn’t connection.jetStreamManagement() or jsm.getStreamNames(subject) throw some kind of connection error?

@scottf Thanks for your help, and apologies for any inconvenience caused!

@scottf
Copy link
Contributor

scottf commented Jan 17, 2025

The jetstream management object is just a functional wrapper - it doesn't actually do any io on creation, but it uses the connection it was given. If register fails, try pausing and retrying in a few seconds. That should give any connection time to reset. Once the [simplified] consumer is running, it is very resilient. If there is a disconnection, it will go into a failure loop where it periodically retries (based on the idleHeartbeat inside the simplified consumer).

getStreamNames throws a timeout, correct? Maybe also something to retry.

Using the client requires a lot of error handling for all the disconnection edge cases. The client does a lot for core subscriptions, some old style subscriptions and all simplification subscriptions, but it does not do everything for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression stale This issue has had no activity in a while
Projects
None yet
Development

No branches or pull requests

5 participants