From a28958b93eb6533208bb7bba24f0558cb0fc6f9a Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Sat, 16 Sep 2023 18:08:12 +0000 Subject: [PATCH 1/3] Update Readme with LoadAndRun usage --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/README.md b/README.md index 43757d2..6b31c3d 100644 --- a/README.md +++ b/README.md @@ -22,14 +22,20 @@ Add the following to your `build.sbt`: libraryDependencies += "uk.sky" %% "fs2-kafka-topic-loader" % "" ``` +### Load + ```scala import cats.data.NonEmptyList import cats.effect.{IO, IOApp} import fs2.kafka.ConsumerSettings +import org.typelevel.log4cats.LoggerFactory +import uk.sky.fs2.kafka.topicloader.{LoadAll, TopicLoader} object Main extends IOApp.Simple { val consumerSettings: ConsumerSettings[IO, String, String] = ??? + given LoggerFactory[IO] = ??? + override def run: IO[Unit] = TopicLoader.load(NonEmptyList.one("topicToLoad"), LoadAll, consumerSettings).evalTap(IO.println).compile.drain } @@ -37,6 +43,41 @@ object Main extends IOApp.Simple { See [`LoadExample.scala`](./it/src/main/scala/load/LoadExample.scala) for a more detailed example. +### LoadAndRun + +```scala +import cats.data.NonEmptyList +import cats.effect.kernel.Resource.ExitCase +import cats.effect.{IO, IOApp, Ref} +import fs2.kafka.ConsumerSettings +import org.typelevel.log4cats.LoggerFactory +import uk.sky.fs2.kafka.topicloader.{LoadAll, TopicLoader} + +object Main extends IOApp.Simple { + val consumerSettings: ConsumerSettings[IO, String, String] = ??? + + val healthCheck: IO[Ref[IO, Boolean]] = Ref.of(false) + + given LoggerFactory[IO] = ??? + + val logger = LoggerFactory[IO].getLogger + + override def run: IO[Unit] = + for { + healthCheck <- healthCheck + _ <- TopicLoader + .loadAndRun(NonEmptyList.one("topicToLoad"), consumerSettings) { + case ExitCase.Succeeded => healthCheck.set(true) + case ExitCase.Errored(e) => logger.error(e)(s"Something went wrong: $e") + case ExitCase.Canceled => logger.warn("Stream was cancelled before loading") + } + .compile + .drain + } yield () +} +``` + + ## Configuration Configuration from the Topic Loader is done via the `ConsumerSettings`. The group id of the Topic Loader should match From 0ed5810ee14cbbcc530f07aa26f7b85cd79f407a Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Sat, 16 Sep 2023 19:13:36 +0100 Subject: [PATCH 2/3] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6b31c3d..5f48936 100644 --- a/README.md +++ b/README.md @@ -56,10 +56,10 @@ import uk.sky.fs2.kafka.topicloader.{LoadAll, TopicLoader} object Main extends IOApp.Simple { val consumerSettings: ConsumerSettings[IO, String, String] = ??? - val healthCheck: IO[Ref[IO, Boolean]] = Ref.of(false) - given LoggerFactory[IO] = ??? + val healthCheck: IO[Ref[IO, Boolean]] = Ref.of(false) + val logger = LoggerFactory[IO].getLogger override def run: IO[Unit] = From 6c9a572fbf32b9940a3a615d4e120476d602db78 Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Sat, 16 Sep 2023 18:20:09 +0000 Subject: [PATCH 3/3] Explain LoadAndRun --- README.md | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 5f48936..6d121f1 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,7 @@ # fs2-kafka-topic-loader -Reads the contents of provided Kafka topics determined by the `LoadTopicStrategy`. - -- `LoadAll` - reads the topics in their entirety -- `LoadCommitted` - reads up to the configured consumer-group's last committed Offset +Reads the contents of provided Kafka topics. This library is aimed for usage in applications that want a deterministic stream of Kafka messages that completes once the last message (determined above) has been read. This is useful if an application shouldn't respond to new events @@ -16,6 +13,21 @@ of a topic needs to be reloaded on an application restart. ## Usage +### `Load` + +Determined by the `LoadTopicStrategy`, this stream completes once all messages have been read. + +Load strategies: + +- `LoadAll` - reads the topics in their entirety +- `LoadCommitted` - reads up to the configured consumer-group's last committed Offset + +### `LoadAndRun` + +Read up to the latest offset, fire a callback and continue streaming messages. + +## Example + Add the following to your `build.sbt`: ```scala @@ -77,7 +89,6 @@ object Main extends IOApp.Simple { } ``` - ## Configuration Configuration from the Topic Loader is done via the `ConsumerSettings`. The group id of the Topic Loader should match