[BUG] Cosmos - ChangeFeedProcessor - Records skipped when RUs are exhausted #43701
Open
3 tasks done
Labels
Client
This issue points to a problem in the data-plane of the library.
Cosmos
customer-reported
Issues that are reported by GitHub users external to the Azure organization.
needs-team-attention
Workflow: This issue needs attention from Azure service team or SDK team
question
The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Service Attention
Workflow: This issue is responsible by Azure service team.
Describe the bug
We are experiencing an issue with the Cosmos change feed. When the database has exhausted RUs and a lot of requests are throttled (error: 429), change feed skips processing of some records. According to the documentation, the change feed guarantees “at-least-once” delivery, so this should not happen.
I was able to reproduce the issue in the application running locally in debug mode by simulating throwing of
CosmosException
(with error 429). I have also identified possible cause of the issue.Observed behavior
com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessorImpl
.lease
is not possible due to insufficient RUs (429 error), aCosmosException
is thrown fromDocumentServiceLeaseUpdaterImpl#tryReplaceLease
call.PartitionProcessorImpl
, where it is handled byonErrorResume
as aTRANSIENT_ERROR
.x-ms-retry-after-ms
) and then continues with another round of processing.ContinuationToken
set inPartitionProcessorImpl#options
whenPartitionProcessorImpl
requests a new batch of records withcreateDocumentChangeFeedQuery
method call.options
(CosmosChangeFeedRequestOptions
) are overwritten with the correctContinuationToken
after checkpointing.options
object from the previous processing.ContinuationToken
inoptions
is pointing to the end of the change feed.createDocumentChangeFeedQuery
method seems not to be respectinglimitRequest: 1
, which should probably limit the read to just one page of data. Instead, after the first page of data is read and returned toPartitionProcessorImpl
, it continues reading the rest of the change feed data in the background thread. As a side effect, theContinuationToken
in theoptions
is also updated.ContinuationToken
, which is too far in the future.The strange part for me is why
createDocumentChangeFeedQuery
continues reading data after first page is loaded. And whether it is correct thatChangeFeedFetcher
is updating the same instance ofContinuationToken
as referenced fromCosmosChangeFeedRequestOptions
"owned" byPartitionProcessorImpl
.I am not sure what the expected behavior or implementation is here, whether the ChangeFeed processing task should exit after 429 error (and then start again from the last known
ContinuationToken
stored inlease
container). Or if it should continue processing as it is currently implemented, but with correctContinuationToken
in theoptions
.Exception or Stack Trace
Change feed processing logs from production (with
com.azure.cosmos.implementation.changefeed
on DEBUG level)Logs show one successful processing before the issue appears, the processing when the issue (429 error) occurs, and one successful processing after the issue. It is visible in the processing after the issue that the
ContinuationToken
is increased by15 015
, practically skipping records to the end of the change feed.Processing logs
Exception thrown when Checkpoint failed
Logs from local debugging showing that the
createDocumentChangeFeedQuery
method reads all of the change feed data (withcom.azure.cosmos.implementation.query
on DEBUG level)Processing logs - first processing
Processing logs - seconds processing
To Reproduce
Reproducer project can be used to simulate the issue:
CosmosConfig
- ConfigureENDPOINT
andKEY
.EventCreator#main
- Use this method to create sample records in the feed container.EventProcessor#main
- Use this method to start the ChangeFeedProcessor (in debug mode).Steps to reproduce the behavior:
com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessorImpl#run
return this.documentClient.createDocumentChangeFeedQuery
if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException());
com.azure.cosmos.implementation.changefeed.common.ChangeFeedContextClientImpl#replaceItem
return cosmosContainer.replaceItem(document, itemId, partitionKey, options)
return this.documentClient.createDocumentChangeFeedQuery
- checkthis.options.continuationState.startFromSettings
- it points to correct position as saved inlease
container.if (cancellationToken.isCancellationRequested())...
, checkthis.options.continuationState.continuation
- it points to the end of the change feed (LSN).ChangeFeedContextClientImpl#replaceItem
(called fromDocumentServiceLeaseUpdaterImpl#tryReplaceLease
), simulate throwing an exception (as described here).new com.azure.cosmos.CosmosException(429, new com.azure.cosmos.implementation.CosmosError(), java.util.Map.of("x-ms-retry-after-ms", "1000"))
return this.documentClient.createDocumentChangeFeedQuery
. Then checkthis.options.continuationState.continuation
- it points to the end of the change feed (LSN) from the previous processing round.if (cancellationToken.isCancellationRequested())...
, checkdocumentFeedResponse
- it contains an emptyresult
, and theetag
inheaders
points to the end of the change feed.lease
is checkpointed (saved to the DB), and records were skipped.Expected behavior
I would expect ChangeFeedProcessor to continue processing the next unprocessed records (or the same batch of records for which checkpoint failed) instead of skipping records.
Setup
Additional context
epkversion
version ofPartitionProcessorImpl
.Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
The text was updated successfully, but these errors were encountered: