Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunk processing API #133

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add chunk processing API
  • Loading branch information
bcarter97 committed Dec 6, 2024
commit 440b185b08ffc5480fd147ead39bd33a54601f91
134 changes: 120 additions & 14 deletions src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package uk.sky.fs2.kafka.topicloader

import cats.data.{NonEmptyList, NonEmptyMap}
import cats.effect.{Async, Resource}
import cats.effect.Resource.ExitCase
import cats.effect.{Async, Deferred, Ref, Resource}
import cats.syntax.all.*
import cats.{Monad, Show}
import fs2.kafka.instances.*
import fs2.kafka.{ConsumerRecord, ConsumerSettings, KafkaConsumer}
import fs2.{Pipe, Stream}
import fs2.{Chunk, Pipe, Stream}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.typelevel.log4cats.syntax.*
@@ -91,6 +92,44 @@ trait TopicLoader {
} yield record
}

def loadChunks[F[_] : Async : LoggerFactory, K, V](
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
consumerSettings: ConsumerSettings[F, K, V]
)(process: Chunk[ConsumerRecord[K, V]] => F[Unit]): F[Unit] =
KafkaConsumer
.stream(consumerSettings)
.flatMap(loadChunks(topics, strategy, _)(process))
.compile
.drain

def loadAndRunChunks[F[_] : Async : LoggerFactory, K, V](
topics: NonEmptyList[String],
consumerSettings: ConsumerSettings[F, K, V]
)(onLoad: Resource.ExitCase => F[Unit])(process: Chunk[ConsumerRecord[K, V]] => F[Unit]): F[Nothing] = {
def postLoad(
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
)(using Logger[F]): Stream[F, Unit] = {
for {
// The only consistent workaround for re-assigning offsets after the initial load is to re-create the consumer
postLoadConsumer <- KafkaConsumer.stream(consumerSettings)
_ <- Stream.eval(assignOffsets(logOffsets, postLoadConsumer)(_.highest))
record <- postLoadConsumer.partitionedStream.map(_.chunks.map(_.map(_.record)).evalMap(process))
} yield record
}.parJoinUnbounded

val stream = for {
given Logger[F] <- Stream.eval(LoggerFactory[F].create)
preloadConsumer <- KafkaConsumer.stream(consumerSettings)
logOffsets <- Stream.eval(logOffsetsForTopics(topics, LoadAll, preloadConsumer)).flatMap(Stream.fromOption(_))
_ <- Stream.eval(info"log offsets: ${logOffsets.show}")
record <-
loadChunks(logOffsets, preloadConsumer)(process).onFinalizeCase(onLoad).drain ++ postLoad(logOffsets).drain
} yield record

stream.compile.lastOrError
}

private def load[F[_] : Async : Logger, K, V](
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
@@ -111,6 +150,74 @@ trait TopicLoader {
record <- consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets))
} yield record

private def loadChunks[F[_] : Async : LoggerFactory, K, V](
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
consumer: KafkaConsumer[F, K, V]
)(process: Chunk[ConsumerRecord[K, V]] => F[Unit]): Stream[F, Chunk[ConsumerRecord[K, V]]] =
for {
given Logger[F] <- Stream.eval(LoggerFactory[F].create)
maybeLogOffsets <- Stream.eval(logOffsetsForTopics(topics, strategy, consumer))
logOffsets <- Stream.fromOption(maybeLogOffsets)
record <- loadChunks(logOffsets, consumer)(process)
} yield record

private def loadChunks[F[_] : Async : Logger, K, V](
logOffsets: NonEmptyMap[TopicPartition, LogOffsets],
consumer: KafkaConsumer[F, K, V]
)(process: Chunk[ConsumerRecord[K, V]] => F[Unit]): Stream[F, Chunk[ConsumerRecord[K, V]]] = {
val concurrentStreams: Stream[F, Stream[F, Chunk[ConsumerRecord[K, V]]]] =
for {
_ <- Stream.eval(assignOffsets(logOffsets, consumer)(_.lowest))
killSwitch <- Stream.eval(Deferred[F, Either[Throwable, Unit]])
nonEmptyStreams <-
consumer.partitionsMapStream
.interruptWhen(killSwitch)
.map { partitionMap =>
val streamWithOffsets =
for {
partitionWithOffsets <- logOffsets.toNel.toList
(topicPartition, offsets) = partitionWithOffsets // Source:future?
partitionWithStream <- partitionMap.get(topicPartition).map(topicPartition -> (_, offsets))
} yield partitionWithStream

streamWithOffsets.toMap
}
completedRef <- Stream.eval(Ref.of[F, Int](nonEmptyStreams.size))
recordStream <- Stream.emits {
nonEmptyStreams.toList.map { case (tp, (stream, o)) =>
val onComplete =
for {
newState <- completedRef.updateAndGet(_ - 1)
_ <- debug"inner stream for ${tp.show} complete"
_ <- if (newState == 0)
killSwitch.complete(().asRight) >> debug"all inner streams complete"
else debug"$newState streams remaining"
} yield ()

val onError = (e: Throwable) =>
for {
_ <- completedRef.set(0)
_ <- Logger[F].error(e)(s"${tp.show} failed to load: $e")
_ <- killSwitch.complete(e.asLeft).void
} yield ()

stream
.mapChunks(_.map(_.record))
.through(filterBelowHighestOffset(NonEmptyMap.one(tp, o)))
.chunks
.evalTap(process)
.onFinalizeCase {
case ExitCase.Succeeded | ExitCase.Canceled => onComplete
case ExitCase.Errored(e) => onError(e)
}
}
}
} yield recordStream

concurrentStreams.parJoinUnbounded
}

private def assignOffsets[F[_] : Monad : Logger, K, V](
logOffsets: NonEmptyMap[TopicPartition, LogOffsets],
consumer: KafkaConsumer[F, K, V]
@@ -156,10 +263,12 @@ trait TopicLoader {
case LoadAll => consumer.endOffsets(topicPartitions)
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
}
logOffsets = beginningOffsetPerPartition.map { (partition, offset) =>
partition -> LogOffsets(offset, endOffsets(partition))
}
} yield NonEmptyMap.fromMap(SortedMap.from(logOffsets))
} yield {
val logOffsets = beginningOffsetPerPartition
.map((partition, offset) => partition -> LogOffsets(offset, endOffsets(partition)))

NonEmptyMap.fromMap(SortedMap.from(logOffsets))
}

private def earliestOffsets[F[_] : Monad : Logger, K, V](
consumer: KafkaConsumer[F, K, V],
@@ -168,11 +277,8 @@ trait TopicLoader {
for {
committed <- offsetsAndMetadataFor(consumer, beginningOffsets)
earliestOffsets <- beginningOffsets.toList.traverse { (tp, beginningOffset) =>
for {
maybeCommitted <- committed.get(tp).pure
earliest <- maybeCommitted.fold(beginningOffset)(_.offset).pure
_ <- debug"Earliest offset for ${tp.show}: $earliest"
} yield tp -> earliest
val earliest = committed.get(tp).fold(beginningOffset)(_.offset)
debug"Earliest offset for ${tp.show}: $earliest".as(tp -> earliest)
}
} yield earliestOffsets.toMap

@@ -200,9 +306,9 @@ trait TopicLoader {
topics: NonEmptyList[String],
consumer: KafkaConsumer[F, K, V]
): F[Set[TopicPartition]] =
for {
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
} yield partitionInfo.map(pi => TopicPartition(pi.topic, pi.partition)).toSet
topics.toList
.flatTraverse(consumer.partitionsFor)
.map(_.map(pi => TopicPartition(pi.topic, pi.partition)).toSet)

private def emitRecordRemovingConsumedPartition[K, V](
t: HighestOffsetsWithRecord[K, V],
Loading