Replies: 3 comments
-
Hi, I had a similar question here #935 We ended up implementing the pattern in the second snippets while True:
records = consumer.getmany(...)
offsets = {}
for tp, messages in records.items():
offsets[tp] = messages[-1].offset + 1
await some_processing(records)
try:
consumer.commit(offsets)
except (CommitFailedError, IllegalStateError) as err:
logger.warning("Commit failed due to rebalancing, circle back to consume new messages", exc_info=err) |
Beta Was this translation helpful? Give feedback.
-
And around |
Beta Was this translation helpful? Give feedback.
-
Hi all. I have created a |
Beta Was this translation helpful? Give feedback.
-
Hi there, I'm a bit confused about using a ConsumerRebalanceListener to ensure I commit my offsets appropriately when manually managing my commits. Specifically, I'm looking at the warning here: https://aiokafka.readthedocs.io/en/stable/consumer.html#manual-vs-automatic-committing
If I have a batch consumer that matches the example given, how would I implement this?
Would I use an asyncio lock to prevent
on_partitions_revoked
andon_partitions_assigned
from returning until I call the next commit?Beta Was this translation helpful? Give feedback.
All reactions