Skip to content

Commit

Permalink
remove startTimestamp from offset store state
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Jan 13, 2025
1 parent 463032f commit 332befa
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,6 @@ class R2dbcTimestampOffsetStoreSpec
val p5 = "p-08192" // slice 101 (same as p1)
val p6 = "p-08076" // slice 106

// some validation require the startTimestamp, which is set from readOffset
offsetStore.getState().startTimestamp shouldBe Instant.EPOCH
offsetStore.readOffset().futureValue
offsetStore.getState().startTimestamp shouldBe clock.instant()

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map(p1 -> 3L, p2 -> 1L, p3 -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, p1, 3L)).futureValue
Expand Down Expand Up @@ -1589,7 +1584,6 @@ class R2dbcTimestampOffsetStoreSpec
// scaled up to 4 projections, testing 512-767
val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore2.readOffset().futureValue.get)
startOffset2.timestamp shouldBe time(2)
offsetStore2.getState().startTimestamp shouldBe time(2)
val latestTime = time(10)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ private[projection] object R2dbcOffsetStore {
final case class RecordWithProjectionKey(record: Record, projectionKey: String)

object State {
val empty: State = State(Map.empty, Map.empty, Instant.EPOCH)
val empty: State = State(Map.empty, Map.empty)

def apply(records: immutable.IndexedSeq[Record]): State = {
if (records.isEmpty) empty
else empty.add(records)
}
}

final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]], startTimestamp: Instant) {
final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]]) {

def size: Int = byPid.size

Expand Down Expand Up @@ -440,13 +440,7 @@ private[projection] class R2dbcOffsetStore(
newState.latestTimestamp,
startOffset)

val startTimestamp = startOffset match {
case None => clock.instant()
case Some(offset) => offset.timestamp
}
val newStateWithStartOffset = newState.copy(startTimestamp = startTimestamp)

if (!state.compareAndSet(oldState, newStateWithStartOffset))
if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")

startOffset
Expand Down Expand Up @@ -861,12 +855,13 @@ private[projection] class R2dbcOffsetStore(
if (acceptBefore.exists(timestamp => previousTimestamp.isBefore(timestamp))) {
logger.debug(
"{} Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before deletion window timestamp [{}].",
"is before deletion window timestamp [{}] for slice [{}].",
logPrefix,
pid,
seqNr,
previousTimestamp,
acceptBefore.get)
acceptBefore.fold("none")(_.toString),
slice)
Accepted
} else if (recordWithOffset.fromPubSub) {
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
Expand All @@ -883,14 +878,14 @@ private[projection] class R2dbcOffsetStore(
// and SourceProvider supports it.
logger.warn(
"{} Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " +
"is after start timestamp [{}] minus backtracking window [{}].",
"is after deletion window timestamp [{}] for slice [{}].",
logPrefix,
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp,
currentState.startTimestamp,
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString),
slice)
RejectedBacktrackingSeqNr
} else {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
Expand Down

0 comments on commit 332befa

Please sign in to comment.