Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events skipped in group rebalancing #3703

Open
ejba opened this issue Jan 10, 2025 · 13 comments
Open

Events skipped in group rebalancing #3703

ejba opened this issue Jan 10, 2025 · 13 comments

Comments

@ejba
Copy link

ejba commented Jan 10, 2025

In what version(s) of Spring for Apache Kafka are you seeing this issue?

Between 3.1.7 and 3.2.5

Describe the bug

The topic has 12 partitions. Consumers are automatically scaled as soon as event lag is detected (typically 1 to 12 replicas). The group rebalancing currently takes a little time (>10 seconds). The events fetched before the group rebalance are processed with success but the offset commitment fails as the current partition was revoked due to group rebalance (non-fatal failure).

The problem happens after the partition assignment is complete. The partition offset has advanced to the uncommitted offset.

Example:
Consumer A assigned to partition P
Consumer A fetches offset 2148209
Consumer A reads 3 events
11 consumers joins the group
Consumer A tries to commit 2148212 (2148209 + 3)
Commit 2148212 fails because partition P was revoked
Consumer A is assigned to partition P again
Consumer A fetches offset 2148260
48 events were skipped ( 2148260 - 2148212 = 48)

To Reproduce
AckMode = Batch
Partition Assignment Strategy = CooperativeSticky

A consumer continually reads events from the partition.
11 new consumers suddenly join the group.
The consumer is assigned to the previous partition.

Expected behavior

Once the group rebalance completes, it is expected the partition assignment fetch always a committed offset to avoid skipped events.

Sample
I wasn't able to create a sample project yet, but I gather the debug logs that explains this issue in detail.

logs.txt

Thanks in advance!

@artembilan
Copy link
Member

There is a Kafka property group.initial.rebalance.delay.ms: https://stackoverflow.com/questions/56561378/how-to-introduce-delay-in-rebalancing-in-case-of-kafka-consumer-group.
You can try to play with that.
Without some sample to reproduce hard to judge what is wrong on Spring side.
Thanks

@ejba
Copy link
Author

ejba commented Jan 10, 2025

Hi @artembilan,
Thanks for your quick answer.

To make sure I understand your answer correctly, the suggestion will minimize the frequency of consecutive rebalances. Is this correct?

@artembilan
Copy link
Member

Right. See more info in the official Apache Kafka docs: https://kafka.apache.org/documentation/

A new config, group.initial.rebalance.delay.ms, was introduced. This config specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. The default value for this is 3 seconds. During development and testing it might be desirable to set this to 0 in order to not delay test execution time.

@ejba
Copy link
Author

ejba commented Jan 10, 2025

Ok, unfortunately we had a case today where a new consumer joined the group (so only a rebalance occurred) and the problem occurred. Therefore, I believe the property will not be useful in this case, sadly.

I hope you don't mind answering a few questions: Is it true that poll() also sends the next fetch request? What happens when the fetch response is received? Can this response update cached subscription data? I understand this may not be the project's responsibility, I just want to make sure of it and get more information at the same time.

Thanks for your help!

@artembilan
Copy link
Member

I don't know those details of Apache Kafka consumer implementation.
That is out of Spring scope.

We will look into this together with @sobychacko when we have a chance.

Meanwhile some project to reproduce would be great.

I don't deny that we may do something in Spring which could be fixed.

@ejba
Copy link
Author

ejba commented Jan 10, 2025

I will do my best to find a way to reproduce the problem in a project, thank you!

@sobychacko
Copy link
Contributor

sobychacko commented Jan 10, 2025

@ejba You might want to double-check these in the application, but here are some explanations for your questions. The poll() implementation in Kafka consumer is pretty clever - in addition to giving you the current batch of messages, it is preparing for what's next by asynchronously sending fetch requests for the next batch of data. This maintains continuous throughput and low latency without waiting for the next poll() invocation.

When the fetch response is received, the Kafka consumer stores it in an internal buffer. On the next poll(), the data is retrieved from this internal buffer, working like an internal queue. If the queue is empty, the poll() invocation may be blocked until it times out.

And finally, regarding subscription data and metadata - the metadata handling in Kafka consumer maintains its own separate cycle from message fetching. The consumer keeps cluster information cached, but fetch responses don't directly update this cache. Instead, when the broker detects stale metadata, it returns specific errors (like NOT_LEADER_FOR_PARTITION) that prompt the consumer to refresh its information. This refresh happens through a separate metadata request, keeping the metadata maintenance process distinct from the regular data fetching cycle.

@ejba
Copy link
Author

ejba commented Jan 10, 2025

@sobychacko Thank you so much for answering back to the questions.

So I have no clue what could be causing the offset being advanced after commit failure exception due to rebalance in progress.

For example, in the attached logs the reader can see the consumer attempt to commit the offset 2148212 but without success and fetch the offset 2148260 after the group rebalance is completed and the old partition re-assigned to the consumer, losing then 48 records.

734428414168	{"@timestamp":"2024-12-17T09:40:14.16763284Z","@version":"1","message":"Commit list: {topic-event.v1-5=OffsetAndMetadata{offset=2148212, leaderEpoch=null, metadata=''}}","logger_name":"org.springframework.kafka.listener.KafkaMessageListenerContainer","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}

1734428414181	{"@timestamp":"2024-12-17T09:40:14.172599074Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Failing OffsetCommit request since the consumer is not part of an active group","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"noBeanNameSet-C-1","level":"INFO","level_value":20000}
...
1734428414157	{"@timestamp":"2024-12-17T09:40:14.157240729Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Added READ_UNCOMMITTED fetch request for partition topic-event.v1-5 at position FetchPosition{offset=2148260, offsetEpoch=Optional[212], currentLeader=LeaderAndEpoch{leader=Optional[broker:20001 (id: 0 rack: rack-id-2)], epoch=212}} to node broker:20003 (id: 2 rack: rack-id)","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractFetch","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}
...
1734428414251	{"@timestamp":"2024-12-17T09:40:14.250666775Z","@version":"1","message":"Consumer exception","logger_name":"org.springframework.kafka.listener.KafkaMessageListenerContainer","thread_name":"noBeanNameSet-C-1","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available\n\tat org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1925)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1348)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1275)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1111)\n\tat org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.commitSync(LegacyKafkaConsumer.java:718)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1057)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3227)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3222)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkRebalanceCommits(KafkaMessageListenerContainer.java:1669)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1617)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1405)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296)\n\t... 2 common frames omitted\n"}
...
1734428414466	{"@timestamp":"2024-12-17T09:40:14.46649834Z","@version":"1","message":"Received: 3 records","logger_name":"org.springframework.kafka.listener.KafkaMessageListenerContainer","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}
...
1734428414616	{"@timestamp":"2024-12-17T09:40:14.615930573Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Fetch READ_UNCOMMITTED at offset 2148263 for partition topic-event.v1-5 returned fetch data PartitionData(partitionIndex=5, errorCode=0, highWatermark=2148265, lastStableOffset=2148265, logStartOffset=2085131, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=1470, buffer=java.nio.HeapByteBuffer[pos=0 lim=1470 cap=1473]))","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractFetch","thread_name":"kafka-coordinator-heartbeat-thread | service-topic-event","level":"DEBUG","level_value":10000}

I will try to create a project to reproduce this problem. Thanks!

@artembilan
Copy link
Member

I see in your logs:

"thread_name":"noBeanNameSet-C-1"

That means that this KafkaMessageListenerContainer is created manually and not managed by Spring ApplicationContext.
Is that expected in your logic?
Usually we don't recommend to use this kind of components outside of Spring dependency injection.

@sobychacko
Copy link
Contributor

Aside from what @artembilan asked ^^, I have a few questions and suggestions. Who is committing the offset 2148260 in your example? With that said, have you tried switching from batch ack mode to manual? I am not saying that this is the issue, but that way, you may be able to have more control over the commit process. Also, since the issues occur during a rebalance, you can also consider implementing a custom ConsumerRebalanceListener and try to commit there. These are some pointers that I could think of to help in your debugging, but ultimately, I feel like there is something asynchronous going on on the application side. A consistently reproducible sample would certainly help.

@ejba
Copy link
Author

ejba commented Jan 10, 2025

That means that this KafkaMessageListenerContainer is created manually and not managed by Spring ApplicationContext.
Is that expected in your logic?

The consumers are created via a DefaultKafkaConsumerFactory instance and launched dynamically in the presence of custom consumer implementations. I recently came across Creating Containers Dynamically page. I was planning to reimplementing container creation based on the page's recommendations in the coming weeks, but if you believe this could be the root cause, I will prioritise the work.

With that said, have you tried switching from batch ack mode to manual?

Originally, the acknowledgment mode was manual_immediate and the acknowledgment invoked at the end of batch processing. I switched to batch as soon as the problem started to appear among consumers. The problem still persisted.

Also, since the issues occur during a rebalance, you can also consider implementing a custom ConsumerRebalanceListener and try to commit there.

Yes, the team was discussing this today with the intention of storing the offsets in external storage (e.g. database).

@ejba
Copy link
Author

ejba commented Jan 10, 2025

Who is committing the offset 2148260 in your example?

This application contains only one consumer (single thread). I don't see the typical "Commit list:" log to show the intent to commit 2148260. Therefore I believe it's something kafka-client does internally (not Spring Consumer).

1734428414146	{"@timestamp":"2024-12-17T09:40:14.14477201Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=16, clientId=consumer-event-1, correlationId=15, headerVersion=2) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='topic-event.v1', topicId=Q-eo0pGaTUabPfuBjV6Kfg, partitions=[FetchPartition(partition=10, currentLeaderEpoch=189, fetchOffset=2146580, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='rack-id')","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}
1734428414157	{"@timestamp":"2024-12-17T09:40:14.157240729Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Added READ_UNCOMMITTED fetch request for partition topic-event.v1-5 at position FetchPosition{offset=2148260, offsetEpoch=Optional[212], currentLeader=LeaderAndEpoch{leader=Optional[broker:20001 (id: 0 rack: rack-id-2)], epoch=212}} to node broker:20003 (id: 2 rack: rack-id)","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractFetch","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}
1734428414157	{"@timestamp":"2024-12-17T09:40:14.157820971Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Built incremental fetch (sessionId=1109229946, epoch=3) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1 partition(s)","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}
1734428414158	{"@timestamp":"2024-12-17T09:40:14.158072045Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-event.v1-5), toForget=(), toReplace=(), implied=(), canUseTopicIds=True) to broker broker:20003 (id: 2 rack: rack-id)","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractFetch","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}
1734428414158	{"@timestamp":"2024-12-17T09:40:14.158219399Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Adding pending request for node broker:20003 (id: 2 rack: rack-id)","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractFetch","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}
1734428414158	{"@timestamp":"2024-12-17T09:40:14.158605929Z","@version":"1","message":"[Consumer clientId=consumer-event-1, groupId=consumer-event] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=16, clientId=consumer-event-1, correlationId=19, headerVersion=2) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1109229946, sessionEpoch=3, topics=[FetchTopic(topic='topic-event.v1', topicId=Q-eo0pGaTUabPfuBjV6Kfg, partitions=[FetchPartition(partition=5, currentLeaderEpoch=212, fetchOffset=2148260, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='us-east-2c')","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"noBeanNameSet-C-1","level":"DEBUG","level_value":10000}

@sobychacko
Copy link
Contributor

Thanks for the updates. In your troubleshooting and analysis, let us know if you find any issues within the framework. We will be happy to take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants