Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify transactional producing #1434

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 92 additions & 85 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,16 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
},
suite("transactions")(
test("a simple transaction") {
import Subscription._

for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient
initialAliceAccount = new ProducerRecord(topic, "alice", 20)
initialBobAccount = new ProducerRecord(topic, "bob", 0)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

_ <- ZIO.scoped {
for {
Expand All @@ -308,9 +308,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield ()
}

settings <- KafkaTestUtils.transactionalConsumerSettings(group, client)
settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2)
recordChunk <- ZIO.scoped {
withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer =>
withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer =>
for {
messages <- consumer.take
.flatMap(_.exit)
Expand All @@ -322,18 +322,18 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertTrue(recordChunk.map(_.value).last == 0)
},
test("an aborted transaction should not be read") {
import Subscription._

for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient
initialAliceAccount = new ProducerRecord(topic, "alice", 20)
initialBobAccount = new ProducerRecord(topic, "bob", 0)
aliceGives20 = new ProducerRecord(topic, "alice", 0)
bobReceives20 = new ProducerRecord(topic, "bob", 20)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

_ <- ZIO.scoped {
for {
Expand All @@ -352,9 +352,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
ZIO.unit // silences the abort
}

settings <- KafkaTestUtils.transactionalConsumerSettings(group, client)
settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2)
recordChunk <- ZIO.scoped {
withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer =>
withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer =>
for {
messages <- consumer.take
.flatMap(_.exit)
Expand All @@ -366,16 +366,16 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertTrue(recordChunk.map(_.value).last == 0)
},
test("serialize concurrent transactions") {
import Subscription._

for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient
initialAliceAccount = new ProducerRecord(topic, "alice", 20)
initialBobAccount = new ProducerRecord(topic, "bob", 0)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

transaction1 = ZIO.scoped {
for {
Expand All @@ -391,9 +391,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}
_ <- transaction1 <&> transaction2

settings <- KafkaTestUtils.transactionalConsumerSettings(group, client)
settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2)
recordChunk <- ZIO.scoped {
withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer =>
withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer =>
for {
messages <- consumer.take
.flatMap(_.exit)
Expand All @@ -405,10 +405,12 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
},
test("exception management") {
for {
topic <- randomTopic
topic <- randomTopic
client1 <- randomClient
initialBobAccount = new ProducerRecord(topic, "bob", 0)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

result <- ZIO.scoped {
transactionalProducer.createTransaction.flatMap { t =>
Expand All @@ -423,19 +425,19 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assert(result)(dies(hasMessage(equalTo("test"))))
},
test("interleaving transaction with non-transactional consumer") {
import Subscription._

for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient

initialAliceAccount = new ProducerRecord(topic, "alice", 20)
initialBobAccount = new ProducerRecord(topic, "bob", 0)
nonTransactional = new ProducerRecord(topic, "no one", -1)
aliceGives20 = new ProducerRecord(topic, "alice", 0)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

_ <- ZIO.scoped {
for {
Expand All @@ -444,41 +446,42 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- tx.produce(initialAliceAccount, Serde.string, Serde.int, None)
} yield ()
}
assertion <- ZIO.scoped {
for {
tx <- transactionalProducer.createTransaction
_ <- tx.produce(aliceGives20, Serde.string, Serde.int, None)
producer <- KafkaTestUtils.makeProducer
_ <- producer.produce(nonTransactional, Serde.string, Serde.int)
settings <- KafkaTestUtils.consumerSettings(client, Some(group))
recordChunk <- ZIO.scoped {
withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer =>
for {
messages <- consumer.take
.flatMap(_.exit)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
} yield record
recordChunk <- ZIO.scoped {
for {
tx <- transactionalProducer.createTransaction
_ <- tx.produce(aliceGives20, Serde.string, Serde.int, None)
producer <- KafkaTestUtils.makeProducer
_ <- producer.produce(nonTransactional, Serde.string, Serde.int)
settings <- KafkaTestUtils.consumerSettings(client2, Some(group))
recordChunk <- ZIO.scoped {
withConsumerInt(Subscription.topics(topic), settings).flatMap {
consumer =>
for {
messages <- consumer.take
.flatMap(_.exit)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
} yield record
}
}
}
} yield assertTrue(recordChunk.nonEmpty)
}
} yield assertion
} yield recordChunk
}
} yield assertTrue(recordChunk.nonEmpty)
},
test("interleaving transaction with transactional consumer should not be read during transaction") {
import Subscription._

for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient

initialAliceAccount = new ProducerRecord(topic, "alice", 20)
initialBobAccount = new ProducerRecord(topic, "bob", 0)
nonTransactional = new ProducerRecord(topic, "no one", -1)
aliceGives20 = new ProducerRecord(topic, "alice", 0)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

_ <- ZIO.scoped {
for {
Expand All @@ -493,9 +496,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- tx.produce(aliceGives20, Serde.string, Serde.int, None)
producer <- KafkaTestUtils.makeProducer
_ <- producer.produce(nonTransactional, Serde.string, Serde.int)
settings <- KafkaTestUtils.transactionalConsumerSettings(group, client)
settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2)
recordChunk <- ZIO.scoped {
withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer =>
withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer =>
for {
messages <- consumer.take
.flatMap(_.exit)
Expand All @@ -509,20 +512,20 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertion
},
test("interleaving transaction with transactional consumer when aborted") {
import Subscription._

for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient

initialAliceAccount = new ProducerRecord(topic, "alice", 20)
initialBobAccount = new ProducerRecord(topic, "bob", 0)
aliceGives20 = new ProducerRecord(topic, "alice", 0)
nonTransactional = new ProducerRecord(topic, "no one", -1)
bobReceives20 = new ProducerRecord(topic, "bob", 20)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)
producer <- KafkaTestUtils.makeProducer

_ <- ZIO.scoped {
Expand All @@ -543,9 +546,9 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
ZIO.unit // silences the abort
}

settings <- KafkaTestUtils.transactionalConsumerSettings(group, client)
settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2)
recordChunk <- ZIO.scoped {
withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer =>
withConsumerInt(Subscription.topics(topic), settings).flatMap { consumer =>
for {
messages <- consumer.take
.flatMap(_.exit)
Expand All @@ -557,28 +560,28 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertTrue(recordChunk.nonEmpty)
},
test("committing offsets after a successful transaction") {
import Subscription._

for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient

initialAliceAccount = new ProducerRecord(topic, "alice", 20)
aliceAccountFeesPaid = new ProducerRecord(topic, "alice", 0)

producer <- KafkaTestUtils.makeProducer
_ <- producer.produce(initialAliceAccount, Serde.string, Serde.int)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

settings <- KafkaTestUtils.transactionalConsumerSettings(group, client)
settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2)
committedOffset <-
ZIO.scoped {
Consumer.make(settings).flatMap { c =>
ZIO.scoped {
c
.plainStream(Topics(Set(topic)), Serde.string, Serde.int)
.plainStream(Subscription.topics(topic), Serde.string, Serde.int)
.toQueue()
.flatMap { q =>
val readAliceAccount = for {
Expand Down Expand Up @@ -611,26 +614,26 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertTrue(committedOffset.get.offset() == 1L)
},
test("not committing offsets after a failed transaction") {
import Subscription._

for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient

initialAliceAccount = new ProducerRecord(topic, "alice", 20)
aliceAccountFeesPaid = new ProducerRecord(topic, "alice", 0)

producer <- KafkaTestUtils.makeProducer
_ <- producer.produce(initialAliceAccount, Serde.string, Serde.int)

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

settings <- KafkaTestUtils.transactionalConsumerSettings(group, client)
settings <- KafkaTestUtils.transactionalConsumerSettings(group, client2)
committedOffset <- ZIO.scoped {
Consumer.make(settings).flatMap { c =>
c
.plainStream(Topics(Set(topic)), Serde.string, Serde.int)
.plainStream(Subscription.topics(topic), Serde.string, Serde.int)
.toQueue()
.flatMap { q =>
val readAliceAccount = for {
Expand Down Expand Up @@ -665,9 +668,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
test("fails if transaction leaks") {
val test = for {
topic <- randomTopic
client1 <- randomClient
transactionThief <- Ref.make(Option.empty[Transaction])

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)
_ <- ZIO.scoped {
transactionalProducer.createTransaction.flatMap { tx =>
transactionThief.set(Some(tx))
Expand All @@ -681,9 +686,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
test("fails if transaction leaks in an open transaction") {
val test = for {
topic <- randomTopic
client1 <- randomClient
transactionThief <- Ref.make(Option.empty[Transaction])

transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString)
consumer1 <- KafkaTestUtils.makeConsumer(client1)
transactionalProducer <- KafkaTestUtils.makeTransactionalProducer(UUID.randomUUID().toString, consumer1)

_ <- ZIO.scoped {
transactionalProducer.createTransaction.flatMap { tx =>
Expand Down
Loading
Loading