Skip to content

Commit

Permalink
[greyhound] parallel consumer - add logs (#36082)
Browse files Browse the repository at this point in the history
[greyhound] parllel consumer - add logs

GitOrigin-RevId: 2d3421c1d110e20b68222e195cd82227b772dc93
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 23, 2023
1 parent e882a4b commit e973c59
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.wixpress.dst.greyhound.core.consumer

import java.time.Clock
import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.{CommittedMissingOffsets, CommittedMissingOffsetsFailed, SkippedGapsOnInitialization}
import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.{CommittedMissingOffsets, CommittedMissingOffsetsFailed, FoundGapsOnInitialization, SkippedGapsOnInitialization}
import com.wixpress.dst.greyhound.core.{ClientId, Group, Offset, OffsetAndMetadata, TopicPartition}
import com.wixpress.dst.greyhound.core.metrics.{GreyhoundMetric, GreyhoundMetrics}
import zio.{URIO, ZIO}
Expand Down Expand Up @@ -82,8 +82,11 @@ class OffsetsInitializer(
val toPause = seekTo.collect { case (k, SeekTo.Pause) => k }
val seekToEndOffsets = fetchEndOffsets(seekToEndPartitions, timeout).mapValues(OffsetAndMetadata.apply)
val gapsSmallestOffsets = OffsetsAndGaps.gapsSmallestOffsets(currentCommittedOffsets)
val seekToGapsOffsets = if (parallelConsumer) gapsSmallestOffsets else Map.empty
val toOffsets = seekToOffsets ++ seekToEndOffsets ++ seekToGapsOffsets

if (gapsSmallestOffsets.nonEmpty) reporter(FoundGapsOnInitialization(clientId, group, gapsSmallestOffsets))

val seekToGapsOffsets = if (parallelConsumer) gapsSmallestOffsets else Map.empty
val toOffsets = seekToOffsets ++ seekToEndOffsets ++ seekToGapsOffsets

if (!parallelConsumer && gapsSmallestOffsets.nonEmpty) reportSkippedGaps(currentCommittedOffsets)
PartitionActions(offsetSeeks = toOffsets, partitionsToPause = toPause.toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,4 +421,10 @@ object ConsumerMetric {
currentCommittedOffsetsAndGaps: Map[TopicPartition, OffsetAndGaps]
) extends ConsumerMetric

case class FoundGapsOnInitialization(
clientId: ClientId,
group: Group,
gapsSmallestOffsets: Map[TopicPartition, OffsetAndMetadata]
) extends ConsumerMetric

}

0 comments on commit e973c59

Please sign in to comment.