From ad6b554c9bd95f41ae8bfaf129b011ce5dac9295 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Abecasis?= Date: Sat, 14 Dec 2024 20:02:16 +0000 Subject: [PATCH] chore: ensure pending commits order Commits would be added to the chain of pending commits based only on the rebalancing state. Given that the rebalancing state is updated (via `ConsumerRebalanceListener.onPartitionsAssigned`) separate from when pending commits are processed (after `poll`), it could happen that commits emitted later would be processed before earlier pending ones. This updates the condition for queueing commits to take into account the prior existence of pending commits. In addition, the condition for processing pending commits in `poll` is also updated to disregard whether a rebalance operation was ongoing at the start of the poll. Instead, the existence of pending commits along with a non-`rebalancing` state are a sufficient trigger. This ensures that rebalance operations that might conclude within a single consumer poll do not leave behind any pending commits. At the moment, these possibilities are theoretical as commit operations are serialized via `KafkaConsumerActor`'s request queue, and don't happen concurrently to polls. That said, the cost of the fixes is trivial and being explicit about the conditions may prevent future bugs, if the surrounding context changes. --- .../fs2/kafka/internal/KafkaConsumerActor.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 41ac8dd94..c67ef462f 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -93,7 +93,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( private[this] def commit(request: Request.Commit[F]): F[Unit] = ref.flatModify { state => val commitF = commitAsync(request.offsets, request.callback) - if (state.rebalancing) { + if (state.rebalancing || state.pendingCommits.nonEmpty) { val newState = state.withPendingCommit( commitF >> logging.log(CommittedPendingCommit(request)) ) @@ -301,7 +301,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( } .flatMap(records) - def handlePoll(newRecords: ConsumerRecords, initialRebalancing: Boolean): F[Unit] = { + def handlePoll(newRecords: ConsumerRecords): F[Unit] = { def handleBatch( state: State[F, K, V], pendingCommits: Option[HandlePollResult.PendingCommits] @@ -380,9 +380,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( } def handlePendingCommits(state: State[F, K, V]) = { - val currentRebalancing = state.rebalancing - - if (initialRebalancing && !currentRebalancing && state.pendingCommits.nonEmpty) { + if (!state.rebalancing && state.pendingCommits.nonEmpty) { val newState = state.withoutPendingCommits ( newState, @@ -414,10 +412,9 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( ref .get .flatMap { state => - if (state.subscribed && state.streaming) { - val initialRebalancing = state.rebalancing - pollConsumer(state).flatMap(handlePoll(_, initialRebalancing)) - } else F.unit + if (state.subscribed && state.streaming) + pollConsumer(state).flatMap(handlePoll(_)) + else F.unit } }