From b27bbf42da18f828ffca02796126074423dde9b9 Mon Sep 17 00:00:00 2001 From: Andrew Mohrland Date: Wed, 8 May 2024 13:18:27 -0700 Subject: [PATCH 1/2] Add SeekTo.fail affordance --- .../scala/com/banno/kafka/RecordStream.scala | 2 +- .../banno/kafka/consumer/ConsumerOps.scala | 6 +++--- .../com/banno/kafka/consumer/SeekTo.scala | 20 +++++++++++++++---- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/com/banno/kafka/RecordStream.scala b/core/src/main/scala/com/banno/kafka/RecordStream.scala index 313cf02f0..62483aa1c 100644 --- a/core/src/main/scala/com/banno/kafka/RecordStream.scala +++ b/core/src/main/scala/com/banno/kafka/RecordStream.scala @@ -611,7 +611,7 @@ object RecordStream { StreamSelector.Impl(hAndU, whetherCommits) } - private def assign[F[_]: Monad: Clock, A, B]( + private def assign[F[_]: MonadThrow: Clock, A, B]( consumer: ConsumerApi[F, GenericRecord, GenericRecord], topical: Topical[A, B], seekToF: Kleisli[F, PartitionQueries[F], SeekTo], diff --git a/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala b/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala index b18927b2b..6fc4f8e2e 100644 --- a/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala +++ b/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala @@ -72,11 +72,11 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) { topics: List[String], offsets: Map[TopicPartition, Long], seekTo: SeekTo = SeekTo.beginning, - )(implicit F: Monad[F], C: Clock[F]): F[Unit] = + )(implicit F: MonadThrow[F], C: Clock[F]): F[Unit] = assignAndSeek(topics, SeekTo.offsets(offsets, seekTo)) def assign(topic: String, offsets: Map[TopicPartition, Long])(implicit - F: Monad[F], + F: MonadThrow[F], C: Clock[F], ): F[Unit] = assign(List(topic), offsets) @@ -84,7 +84,7 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) { def assignAndSeek( topics: List[String], seekTo: SeekTo, - )(implicit F: Monad[F], C: Clock[F]): F[Unit] = + )(implicit F: MonadThrow[F], C: Clock[F]): F[Unit] = for { infos <- consumer.partitionsFor(topics) partitions = infos.map(_.toTopicPartition) diff --git a/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala b/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala index 31738a928..cbbd443e6 100644 --- a/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala +++ b/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala @@ -104,7 +104,7 @@ object SeekTo { } sealed trait FinalFallback { - private[SeekTo] def seek[F[_]]( + private[SeekTo] def seek[F[_]: ApplicativeThrow]( consumer: ConsumerApi[F, _, _], partitions: Iterable[TopicPartition], ): F[Unit] @@ -112,22 +112,31 @@ object SeekTo { object FinalFallback { private case object Beginning extends FinalFallback { - override def seek[F[_]]( + override def seek[F[_]: ApplicativeThrow]( consumer: ConsumerApi[F, _, _], partitions: Iterable[TopicPartition], ): F[Unit] = consumer.seekToBeginning(partitions) } private case object End extends FinalFallback { - override def seek[F[_]]( + override def seek[F[_]: ApplicativeThrow]( consumer: ConsumerApi[F, _, _], partitions: Iterable[TopicPartition], ): F[Unit] = consumer.seekToEnd(partitions) } + private case class Fail(throwable: Throwable) extends FinalFallback { + override def seek[F[_]: ApplicativeThrow]( + consumer: ConsumerApi[F, _, _], + partitions: Iterable[TopicPartition], + ): F[Unit] = + throwable.raiseError[F, Unit] + } + val beginning: FinalFallback = Beginning val end: FinalFallback = End + def fail(throwable: Throwable): FinalFallback = Fail(throwable) } private case class Impl( @@ -147,6 +156,9 @@ object SeekTo { def end: SeekTo = Impl(List.empty, FinalFallback.end) + def fail(throwable: Throwable): SeekTo = + Impl(List.empty, FinalFallback.fail(throwable)) + def timestamps( timestamps: Map[TopicPartition, Long], default: SeekTo, @@ -168,7 +180,7 @@ object SeekTo { ): SeekTo = firstAttemptThen(Attempt.timestampBeforeNow(duration), default) - def seek[F[_]: Monad: Clock]( + def seek[F[_]: MonadThrow: Clock]( consumer: ConsumerApi[F, _, _], partitions: Iterable[TopicPartition], seekTo: SeekTo, From 052ce50b1927e24dbe917743e8297381964c4540 Mon Sep 17 00:00:00 2001 From: Andrew Mohrland Date: Wed, 8 May 2024 13:49:28 -0700 Subject: [PATCH 2/2] Add default SeekTo failure --- .../main/scala/com/banno/kafka/consumer/SeekTo.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala b/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala index cbbd443e6..2b11febc8 100644 --- a/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala +++ b/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala @@ -21,12 +21,17 @@ import cats.effect.* import cats.syntax.all.* import scala.concurrent.duration.* - import org.apache.kafka.common.* +import scala.util.control.NoStackTrace + sealed trait SeekTo object SeekTo { + object Failure extends NoStackTrace { + override val getMessage: String = + "Consumer offset seeking failed" + } sealed trait Attempt { private[SeekTo] def toOffsets[F[_]: Monad: Clock]( queries: PartitionQueries[F], @@ -156,9 +161,12 @@ object SeekTo { def end: SeekTo = Impl(List.empty, FinalFallback.end) - def fail(throwable: Throwable): SeekTo = + def failWith(throwable: Throwable): SeekTo = Impl(List.empty, FinalFallback.fail(throwable)) + def fail: SeekTo = + failWith(Failure) + def timestamps( timestamps: Map[TopicPartition, Long], default: SeekTo,