From 221dc73503393c56985bd854957a818f188c43d7 Mon Sep 17 00:00:00 2001 From: Miles Sabin Date: Fri, 2 Aug 2024 16:49:20 +0100 Subject: [PATCH] Added batching producer ops (#906) * Added several batched send, pipe and sink convenience methods to ProducerOps. * Added tests * Added scaladoc for new methods * Updated docker-compose to docker compose --------- Co-authored-by: Zach Cox --- .github/workflows/ci.yml | 2 +- build.sbt | 2 +- .../banno/kafka/producer/ProducerOps.scala | 183 +++++++++++++++ .../com/banno/kafka/ProducerSendSpec.scala | 211 ++++++++++++++++++ 4 files changed, 396 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2e6ff16a1..9fbce8b13 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,7 +57,7 @@ jobs: - name: Start docker-compose id: start-docker-compose - run: docker-compose up -d + run: docker compose up -d - name: Check that workflows are up to date run: sbt githubWorkflowCheck diff --git a/build.sbt b/build.sbt index 05eceb3bf..87be36a5a 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ ThisBuild / githubWorkflowBuildPreamble := Seq( WorkflowStep.Run( id = Some("start-docker-compose"), name = Some("Start docker-compose"), - commands = List("docker-compose up -d"), + commands = List("docker compose up -d"), ) ) ThisBuild / tlSonatypeUseLegacyHost := true diff --git a/core/src/main/scala/com/banno/kafka/producer/ProducerOps.scala b/core/src/main/scala/com/banno/kafka/producer/ProducerOps.scala index 4fbb887f0..35989889a 100644 --- a/core/src/main/scala/com/banno/kafka/producer/ProducerOps.scala +++ b/core/src/main/scala/com/banno/kafka/producer/ProducerOps.scala @@ -17,6 +17,7 @@ package com.banno.kafka.producer import cats.* +import cats.data.NonEmptyList import cats.syntax.all.* import fs2.* import org.apache.kafka.common.* @@ -35,15 +36,197 @@ case class ProducerOps[F[_], K, V](producer: ProducerApi[F, K, V]) { )(implicit F: Applicative[F]): F[G[RecordMetadata]] = records.traverse(producer.sendAsync) + /** Sends all of the possibly empty collection of records to the producer + * (synchronously), so the producer may batch them. After all records are + * sent, asynchronously waits for all acks. + * + * Returns the write metadatas, in order. Fails if any individual send or ack + * fails. + * + * This, and related batch write operations, allow the producer to perform + * its own batching, while semantically blocking until all writes have + * succeeded. It maximizes concurrency and producer batching, and also + * simplicity of usage. + */ + def sendBatch[G[_]: Traverse]( + records: G[ProducerRecord[K, V]] + )(implicit F: Monad[F]): F[G[RecordMetadata]] = { + val sends: G[F[F[RecordMetadata]]] = records.map(producer.send) + for { + acks <- sends.sequence + rms <- acks.sequence + } yield rms + } + + /** Sends all of the non-empty collection of records to the producer + * (synchronously), so the producer may batch them. After all records are + * sent, asynchronously waits for all acks. + * + * Returns the write metadatas, in order. Fails if any individual send or ack + * fails. + * + * This, and related batch write operations, allow the producer to perform + * its own batching, while semantically blocking until all writes have + * succeeded. It maximizes concurrency and producer batching, and also + * simplicity of usage. + */ + def sendBatchNonEmpty[G[_]: NonEmptyTraverse]( + records: G[ProducerRecord[K, V]] + )(implicit F: FlatMap[F]): F[G[RecordMetadata]] = { + val sends: G[F[F[RecordMetadata]]] = records.map(producer.send) + for { + acks <- sends.nonEmptySequence + rms <- acks.nonEmptySequence + } yield rms + } + + /** Sends all of the possibly empty collection of records to the producer + * (synchronously), so the producer may batch them. After all records are + * sent, asynchronously waits for all acks. + * + * Calls the `onSend` callback for each record, after it is sent. + * + * Returns the write metadatas, in order. Fails if any individual send or ack + * fails. + * + * This, and related batch write operations, allow the producer to perform + * its own batching, while semantically blocking until all writes have + * succeeded. It maximizes concurrency and producer batching, and also + * simplicity of usage. + */ + def sendBatchWithCallbacks[G[_]: Traverse]( + records: G[ProducerRecord[K, V]], + onSend: ProducerRecord[K, V] => F[Unit], + )(implicit F: Monad[F]): F[G[RecordMetadata]] = { + val sends: G[F[F[RecordMetadata]]] = + records.map(r => producer.send(r) <* onSend(r)) + for { + acks <- sends.sequence + rms <- acks.sequence + } yield rms + } + + /** Sends all of the non-empty collection of records to the producer + * (synchronously), so the producer may batch them. After all records are + * sent, asynchronously waits for all acks. + * + * Calls the `onSend` callback for each record, after it is sent. + * + * Returns the write metadatas, in order. Fails if any individual send or ack + * fails. + * + * This, and related batch write operations, allow the producer to perform + * its own batching, while semantically blocking until all writes have + * succeeded. It maximizes concurrency and producer batching, and also + * simplicity of usage. + */ + def sendBatchNonEmptyWithCallbacks[G[_]: NonEmptyTraverse]( + records: G[ProducerRecord[K, V]], + onSend: ProducerRecord[K, V] => F[Unit], + )(implicit F: FlatMap[F]): F[G[RecordMetadata]] = { + val sends: G[F[F[RecordMetadata]]] = + records.map(r => producer.send(r) <* onSend(r)) + for { + acks <- sends.nonEmptySequence + rms <- acks.nonEmptySequence + } yield rms + } + + /** Returns a Pipe which transforms a stream of records into a stream of + * record metadatas, by sending each record to the producer and waiting for + * the ack. + */ def pipeAsync: Pipe[F, ProducerRecord[K, V], RecordMetadata] = _.evalMap(producer.sendAsync) + /** Returns a Pipe which transforms a stream of possibly empty collections of + * records into a stream of record metadatas, by sending each collection to + * the producer as a batch and waiting for the ack. + */ + def pipeSendBatch[G[_]: Traverse](implicit + F: Monad[F] + ): Pipe[F, G[ProducerRecord[K, V]], G[RecordMetadata]] = + _.evalMap(sendBatch[G]) + + /** Returns a Pipe which transforms a stream of non-empty collections of + * records into a stream of record metadatas, by sending each collection to + * the producer as a batch and waiting for the ack. + */ + def pipeSendBatchNonEmpty[G[_]: NonEmptyTraverse](implicit + F: FlatMap[F] + ): Pipe[F, G[ProducerRecord[K, V]], G[RecordMetadata]] = + _.evalMap(sendBatchNonEmpty[G]) + + /** Returns a Pipe which transforms a stream of records into a stream of + * record metadatas, by using the stream's chunks as batches of records to + * send to the producer. + */ + def pipeSendBatchChunks(implicit + F: FlatMap[F] + ): Pipe[F, ProducerRecord[K, V], RecordMetadata] = + s => + pipeSendBatchNonEmpty[NonEmptyList](NonEmptyTraverse[NonEmptyList], F)( + s.chunks.map(_.toNel).unNone + ).flatMap(nel => Stream.emits(nel.toList)) + + /** Returns a Pipe which transforms a stream of records into a stream of + * record metadatas, by calling chunkN on the stream, to create chunks of + * size `n`, and sending those chunks as batches to the producer. + */ + def pipeSendBatchChunkN(n: Int, allowFewer: Boolean = true)(implicit + F: Monad[F] + ): Pipe[F, ProducerRecord[K, V], RecordMetadata] = + s => + pipeSendBatch[Chunk](Traverse[Chunk], F)(s.chunkN(n, allowFewer)) + .flatMap(Stream.chunk) + + /** Returns a Pipe which transforms a stream of records by sending each record + * to the producer and waiting for the ack. + */ def sink: Pipe[F, ProducerRecord[K, V], Unit] = _.evalMap(producer.sendAndForget) + /** Returns a Pipe which transforms a stream of records by sending each record + * to the producer and waiting for the ack. + */ def sinkAsync: Pipe[F, ProducerRecord[K, V], Unit] = pipeAsync.apply(_).void + /** Returns a Pipe which transforms a stream of possibly empty collections of + * records by sending each collection to the producer as a batch and waiting + * for the ack. + */ + def sinkSendBatch[G[_]: Traverse](implicit + F: Monad[F] + ): Pipe[F, G[ProducerRecord[K, V]], Unit] = + pipeSendBatch.apply(_).void + + /** Returns a Pipe which transforms a stream of non-empty collections of + * records by sending each collection to the producer as a batch and waiting + * for the ack. + */ + def sinkSendBatchNonEmpty[G[_]: NonEmptyTraverse](implicit + F: FlatMap[F] + ): Pipe[F, G[ProducerRecord[K, V]], Unit] = + pipeSendBatchNonEmpty.apply(_).void + + /** Returns a Pipe which transforms a stream of records by using the stream's + * chunks as batches of records to send to the producer. + */ + def sinkSendBatchChunks(implicit + F: FlatMap[F] + ): Pipe[F, ProducerRecord[K, V], Unit] = + pipeSendBatchChunks.apply(_).void + + /** Returns a Pipe which transforms a stream of records by calling chunkN on + * the stream, to create chunks of size `n`, and sending those chunks as + * batches to the producer. + */ + def sinkSendBatchChunkN(n: Int, allowFewer: Boolean = true)(implicit + F: Monad[F] + ): Pipe[F, ProducerRecord[K, V], Unit] = + pipeSendBatchChunkN(n, allowFewer).apply(_).void + def transaction[G[_]: Foldable]( records: G[ProducerRecord[K, V]] )(implicit F: MonadError[F, Throwable]): F[Unit] = diff --git a/core/src/test/scala/com/banno/kafka/ProducerSendSpec.scala b/core/src/test/scala/com/banno/kafka/ProducerSendSpec.scala index ffa7cda83..5a65ca11c 100644 --- a/core/src/test/scala/com/banno/kafka/ProducerSendSpec.scala +++ b/core/src/test/scala/com/banno/kafka/ProducerSendSpec.scala @@ -18,6 +18,8 @@ package com.banno.kafka import cats.syntax.all.* import cats.effect.IO +import cats.effect.kernel.Ref +import fs2.Stream import munit.CatsEffectSuite import org.apache.kafka.clients.producer.* import com.banno.kafka.producer.* @@ -88,6 +90,215 @@ class ProducerSendSpec extends CatsEffectSuite with KafkaSpec { } } + test("send batch") { + ProducerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer) + ) + .use { producer => + ConsumerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer), + GroupId(genGroupId), + AutoOffsetReset.earliest, + ) + .use { consumer => + for { + topic <- createTestTopic[IO]() + values = (0 to 9).toList + records = values.map(v => new ProducerRecord(topic, v, v)) + rms <- producer.sendBatch(records) + () <- consumer.subscribe(topic) + records <- consumer + .recordStream(100.millis) + .take(values.size.toLong) + .compile + .toList + } yield { + assertEquals(rms.size, values.size) + for ((rm, i) <- rms.zipWithIndex) { + assertEquals(rm.topic, topic) + assertEquals(rm.offset, i.toLong) + } + assertEquals(values, records.map(_.value)) + } + } + } + } + + test("send batch with callback") { + ProducerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer) + ) + .use { producer => + ConsumerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer), + GroupId(genGroupId), + AutoOffsetReset.earliest, + ) + .use { consumer => + for { + topic <- createTestTopic[IO]() + values = (0 to 9).toList + records = values.map(v => new ProducerRecord(topic, v, v)) + ref <- Ref[IO].of(0) + rms <- producer.sendBatchWithCallbacks( + records, + r => ref.update(_ + r.value), + ) + () <- consumer.subscribe(topic) + records <- consumer + .recordStream(100.millis) + .take(values.size.toLong) + .compile + .toList + sum <- ref.get + } yield { + assertEquals(rms.size, values.size) + for ((rm, i) <- rms.zipWithIndex) { + assertEquals(rm.topic, topic) + assertEquals(rm.offset, i.toLong) + assertEquals(sum, values.sum) + } + assertEquals(values, records.map(_.value)) + } + } + } + } + + test("pipe send batch") { + ProducerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer) + ) + .use { producer => + ConsumerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer), + GroupId(genGroupId), + AutoOffsetReset.earliest, + ) + .use { consumer => + for { + topic <- createTestTopic[IO]() + values = (0 to 9).toList + records = values.map(v => new ProducerRecord(topic, v, v)) + pstream = Stream(records.take(5), records.drop(5)).covary[IO] + rms <- pstream + .through(producer.pipeSendBatch) + .compile + .toList + .map(_.flatten) + () <- consumer.subscribe(topic) + records <- consumer + .recordStream(100.millis) + .take(values.size.toLong) + .compile + .toList + } yield { + assertEquals(rms.size, values.size) + for ((rm, i) <- rms.zipWithIndex) { + assertEquals(rm.topic, topic) + assertEquals(rm.offset, i.toLong) + } + assertEquals(values, records.map(_.value)) + } + } + } + } + + test("pipe send batch implicitly chunked") { + ProducerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer) + ) + .use { producer => + ConsumerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer), + GroupId(genGroupId), + AutoOffsetReset.earliest, + ) + .use { consumer => + for { + topic <- createTestTopic[IO]() + values = (0 to 9).toList + records = values.map(v => new ProducerRecord(topic, v, v)) + pstream = Stream + .emits(records.take(5)) + .covary[IO] ++ Stream.emits(records.drop(5)).covary[IO] + crms <- pstream + .through(producer.pipeSendBatchChunks) + .compile + .foldChunks(List.empty[List[RecordMetadata]]) { + case (acc, chunk) => chunk.toList :: acc + } + rms = crms.reverse.flatten + () <- consumer.subscribe(topic) + records <- consumer + .recordStream(100.millis) + .take(values.size.toLong) + .compile + .toList + } yield { + assertEquals(crms.size, 2) + assertEquals(rms.size, values.size) + for ((rm, i) <- rms.zipWithIndex) { + assertEquals(rm.topic, topic) + assertEquals(rm.offset, i.toLong) + } + assertEquals(values, records.map(_.value)) + } + } + } + } + + test("pipe send batch explicitly chunked") { + ProducerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer) + ) + .use { producer => + ConsumerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer), + GroupId(genGroupId), + AutoOffsetReset.earliest, + ) + .use { consumer => + for { + topic <- createTestTopic[IO]() + values = (0 to 9).toList + records = values.map(v => new ProducerRecord(topic, v, v)) + pstream = Stream.emits(records).covary[IO] + crms <- pstream + .through(producer.pipeSendBatchChunkN(2)) + .compile + .foldChunks(List.empty[List[RecordMetadata]]) { + case (acc, chunk) => chunk.toList :: acc + } + rms = crms.reverse.flatten + () <- consumer.subscribe(topic) + records <- consumer + .recordStream(100.millis) + .take(values.size.toLong) + .compile + .toList + } yield { + assertEquals(crms.size, 5) + assertEquals(rms.size, values.size) + for ((rm, i) <- rms.zipWithIndex) { + assertEquals(rm.topic, topic) + assertEquals(rm.offset, i.toLong) + } + assertEquals(values, records.map(_.value)) + } + } + } + } + test("outer effect fails on send throw") { val producer = ProducerImpl[IO, String, String](ThrowOnSendProducer[String, String]())