Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Cosmos - ChangeFeedProcessor - Records skipped when RUs are exhausted #43701

Open
3 tasks done
egroSK opened this issue Jan 3, 2025 · 4 comments
Open
3 tasks done
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team.

Comments

@egroSK
Copy link

egroSK commented Jan 3, 2025

Describe the bug

We are experiencing an issue with the Cosmos change feed. When the database has exhausted RUs and a lot of requests are throttled (error: 429), change feed skips processing of some records. According to the documentation, the change feed guarantees “at-least-once” delivery, so this should not happen.

I was able to reproduce the issue in the application running locally in debug mode by simulating throwing of CosmosException (with error 429). I have also identified possible cause of the issue.

Observed behavior

  • The main class handling change feed processing is: com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessorImpl.
  • After batch of records from change feed is processed and checkpoint of lease is not possible due to insufficient RUs (429 error), a CosmosException is thrown from DocumentServiceLeaseUpdaterImpl#tryReplaceLease call.
    • This exception bubbles up back to PartitionProcessorImpl, where it is handled by onErrorResume as a TRANSIENT_ERROR.
    • It waits for a short period (x-ms-retry-after-ms) and then continues with another round of processing.
  • The problem is that it does not continue with the next records in the change feed (or the same batch of records). Instead, it skips to the end of the change feed.
    • It is caused by an incorrect ContinuationToken set in PartitionProcessorImpl#options when PartitionProcessorImpl requests a new batch of records with createDocumentChangeFeedQuery method call.
    • In a happy scenario, options (CosmosChangeFeedRequestOptions) are overwritten with the correct ContinuationToken after checkpointing.
    • But in this error case, it uses options object from the previous processing.
      • As a cause of another possible bug, ContinuationToken in options is pointing to the end of the change feed.
      • Something inside the createDocumentChangeFeedQuery method seems not to be respecting limitRequest: 1, which should probably limit the read to just one page of data. Instead, after the first page of data is read and returned to PartitionProcessorImpl, it continues reading the rest of the change feed data in the background thread. As a side effect, the ContinuationToken in the options is also updated.
  • After successfully processing the next batch of data (or possibly no data if no new data is available), it checkpoints the new ContinuationToken, which is too far in the future.

The strange part for me is why createDocumentChangeFeedQuery continues reading data after first page is loaded. And whether it is correct that ChangeFeedFetcher is updating the same instance of ContinuationToken as referenced from CosmosChangeFeedRequestOptions "owned" by PartitionProcessorImpl.

I am not sure what the expected behavior or implementation is here, whether the ChangeFeed processing task should exit after 429 error (and then start again from the last known ContinuationToken stored in lease container). Or if it should continue processing as it is currently implemented, but with correct ContinuationToken in the options.

Exception or Stack Trace

Change feed processing logs from production (with com.azure.cosmos.implementation.changefeed on DEBUG level)

Logs show one successful processing before the issue appears, the processing when the issue (429 error) occurs, and one successful processing after the issue. It is visible in the processing after the issue that the ContinuationToken is increased by 15 015, practically skipping records to the end of the change feed.

Processing logs
==Processing before the issue==

16.12.2024 17:31:32.0238457 PartitionProcessorImpl      [boundedElastic-43] Partition 5: processing 64 feeds with owner svc--0000010-84c478f9b9-qjwj6.
16.12.2024 17:31:32.0238457 DefaultObserver             [boundedElastic-43] Start processing from thread 192
16.12.2024 17:31:45.9146013 LeaseRenewerImpl            [boundedElastic-37] Partition 5: renewed lease with result true
16.12.2024 17:32:08.7723609 LeaseRenewerImpl            [boundedElastic-37] Partition 5: renewed lease with result true
16.12.2024 17:32:14.9638382 DefaultObserver             [boundedElastic-43] Done processing from thread 192
16.12.2024 17:32:14.9638382 PartitionCheckpointerImpl   [boundedElastic-43] Checkpoint: partition 5, new continuation "61503372"

==Processing with the issue (429 error)==

16.12.2024 17:32:14.9638382 PartitionProcessorImpl      [boundedElastic-37] Partition 5: processing 64 feeds with owner svc--0000010-84c478f9b9-qjwj6.
16.12.2024 17:32:14.9638382 DefaultObserver             [boundedElastic-37] Start processing from thread 172
16.12.2024 17:32:31.7882184 LeaseRenewerImpl            [boundedElastic-44] Partition 5: renewed lease with result true
16.12.2024 17:32:47.6390288 LeaseRenewerImpl            [boundedElastic-45] Partition 5: renewed lease with result true
16.12.2024 17:33:06.9327005 LeaseRenewerImpl            [boundedElastic-43] Partition 5: renewed lease with result true
16.12.2024 17:33:15.9546289 DefaultObserver             [boundedElastic-37] Done processing from thread 172

16.12.2024 17:33:25.0242651 ResourceThrottleRetryPolicy [cosmos-rntbd-epoll-2-1] Operation will NOT be retried. Current attempt 9                                   // RequestRateTooLargeException
16.12.2024 17:33:25.0242651 LeaseStoreManagerImpl       [boundedElastic-37] Partition 5 lease with token '"ff059d49-0000-0d00-0000-6760560e0000"' failed to checkpoint for owner 'svc--0000010-84c478f9b9-qjwj6' with continuation token '"61503372"'
16.12.2024 17:33:25.0242651 AutoCheckpointer            [boundedElastic-37] Checkpoint failed; this worker will be killed                                           // RequestRateTooLargeException
16.12.2024 17:33:25.0242651 PartitionProcessorImpl      [boundedElastic-37] Exception was thrown from thread 172                                                    // RequestRateTooLargeException
16.12.2024 17:33:25.0242651 PartitionProcessorImpl      [boundedElastic-37] CosmosException: Partition 5 from thread 172 with owner svc--0000010-84c478f9b9-qjwj6   // RequestRateTooLargeException

==Processing after the issue==

16.12.2024 17:33:25.9511190 PartitionProcessorImpl      [boundedElastic-41] Partition 5: processing 64 feeds with owner svc--0000010-84c478f9b9-qjwj6.
16.12.2024 17:33:25.9511190 DefaultObserver             [boundedElastic-41] Start processing from thread 179
16.12.2024 17:33:25.9511190 LeaseRenewerImpl            [boundedElastic-46] Partition 5: renewed lease with result true
16.12.2024 17:33:45.0589867 LeaseRenewerImpl            [boundedElastic-43] Partition 5: renewed lease with result true
16.12.2024 17:34:08.3759440 LeaseRenewerImpl            [boundedElastic-47] Partition 5: renewed lease with result true
16.12.2024 17:34:26.0445986 LeaseRenewerImpl            [boundedElastic-25] Partition 5: renewed lease with result true
16.12.2024 17:34:39.9009106 DefaultObserver             [boundedElastic-41] Done processing from thread 179
16.12.2024 17:34:39.9009106 PartitionCheckpointerImpl   [boundedElastic-41] Checkpoint: partition 5, new continuation "61518387"    // 15 015 difference in ContinuationToken!
Exception thrown when Checkpoint failed
RequestRateTooLargeException: {
    "innerErrorMessage": "[\"Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later. Learn more: http://aka.ms/cosmosdb-error-429\"]",
    "cosmosDiagnostics": {
        "userAgent": "azsdk-java-cosmos/4.65.0 Linux/5.15.164.1-1.cm2 JRE/21.0.5",
        "activityId": "ad1ff8ed-ab4b-4dbd-9ef4-c30f9a26e3cd",
        "requestLatencyInMs": 9494,
        "requestStartTimeUTC": "2024-12-16T16:33:15.101835633Z",
        "requestEndTimeUTC": "2024-12-16T16:33:24.596701301Z",
        "responseStatisticsList": [
            {
                "storeResult": {
                    "storePhysicalAddress": "rntbd://<DELETED>/",
                    "lsn": 23446153,
                    "quorumAckedLSN": 23446153,
                    "currentReplicaSetSize": 3,
                    "globalCommittedLsn": 23446153,
                    "partitionKeyRangeId": null,
                    "isValid": true,
                    "statusCode": 429,
                    "subStatusCode": 3200,
                    "isGone": false,
                    "isNotFound": false,
                    "isInvalidPartition": false,
                    "isThroughputControlRequestRateTooLarge": false,
                    "requestCharge": 0.38,
                    "itemLSN": -1,
                    "sessionToken": null,
                    "backendLatencyInMs": null,
                    "retryAfterInMs": 500.0,
                    "exceptionMessage": "[\"Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later. Learn more: http://aka.ms/cosmosdb-error-429\"]",
                    "exceptionResponseHeaders": "{x-ms-current-replica-set-size=4, lsn=23446153, x-ms-request-charge=0.38, x-ms-schemaversion=1.18, x-ms-transport-request-id=337114, x-ms-number-of-read-regions=0, x-ms-current-write-quorum=3, x-ms-cosmos-quorum-acked-llsn=23446153, x-ms-quorum-acked-lsn=23446153, x-ms-activity-id=2a7d8144-42b5-4ae7-8b37-298c8d3b7657, x-ms-xp-role=1, x-ms-global-Committed-lsn=23446153, x-ms-retry-after-ms=500, x-ms-cosmos-llsn=23446153, x-ms-serviceversion= version=2.14.0.0, x-ms-substatus=3200}",
                    "replicaStatusList": {
                        "Ignoring": [
                            "18974:S:Connected",
                            "19027:S:Connected",
                            "19072:S:Connected"
                        ],
                        "Attempting": [
                            "19230:P:Connected"
                        ]
                    },
                    "transportRequestTimeline": [
                        {
                            "eventName": "created",
                            "startTimeUTC": "2024-12-16T16:33:15.101907868Z",
                            "durationInMilliSecs": 0.001537
                        },
                        {
                            "eventName": "queued",
                            "startTimeUTC": "2024-12-16T16:33:15.101909405Z",
                            "durationInMilliSecs": 4.8e-05
                        },
                        {
                            "eventName": "channelAcquisitionStarted",
                            "startTimeUTC": "2024-12-16T16:33:15.101909453Z",
                            "durationInMilliSecs": 0.112946
                        },
                        {
                            "eventName": "pipelined",
                            "startTimeUTC": "2024-12-16T16:33:15.102022399Z",
                            "durationInMilliSecs": 0.064768
                        },
                        {
                            "eventName": "transitTime",
                            "startTimeUTC": "2024-12-16T16:33:15.102087167Z",
                            "durationInMilliSecs": 0.641727
                        },
                        {
                            "eventName": "decodeTime",
                            "startTimeUTC": "2024-12-16T16:33:15.102728894Z",
                            "durationInMilliSecs": 0.005497
                        },
                        {
                            "eventName": "received",
                            "startTimeUTC": "2024-12-16T16:33:15.102734391Z",
                            "durationInMilliSecs": 0.042665
                        },
                        {
                            "eventName": "completed",
                            "startTimeUTC": "2024-12-16T16:33:15.102777056Z",
                            "durationInMilliSecs": 0.000505
                        }
                    ],
                    "rntbdRequestLengthInBytes": 890,
                    "rntbdResponseLengthInBytes": 370,
                    "requestPayloadLengthInBytes": 276,
                    "responsePayloadLengthInBytes": 370,
                    "channelStatistics": {
                        "channelId": "6a0bb634",
                        "channelTaskQueueSize": 1,
                        "pendingRequestsCount": 0,
                        "lastReadTime": "2024-12-16T16:33:15.096850780Z",
                        "waitForConnectionInit": false
                    },
                    "serviceEndpointStatistics": {
                        "availableChannels": 5,
                        "acquiredChannels": 0,
                        "executorTaskQueueSize": 0,
                        "inflightRequests": 1,
                        "lastSuccessfulRequestTime": "2024-12-16T16:33:15.097Z",
                        "lastRequestTime": "2024-12-16T16:33:15.097Z",
                        "createdTime": "2024-12-16T14:45:46.690702580Z",
                        "isClosed": false,
                        "cerMetrics": {}
                    }
                },
                "requestResponseTimeUTC": "2024-12-16T16:33:15.102788695Z",
                "requestStartTimeUTC": "2024-12-16T16:33:15.101907868Z",
                "requestResourceType": "Document",
                "requestOperationType": "Replace",
                "requestSessionToken": null,
                "e2ePolicyCfg": null,
                "excludedRegions": null,
                "sessionTokenEvaluationResults": [],
                "locationToLocationSpecificHealthContext": {
                    "v": null
                }
            },
            {
                "storeResult": {
                    "storePhysicalAddress": "rntbd://<DELETED><EXCEPTION_MESSAGE_CUT_HERE>"
                }
            }
        ]
    }
}
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.messageReceived(RntbdRequestManager.java:1131)
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.channelRead(RntbdRequestManager.java:226)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1503)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1366)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1415)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)

Logs from local debugging showing that the createDocumentChangeFeedQuery method reads all of the change feed data (with com.azure.cosmos.implementation.query on DEBUG level)

Processing logs - first processing
03.01.2025 13:30:06:738 [cosmos-rntbd-nio-2-16] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3369\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:06:813 [cosmos-rntbd-nio-2-16] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3433\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:127 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3497\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:188 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3561\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:250 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3625\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:343 [cosmos-rntbd-nio-2-13] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3689\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:408 [cosmos-rntbd-nio-2-13] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3753\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:476 [cosmos-rntbd-nio-2-13] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3817\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:561 [cosmos-rntbd-nio-2-13] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3881\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:629 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3905\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:07:695 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3905\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = false, Context: n/a
03.01.2025 13:30:07:695 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Paginator - No more results, Context: n/a

03.01.2025 13:30:22:121 [boundedElastic-7] INFO com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessorImpl - Partition 0: processing 64 feeds with owner local.
03.01.2025 13:30:22:121 [boundedElastic-7] INFO com.azure.cosmos.implementation.changefeed.common.DefaultObserver - Start processing from thread 80
03.01.2025 13:30:22:121 [boundedElastic-7] INFO com.reproducer.EventProcessor - RECEIVED 64 RECORDS
03.01.2025 13:30:22:121 [boundedElastic-7] INFO com.azure.cosmos.implementation.changefeed.common.DefaultObserver - Done processing from thread 80
03.01.2025 13:30:27:677 [boundedElastic-7] INFO com.azure.cosmos.implementation.changefeed.pkversion.PartitionCheckpointerImpl - Checkpoint: partition 0, new continuation "3369"
Processing logs - seconds processing
03.01.2025 13:30:40:292 [cosmos-rntbd-nio-2-13] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3433\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:376 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3497\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:442 [cosmos-rntbd-nio-2-16] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3561\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:503 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3625\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:564 [cosmos-rntbd-nio-2-13] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3689\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:619 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3753\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:681 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3817\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:740 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3881\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:797 [cosmos-rntbd-nio-2-16] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3905\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = true, Context: n/a
03.01.2025 13:30:40:859 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Fetcher - Fetcher state updated: isChangeFeed = true, continuation token = {"V":1,"Rid":"events","Mode":"INCREMENTAL","StartFrom":{"Type":"LEASE","Etag":"\"3304\"","PKRangeId":"0"},"Continuation":{"V":1,"Rid":"events","Continuation":[{"token":"\"3905\"","range":{"min":"","max":"FF"}}],"PKRangeId":"0"}}, max item count = 64, should fetch more = false, Context: n/a
03.01.2025 13:30:40:859 [cosmos-rntbd-nio-2-6] DEBUG com.azure.cosmos.implementation.query.Paginator - No more results, Context: n/a

03.01.2025 13:30:49:338 [boundedElastic-4] INFO com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessorImpl - Partition 0: processing 64 feeds with owner local.
03.01.2025 13:30:49:338 [boundedElastic-4] INFO com.azure.cosmos.implementation.changefeed.common.DefaultObserver - Start processing from thread 70
03.01.2025 13:30:49:338 [boundedElastic-4] INFO com.reproducer.EventProcessor - RECEIVED 64 RECORDS
03.01.2025 13:30:49:338 [boundedElastic-4] INFO com.azure.cosmos.implementation.changefeed.common.DefaultObserver - Done processing from thread 70
03.01.2025 13:30:54:258 [boundedElastic-7] INFO com.azure.cosmos.implementation.changefeed.pkversion.PartitionCheckpointerImpl - Checkpoint: partition 0, new continuation "3433"

To Reproduce

Reproducer project can be used to simulate the issue:

  • CosmosConfig - Configure ENDPOINT and KEY.
  • EventCreator#main - Use this method to create sample records in the feed container.
  • EventProcessor#main - Use this method to start the ChangeFeedProcessor (in debug mode).

Steps to reproduce the behavior:

  1. Insert data to the feed container (for example, 300 records).
  2. Add a breakpoint to these lines:
    • com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessorImpl#run
      • return this.documentClient.createDocumentChangeFeedQuery
      • if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException());
    • com.azure.cosmos.implementation.changefeed.common.ChangeFeedContextClientImpl#replaceItem
      • return cosmosContainer.replaceItem(document, itemId, partitionKey, options)
    • ⚠️ It is important to set breakpoints (as described here) to suspend only Thread which hits breakpoint, not All threads, so processing in the background threads can continue.
  3. Start the application with change feed in debug mode.
  4. After the breakpoint stops on the line return this.documentClient.createDocumentChangeFeedQuery - check this.options.continuationState.startFromSettings - it points to correct position as saved in lease container.
  5. Resume the program. After the breakpoint stops on the line if (cancellationToken.isCancellationRequested())..., check this.options.continuationState.continuation - it points to the end of the change feed (LSN).
  6. Resume the program. After the first batch of messages is processed, and it stops at breakpoint in ChangeFeedContextClientImpl#replaceItem (called from DocumentServiceLeaseUpdaterImpl#tryReplaceLease), simulate throwing an exception (as described here).
    • new com.azure.cosmos.CosmosException(429, new com.azure.cosmos.implementation.CosmosError(), java.util.Map.of("x-ms-retry-after-ms", "1000"))
  7. Resume the program, wait for the next round of processing to start and for the breakpoint to stop at return this.documentClient.createDocumentChangeFeedQuery. Then check this.options.continuationState.continuation - it points to the end of the change feed (LSN) from the previous processing round.
  8. Resume the program. After the breakpoint stops on the line if (cancellationToken.isCancellationRequested())..., check documentFeedResponse - it contains an empty result, and the etag in headers points to the end of the change feed.
  9. Resume the program. The lease is checkpointed (saved to the DB), and records were skipped.

Expected behavior

I would expect ChangeFeedProcessor to continue processing the next unprocessed records (or the same batch of records for which checkpoint failed) instead of skipping records.

Setup

  • OS: macOS 15.2 (local development environment) / Linux (production environment)
  • IDE: IntelliJ
  • Library/Libraries: com.azure:azure-cosmos:4.65.0
  • Java version: 21.0.5
  • App Server/Environment: - (not important for the issue to happen)
  • Frameworks: - (not important for the issue to happen)

Additional context

  • We have already increased RUs for the database to try to prevent the issue, and it seems to be helping so far. However, I believe this issue should not occur with the guaranteed at-least-once delivery.
  • The issue also occurs with epkversion version of PartitionProcessorImpl.

Information Checklist

Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. labels Jan 3, 2025
Copy link

github-actions bot commented Jan 3, 2025

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @kushagraThapar @pjohari-ms @TheovanKraay.

@kushagraThapar
Copy link
Member

@tvaron3 can you please take a look at this issue, thanks!

@tvaron3
Copy link
Member

tvaron3 commented Jan 21, 2025

@egroSK Thanks for the amazing repro and analysis! I have a pr with a fix out for this issue.

@egroSK
Copy link
Author

egroSK commented Jan 23, 2025

Nice! Thank you for fixing the issue. I am looking forward to the release with the fix. 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team.
Projects
None yet
Development

No branches or pull requests

4 participants