From 9f84a73550afb9f837885f195b339684cd4b3249 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 29 Dec 2024 10:09:44 +0100 Subject: [PATCH 1/2] Simplify transactional producing with rebalanceSafeCommits This simplifies the setup for transactional producing and committing, no longer requiring a custom rebalance listener or the use of `partitionedAssignmentStream`. See `ConsumerSpec` for an example. Note that the transaction ID is no longer used for fencing since https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics, the mechanism relies on the consumer group metadata instead. NOTE This change is not binary compatible and is therefore, for zio-kafka 3.0.0 --- .../zio/kafka/consumer/ConsumerSpec.scala | 132 +++++++----------- .../zio/kafka/testkit/KafkaTestUtils.scala | 6 +- .../scala/zio/kafka/consumer/Consumer.scala | 3 + .../zio/kafka/consumer/ConsumerSettings.scala | 2 + .../producer/TransactionalProducer.scala | 20 +-- 5 files changed, 75 insertions(+), 88 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 29e060a8a..ebadac01f 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1406,87 +1406,65 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val allMessages = (1 to messageCount).map(i => s"$i" -> f"msg$i%06d") val (messagesBeforeRebalance, messagesAfterRebalance) = allMessages.splitAt(messageCount / 2) - def transactionalRebalanceListener(streamCompleteOnRebalanceRef: Ref[Option[Promise[Nothing, Unit]]]) = - RebalanceListener( - onAssigned = _ => ZIO.unit, - onRevoked = _ => - streamCompleteOnRebalanceRef.get.flatMap { - case Some(p) => - ZIO.logDebug("onRevoked, awaiting stream completion") *> - p.await.timeoutFail(new InterruptedException("Timed out waiting stream to complete"))(1.minute) - case None => ZIO.unit - }, - onLost = _ => ZIO.logDebug("Lost some partitions") - ) - def makeCopyingTransactionalConsumer( name: String, consumerGroupId: String, clientId: String, fromTopic: String, toTopic: String, - tProducer: TransactionalProducer, - consumerCreated: Promise[Nothing, Unit] - ): ZIO[Scope & Kafka, Throwable, Unit] = + consumerCreated: Promise[Throwable, Unit] + ): ZIO[Kafka, Throwable, Unit] = ZIO.logAnnotate("consumer", name) { - for { - consumedMessagesCounter <- Ref.make(0) - _ <- consumedMessagesCounter.get - .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) - .repeat(Schedule.fixed(1.second)) - .fork - streamCompleteOnRebalanceRef <- Ref.make[Option[Promise[Nothing, Unit]]](None) - consumer <- KafkaTestUtils.makeTransactionalConsumer( - clientId, - consumerGroupId, - restartStreamOnRebalancing = true, - properties = Map( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> - implicitly[ClassTag[T]].runtimeClass.getName, - ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" - ), - rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef) - ) - tConsumer <- - consumer - .partitionedAssignmentStream(Subscription.topics(fromTopic), Serde.string, Serde.string) - .mapZIO { assignedPartitions => - for { - p <- Promise.make[Nothing, Unit] - _ <- streamCompleteOnRebalanceRef.set(Some(p)) - _ <- ZIO.logDebug(s"${assignedPartitions.size} partitions assigned") - _ <- consumerCreated.succeed(()) - partitionStreams = assignedPartitions.map(_._2) - s <- ZStream - .mergeAllUnbounded(64)(partitionStreams: _*) - .mapChunksZIO { records => - ZIO.scoped { - for { - t <- tProducer.createTransaction - _ <- t.produceChunkBatch( - records.map(r => new ProducerRecord(toTopic, r.key, r.value)), - Serde.string, - Serde.string, - OffsetBatch(records.map(_.offset)) - ) - _ <- consumedMessagesCounter.update(_ + records.size) - } yield Chunk.empty - }.uninterruptible - } - .runDrain - .ensuring { - for { - _ <- streamCompleteOnRebalanceRef.set(None) - _ <- p.succeed(()) - c <- consumedMessagesCounter.get - _ <- ZIO.logDebug(s"Consumed $c messages") - } yield () - } - } yield s - } - .runDrain - .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") - } yield tConsumer + ZIO.scoped { + (for { + consumer <- ZIO.service[Consumer] + consumedMessagesCounter <- Ref.make(0) + _ <- consumedMessagesCounter.get + .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) + .repeat(Schedule.fixed(1.second)) + .fork + + transactionalId <- randomThing("transactional") + tProducerSettings <- transactionalProducerSettings(transactionalId) + tProducer <- + TransactionalProducer.make(tProducerSettings, consumer) + + tConsumer <- + consumer + .partitionedStream(Subscription.topics(fromTopic), Serde.string, Serde.string) + .flatMapPar(Int.MaxValue) { case (_, partitionStream) => + ZStream.fromZIO(consumerCreated.succeed(())) *> + partitionStream.mapChunksZIO { records => + ZIO.scoped { + for { + t <- tProducer.createTransaction + _ <- t.produceChunkBatch( + records.map(r => new ProducerRecord(toTopic, r.key, r.value)), + Serde.string, + Serde.string, + OffsetBatch(records.map(_.offset)) + ) + _ <- consumedMessagesCounter.update(_ + records.size) + } yield Chunk.empty + } + } + } + .runDrain + .tapError(e => ZIO.logError(s"Error: $e") *> consumerCreated.fail(e)) <* ZIO.logDebug("Done") + } yield tConsumer) + .provideSome[Kafka & Scope]( + transactionalConsumer( + clientId, + consumerGroupId, + rebalanceSafeCommits = true, + properties = Map( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> + implicitly[ClassTag[T]].runtimeClass.getName, + ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" + ) + ) + ) + } } for { @@ -1506,28 +1484,26 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- ZIO.logDebug("Starting copier 1") copier1ClientId = copyingGroup + "-1" - copier1Created <- Promise.make[Nothing, Unit] + copier1Created <- Promise.make[Throwable, Unit] copier1 <- makeCopyingTransactionalConsumer( "1", copyingGroup, copier1ClientId, topicA, topicB, - tProducer, copier1Created ).fork _ <- copier1Created.await _ <- ZIO.logDebug("Starting copier 2") copier2ClientId = copyingGroup + "-2" - copier2Created <- Promise.make[Nothing, Unit] + copier2Created <- Promise.make[Throwable, Unit] copier2 <- makeCopyingTransactionalConsumer( "2", copyingGroup, copier2ClientId, topicA, topicB, - tProducer, copier2Created ).fork _ <- ZIO.logDebug("Waiting for copier 2 to start") diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 6f96d852b..8eddf9500 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -78,6 +78,8 @@ object KafkaTestUtils { def makeTransactionalProducer(transactionalId: String): ZIO[Scope & Kafka, Throwable, TransactionalProducer] = transactionalProducerSettings(transactionalId).flatMap(TransactionalProducer.make) + def FIX_METHOD: []** + /** * `TransactionalProducer` layer for use in tests. * @@ -87,7 +89,7 @@ object KafkaTestUtils { * ℹ️ Instead of using a layer, consider using [[KafkaTestUtils.makeTransactionalProducer]] to directly get a * producer. */ - val transactionalProducer: ZLayer[Kafka, Throwable, TransactionalProducer] = + val transactionalProducer: ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = transactionalProducer("test-transaction") /** @@ -96,7 +98,7 @@ object KafkaTestUtils { * ℹ️ Instead of using a layer, consider using [[KafkaTestUtils.makeTransactionalProducer]] to directly get a * producer. */ - def transactionalProducer(transactionalId: String): ZLayer[Kafka, Throwable, TransactionalProducer] = + def transactionalProducer(transactionalId: String): ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = ZLayer.scoped(makeTransactionalProducer(transactionalId)) // ----------------------------------------------------------------------------------------- diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 2926839e5..fc9201807 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -164,6 +164,9 @@ trait Consumer { * This method is useful when you want to use rebalance-safe-commits, but you are not committing to the Kafka brokers, * but to some external system, for example a relational database. * + * When this consumer is used in combination with a [[zio.kafka.producer.TransactionalProducer]], the transactional + * producer calls this method when the transaction is committed. + * * See also [[zio.kafka.consumer.ConsumerSettings.withRebalanceSafeCommits]]. */ def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit] diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 092c17ff8..621999075 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -240,6 +240,8 @@ final case class ConsumerSettings( * External commits (that is, commits to an external system, e.g. a relational database) must be registered to the * consumer with [[Consumer.registerExternalCommits]]. * + * When this consumer is coupled to a TransactionalProducer, `rebalanceSafeCommits` must be enabled. + * * When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any * offset commits from these streams have a high chance of being delayed (commits are not possible during some phases * of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index dcc91636b..910630b81 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -7,7 +7,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException import org.apache.kafka.common.serialization.ByteArraySerializer import zio.Cause.Fail import zio._ -import zio.kafka.consumer.OffsetBatch +import zio.kafka.consumer.{ Consumer, OffsetBatch } import java.util import scala.jdk.CollectionConverters._ @@ -25,7 +25,8 @@ object TransactionalProducer { private final class LiveTransactionalProducer( live: ProducerLive, - semaphore: Semaphore + semaphore: Semaphore, + consumer: Consumer ) extends TransactionalProducer { private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction()) @@ -51,10 +52,12 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) + sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> + ZIO.attemptBlocking(live.p.commitTransaction()) *> + consumer.registerExternalCommits(offsetBatch).unit } - private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] = + private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): ZIO[Any, Nothing, Unit] = exit match { case Exit.Success(_) => transaction.offsetBatchRef.get @@ -84,15 +87,16 @@ object TransactionalProducer { def createTransaction: ZIO[TransactionalProducer & Scope, Throwable, Transaction] = ZIO.service[TransactionalProducer].flatMap(_.createTransaction) - val live: RLayer[TransactionalProducerSettings, TransactionalProducer] = + val live: RLayer[TransactionalProducerSettings with Consumer, TransactionalProducer] = ZLayer.scoped { for { settings <- ZIO.service[TransactionalProducerSettings] - producer <- make(settings) + consumer <- ZIO.service[Consumer] + producer <- make(settings, consumer) } yield producer } - def make(settings: TransactionalProducerSettings): ZIO[Scope, Throwable, TransactionalProducer] = + def make(settings: TransactionalProducerSettings, consumer: Consumer): ZIO[Scope, Throwable, TransactionalProducer] = for { rawProducer <- ZIO.acquireRelease( ZIO.attempt( @@ -112,5 +116,5 @@ object TransactionalProducer { ) live = new ProducerLive(settings.producerSettings, rawProducer, runtime, sendQueue) _ <- ZIO.blocking(live.sendFromQueue).forkScoped - } yield new LiveTransactionalProducer(live, semaphore) + } yield new LiveTransactionalProducer(live, semaphore, consumer) } From 030c73b638cb863e46d426499bf2bfab08046bfc Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Thu, 13 Feb 2025 17:00:19 +0100 Subject: [PATCH 2/2] Fix merge --- .../test/scala/zio/kafka/ProducerSpec.scala | 177 +++++++++--------- .../zio/kafka/consumer/ConsumerSpec.scala | 8 +- .../zio/kafka/testkit/KafkaTestUtils.scala | 13 +- 3 files changed, 102 insertions(+), 96 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index a22ba6635..83344c162 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -289,16 +289,16 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }, suite("transactions")( test("a simple transaction") { - import Subscription._ - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient initialAliceAccount = new ProducerRecord(topic, "alice", 20) initialBobAccount = new ProducerRecord(topic, "bob", 0) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) _ <- ZIO.scoped { for { @@ -308,9 +308,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield () } - settings <- KafkaTestUtils.transactionalConsumerSettings(group, client) + settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2) recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer => for { messages <- consumer.take .flatMap(_.exit) @@ -322,18 +322,18 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assertTrue(recordChunk.map(_.value).last == 0) }, test("an aborted transaction should not be read") { - import Subscription._ - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient initialAliceAccount = new ProducerRecord(topic, "alice", 20) initialBobAccount = new ProducerRecord(topic, "bob", 0) aliceGives20 = new ProducerRecord(topic, "alice", 0) bobReceives20 = new ProducerRecord(topic, "bob", 20) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) _ <- ZIO.scoped { for { @@ -352,9 +352,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ZIO.unit // silences the abort } - settings <- KafkaTestUtils.transactionalConsumerSettings(group, client) + settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2) recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer => for { messages <- consumer.take .flatMap(_.exit) @@ -366,16 +366,16 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assertTrue(recordChunk.map(_.value).last == 0) }, test("serialize concurrent transactions") { - import Subscription._ - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient initialAliceAccount = new ProducerRecord(topic, "alice", 20) initialBobAccount = new ProducerRecord(topic, "bob", 0) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) transaction1 = ZIO.scoped { for { @@ -391,9 +391,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } _ <- transaction1 <&> transaction2 - settings <- KafkaTestUtils.transactionalConsumerSettings(group, client) + settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2) recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer => for { messages <- consumer.take .flatMap(_.exit) @@ -405,10 +405,12 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }, test("exception management") { for { - topic <- randomTopic + topic <- randomTopic + client1 <- randomClient initialBobAccount = new ProducerRecord(topic, "bob", 0) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) result <- ZIO.scoped { transactionalProducer.createTransaction.flatMap { t => @@ -423,19 +425,19 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assert(result)(dies(hasMessage(equalTo("test")))) }, test("interleaving transaction with non-transactional consumer") { - import Subscription._ - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient initialAliceAccount = new ProducerRecord(topic, "alice", 20) initialBobAccount = new ProducerRecord(topic, "bob", 0) nonTransactional = new ProducerRecord(topic, "no one", -1) aliceGives20 = new ProducerRecord(topic, "alice", 0) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) _ <- ZIO.scoped { for { @@ -444,41 +446,42 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- tx.produce(initialAliceAccount, Serde.string, Serde.int, None) } yield () } - assertion <- ZIO.scoped { - for { - tx <- transactionalProducer.createTransaction - _ <- tx.produce(aliceGives20, Serde.string, Serde.int, None) - producer <- KafkaTestUtils.makeProducer - _ <- producer.produce(nonTransactional, Serde.string, Serde.int) - settings <- KafkaTestUtils.consumerSettings(client, Some(group)) - recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => - for { - messages <- consumer.take - .flatMap(_.exit) - .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") - } yield record + recordChunk <- ZIO.scoped { + for { + tx <- transactionalProducer.createTransaction + _ <- tx.produce(aliceGives20, Serde.string, Serde.int, None) + producer <- KafkaTestUtils.makeProducer + _ <- producer.produce(nonTransactional, Serde.string, Serde.int) + settings <- KafkaTestUtils.consumerSettings(client2, Some(group)) + recordChunk <- ZIO.scoped { + withConsumerInt(Subscription.topics(topic), settings).flatMap { + consumer => + for { + messages <- consumer.take + .flatMap(_.exit) + .mapError(_.getOrElse(new NoSuchElementException)) + record = messages.filter(rec => rec.record.key == "no one") + } yield record + } } - } - } yield assertTrue(recordChunk.nonEmpty) - } - } yield assertion + } yield recordChunk + } + } yield assertTrue(recordChunk.nonEmpty) }, test("interleaving transaction with transactional consumer should not be read during transaction") { - import Subscription._ - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient initialAliceAccount = new ProducerRecord(topic, "alice", 20) initialBobAccount = new ProducerRecord(topic, "bob", 0) nonTransactional = new ProducerRecord(topic, "no one", -1) aliceGives20 = new ProducerRecord(topic, "alice", 0) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) _ <- ZIO.scoped { for { @@ -493,9 +496,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- tx.produce(aliceGives20, Serde.string, Serde.int, None) producer <- KafkaTestUtils.makeProducer _ <- producer.produce(nonTransactional, Serde.string, Serde.int) - settings <- KafkaTestUtils.transactionalConsumerSettings(group, client) + settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2) recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer => for { messages <- consumer.take .flatMap(_.exit) @@ -509,12 +512,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assertion }, test("interleaving transaction with transactional consumer when aborted") { - import Subscription._ - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient initialAliceAccount = new ProducerRecord(topic, "alice", 20) initialBobAccount = new ProducerRecord(topic, "bob", 0) @@ -522,7 +524,8 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { nonTransactional = new ProducerRecord(topic, "no one", -1) bobReceives20 = new ProducerRecord(topic, "bob", 20) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) producer <- KafkaTestUtils.makeProducer _ <- ZIO.scoped { @@ -543,9 +546,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ZIO.unit // silences the abort } - settings <- KafkaTestUtils.transactionalConsumerSettings(group, client) + settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2) recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer => for { messages <- consumer.take .flatMap(_.exit) @@ -557,12 +560,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assertTrue(recordChunk.nonEmpty) }, test("committing offsets after a successful transaction") { - import Subscription._ - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient initialAliceAccount = new ProducerRecord(topic, "alice", 20) aliceAccountFeesPaid = new ProducerRecord(topic, "alice", 0) @@ -570,15 +572,16 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { producer <- KafkaTestUtils.makeProducer _ <- producer.produce(initialAliceAccount, Serde.string, Serde.int) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) - settings <- KafkaTestUtils.transactionalConsumerSettings(group, client) + settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2) committedOffset <- ZIO.scoped { Consumer.make(settings).flatMap { c => ZIO.scoped { c - .plainStream(Topics(Set(topic)), Serde.string, Serde.int) + .plainStream(Subscription.topics(topic), Serde.string, Serde.int) .toQueue() .flatMap { q => val readAliceAccount = for { @@ -611,12 +614,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assertTrue(committedOffset.get.offset() == 1L) }, test("not committing offsets after a failed transaction") { - import Subscription._ - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient initialAliceAccount = new ProducerRecord(topic, "alice", 20) aliceAccountFeesPaid = new ProducerRecord(topic, "alice", 0) @@ -624,13 +626,14 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { producer <- KafkaTestUtils.makeProducer _ <- producer.produce(initialAliceAccount, Serde.string, Serde.int) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) - settings <- KafkaTestUtils.transactionalConsumerSettings(group, client) + settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2) committedOffset <- ZIO.scoped { Consumer.make(settings).flatMap { c => c - .plainStream(Topics(Set(topic)), Serde.string, Serde.int) + .plainStream(Subscription.topics(topic), Serde.string, Serde.int) .toQueue() .flatMap { q => val readAliceAccount = for { @@ -665,9 +668,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test("fails if transaction leaks") { val test = for { topic <- randomTopic + client1 <- randomClient transactionThief <- Ref.make(Option.empty[Transaction]) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) _ <- ZIO.scoped { transactionalProducer.createTransaction.flatMap { tx => transactionThief.set(Some(tx)) @@ -681,9 +686,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test("fails if transaction leaks in an open transaction") { val test = for { topic <- randomTopic + client1 <- randomClient transactionThief <- Ref.make(Option.empty[Transaction]) - transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString) + consumer1 <- KafkaTestUtils.makeConsumer(client1) + transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1) _ <- ZIO.scoped { transactionalProducer.createTransaction.flatMap { tx => diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ebadac01f..76a51a87e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1425,7 +1425,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .fork transactionalId <- randomThing("transactional") - tProducerSettings <- transactionalProducerSettings(transactionalId) + tProducerSettings <- KafkaTestUtils.transactionalProducerSettings(transactionalId) tProducer <- TransactionalProducer.make(tProducerSettings, consumer) @@ -1453,7 +1453,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .tapError(e => ZIO.logError(s"Error: $e") *> consumerCreated.fail(e)) <* ZIO.logDebug("Done") } yield tConsumer) .provideSome[Kafka & Scope]( - transactionalConsumer( + KafkaTestUtils.transactionalConsumer( clientId, consumerGroupId, rebalanceSafeCommits = true, @@ -1468,10 +1468,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } for { - transactionalId <- randomThing("transactional") - tProducerSettings <- KafkaTestUtils.transactionalProducerSettings(transactionalId) - tProducer <- TransactionalProducer.make(tProducerSettings) - topicA <- randomTopic topicB <- randomTopic _ <- KafkaTestUtils.createCustomTopic(topicA, partitionCount) diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 8eddf9500..266847b2a 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -75,10 +75,11 @@ object KafkaTestUtils { * * Note: to run multiple tests in parallel, every test needs a different transactional id. */ - def makeTransactionalProducer(transactionalId: String): ZIO[Scope & Kafka, Throwable, TransactionalProducer] = - transactionalProducerSettings(transactionalId).flatMap(TransactionalProducer.make) - - def FIX_METHOD: []** + def makeTransactionalProducer( + transactionalId: String, + consumer: Consumer + ): ZIO[Scope & Kafka, Throwable, TransactionalProducer] = + transactionalProducerSettings(transactionalId).flatMap(TransactionalProducer.make(_, consumer)) /** * `TransactionalProducer` layer for use in tests. @@ -99,7 +100,9 @@ object KafkaTestUtils { * producer. */ def transactionalProducer(transactionalId: String): ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = - ZLayer.scoped(makeTransactionalProducer(transactionalId)) + ZLayer.scoped { + ZIO.serviceWithZIO[Consumer](makeTransactionalProducer(transactionalId, _)) + } // ----------------------------------------------------------------------------------------- //