-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix consumer receive individual acknowledged messages from compacted topic after reconnection #23495
base: master
Are you sure you want to change the base?
Conversation
…rom compacted topic after reconnection
@coderzc PTAL~ |
List<Entry> unAckedEntries = new ArrayList<>(); | ||
long entriesSize = 0; | ||
for (Entry entry : entries) { | ||
entriesSize += entry.getLength(); | ||
if (COMPACTION_CURSOR_NAME.equals(cursor.getName())) { | ||
for (Entry entry : entries) { | ||
entriesSize += entry.getLength(); | ||
} | ||
unAckedEntries = entries; | ||
} else { | ||
for (Entry entry : entries) { | ||
Position position = entry.getPosition(); | ||
if (!cursor.isMessageIndividualDeleted(position)) { | ||
unAckedEntries.add(entry); | ||
entriesSize += entry.getLength(); | ||
} | ||
} | ||
} | ||
cursor.updateReadStats(entries.size(), entriesSize); | ||
cursor.updateReadStats(unAckedEntries.size(), entriesSize); | ||
|
||
Entry lastEntry = entries.get(entries.size() - 1); | ||
cursor.seek(lastEntry.getPosition().getNext(), true); | ||
callback.readEntriesComplete(entries, readEntriesCtx); | ||
callback.readEntriesComplete(unAckedEntries, readEntriesCtx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause these messages to be lost the next time compaction is run, and other consumers will not be able to read these messages. so why use individual ack for Exclusive mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
for the compaction_cursor_name
__compaction
, we don't skip the individual acked position, this will not affect the compactionif (COMPACTION_CURSOR_NAME.equals(cursor.getName())) { for (Entry entry : entries) { entriesSize += entry.getLength(); } unAckedEntries = entries; }
-
The individual ack is not recommended in Exclusive mode? I don't know about this, I always use this before😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the compaction_cursor_name __compaction , we don't skip the individual acked position, this will not affect the compaction
OK, I see
The individual ack is not recommended in Exclusive mode? I don't know about this, I always use this before😂
Yes, for Exclusive mode, suggest using acknowledgeCumulative
acknowledge message. actually, if every message is acknowledged, then acknowledgeCumulative and individual acknowledgment have the same effect, for this cause, I think #21187 has fixed it. But if the acknowledged messages are discontinuous then we can receive acknowledged messages, due to the compaction delete some message, using individual acknowledged will make acknowledged messages are discontinuous
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@summeriiii I think for the compacted topic, we cannot use individual acknowledgment, otherwise the entry will not be deleted because the messages deleted by compact will never be acknowledged. so I suggest using acknowledgeCumulative
acknowledge message for compacted topic.
@@ -893,6 +893,8 @@ default ManagedCursorAttributes getManagedCursorAttributes() { | |||
|
|||
boolean isMessageDeleted(Position position); | |||
|
|||
boolean isMessageIndividualDeleted(Position position); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can using isMessageDeleted
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use isMessageDeleted
before, but the markDeletePosition is not correct in some cases, eg: expire messages(CompactionTest#testCompactionWithTTL), so I add a new method isMessageIndividualDeleted only to check individualDeletedMessages
Fixes #23494
Motivation
After consumed the compacted topic and individual acknowledged messages, if the topic unload and consumer reconnect, we will receive the individual acknowledged messages again.
Like the none-compacted topic, I think that consumer should not receive acknowledge message after reconnection.
Modifications
isMessageIndividualDeleted
to check if this message has been individual acknowledgedCompactedTopicUtils#asyncReadCompactedEntries
, use isMessageIndividualDeleted to filter out and skip unnecessary entryDocumentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: