Skip to content

Commit

Permalink
[greyhound] parallel consumer - improve logging (#36009)
Browse files Browse the repository at this point in the history
* [greyhound] parallel consumer - improve logging

* dummy commit

GitOrigin-RevId: 60e02e82d457076a06b0c0f2a575130051db8c69
  • Loading branch information
ben-wattelman authored and wix-oss committed Oct 5, 2023
1 parent 33e278e commit 76203d5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ class OffsetsInitializer(
}

private def reportSkippedGaps(currentCommittedOffsets: Map[TopicPartition, Option[OffsetAndMetadata]]) = {
val skippedGaps = currentCommittedOffsets
val committedOffsetsAndGaps = currentCommittedOffsets
.collect { case (tp, Some(om)) => tp -> om }
.map(tpom => tpom._1 -> OffsetsAndGaps.parseGapsString(tpom._2.metadata))
.collect { case (tp, Some(gaps)) => tp -> gaps }
reporter(SkippedGapsOnInitialization(clientId, group, skippedGaps))
.collect { case (tp, Some(offsetAndGaps)) => tp -> offsetAndGaps }
val skippedGaps = committedOffsetsAndGaps.collect { case (tp, offsetAndGaps) if offsetAndGaps.gaps.nonEmpty => tp -> offsetAndGaps }

reporter(SkippedGapsOnInitialization(clientId, group, skippedGaps, committedOffsetsAndGaps))
}

private def fetchEndOffsets(seekToEndPartitions: Set[TopicPartition], timeout: Duration) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ object ConsumerMetric {

case class ClosedConsumer(group: Group, clientId: ClientId, result: MetricResult[Throwable, Unit]) extends ConsumerMetric

case class SkippedGapsOnInitialization(clientId: ClientId, group: Group, gaps: Map[TopicPartition, OffsetAndGaps]) extends ConsumerMetric
case class SkippedGapsOnInitialization(
clientId: ClientId,
group: Group,
skippedGaps: Map[TopicPartition, OffsetAndGaps],
currentCommittedOffsetsAndGaps: Map[TopicPartition, OffsetAndGaps]
) extends ConsumerMetric

}

0 comments on commit 76203d5

Please sign in to comment.