Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix consumer receive individual acknowledged messages from compacted topic after reconnection #23495

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

summeriiii
Copy link
Contributor

Fixes #23494

Motivation

After consumed the compacted topic and individual acknowledged messages, if the topic unload and consumer reconnect, we will receive the individual acknowledged messages again.
Like the none-compacted topic, I think that consumer should not receive acknowledge message after reconnection.

Modifications

  • add isMessageIndividualDeleted to check if this message has been individual acknowledged
  • In CompactedTopicUtils#asyncReadCompactedEntries, use isMessageIndividualDeleted to filter out and skip unnecessary entry

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Oct 21, 2024
@summeriiii
Copy link
Contributor Author

@coderzc PTAL~

Comment on lines +95 to +115
List<Entry> unAckedEntries = new ArrayList<>();
long entriesSize = 0;
for (Entry entry : entries) {
entriesSize += entry.getLength();
if (COMPACTION_CURSOR_NAME.equals(cursor.getName())) {
for (Entry entry : entries) {
entriesSize += entry.getLength();
}
unAckedEntries = entries;
} else {
for (Entry entry : entries) {
Position position = entry.getPosition();
if (!cursor.isMessageIndividualDeleted(position)) {
unAckedEntries.add(entry);
entriesSize += entry.getLength();
}
}
}
cursor.updateReadStats(entries.size(), entriesSize);
cursor.updateReadStats(unAckedEntries.size(), entriesSize);

Entry lastEntry = entries.get(entries.size() - 1);
cursor.seek(lastEntry.getPosition().getNext(), true);
callback.readEntriesComplete(entries, readEntriesCtx);
callback.readEntriesComplete(unAckedEntries, readEntriesCtx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause these messages to be lost the next time compaction is run, and other consumers will not be able to read these messages. so why use individual ack for Exclusive mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. for the compaction_cursor_name __compaction , we don't skip the individual acked position, this will not affect the compaction

    if (COMPACTION_CURSOR_NAME.equals(cursor.getName())) {
        for (Entry entry : entries) {
            entriesSize += entry.getLength();
        }
        unAckedEntries = entries;
    }
  2. The individual ack is not recommended in Exclusive mode? I don't know about this, I always use this before😂

Copy link
Member

@coderzc coderzc Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the compaction_cursor_name __compaction , we don't skip the individual acked position, this will not affect the compaction

OK, I see

The individual ack is not recommended in Exclusive mode? I don't know about this, I always use this before😂

Yes, for Exclusive mode, suggest using acknowledgeCumulative acknowledge message. actually, if every message is acknowledged, then acknowledgeCumulative and individual acknowledgment have the same effect, for this cause, I think #21187 has fixed it. But if the acknowledged messages are discontinuous then we can receive acknowledged messages, due to the compaction delete some message, using individual acknowledged will make acknowledged messages are discontinuous

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@summeriiii I think for the compacted topic, we cannot use individual acknowledgment, otherwise the entry will not be deleted because the messages deleted by compact will never be acknowledged. so I suggest using acknowledgeCumulative acknowledge message for compacted topic.

@@ -893,6 +893,8 @@ default ManagedCursorAttributes getManagedCursorAttributes() {

boolean isMessageDeleted(Position position);

boolean isMessageIndividualDeleted(Position position);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can using isMessageDeleted instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use isMessageDeleted before, but the markDeletePosition is not correct in some cases, eg: expire messages(CompactionTest#testCompactionWithTTL), so I add a new method isMessageIndividualDeleted only to check individualDeletedMessages

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Consumer receive acknowledged messages from compacted topic after reconnection
2 participants