Skip to content

Commit

Permalink
Merge branch 'series/1.x' into series/2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer committed Feb 15, 2021
2 parents c4fb166 + 653eb3e commit cec73ce
Showing 1 changed file with 49 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,61 +55,61 @@ object TransactionalKafkaProducer {
implicit F: ConcurrentEffect[F],
context: ContextShift[F]
): Resource[F, TransactionalKafkaProducer[F, K, V]] =
Resource.liftF(settings.producerSettings.keySerializer).flatMap { keySerializer =>
Resource.liftF(settings.producerSettings.valueSerializer).flatMap { valueSerializer =>
WithProducer(settings).map { withProducer =>
new TransactionalKafkaProducer[F, K, V] {
override def produce[P](
records: TransactionalProducerRecords[F, P, K, V]
): F[ProducerResult[P, K, V]] =
produceTransaction(records)
.map(ProducerResult(_, records.passthrough))
(
Resource.liftF(settings.producerSettings.keySerializer),
Resource.liftF(settings.producerSettings.valueSerializer),
WithProducer(settings)
).mapN { (keySerializer, valueSerializer, withProducer) =>
new TransactionalKafkaProducer[F, K, V] {
override def produce[P](
records: TransactionalProducerRecords[F, P, K, V]
): F[ProducerResult[P, K, V]] =
produceTransaction(records)
.map(ProducerResult(_, records.passthrough))

private[this] def produceTransaction[P](
records: TransactionalProducerRecords[F, P, K, V]
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
if (records.records.isEmpty) F.pure(Chunk.empty)
else {
val batch =
CommittableOffsetBatch.fromFoldableMap(records.records)(_.offset)
private[this] def produceTransaction[P](
records: TransactionalProducerRecords[F, P, K, V]
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
if (records.records.isEmpty) F.pure(Chunk.empty)
else {
val batch =
CommittableOffsetBatch.fromFoldableMap(records.records)(_.offset)

val consumerGroupId =
if (batch.consumerGroupIdsMissing || batch.consumerGroupIds.size != 1)
F.raiseError(ConsumerGroupException(batch.consumerGroupIds))
else F.pure(batch.consumerGroupIds.head)
val consumerGroupId =
if (batch.consumerGroupIdsMissing || batch.consumerGroupIds.size != 1)
F.raiseError(ConsumerGroupException(batch.consumerGroupIds))
else F.pure(batch.consumerGroupIds.head)

consumerGroupId.flatMap { groupId =>
withProducer { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
records.records
.flatMap(_.records)
.traverse(
KafkaProducer.produceRecord(keySerializer, valueSerializer, producer)
consumerGroupId.flatMap { groupId =>
withProducer { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
records.records
.flatMap(_.records)
.traverse(
KafkaProducer.produceRecord(keySerializer, valueSerializer, producer)
)
.map(_.sequence)
.flatTap { _ =>
blocking {
producer.sendOffsetsToTransaction(
batch.offsets.asJava,
groupId
)
.map(_.sequence)
.flatTap { _ =>
blocking {
producer.sendOffsetsToTransaction(
batch.offsets.asJava,
groupId
)
}
}
} {
case (_, ExitCase.Completed) =>
blocking(producer.commitTransaction())
case (_, ExitCase.Canceled | ExitCase.Error(_)) =>
blocking(producer.abortTransaction())
}
}
}.flatten
}
}

override def toString: String =
"TransactionalKafkaProducer$" + System.identityHashCode(this)
} {
case (_, ExitCase.Completed) =>
blocking(producer.commitTransaction())
case (_, ExitCase.Canceled | ExitCase.Error(_)) =>
blocking(producer.abortTransaction())
}
}.flatten
}
}
}

override def toString: String =
"TransactionalKafkaProducer$" + System.identityHashCode(this)
}
}

Expand Down

0 comments on commit cec73ce

Please sign in to comment.