Skip to content

Commit

Permalink
Document transactions
Browse files Browse the repository at this point in the history
Document the transactional producer introduced in #1434.
  • Loading branch information
erikvanoosten committed Jan 15, 2025
1 parent 5c579cc commit 6eb76b0
Show file tree
Hide file tree
Showing 5 changed files with 383 additions and 25 deletions.
1 change: 1 addition & 0 deletions docs/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const sidebars = {
"preventing-duplicates",
"sharing-consumer",
"serialization-and-deserialization",
"transactions",
"writing-tests"
]
}
Expand Down
262 changes: 262 additions & 0 deletions docs/transactions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
---
id: transactions
title: "Transactions"
---

> This document describes Kafka transactions for zio-kafka version 3.0.0 and later.
## What are Kafka Transactions?

[Kafka transactions](https://www.confluent.io/blog/transactions-apache-kafka/) are different from what you may know from
relational databases. In Kafka a transaction means that your program consumes some records, then produces some other
records (to the same broker) and then by committing the _consumed_ records the transaction is committed, and the
produced records become available to other consumers.

I can image that went a bit fast, here is a more gentle introduction with more background:
[Kafka Transactions Explained (Twice!)](https://www.warpstream.com/blog/kafka-transactions-explained-twice).

## A warning about consuming transactional records

By default, produced records become immediately visible for consumers, even if they are produced in a not yet committed
Kafka transaction! To prevent reading transactional records before they are committed, you need to configure every
consumer with the `isolation.level` property set to `read_committed`. For example as follows:

```scala
val settings = ConsumerSettings(bootstrapServers)
.withGroupId(groupId)
.withReadCommitted(true) // Only read committed records
.... etc.
```

## Producing transactional records

In order to produce records transactionally, we need a `Consumer` and a `TransactionalProducer` that will work together
to safely commit transactions.

First we create layers for the `ConsumerSettings` and the `Consumer`.
Note that rebalance-safe-commits must be enabled (see the background section below for more information).
Since the default `maxRebalanceDuration` is quite high, you may also consider setting it to a lower duration, one in
which you are sure all records of a poll can be processed.

```scala
val bootstrapBrokers = List("localhost:9092")
val consumerSettings = ZLayer.succeed {
ConsumerSettings(bootstrapBrokers)
.withGroupId("somegroupid")
.withRebalanceSafeCommits(true) // required!
.withMaxRebalanceDuration(30.seconds)
}
val consumer = Consumer.live
```

Now we can create layers for the `ProducerSettings` and the `TransactionalProducer`.
Note that the producer connects (and must connect) to the same brokers as the consumer.

DEVELOPER NOTE: here we assume that we no longer have `TransactionalProducerSettings`. TODO: check this assertion

```scala
val producerSettings = ZLayer.succeed {
ProducerSettings(bootstrapServers = bootstrapBrokers)
}
val producer: ZLayer[TransactionalProducerSettings with Consumer, Throwable, TransactionalProducer] =
TransactionalProducer.live
```

Notice that the `producer` layer also requires a `Consumer`. The producer coordinates with the consumer to make sure
that transactions are accepted by the brokers. The consumer does this by holding up rebalances until all records
consumed before the rebalance are committed by the producer. (Again, see the background section below for more
information.)

With these in place we can look at the application logic of consuming and producing. In this example we use the
`plainStream` method to consume records with a `Long` value and an `Int` key.

```scala
val consumedRecordsStream = consumer
.plainStream(Subscription.topics("my-input-topic"), Serde.int, Serde.long)
```

For each consumed records, we then create a `ProducerRecord` with a `String` value containing `Copy of <input value>`.
Note, that we also keep the offset of the consumed record so that we can commit it later.

```scala
val producedRecordsStream = consumedRecordsStream
.mapChunks { records: Chunk[CommittableRecord[Int, Long]] =>
records.map { record =>
val key: Int = record.record.key()
val value: Long = record.record.value()
val newValue: String = "Copy of " + value.toString

val producerRecord: ProducerRecord[Int, String] =
new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
}
}
```

Typically, to optimize throughput, we want to produce records in batches. The underlying chunking structure of the
consumer stream is ideal for that because zio-kafka guarantees that each chunk in the stream corresponds to the records
that were fetched together. However, we need to be careful to retain the chunking structure. For example, we should
not use `.mapZIO` because it results in a stream where each chunk contains only a single item. Therefore, we use
`.mapChunksZIO` instead.

These new records can now be produced. Let's build it up slowly.

```scala
producedRecordsStream
.mapChunksZIO { recordsAndOffsets: Chunk[(ProducerRecord[Int, String], Offset)] =>
???
}
```

Let's work on those question marks. We need to create a transaction.
Each zio-kafka transaction runs in a ZIO `Scope`; the transaction commits when the scope closes.

```scala
ZIO.scoped {
??? // transaction stuff
}
```

We also want to try to complete the transaction, even when the program is shutting down.
Therefore, we add `.uninterruptible` and get:

```scala
ZIO
.scoped {
??? // transaction stuff
}
.uninterruptible
```

Now we can fill in the 'transaction stuff': we create the transaction and use it to produce some records.

```scala
for {
tx <- transactionalProducer.createTransaction
_ <- {
val (records, offsets) = recordsAndOffsets.unzip
tx.produceChunkBatch(
records,
Serde.int,
Serde.string,
OffsetBatch(offsets)
)
}
// running inside `mapChunksZIO`, we need to return a Chunk.
// Since we're not using the result, the empty chunk will do.
} yield Chunk.empty
```

Notice how we pass the offsets of the consumed records. Once the transaction completes, these are the offsets that will
be committed.

Putting it all together we get:

```scala
import org.apache.kafka.clients.producer.ProducerRecord
import zio._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer._
import zio.kafka.producer._
import zio.kafka.serde.Serde

import java.util.UUID

object Transactional extends ZIOAppDefault {

private val topic = "transactional-test-topic"

val bootstrapBrokers = List("localhost:9092")

val consumerSettings = ZLayer.succeed {
ConsumerSettings(bootstrapBrokers)
.withGroupId("transactional-example-app")
.withRebalanceSafeCommits(true) // required for transactional producing
.withMaxRebalanceDuration(30.seconds)
}

val producerSettings = ZLayer.succeed {
TransactionalProducerSettings(
bootstrapServers = bootstrapBrokers,
transactionalId = UUID.randomUUID().toString // TODO: do we still need this?
)
}

private val runConsumerStream: ZIO[Consumer & TransactionalProducer, Throwable, Unit] =
for {
consumer <- ZIO.service[Consumer]
transactionalProducer <- ZIO.service[TransactionalProducer]
_ <- ZIO.logInfo(s"Consuming messages from topic $topic...")
_ <- consumer
.plainStream(Subscription.topics(topic), Serde.int, Serde.long)
.mapChunks { records: Chunk[CommittableRecord[Int, Long]] =>
records.map { record =>
val key: Int = record.record.key()
val value: Long = record.record.value()
val newValue: String = "Copy of " + value.toString

val producerRecord: ProducerRecord[Int, String] =
new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
}
}
.mapChunksZIO { recordsAndOffsets: Chunk[(ProducerRecord[Int, String], Offset)] =>
ZIO.scoped {
for {
_ <- ZIO.addFinalizer(ZIO.logInfo("Completing transaction"))
tx <- transactionalProducer.createTransaction
_ <- {
val (records, offsets) = recordsAndOffsets.unzip
tx.produceChunkBatch(
records,
Serde.int,
Serde.string,
OffsetBatch(offsets)
)
}
} yield Chunk.empty
}.uninterruptible
}
.runDrain
} yield ()

override def run: ZIO[Scope, Any, Any] =
runConsumerStream
.provide(
consumerSettings,
ZLayer.succeed(Diagnostics.NoOp),
Consumer.live,
producerSettings,
TransactionalProducer.live
)

}
```

## Technical background - How does it all work?

Kafka must ensure that each record is only processed once, even though a partition can be revoked, assigned to another
consumer, all while the original consumer knows nothing about this. In this situation the original consumer will happily
continue producing records even though another consumer is doing the same thing with the same records.

Kafka does this by validating the group epoch. The group epoch is a number that gets increased after every rebalance.
When the transactional producer commits a transaction with the offsets of consumed records, it also includes the group
epoch of when the records were consumed. If the broker detects a commit with a group epoch that is not equal to the
current epoch, it will reject the commit.

In addition, to prevent an old consumer from continuing to poll into the next epoch without participating in the
rebalance, consumers polling within the wrong group epoch are 'fenced'. A fenced consumer dies with a
`FencedInstanceIdException`.

### How zio-kafka helps

With zio-kafka you can use ZIO scopes to define the transaction boundaries. Within the scope you can produce records one
by one, all in 1 batch or in multiple batches. Once the scope closes, all collected offsets are committed.

In addition, zio-kafka prevents failed commits. The `TransactionalProducer` informs the `Consumer` which offsets were
committed. When a rebalance starts, the consumer holds it up until all records consumed so far are committed. (This
feature is called rebalance-safe-commits.) Because of this, every transaction commits in the same group epoch as the
epoch in which the records were consumed, and the brokers can accept the transaction.

When a zio-kafka consumer misses a rebalance and continues polling, there is nothing that can be done. It will die with
a `FencedInstanceIdException`. In these cases the consumer (or perhaps simpler, the whole program) should restart.
25 changes: 0 additions & 25 deletions zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala
Original file line number Diff line number Diff line change
@@ -1,36 +1,11 @@
package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription }
import zio.kafka.serde.Serde
import zio.logging.backend.SLF4J

trait MyKafka {
def bootstrapServers: List[String]
def stop(): UIO[Unit]
}

object MyKafka {
final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends MyKafka {
override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}")
override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true))
}

val embedded: ZLayer[Any, Throwable, MyKafka] = ZLayer.scoped {
implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"super.users" -> "User:ANONYMOUS"
)
)
ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop())
}
}

object Main extends ZIOAppDefault {

/**
Expand Down
28 changes: 28 additions & 0 deletions zio-kafka-example/src/main/scala/zio/kafka/example/MyKafka.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._

trait MyKafka {
def bootstrapServers: List[String]
def stop(): UIO[Unit]
}

object MyKafka {
final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends MyKafka {
override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}")
override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true))
}

val embedded: ZLayer[Any, Throwable, MyKafka] = ZLayer.scoped {
implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"super.users" -> "User:ANONYMOUS"
)
)
ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop())
}
}
Loading

0 comments on commit 6eb76b0

Please sign in to comment.