Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/series/2.x' into series/3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer committed Apr 1, 2022
2 parents c834397 + 64eaa1d commit cf9bff2
Show file tree
Hide file tree
Showing 20 changed files with 343 additions and 217 deletions.
34 changes: 26 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,27 @@ jobs:
matrix:
os: [ubuntu-latest]
scala: [2.13.8, 3.1.1]
java: [[email protected]]
java: [temurin@8, temurin@17]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v13
- name: Setup Java (temurin@8)
if: matrix.java == 'temurin@8'
uses: actions/setup-java@v2
with:
java-version: ${{ matrix.java }}
distribution: temurin
java-version: 8

- name: Setup Java (temurin@17)
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v2
with:
distribution: temurin
java-version: 17

- name: Cache sbt
uses: actions/cache@v2
Expand Down Expand Up @@ -65,18 +74,27 @@ jobs:
matrix:
os: [ubuntu-latest]
scala: [2.13.8]
java: [adopt@1.8]
java: [temurin@8]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v13
- name: Setup Java (temurin@8)
if: matrix.java == 'temurin@8'
uses: actions/setup-java@v2
with:
distribution: temurin
java-version: 8

- name: Setup Java (temurin@17)
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v2
with:
java-version: ${{ matrix.java }}
distribution: temurin
java-version: 17

- name: Cache sbt
uses: actions/cache@v2
Expand Down
18 changes: 12 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ val fs2Version = "3.2.5"

val kafkaVersion = "3.1.0"

val testcontainersScalaVersion = "0.40.0"
val testcontainersScalaVersion = "0.40.4"

val vulcanVersion = "1.8.0"

Expand Down Expand Up @@ -99,10 +99,8 @@ lazy val docs = project
lazy val dependencySettings = Seq(
resolvers += "confluent" at "https://packages.confluent.io/maven/",
libraryDependencies ++= Seq(
("com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion)
.cross(CrossVersion.for3Use2_13),
("com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion)
.cross(CrossVersion.for3Use2_13),
"com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion,
"com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion,
"org.typelevel" %% "discipline-scalatest" % "2.1.5",
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
Expand Down Expand Up @@ -206,6 +204,8 @@ ThisBuild / githubWorkflowBuild := Seq(

ThisBuild / githubWorkflowArtifactUpload := false

ThisBuild / githubWorkflowJavaVersions := Seq(JavaSpec.temurin("8"), JavaSpec.temurin("17"))

ThisBuild / githubWorkflowTargetTags ++= Seq("v*")
ThisBuild / githubWorkflowPublishTargetBranches :=
Seq(RefPredicate.StartsWith(Ref.Tag("v")))
Expand Down Expand Up @@ -280,13 +280,19 @@ lazy val mimaSettings = Seq(
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.deleteConsumerGroups"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaProducerConnection.produce"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaProducerConnection.metrics"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.kafka.KafkaConsumer.committed"),

// package-private
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.from"),

// sealed
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ConsumerSettings.withDeserializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers")
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),
ProblemFilters.exclude[FinalMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),

// private
ProblemFilters.exclude[Problem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.*")
)
// format: on
}
Expand Down
127 changes: 72 additions & 55 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package fs2.kafka

import cats.{Foldable, Functor, Reducible}
import cats.data.{NonEmptyList, NonEmptySet, OptionT}
import cats.data.{NonEmptySet, OptionT}
import cats.effect._
import cats.effect.std._
import cats.effect.implicits._
Expand All @@ -20,6 +20,7 @@ import fs2.kafka.instances._
import fs2.kafka.internal.KafkaConsumerActor._
import fs2.kafka.internal.syntax._
import fs2.kafka.consumer._
import fs2.kafka.internal.LogEntry.{RevokedPreviousFetch, StoredFetch}

import java.util
import org.apache.kafka.clients.consumer.OffsetAndMetadata
Expand Down Expand Up @@ -71,7 +72,7 @@ import scala.util.matching.Regex
sealed abstract class KafkaConsumer[F[_], K, V]
extends KafkaConsume[F, K, V]
with KafkaAssignment[F]
with KafkaOffsets[F]
with KafkaOffsetsV2[F]
with KafkaSubscription[F]
with KafkaTopics[F]
with KafkaCommit[F]
Expand Down Expand Up @@ -134,17 +135,17 @@ object KafkaConsumer {
val chunkQueue: F[Queue[F, Option[Chunk[CommittableConsumerRecord[F, K, V]]]]] =
Queue.bounded(settings.maxPrefetchBatches - 1)

type PartitionRequest =
type PartitionResult =
(Chunk[KafkaByteConsumerRecord], FetchCompletedReason)

type PartitionsMap = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]
type PartitionsMapQueue = Queue[F, Option[PartitionsMap]]

def createPartitionStream(
def partitionStream(
streamId: StreamId,
partition: TopicPartition,
assignmentRevoked: F[Unit]
): F[Stream[F, CommittableConsumerRecord[F, K, V]]] =
): Stream[F, CommittableConsumerRecord[F, K, V]] = Stream.force {
for {
chunks <- chunkQueue
dequeueDone <- Deferred[F, Unit]
Expand Down Expand Up @@ -179,13 +180,34 @@ object KafkaConsumer {
)
)

def fetchPartition(deferred: Deferred[F, PartitionRequest]): F[Unit] = {
val request = Request.Fetch(
partition,
streamId,
deferred.complete(_: PartitionRequest).void
)
val fetch = requests.offer(request) >> deferred.get
def fetchPartition: F[Unit] = F.deferred[PartitionResult].flatMap { deferred =>
val callback: PartitionResult => F[Unit] =
deferred.complete(_).void

val fetch: F[PartitionResult] = withPermit {
val assigned =
withConsumer.blocking {
_.assignment.contains(partition)
}

def storeFetch: F[Unit] =
actor.ref.modify { state =>
val (newState, oldFetches) =
state.withFetch(partition, streamId, callback)
newState ->
(logging.log(StoredFetch(partition, callback, newState)) >>
oldFetches.traverse_ { fetch =>
fetch.completeRevoked(Chunk.empty) >>
logging.log(RevokedPreviousFetch(partition, streamId))
})
}.flatten

def completeRevoked: F[Unit] =
callback((Chunk.empty, FetchCompletedReason.TopicPartitionRevoked))

assigned.ifM(storeFetch, completeRevoked)
} >> deferred.get

F.race(shutdown, fetch).flatMap {
case Left(()) =>
stopReqs.complete(()).void
Expand All @@ -212,7 +234,7 @@ object KafkaConsumer {
.repeatEval {
stopReqs.tryGet.flatMap {
case None =>
Deferred[F, PartitionRequest] >>= fetchPartition
fetchPartition

case Some(()) =>
// Prevent issuing additional requests after partition is
Expand All @@ -234,34 +256,23 @@ object KafkaConsumer {
.onFinalize(dequeueDone.complete(()).void)
}
}.flatten
}

def enqueueAssignment(
streamId: StreamId,
assigned: Map[TopicPartition, Deferred[F, Unit]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] = {
val assignment: F[PartitionsMap] = if (assigned.isEmpty) {
F.pure(Map.empty)
} else {
assigned.toVector
.traverse {
): F[Unit] =
stopConsumingDeferred.tryGet.flatMap {
case None =>
val assignment: PartitionsMap = assigned.map {
case (partition, finisher) =>
createPartitionStream(streamId, partition, finisher.get).map { stream =>
partition -> stream
}
partition -> partitionStream(streamId, partition, finisher.get)
}
.map(_.toMap)
}

assignment.flatMap { assignment =>
stopConsumingDeferred.tryGet.flatMap {
case None =>
partitionsMapQueue.offer(Some(assignment))
case Some(()) =>
F.unit
}
partitionsMapQueue.offer(Some(assignment))
case Some(()) =>
F.unit
}
}

def onRebalance(
streamId: StreamId,
Expand Down Expand Up @@ -406,7 +417,7 @@ object KafkaConsumer {
private def assignment(
onRebalance: Option[OnRebalance[F]]
): F[SortedSet[TopicPartition]] =
permit.surround {
withPermit {
onRebalance
.fold(actor.ref.updateAndGet(_.asStreaming)) { on =>
actor.ref.updateAndGet(_.withOnRebalance(on).asStreaming).flatTap { newState =>
Expand Down Expand Up @@ -469,17 +480,11 @@ object KafkaConsumer {
override def seek(partition: TopicPartition, offset: Long): F[Unit] =
withConsumer.blocking { _.seek(partition, offset) }

override def seekToBeginning: F[Unit] =
seekToBeginning(List.empty[TopicPartition])

override def seekToBeginning[G[_]](partitions: G[TopicPartition])(
implicit G: Foldable[G]
): F[Unit] =
withConsumer.blocking { _.seekToBeginning(partitions.asJava) }

override def seekToEnd: F[Unit] =
seekToEnd(List.empty[TopicPartition])

override def seekToEnd[G[_]](
partitions: G[TopicPartition]
)(implicit G: Foldable[G]): F[Unit] =
Expand All @@ -502,11 +507,27 @@ object KafkaConsumer {
override def position(partition: TopicPartition, timeout: FiniteDuration): F[Long] =
withConsumer.blocking { _.position(partition, timeout.toJava) }

override def subscribeTo(firstTopic: String, remainingTopics: String*): F[Unit] =
subscribe(NonEmptyList.of(firstTopic, remainingTopics: _*))
override def committed(
partitions: Set[TopicPartition]
): F[Map[TopicPartition, OffsetAndMetadata]] =
withConsumer.blocking {
_.committed(partitions.asJava)
.asInstanceOf[util.Map[TopicPartition, OffsetAndMetadata]]
.toMap
}

override def committed(
partitions: Set[TopicPartition],
timeout: FiniteDuration
): F[Map[TopicPartition, OffsetAndMetadata]] =
withConsumer.blocking {
_.committed(partitions.asJava, timeout.toJava)
.asInstanceOf[util.Map[TopicPartition, OffsetAndMetadata]]
.toMap
}

override def subscribe[G[_]](topics: G[String])(implicit G: Reducible[G]): F[Unit] =
permit.surround {
withPermit {
withConsumer.blocking {
_.subscribe(
topics.toList.asJava,
Expand All @@ -517,15 +538,14 @@ object KafkaConsumer {
.log(LogEntry.SubscribedTopics(topics.toNonEmptyList, _))
}

private def permit: Resource[F, Unit] =
Resource.eval {
Deferred[F, Resource[F, Unit]].flatMap { permitDef =>
requests.offer(Request.Permit(permitDef.complete(_).void)) >> permitDef.get
}
}.flatten
private def withPermit[A](fa: F[A]): F[A] = F.deferred[Either[Throwable, A]].flatMap {
deferred =>
requests
.offer(Request.WithPermit(fa, deferred.complete(_: Either[Throwable, A]).void)) >> deferred.get.rethrow
}

override def subscribe(regex: Regex): F[Unit] =
permit.surround {
withPermit {
withConsumer.blocking {
_.subscribe(
regex.pattern,
Expand All @@ -538,7 +558,7 @@ object KafkaConsumer {
}

override def unsubscribe: F[Unit] =
permit.surround {
withPermit {
withConsumer.blocking { _.unsubscribe() } >> actor.ref
.updateAndGet(_.asUnsubscribed)
.log(LogEntry.Unsubscribed(_))
Expand All @@ -548,7 +568,7 @@ object KafkaConsumer {
stopConsumingDeferred.complete(()).attempt.void

override def assign(partitions: NonEmptySet[TopicPartition]): F[Unit] =
permit.surround {
withPermit {
withConsumer.blocking {
_.assign(
partitions.toList.asJava
Expand All @@ -559,9 +579,6 @@ object KafkaConsumer {

}

override def assign(topic: String, partitions: NonEmptySet[Int]): F[Unit] =
assign(partitions.map(new TopicPartition(topic, _)))

override def assign(topic: String): F[Unit] =
for {
partitions <- partitionsFor(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package fs2.kafka.consumer

import fs2._
import fs2.kafka.instances._
import cats.data.NonEmptySet
import scala.collection.immutable.SortedSet
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -63,7 +64,8 @@ trait KafkaAssignment[F[_]] {
*
* @see org.apache.kafka.clients.consumer.KafkaConsumer#assign
*/
def assign(topic: String, partitions: NonEmptySet[Int]): F[Unit]
def assign(topic: String, partitions: NonEmptySet[Int]): F[Unit] =
assign(partitions.map(new TopicPartition(topic, _)))

/**
* Manually assigns all partitions for the specified topic to the consumer.
Expand Down
Loading

0 comments on commit cf9bff2

Please sign in to comment.