-
Notifications
You must be signed in to change notification settings - Fork 407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: handle error in kafka consumer loop to prevent premature return #2100
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Chillax-0v0
approved these changes
Feb 19, 2025
zhouxinyu
reviewed
Feb 19, 2025
zhouxinyu
approved these changes
Feb 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM~
messixukejia
approved these changes
Feb 21, 2025
shalousun
approved these changes
Feb 21, 2025
I've implemented the retry logic with a basic delay approach. Could you help review it? Regarding adding retry parameters, would you recommend handling this in the current PR or creating a new PR? |
yyuuttaaoo
approved these changes
Feb 24, 2025
shalousun
approved these changes
Feb 24, 2025
messixukejia
approved these changes
Feb 24, 2025
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
When there are short-term network jitters or brief service outages in the Kafka cluster, it causes the Logtail program to fail in consumption and exit the consumption process. As a result, even after Kafka resumes normal operation, Logtail cannot continue consuming messages, leading to constant message accumulation. The only solution is to restart Logtail to restore normal operations.
Root Cause:
After reviewing portions of the code for the open-source ilogtail component, we discovered that the error handling logic in the Kafka consumption part of LoongCollector (input_kafka) does not meet the requirements of the Sarama client. Specifically, when the k.consumerGroupClient.Consume() call returns an error (e.g., due to network instability exhausting Sarama's internal retries), the current code simply exits the goroutine by returning. This approach lacks upper-layer fault tolerance and retry mechanisms. Our tests have confirmed this issue.
According to Sarama Issue #2381 , when network jitters or short-term network unavailability occur, and multiple retries still fail, the current ConsumeGroupSession will exit and return an error. It requires the upper layer to handle the exception and implement recovery and retry operations.
I believe that in such logtail scenarios, LoongCollector should continuously retry until Kafka recovers, instead of exiting directly.