Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hasMessageAvailable might return true after seeking to latest #409

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Mar 4, 2024

Motivation

See apache/pulsar-client-python#199

There is a race condition when hasMessageAvailable is called after seek if the start message ID of Reader is earliest.

In ConsumerImpl::hasMessageAvailableAsync, if the connection is not established at the moment, lastDequedMessageId_ will be earliest because no message is received. Since lastMessageIdInBroker_ is also earliest, getLastMessageIdAsync will be called and then it comes at

callback(result, (result == ResultOk) && hasMoreMessages(response.getLastMessageId(), messageId));

However, before getLastMessageIdAsync is called, messageId was earliest because lastDequedMessageId_ and startMessageId_ were both earliest. However, when the callback is called, the startMessageId_ has already been updated to latest in connectionOpened, so we should compare to latest.

Modifications

In the callback of getLastMessageIdAsync, retrieve the latest value of startMessageId_ to compare rather then reusing the old value.

Refactor the seek flow to reset the seek states and trigger the callback after updating the startMessageId_.

ReaderTest.testHasMessageAvailableAfterSeekToEnd is added to cover the changes.

@BewareMyPower BewareMyPower marked this pull request as draft March 4, 2024 15:08
@BewareMyPower BewareMyPower self-assigned this Mar 4, 2024
@BewareMyPower BewareMyPower added this to the 3.5.0 milestone Mar 4, 2024
@BewareMyPower BewareMyPower changed the title Fix race condition of hasMessageAvailable by reading latest startMessageId each time (WIP) Fix race condition of hasMessageAvailable by reading latest startMessageId each time Mar 4, 2024
@BewareMyPower BewareMyPower force-pushed the bewaremypower/has-msg-available-after-seek-latest branch 2 times, most recently from f59e148 to 886ff1c Compare March 5, 2024 04:37
After a seek operation is done, the `startMessageId` will be updated
until the reconnection due to the seek is done in `connectionOpened`.
So before it's updated, `hasMessageAvailable` could compare with an
outdated `startMessageId` and return a wrong value.

### Modifications

Replace `duringSeek` with a `SeekStatus` field:
- `NOT_STARTED`: initial, or a seek operation is done. `seek` could only succeed in this status.
- `IN_PROGRESS`: A seek operation has started but the client does not receive the response from broker.
- `COMPLETED`: The client has received the seek response but the seek future is not done.

After the status becomes `COMPLETED`, if the connection is not ready,
next time the connection is established, the status will change from
`COMPLETED` to `NOT_STARTED` and then seek future will be completed
in the internal executor.

Add `testHasMessageAvailableAfterSeekToEnd` and `testSeekInProgress`.
@BewareMyPower BewareMyPower changed the title (WIP) Fix race condition of hasMessageAvailable by reading latest startMessageId each time Fix Reader.hasMessageAvailable might return true after seeking to latest Mar 5, 2024
@BewareMyPower BewareMyPower force-pushed the bewaremypower/has-msg-available-after-seek-latest branch from 886ff1c to 0e91bbd Compare March 5, 2024 10:49
@BewareMyPower BewareMyPower changed the title Fix Reader.hasMessageAvailable might return true after seeking to latest Fix hasMessageAvailable might return true after seeking to latest Mar 5, 2024
@BewareMyPower BewareMyPower marked this pull request as ready for review March 5, 2024 10:50
@BewareMyPower BewareMyPower marked this pull request as draft March 5, 2024 14:16
@BewareMyPower BewareMyPower marked this pull request as ready for review March 5, 2024 17:26
@BewareMyPower
Copy link
Contributor Author

@merlimat @shibd @RobertIndie @Demogorgon314 Could you take a look?

tests/ReaderTest.cc Show resolved Hide resolved
tests/ReaderTest.cc Outdated Show resolved Hide resolved
@@ -236,16 +236,15 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
// sending the subscribe request.
cnx->registerConsumer(consumerId_, get_shared_this_ptr());

if (duringSeek_) {
if (duringSeek()) {
ackGroupingTrackerPtr_->flushAndClean();
}

Lock lockForMessageId(mutexForMessageId_);
// Update startMessageId so that we can discard messages after delivery restarts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this comment inside the clearReceiveQueue.

@BewareMyPower
Copy link
Contributor Author

@RobertIndie @shibd Comments are addressed, PTAL again.

@BewareMyPower BewareMyPower merged commit ee1d7b9 into apache:main Mar 11, 2024
14 of 15 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/has-msg-available-after-seek-latest branch March 11, 2024 11:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants