-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix consumer receive individual acknowledged messages from compacted topic after reconnection #23495
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,9 @@ | |
import static com.google.common.base.Preconditions.checkArgument; | ||
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; | ||
import com.google.common.annotations.Beta; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.CompletableFuture; | ||
import javax.annotation.Nullable; | ||
|
@@ -38,6 +40,8 @@ | |
|
||
public class CompactedTopicUtils { | ||
|
||
private static final String COMPACTION_CURSOR_NAME = "__compaction"; | ||
|
||
@Beta | ||
public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService, | ||
ManagedCursor cursor, int maxEntries, | ||
|
@@ -88,15 +92,27 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact | |
return; | ||
} | ||
|
||
List<Entry> unAckedEntries = new ArrayList<>(); | ||
long entriesSize = 0; | ||
for (Entry entry : entries) { | ||
entriesSize += entry.getLength(); | ||
if (COMPACTION_CURSOR_NAME.equals(cursor.getName())) { | ||
for (Entry entry : entries) { | ||
entriesSize += entry.getLength(); | ||
} | ||
unAckedEntries = entries; | ||
} else { | ||
for (Entry entry : entries) { | ||
Position position = entry.getPosition(); | ||
if (!cursor.isMessageIndividualDeleted(position)) { | ||
unAckedEntries.add(entry); | ||
entriesSize += entry.getLength(); | ||
} | ||
} | ||
} | ||
cursor.updateReadStats(entries.size(), entriesSize); | ||
cursor.updateReadStats(unAckedEntries.size(), entriesSize); | ||
|
||
Entry lastEntry = entries.get(entries.size() - 1); | ||
cursor.seek(lastEntry.getPosition().getNext(), true); | ||
callback.readEntriesComplete(entries, readEntriesCtx); | ||
callback.readEntriesComplete(unAckedEntries, readEntriesCtx); | ||
Comment on lines
+95
to
+115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will cause these messages to be lost the next time compaction is run, and other consumers will not be able to read these messages. so why use individual ack for Exclusive mode? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
OK, I see
Yes, for Exclusive mode, suggest using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @summeriiii I think for the compacted topic, we cannot use individual acknowledgment, otherwise the entry will not be deleted because the messages deleted by compact will never be acknowledged. so I suggest using |
||
}); | ||
}).exceptionally((exception) -> { | ||
exception = FutureUtil.unwrapCompletionException(exception); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can using
isMessageDeleted
insteadThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use
isMessageDeleted
before, but the markDeletePosition is not correct in some cases, eg: expire messages(CompactionTest#testCompactionWithTTL), so I add a new method isMessageIndividualDeleted only to check individualDeletedMessages