Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Readme with LoadAndRun usage #18

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

# fs2-kafka-topic-loader

Reads the contents of provided Kafka topics determined by the `LoadTopicStrategy`.

- `LoadAll` - reads the topics in their entirety
- `LoadCommitted` - reads up to the configured consumer-group's last committed Offset
Reads the contents of provided Kafka topics.

This library is aimed for usage in applications that want a deterministic stream of Kafka messages that completes once
the last message (determined above) has been read. This is useful if an application shouldn't respond to new events
Expand All @@ -16,27 +13,82 @@ of a topic needs to be reloaded on an application restart.

## Usage

### `Load`

Determined by the `LoadTopicStrategy`, this stream completes once all messages have been read.

Load strategies:

- `LoadAll` - reads the topics in their entirety
- `LoadCommitted` - reads up to the configured consumer-group's last committed Offset

### `LoadAndRun`

Read up to the latest offset, fire a callback and continue streaming messages.

## Example

Add the following to your `build.sbt`:

```scala
libraryDependencies += "uk.sky" %% "fs2-kafka-topic-loader" % "<version>"
```

### Load

```scala
import cats.data.NonEmptyList
import cats.effect.{IO, IOApp}
import fs2.kafka.ConsumerSettings
import org.typelevel.log4cats.LoggerFactory
import uk.sky.fs2.kafka.topicloader.{LoadAll, TopicLoader}

object Main extends IOApp.Simple {
val consumerSettings: ConsumerSettings[IO, String, String] = ???

given LoggerFactory[IO] = ???

override def run: IO[Unit] =
TopicLoader.load(NonEmptyList.one("topicToLoad"), LoadAll, consumerSettings).evalTap(IO.println).compile.drain
}
```

See [`LoadExample.scala`](./it/src/main/scala/load/LoadExample.scala) for a more detailed example.

### LoadAndRun

```scala
import cats.data.NonEmptyList
import cats.effect.kernel.Resource.ExitCase
import cats.effect.{IO, IOApp, Ref}
import fs2.kafka.ConsumerSettings
import org.typelevel.log4cats.LoggerFactory
import uk.sky.fs2.kafka.topicloader.{LoadAll, TopicLoader}

object Main extends IOApp.Simple {
val consumerSettings: ConsumerSettings[IO, String, String] = ???

given LoggerFactory[IO] = ???

val healthCheck: IO[Ref[IO, Boolean]] = Ref.of(false)

val logger = LoggerFactory[IO].getLogger

override def run: IO[Unit] =
for {
healthCheck <- healthCheck
_ <- TopicLoader
.loadAndRun(NonEmptyList.one("topicToLoad"), consumerSettings) {
case ExitCase.Succeeded => healthCheck.set(true)
case ExitCase.Errored(e) => logger.error(e)(s"Something went wrong: $e")
case ExitCase.Canceled => logger.warn("Stream was cancelled before loading")
}
.compile
.drain
} yield ()
}
```

## Configuration

Configuration from the Topic Loader is done via the `ConsumerSettings`. The group id of the Topic Loader should match
Expand Down