Skip to content

Commit

Permalink
chore: single definition of pending commit effect
Browse files Browse the repository at this point in the history
Whether immediately committing offsets, or queueing the request in
`State.pendingCommits`, the necessary effect is built at once.

This avoids the need to repeat similar logic in disparate call sites,
and makes visible in a single place how the request will be processed
and logged.
  • Loading branch information
biochimia committed Jan 3, 2025
1 parent 41be508 commit fd98980
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,16 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
.handleErrorWith(e => F.delay(callback(Left(e))))

private[this] def commit(request: Request.Commit[F]): F[Unit] =
ref
.modify { state =>
if (state.rebalancing) {
val newState = state.withPendingCommit(request)
(newState, Some(StoredPendingCommit(request, newState)))
} else (state, None)
}
.flatMap {
case Some(log) => logging.log(log)
case None => commitAsync(request.offsets, request.callback)
}
ref.flatModify { state =>
val commitF = commitAsync(request.offsets, request.callback)
if (state.rebalancing) {
val newState = state.withPendingCommit(
commitF >> logging.log(CommittedPendingCommit(request))
)
(newState, logging.log(StoredPendingCommit(request, newState)))
} else
(state, commitF)
}

private[this] def manualCommitSync(request: Request.ManualCommitSync[F]): F[Unit] = {
val commit =
Expand Down Expand Up @@ -388,10 +387,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
(
newState,
Some(
HandlePollResult.PendingCommits(
commits = state.pendingCommits,
log = CommittedPendingCommits(state.pendingCommits, newState)
)
HandlePollResult.PendingCommits(commits = state.pendingCommits)
)
)
} else (state, None)
Expand Down Expand Up @@ -448,15 +444,9 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](

private[this] object HandlePollResult {

case class PendingCommits(
commits: Chain[Request.Commit[F]],
log: CommittedPendingCommits[F]
) {
case class PendingCommits(commits: Chain[F[Unit]]) {

def commit: F[Unit] =
commits.traverse { commitRequest =>
commitAsync(commitRequest.offsets, commitRequest.callback)
} >> logging.log(log)
def commit: F[Unit] = commits.sequence_

}

Expand Down Expand Up @@ -506,7 +496,7 @@ private[kafka] object KafkaConsumerActor {
final case class State[F[_], K, V](
fetches: Map[TopicPartition, Map[StreamId, FetchRequest[F, K, V]]],
records: Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, K, V]]],
pendingCommits: Chain[Request.Commit[F]],
pendingCommits: Chain[F[Unit]],
onRebalances: Chain[OnRebalance[F]],
rebalancing: Boolean,
subscribed: Boolean,
Expand Down Expand Up @@ -562,7 +552,7 @@ private[kafka] object KafkaConsumerActor {
def withoutRecords(partitions: Set[TopicPartition]): State[F, K, V] =
copy(records = records.filterKeysStrict(!partitions.contains(_)))

def withPendingCommit(pendingCommit: Request.Commit[F]): State[F, K, V] =
def withPendingCommit(pendingCommit: F[Unit]): State[F, K, V] =
copy(pendingCommits = pendingCommits.append(pendingCommit))

def withoutPendingCommits: State[F, K, V] =
Expand Down
10 changes: 3 additions & 7 deletions modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.regex.Pattern

import scala.collection.immutable.SortedSet

import cats.data.{Chain, NonEmptyList, NonEmptySet, NonEmptyVector}
import cats.data.{NonEmptyList, NonEmptySet, NonEmptyVector}
import cats.syntax.all.*
import fs2.kafka.instances.*
import fs2.kafka.internal.syntax.*
Expand Down Expand Up @@ -211,15 +211,11 @@ private[kafka] object LogEntry {

}

final case class CommittedPendingCommits[F[_]](
pendingCommits: Chain[Request.Commit[F]],
state: State[F, ?, ?]
) extends LogEntry {
final case class CommittedPendingCommit[F[_]](pendingCommit: Request.Commit[F]) extends LogEntry {

override def level: LogLevel = Debug

override def message: String =
s"Committed pending commits [$pendingCommits]. Current state [$state]."
override def message: String = s"Committed pending commit [$pendingCommit]."

}

Expand Down

0 comments on commit fd98980

Please sign in to comment.