diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala index 843fc9c4a..5ff14a0c1 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala @@ -4,6 +4,7 @@ package akka.projection.r2dbc +import java.time.{ Duration => JDuration } import java.time.Instant import akka.projection.r2dbc.internal.R2dbcOffsetStore.Pid @@ -16,9 +17,10 @@ import org.scalatest.wordspec.AnyWordSpec class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers { + def slice(pid: Pid): Int = math.abs(pid.hashCode % 1024) + def createRecord(pid: Pid, seqNr: SeqNr, timestamp: Instant): Record = { - val slice = math.abs(pid.hashCode % 1024) - Record(slice, pid, seqNr, timestamp) + Record(slice(pid), pid, seqNr, timestamp) } "R2dbcOffsetStore.State" should { @@ -31,9 +33,11 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers createRecord("p1", 2, t0.plusMillis(1)), createRecord("p1", 3, t0.plusMillis(2)))) state1.byPid("p1").seqNr shouldBe 3L + state1.bySliceSorted.size shouldBe 1 + state1.bySliceSorted(slice("p1")).size shouldBe 1 + state1.bySliceSorted(slice("p1")).head.seqNr shouldBe 3 state1.latestTimestamp shouldBe t0.plusMillis(2) state1.latestOffset.get.seen shouldBe Map("p1" -> 3L) - state1.oldestTimestamp shouldBe t0 val state2 = state1.add(Vector(createRecord("p2", 2, t0.plusMillis(1)))) state2.byPid("p1").seqNr shouldBe 3L @@ -41,15 +45,37 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers // latest not updated because timestamp of p2 was before latest state2.latestTimestamp shouldBe t0.plusMillis(2) state2.latestOffset.get.seen shouldBe Map("p1" -> 3L) - state2.oldestTimestamp shouldBe t0 val state3 = state2.add(Vector(createRecord("p3", 10, t0.plusMillis(3)))) state3.byPid("p1").seqNr shouldBe 3L state3.byPid("p2").seqNr shouldBe 2L state3.byPid("p3").seqNr shouldBe 10L + slice("p3") should not be slice("p1") + slice("p3") should not be slice("p2") + state3.bySliceSorted(slice("p3")).last.pid shouldBe "p3" + state3.bySliceSorted(slice("p3")).last.seqNr shouldBe 10 state3.latestTimestamp shouldBe t0.plusMillis(3) state3.latestOffset.get.seen shouldBe Map("p3" -> 10L) - state3.oldestTimestamp shouldBe t0 + + slice("p863") shouldBe slice("p984") // both slice 645 + slice("p863") should not be slice("p1") + slice("p863") should not be slice("p2") + slice("p863") should not be slice("p3") + val state4 = state3 + .add(Vector(createRecord("p863", 1, t0.plusMillis(10)))) + .add(Vector(createRecord("p863", 2, t0.plusMillis(11)))) + .add(Vector(createRecord("p984", 1, t0.plusMillis(12)), createRecord("p984", 2, t0.plusMillis(13)))) + state4.bySliceSorted(slice("p984")).size shouldBe 2 + state4.bySliceSorted(slice("p984")).last.pid shouldBe "p984" + state4.bySliceSorted(slice("p984")).last.seqNr shouldBe 2 + + val state5 = state3 + .add(Vector(createRecord("p863", 2, t0.plusMillis(13)))) + .add(Vector(createRecord("p863", 1, t0.plusMillis(12)))) + .add(Vector(createRecord("p984", 2, t0.plusMillis(11)), createRecord("p984", 1, t0.plusMillis(10)))) + state5.bySliceSorted(slice("p863")).size shouldBe 2 + state5.bySliceSorted(slice("p863")).last.pid shouldBe "p863" + state5.bySliceSorted(slice("p863")).last.seqNr shouldBe 2 } // reproducer of issue #173 @@ -68,41 +94,48 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers state.latestOffset.get.seen shouldBe Map("p1" -> 3L, "p2" -> 2L, "p3" -> 5L, "p4" -> 9L) } + val allowAll: Any => Boolean = _ => true + "evict old" in { - // these pids have the same slice 645, otherwise it will keep one for each slice - val p1 = "p500" - val p2 = "p621" - val p3 = "p742" - val p4 = "p863" - val p5 = "p984" + val p1 = "p500" // slice 645 + val p2 = "p621" // slice 645 + val p3 = "p742" // slice 645 + val p4 = "p863" // slice 645 + val p5 = "p984" // slice 645 + val p6 = "p92" // slice 905 + val p7 = "p108" // slice 905 val t0 = TestClock.nowMillis().instant() val state1 = State.empty .add( Vector( - createRecord(p1, 1, t0), - createRecord(p2, 2, t0.plusMillis(1)), - createRecord(p3, 3, t0.plusMillis(2)), - createRecord(p4, 4, t0.plusMillis(3)), - createRecord(p5, 5, t0.plusMillis(4)))) - state1.latestOffset.get.seen shouldBe Map(p5 -> 5L) - state1.oldestTimestamp shouldBe t0 + createRecord(p1, 1, t0.plusMillis(1)), + createRecord(p2, 2, t0.plusMillis(2)), + createRecord(p3, 3, t0.plusMillis(3)), + createRecord(p4, 4, t0.plusMillis(4)), + createRecord(p6, 6, t0.plusMillis(6)))) state1.byPid - .map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L) - - val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1) - state2.latestOffset.get.seen shouldBe Map(p5 -> 5L) - state2.oldestTimestamp shouldBe t0.plusMillis(2) - state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p3 -> 3L, p4 -> 4L, p5 -> 5L) + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) // keep all - state1.evict(t0.plusMillis(2), keepNumberOfEntries = 100) shouldBe state1 - - // keep 4 - val state3 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 4) - state3.latestOffset.get.seen shouldBe Map(p5 -> 5L) - state3.oldestTimestamp shouldBe t0.plusMillis(1) - state3.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L) + state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000), allowAll) shouldBe state1 + + // evict older than time window + val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll) + state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) + + val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10)))) + val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll) + state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L) + + val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2), allowAll) + state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map( + p1 -> 1L, + p2 -> 2L, + p3 -> 3L, + p4 -> 4L, + p5 -> 5L, + p7 -> 7L) } "evict old but keep latest for each slice" in { @@ -112,9 +145,9 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers Vector( createRecord("p1", 1, t0), createRecord("p92", 2, t0.plusMillis(1)), - createRecord("p108", 3, t0.plusMillis(2)), - createRecord("p4", 4, t0.plusMillis(3)), - createRecord("p5", 5, t0.plusMillis(4)))) + createRecord("p108", 3, t0.plusMillis(20)), + createRecord("p4", 4, t0.plusMillis(30)), + createRecord("p5", 5, t0.plusMillis(40)))) state1.byPid("p1").slice shouldBe 449 state1.byPid("p92").slice shouldBe 905 @@ -122,6 +155,8 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers state1.byPid("p4").slice shouldBe 452 state1.byPid("p5").slice shouldBe 453 + val slices = state1.bySliceSorted.keySet + state1.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map( "p1" -> 1L, "p92" -> 2L, @@ -129,22 +164,48 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers "p4" -> 4L, "p5" -> 5L) state1.latestOffset.get.seen shouldBe Map("p5" -> 5L) - state1.oldestTimestamp shouldBe t0 - val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1) + val timeWindow = JDuration.ofMillis(1) + + val state2 = slices.foldLeft(state1) { + case (acc, slice) => acc.evict(slice, timeWindow, allowAll) + } // note that p92 is evicted because it has same slice as p108 // p1 is kept because keeping one for each slice state2.byPid .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L) state2.latestOffset.get.seen shouldBe Map("p5" -> 5L) - state2.oldestTimestamp shouldBe t0 - val state3 = state1.evict(t0.plusMillis(10), keepNumberOfEntries = 1) + val state3 = slices.foldLeft(state2) { + case (acc, slice) => acc.evict(slice, timeWindow, allowAll) + } // still keeping one for each slice state3.byPid .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L) } + "evict old but only those allowed to be evicted" in { + val t0 = TestClock.nowMillis().instant() + val state1 = State.empty.add( + Vector( + createRecord("p92", 2, t0.plusMillis(1)), + createRecord("p108", 3, t0.plusMillis(20)), + createRecord("p229", 4, t0.plusMillis(30)))) + + val slices = state1.bySliceSorted.keySet + slices.size shouldBe 1 + + state1 + .evict(slices.head, JDuration.ofMillis(1), allowAll) + .byPid + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p229" -> 4) // p92 and p108 evicted + + state1 + .evict(slices.head, JDuration.ofMillis(1), _.pid == "p108") + .byPid // allow only p108 to be evicted + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p92" -> 2, "p229" -> 4) + } + "find duplicate" in { val t0 = TestClock.nowMillis().instant() val state = diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index b22201013..dd1b6927e 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -29,7 +29,6 @@ import akka.projection.r2dbc.internal.OffsetPidSeqNr import akka.projection.r2dbc.internal.R2dbcOffsetStore import akka.projection.r2dbc.internal.R2dbcOffsetStore.Pid import akka.projection.r2dbc.internal.R2dbcOffsetStore.SeqNr -import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory @@ -48,13 +47,7 @@ object R2dbcTimestampOffsetStoreSpec { } class R2dbcTimestampOffsetStoreSpec - extends ScalaTestWithActorTestKit( - ConfigFactory - .parseString(""" - # to be able to test eviction - akka.projection.r2dbc.offset-store.keep-number-of-entries = 0 - """) - .withFallback(TestConfig.config)) + extends ScalaTestWithActorTestKit(TestConfig.config) with AnyWordSpecLike with TestDbLifecycle with TestData @@ -134,6 +127,9 @@ class R2dbcTimestampOffsetStoreSpec TimestampOffset(timestamp, timestamp.plusMillis(1000), Map(pid -> revision)), timestamp.toEpochMilli) + def slice(pid: String): Int = + persistenceExt.sliceForPersistenceId(pid) + s"The R2dbcOffsetStore for TimestampOffset (dialect ${r2dbcSettings.dialectName})" must { "save TimestampOffset with one entry" in { @@ -738,16 +734,16 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20L) } - "evict old records" in { + "evict old records from same slice" in { val projectionId = genRandomProjectionId() - val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10)) + val evictSettings = settings.withTimeWindow(100.seconds) import evictSettings._ val offsetStore = createOffsetStore(projectionId, evictSettings) val startTime = TestClock.nowMicros().instant() log.debug("Start time [{}]", startTime) - // these pids have the same slice 645, otherwise it will keep one for each slice + // these pids have the same slice 645 val p1 = "p500" val p2 = "p621" val p3 = "p742" @@ -759,34 +755,22 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)), - p4, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)), - p5, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)), - p6, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L)) .futureValue offsetStore.getState().size shouldBe 6 @@ -796,81 +780,59 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 7 // nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)), - p8, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L)) .futureValue offsetStore.getState().size shouldBe 8 // still nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)), - p8, - 2L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L)) .futureValue offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8) offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)), - p8, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L)) .futureValue offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) } - "evict old records but keep latest for each slice" in { + "evict old records from different slices" in { val projectionId = genRandomProjectionId() - val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10)) + val evictSettings = settings.withTimeWindow(100.seconds) import evictSettings._ val offsetStore = createOffsetStore(projectionId, evictSettings) val startTime = TestClock.nowMicros().instant() log.debug("Start time [{}]", startTime) - val p1 = "p500" // slice 645 - val p2 = "p92" // slice 905 - val p3 = "p108" // slice 905 - val p4 = "p863" // slice 645 - val p5 = "p984" // slice 645 - val p6 = "p3080" // slice 645 - val p7 = "p4290" // slice 645 - val p8 = "p20180" // slice 645 + // these pids have the same slice 645 + val p1 = "p500" + val p2 = "p621" + val p3 = "p742" + val p4 = "p863" + val p5 = "p984" + val p6 = "p3080" + val p7 = "p4290" + val p8 = "p20180" + val p9 = "p-0960" // slice 576 offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)), - p4, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)), - p5, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)), - p6, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L)) .futureValue offsetStore.getState().size shouldBe 6 @@ -880,37 +842,49 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 7 // nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)), - p8, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L)) .futureValue offsetStore.getState().size shouldBe 8 // still nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)), - p8, - 2L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L)) .futureValue - // also keeping p3 ("p108") for slice 905 - offsetStore.getState().byPid.keySet shouldBe Set(p3, p5, p6, p7, p8) + offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8) offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)), - p8, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L)) .futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p3, p7, p8) + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) + + // save same slice, but behind + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1001), Map(p2 -> 2L)), p2, 2L)) + .futureValue + // it's evicted immediately + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) + val dao = offsetStore.dao + // but still saved + dao.readTimestampOffset(slice(p2), p2).futureValue.get.seqNr shouldBe 2 + + // save another slice that hasn't been used before + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1002), Map(p9 -> 1L)), p9, 1L)) + .futureValue + offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8) + dao.readTimestampOffset(slice(p9), p9).futureValue.get.seqNr shouldBe 1 + // and one more of that same slice + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1003), Map(p9 -> 2L)), p9, 2L)) + .futureValue + offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8) + dao.readTimestampOffset(slice(p9), p9).futureValue.get.seqNr shouldBe 2 } "delete old records" in { val projectionId = genRandomProjectionId() - val deleteSettings = settings.withTimeWindow(JDuration.ofSeconds(100)) + val deleteSettings = settings + .withTimeWindow(100.seconds) + .withDeleteAfter(100.seconds) import deleteSettings._ val offsetStore = createOffsetStore(projectionId, deleteSettings) @@ -942,10 +916,10 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 4 offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(2)), Map(p5 -> 1L)), p5, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.minusSeconds(2)), Map(p5 -> 1L)), p5, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p6 -> 1L)), p6, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.minusSeconds(1)), Map(p6 -> 1L)), p6, 1L)) .futureValue // nothing deleted yet offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 @@ -953,10 +927,10 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 6 offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)), Map(p7 -> 1L)), p7, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.plusSeconds(1)), Map(p7 -> 1L)), p7, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(3)), Map(p8 -> 1L)), p8, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.plusSeconds(3)), Map(p8 -> 1L)), p8, 1L)) .futureValue offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 offsetStore.readOffset().futureValue // this will load from database @@ -965,7 +939,9 @@ class R2dbcTimestampOffsetStoreSpec "delete old records but keep latest for each slice" in { val projectionId = genRandomProjectionId() - val deleteSettings = settings.withTimeWindow(JDuration.ofSeconds(100)) + val deleteSettings = settings + .withTimeWindow(100.seconds) + .withDeleteAfter(100.seconds) import deleteSettings._ val offsetStore = createOffsetStore(projectionId, deleteSettings) @@ -996,10 +972,10 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 4 offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(2)), Map(p5 -> 1L)), p5, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.minusSeconds(2)), Map(p5 -> 1L)), p5, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p6 -> 1L)), p6, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.minusSeconds(1)), Map(p6 -> 1L)), p6, 1L)) .futureValue // nothing deleted yet offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 @@ -1007,26 +983,30 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 6 offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)), Map(p7 -> 1L)), p7, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.plusSeconds(1)), Map(p7 -> 1L)), p7, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(3)), Map(p8 -> 1L)), p8, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.plusSeconds(3)), Map(p8 -> 1L)), p8, 1L)) .futureValue - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.plusSeconds(3)), Map(p3 -> 2L)), p3, 2L)) + .futureValue + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 + offsetStore.getState().byPid.keySet shouldBe Set(p3, p4, p5, p6, p7, p8) offsetStore.readOffset().futureValue // this will load from database // p3 is kept for slice 905 offsetStore.getState().byPid.keySet shouldBe Set(p3, p4, p5, p6, p7, p8) } "delete many old records" in { - // windowSeconds and totalMillis can be increase for longer/more testing - val windowSeconds = 3 + // deleteAfter and totalMillis can be increase for longer/more testing + val deleteAfter = 3 val totalMillis = 5 * 1000 val projectionId = genRandomProjectionId() val deleteSettings = settings - .withTimeWindow(JDuration.ofSeconds(windowSeconds)) - .withKeepNumberOfEntries(2000) + .withTimeWindow(JDuration.ofSeconds(deleteAfter)) + .withDeleteAfter(JDuration.ofSeconds(deleteAfter)) .withDeleteInterval(JDuration.ofHours(1)) // don't run the scheduled deletes val offsetStore = createOffsetStore(projectionId, deleteSettings) @@ -1057,7 +1037,8 @@ class R2dbcTimestampOffsetStoreSpec val projectionId = genRandomProjectionId() val deleteSettings = settings - .withTimeWindow(JDuration.ofSeconds(100)) + .withTimeWindow(10.seconds) + .withDeleteAfter(100.seconds) .withDeleteInterval(JDuration.ofMillis(500)) import deleteSettings._ val offsetStore = createOffsetStore(projectionId, deleteSettings) @@ -1072,7 +1053,7 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)), Map(p2 -> 1L)), p2, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(deleteAfter.plusSeconds(1)), Map(p2 -> 1L)), p2, 1L)) .futureValue eventually { offsetStore.readOffset().futureValue // this will load from database @@ -1082,7 +1063,7 @@ class R2dbcTimestampOffsetStoreSpec offsetStore .saveOffset( OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.multipliedBy(2).plusSeconds(2)), Map(p3 -> 1L)), + TimestampOffset(startTime.plus(deleteAfter.multipliedBy(2).plusSeconds(2)), Map(p3 -> 1L)), p3, 1L)) .futureValue @@ -1092,15 +1073,14 @@ class R2dbcTimestampOffsetStoreSpec } } - "delete old records triggered by time window, while still within entries limit" in { + "delete old records from different slices" in { val projectionId = genRandomProjectionId() - val evictSettings = settings - .withKeepNumberOfEntries(10) - .withTimeWindow(JDuration.ofSeconds(100)) - .withEvictInterval(JDuration.ofSeconds(10)) - val offsetStore = createOffsetStore(projectionId, evictSettings) + val deleteSettings = settings + .withTimeWindow(100.seconds) + .withDeleteAfter(100.seconds) + val offsetStore = createOffsetStore(projectionId, deleteSettings) - import evictSettings.{ evictInterval, timeWindow } + import deleteSettings.deleteAfter val t0 = TestClock.nowMicros().instant() log.debug("Start time [{}]", t0) @@ -1123,49 +1103,48 @@ class R2dbcTimestampOffsetStoreSpec val t3 = t0.plusSeconds(3) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue - val t4 = t0.plus(evictInterval).plusSeconds(1) + val t4 = t0.plusSeconds(11) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue - val t5 = t0.plus(evictInterval).plusSeconds(2) + val t5 = t0.plusSeconds(12) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue - val t6 = t0.plus(evictInterval).plusSeconds(3) + val t6 = t0.plusSeconds(13) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue offsetStore.getState().size shouldBe 6 - val t7 = t0.plus(timeWindow.minusSeconds(10)) + val t7 = t0.plus(deleteAfter.minusSeconds(10)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue offsetStore.getState().size shouldBe 7 // no eviction offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window) - val t8 = t0.plus(timeWindow.plus(evictInterval).minusSeconds(3)) + val t8 = t0.plus(deleteAfter.plusSeconds(7)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue - offsetStore.getState().size shouldBe 8 // no eviction - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p1@t1 and p2@t2, kept p3@t3 (latest) + offsetStore.getState().byPid.keySet shouldBe Set(p2, p3, p4, p5, p6, p7, p8) // eviction slice 645 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 1 // deleted p1@t1 - val t9 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3)) + val t9 = t0.plus(deleteAfter.plusSeconds(13)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue - offsetStore.getState().size shouldBe 8 // no eviction (outside eviction window, but within keep-number-of-entries) - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p4@t4 and p5@t5, kept p3@t3 (latest) + offsetStore.getState().byPid.keySet shouldBe Set(p2, p3, p6, p7, p8) // eviction slice 645 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p4@t4 and p5@t5 - offsetStore.getState().byPid.keySet shouldBe Set(p1, p2, p3, p4, p5, p6, p7, p8) + offsetStore.getState().byPid.keySet shouldBe Set(p2, p3, p6, p7, p8) offsetStore.readOffset().futureValue // reload from database - offsetStore.getState().byPid.keySet shouldBe Set(p3, p6, p7, p8) + offsetStore.getState().byPid.keySet shouldBe Set(p2, p3, p6, p7, p8) } - "delete old records triggered after eviction" in { + "delete old records for same slice" in { val projectionId = genRandomProjectionId() - val evictSettings = settings - .withKeepNumberOfEntries(5) - .withTimeWindow(JDuration.ofSeconds(100)) - .withEvictInterval(JDuration.ofSeconds(10)) - val offsetStore = createOffsetStore(projectionId, evictSettings) + val deleteSettings = settings + .withTimeWindow(100.seconds) + .withDeleteAfter(100.seconds) + val offsetStore = createOffsetStore(projectionId, deleteSettings) - import evictSettings.{ evictInterval, timeWindow } + import deleteSettings.deleteAfter val t0 = TestClock.nowMicros().instant() log.debug("Start time [{}]", t0) @@ -1193,69 +1172,69 @@ class R2dbcTimestampOffsetStoreSpec val t3 = t0.plusSeconds(3) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue - val t4 = t0.plus(evictInterval).plusSeconds(7) + val t4 = t0.plusSeconds(17) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue - val t5 = t0.plus(evictInterval).plusSeconds(8) + val t5 = t0.plusSeconds(18) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue - val t6 = t0.plus(evictInterval).plusSeconds(9) + val t6 = t0.plusSeconds(19) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue offsetStore.getState().size shouldBe 6 // no eviction offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion - val t7 = t0.plus(timeWindow.minus(evictInterval)) + val t7 = t0.plus(deleteAfter.minusSeconds(10)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue offsetStore.getState().size shouldBe 7 // no eviction offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window) - val t8 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3)) + val t8 = t0.plus(deleteAfter.plusSeconds(13)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8) // evicted p1@t1, p2@t2, and p3@t3 offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deletion triggered by eviction - val t9 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(10)) + val t9 = t0.plus(deleteAfter.plusSeconds(30)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue - offsetStore.getState().size shouldBe 5 // no eviction (outside time window, but still within limit) + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) // evicted offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deleted p4@t4, p5@t5, p6@t6 (outside window) - val t10 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(11)) + val t10 = t0.plus(deleteAfter.plusSeconds(31)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t10, Map(p9 -> 1L)), p9, 1L)).futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8, p9) // evicted p4@t4 - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9) // nothing evicted + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // but if deletion triggered nothing to delete - val t11 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(12)) + val t11 = t0.plus(deleteAfter.plusSeconds(32)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t11, Map(p10 -> 1L)), p10, 1L)).futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p6, p7, p8, p9, p10) // evicted p5@t5 - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10) // nothing evicted + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // but if deletion triggered nothing to delete - val t12 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(13)) + val t12 = t0.plus(deleteAfter.plusSeconds(33)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t12, Map(p11 -> 1L)), p11, 1L)).futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11) // evicted p6@t6 - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11) // nothing evicted + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // but if deletion triggered nothing to delete - val t13 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(14)) + val t13 = t0.plus(deleteAfter.plusSeconds(34)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t13, Map(p12 -> 1L)), p12, 1L)).futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11, p12) // no eviction (within time window) - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11, p12) // nothing evicted + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // but if deletion triggered nothing to delete - val t14 = t0.plus(timeWindow.multipliedBy(2).plus(evictInterval.multipliedBy(3)).plusSeconds(1)) + val t14 = t7.plus(deleteAfter.plusSeconds(1)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t14, Map(p12 -> 2L)), p12, 2L)).futureValue offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) // evicted p7@t7 - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // triggered by evict, deleted p7@t7, p8@t8, p8@t9 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 1 // triggered by evict, deleted p7@t7 offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) offsetStore.readOffset().futureValue // reload from database - offsetStore.getState().byPid.keySet shouldBe Set(p9, p10, p11, p12) + offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) } "set offset" in { @@ -1386,7 +1365,7 @@ class R2dbcTimestampOffsetStoreSpec ProjectionId(projectionName, s"$minSlice-$maxSlice"), Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)), system, - settings.withTimeWindow(JDuration.ofSeconds(10)), + settings.withTimeWindow(10.seconds).withDeleteAfter(10.seconds), r2dbcExecutor) // two projections at higher scale @@ -1429,7 +1408,7 @@ class R2dbcTimestampOffsetStoreSpec val state1 = offsetStore3.getState() state1.size shouldBe 4 - state1.latestBySlice.size shouldBe 4 + state1.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets().size shouldBe 4 // all latest are from other projection keys offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload @@ -1448,7 +1427,7 @@ class R2dbcTimestampOffsetStoreSpec val state2 = offsetStore3.getState() state2.size shouldBe 4 - state2.latestBySlice.size shouldBe 4 + state2.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets().size shouldBe 2 // latest by slice still from other projection keys (768-1023) offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload @@ -1486,7 +1465,7 @@ class R2dbcTimestampOffsetStoreSpec val state3 = offsetStore3.getState() state3.size shouldBe 4 - state3.latestBySlice.size shouldBe 4 + state3.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets().size shouldBe 1 // latest by slice still from 768-1023 offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload @@ -1520,7 +1499,7 @@ class R2dbcTimestampOffsetStoreSpec val state4 = offsetStore3.getState() state4.size shouldBe 4 - state4.latestBySlice.size shouldBe 4 + state4.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets().size shouldBe 1 // latest by slice still from 768-1023 offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload @@ -1556,14 +1535,17 @@ class R2dbcTimestampOffsetStoreSpec val state5 = offsetStore3.getState() state5.size shouldBe 4 - state5.latestBySlice.size shouldBe 4 + state5.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets() shouldBe empty offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // outdated offsets, included those for 768-1023, will eventually be deleted offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p1 -> 4L)), p1, 4L)).futureValue - offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 17 + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p2 -> 8L)), p2, 8L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 8L)), p3, 8L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p4 -> 10L)), p4, 10L)).futureValue + offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 20 } "validate timestamp of previous sequence number" in { @@ -1611,7 +1593,7 @@ class R2dbcTimestampOffsetStoreSpec .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) .futureValue shouldBe RejectedBacktrackingSeqNr // accepted if timestamp of previous seqNr is before start timestamp minus backtracking window - clock.setInstant(startOffset2.timestamp.minus(settings.timeWindow.plusSeconds(1))) + clock.setInstant(startOffset2.timestamp.minus(settings.deleteAfter.plusSeconds(1))) offsetStore2 .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) .futureValue shouldBe Accepted diff --git a/akka-projection-r2dbc/src/main/mima-filters/1.6.5.backwards.excludes/r2dbc-offset-store.excludes b/akka-projection-r2dbc/src/main/mima-filters/1.6.5.backwards.excludes/r2dbc-offset-store.excludes new file mode 100644 index 000000000..d6efaba39 --- /dev/null +++ b/akka-projection-r2dbc/src/main/mima-filters/1.6.5.backwards.excludes/r2dbc-offset-store.excludes @@ -0,0 +1,2 @@ +# internal +ProblemFilters.exclude[Problem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#State*") diff --git a/akka-projection-r2dbc/src/main/resources/reference.conf b/akka-projection-r2dbc/src/main/resources/reference.conf index d0a72926e..6b0490f49 100644 --- a/akka-projection-r2dbc/src/main/resources/reference.conf +++ b/akka-projection-r2dbc/src/main/resources/reference.conf @@ -27,17 +27,12 @@ akka.projection.r2dbc { # It should not be larger than the akka.projection.r2dbc.offset-store.time-window. backtracking-window = ${akka.persistence.r2dbc.query.backtracking.window} - # Keep this number of entries. Don't evict old entries until this threshold - # has been reached. - keep-number-of-entries = 10000 + # Remove old entries when older than this duration from the offset store database. + delete-after = 1 day - # Remove old entries outside the time-window from the offset store memory - # with this frequency. - evict-interval = 10 seconds - - # Remove old entries outside the time-window from the offset store database + # Remove old entries when older than delete-after from the offset store database # with this frequency. Can be disabled with `off`. - delete-interval = 1 minute + delete-interval = 10 minutes # Adopt latest-by-slice entries from other projection keys with this frequency. # Can be disabled with `off`. @@ -45,6 +40,14 @@ akka.projection.r2dbc { # Trying to batch insert offsets in batches of this size. offset-batch-size = 20 + + # Number of slices (within a given projection's slice range) which will be queried for + # offsets simultaneously when the projection is started. + offset-slice-read-parallelism = 10 + + # Number of offsets to retrieve per slice when the projection is started. + # Other offsets will be loaded on demand. + offset-slice-read-limit = 100 } # By default it shares connection-factory with akka-persistence-r2dbc (write side), diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala index d4403af21..8a36355cd 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala @@ -7,6 +7,7 @@ package akka.projection.r2dbc import java.time.{ Duration => JDuration } import java.util.Locale +import scala.annotation.nowarn import scala.concurrent.duration._ import scala.jdk.DurationConverters._ @@ -50,22 +51,39 @@ object R2dbcProjectionSettings { case _ => config.getDuration("offset-store.adopt-interval") } + val backtrackingWindow = config.getDuration("offset-store.backtracking-window") + val timeWindow = { + val d = config.getDuration("offset-store.time-window") + // can't be less then backtrackingWindow + if (d.compareTo(backtrackingWindow) < 0) backtrackingWindow + else d + } + val deleteAfter = { + val d = config.getDuration("offset-store.delete-after") + // can't be less then timeWindow + if (d.compareTo(timeWindow) < 0) timeWindow + else d + } + new R2dbcProjectionSettings( schema = Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty), offsetTable = config.getString("offset-store.offset-table"), timestampOffsetTable = config.getString("offset-store.timestamp-offset-table"), managementTable = config.getString("offset-store.management-table"), useConnectionFactory = config.getString("use-connection-factory"), - timeWindow = config.getDuration("offset-store.time-window"), - backtrackingWindow = config.getDuration("offset-store.backtracking-window"), - keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"), - evictInterval = config.getDuration("offset-store.evict-interval"), + timeWindow, + backtrackingWindow, + deleteAfter, + keepNumberOfEntries = 0, + evictInterval = JDuration.ZERO, deleteInterval, adoptInterval, logDbCallsExceeding, warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"), offsetBatchSize = config.getInt("offset-store.offset-batch-size"), - customConnectionFactory = None) + customConnectionFactory = None, + offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"), + offsetSliceReadLimit = config.getInt("offset-store.offset-slice-read-limit")) } /** @@ -84,14 +102,19 @@ final class R2dbcProjectionSettings private ( val useConnectionFactory: String, val timeWindow: JDuration, val backtrackingWindow: JDuration, + val deleteAfter: JDuration, + @deprecated("Not used, evict is only based on time window", "1.6.6") val keepNumberOfEntries: Int, + @deprecated("Not used, evict is not periodic", "1.6.6") val evictInterval: JDuration, val deleteInterval: JDuration, val adoptInterval: JDuration, val logDbCallsExceeding: FiniteDuration, val warnAboutFilteredEventsInFlow: Boolean, val offsetBatchSize: Int, - val customConnectionFactory: Option[ConnectionFactory]) { + val customConnectionFactory: Option[ConnectionFactory], + val offsetSliceReadParallelism: Int, + val offsetSliceReadLimit: Int) { val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + offsetTable val timestampOffsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + timestampOffsetTable @@ -124,14 +147,23 @@ final class R2dbcProjectionSettings private ( def withBacktrackingWindow(backtrackingWindow: JDuration): R2dbcProjectionSettings = copy(backtrackingWindow = backtrackingWindow) + def withDeleteAfter(deleteAfter: FiniteDuration): R2dbcProjectionSettings = + copy(deleteAfter = deleteAfter.toJava) + + def withDeleteAfter(deleteAfter: JDuration): R2dbcProjectionSettings = + copy(deleteAfter = deleteAfter) + + @deprecated("Not used, evict is only based on time window", "1.6.6") def withKeepNumberOfEntries(keepNumberOfEntries: Int): R2dbcProjectionSettings = - copy(keepNumberOfEntries = keepNumberOfEntries) + this + @deprecated("Not used, evict is not periodic", "1.6.6") def withEvictInterval(evictInterval: FiniteDuration): R2dbcProjectionSettings = - copy(evictInterval = evictInterval.toJava) + this + @deprecated("Not used, evict is not periodic", "1.6.6") def withEvictInterval(evictInterval: JDuration): R2dbcProjectionSettings = - copy(evictInterval = evictInterval) + this def withDeleteInterval(deleteInterval: FiniteDuration): R2dbcProjectionSettings = copy(deleteInterval = deleteInterval.toJava) @@ -160,6 +192,13 @@ final class R2dbcProjectionSettings private ( def withCustomConnectionFactory(customConnectionFactory: ConnectionFactory): R2dbcProjectionSettings = copy(customConnectionFactory = Some(customConnectionFactory)) + def withOffsetSliceReadParallelism(offsetSliceReadParallelism: Int): R2dbcProjectionSettings = + copy(offsetSliceReadParallelism = offsetSliceReadParallelism) + + def withOffsetSliceReadLimit(offsetSliceReadLimit: Int): R2dbcProjectionSettings = + copy(offsetSliceReadLimit = offsetSliceReadLimit) + + @nowarn("msg=deprecated") private def copy( schema: Option[String] = schema, offsetTable: String = offsetTable, @@ -168,14 +207,15 @@ final class R2dbcProjectionSettings private ( useConnectionFactory: String = useConnectionFactory, timeWindow: JDuration = timeWindow, backtrackingWindow: JDuration = backtrackingWindow, - keepNumberOfEntries: Int = keepNumberOfEntries, - evictInterval: JDuration = evictInterval, + deleteAfter: JDuration = deleteAfter, deleteInterval: JDuration = deleteInterval, adoptInterval: JDuration = adoptInterval, logDbCallsExceeding: FiniteDuration = logDbCallsExceeding, warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow, offsetBatchSize: Int = offsetBatchSize, - customConnectionFactory: Option[ConnectionFactory] = customConnectionFactory) = + customConnectionFactory: Option[ConnectionFactory] = customConnectionFactory, + offsetSliceReadParallelism: Int = offsetSliceReadParallelism, + offsetSliceReadLimit: Int = offsetSliceReadLimit) = new R2dbcProjectionSettings( schema, offsetTable, @@ -184,6 +224,7 @@ final class R2dbcProjectionSettings private ( useConnectionFactory, timeWindow, backtrackingWindow, + deleteAfter, keepNumberOfEntries, evictInterval, deleteInterval, @@ -191,8 +232,10 @@ final class R2dbcProjectionSettings private ( logDbCallsExceeding, warnAboutFilteredEventsInFlow, offsetBatchSize, - customConnectionFactory) + customConnectionFactory, + offsetSliceReadParallelism, + offsetSliceReadLimit) override def toString = - s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $keepNumberOfEntries, $evictInterval, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize, $customConnectionFactory)" + s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize, $customConnectionFactory)" } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala index e9d0a54b4..e2f0c92b7 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala @@ -22,7 +22,7 @@ import akka.projection.r2dbc.internal.R2dbcOffsetStore.LatestBySlice @InternalApi private[projection] trait OffsetStoreDao { - def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] + def readTimestampOffset(slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]] @@ -37,7 +37,7 @@ private[projection] trait OffsetStoreDao { timestamp: Instant, storageRepresentation: OffsetSerialization.StorageRepresentation): Future[Done] - def deleteOldTimestampOffset(until: Instant, notInLatestBySlice: Seq[LatestBySlice]): Future[Long] + def deleteOldTimestampOffset(slice: Int, until: Instant): Future[Long] def deleteNewTimestampOffsetsInTx(connection: Connection, timestamp: Instant): Future[Long] diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index 651a7af52..20e2d3c34 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -6,7 +6,6 @@ package akka.projection.r2dbc.internal import java.time.Instant -import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -65,8 +64,8 @@ private[projection] class PostgresOffsetStoreDao( private val selectTimestampOffsetSql: String = sql""" - SELECT projection_key, slice, persistence_id, seq_nr, timestamp_offset - FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?""" + SELECT projection_key, persistence_id, seq_nr, timestamp_offset + FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? ORDER BY timestamp_offset DESC LIMIT ?""" protected def createSelectOneTimestampOffsetSql: String = sql""" @@ -93,27 +92,17 @@ private[projection] class PostgresOffsetStoreDao( } /** - * delete less than a timestamp - * @param notInLatestBySlice not used in postgres, but needed in sql + * delete less than a timestamp for a given slice */ - @nowarn - protected def deleteOldTimestampOffsetSql(notInLatestBySlice: Seq[LatestBySlice]): String = + protected def deleteOldTimestampOffsetSql(): String = sql""" - DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset < ? - AND NOT (persistence_id || '-' || seq_nr) = ANY (?)""" + DELETE FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? AND timestamp_offset < ?""" - protected def bindDeleteOldTimestampOffsetSql( - stmt: Statement, - minSlice: Int, - maxSlice: Int, - until: Instant, - notInLatestBySlice: Seq[LatestBySlice]): Statement = { + protected def bindDeleteOldTimestampOffsetSql(stmt: Statement, slice: Int, until: Instant): Statement = { stmt - .bind(0, minSlice) - .bind(1, maxSlice) - .bind(2, projectionId.name) - .bindTimestamp(3, until) - .bind(4, notInLatestBySlice.iterator.map(record => s"${record.pid}-${record.seqNr}").toArray[String]) + .bind(0, slice) + .bind(1, projectionId.name) + .bindTimestamp(2, until) } // delete greater than or equal a timestamp @@ -203,25 +192,19 @@ private[projection] class PostgresOffsetStoreDao( s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.") } - override def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = { - val (minSlice, maxSlice) = { - sourceProvider match { - case Some(provider) => (provider.minSlice, provider.maxSlice) - case None => (0, persistenceExt.numberOfSlices - 1) - } - } + override def readTimestampOffset( + slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = { r2dbcExecutor.select("read timestamp offset")( conn => { logger.trace("reading timestamp offset for [{}]", projectionId) conn .createStatement(selectTimestampOffsetSql) - .bind(0, minSlice) - .bind(1, maxSlice) - .bind(2, projectionId.name) + .bind(0, slice) + .bind(1, projectionId.name) + .bind(2, settings.offsetSliceReadLimit) }, row => { val projectionKey = row.get("projection_key", classOf[String]) - val slice = row.get("slice", classOf[java.lang.Integer]) val pid = row.get("persistence_id", classOf[String]) val seqNr = row.get("seq_nr", classOf[java.lang.Long]) val timestamp = row.getTimestamp("timestamp_offset") @@ -357,12 +340,10 @@ private[projection] class PostgresOffsetStoreDao( R2dbcExecutor.updateInTx(statements).map(_ => Done)(ExecutionContext.parasitic) } - override def deleteOldTimestampOffset(until: Instant, notInLatestBySlice: Seq[LatestBySlice]): Future[Long] = { - val minSlice = timestampOffsetBySlicesSourceProvider.minSlice - val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice + override def deleteOldTimestampOffset(slice: Int, until: Instant): Future[Long] = { r2dbcExecutor.updateOne("delete old timestamp offset") { conn => - val stmt = conn.createStatement(deleteOldTimestampOffsetSql(notInLatestBySlice)) - bindDeleteOldTimestampOffsetSql(stmt, minSlice, maxSlice, until, notInLatestBySlice) + val stmt = conn.createStatement(deleteOldTimestampOffsetSql()) + bindDeleteOldTimestampOffsetSql(stmt, slice, until) } } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index cb69c93bc..579576093 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -26,18 +26,24 @@ import akka.projection.internal.OffsetSerialization.MultipleOffsets import akka.projection.r2dbc.R2dbcProjectionSettings import io.r2dbc.spi.Connection import org.slf4j.LoggerFactory - import java.time.Clock import java.time.Instant import java.time.{ Duration => JDuration } import java.util.UUID -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicReference +import java.lang.Boolean.FALSE +import java.lang.Boolean.TRUE + import scala.annotation.tailrec import scala.collection.immutable +import scala.collection.immutable.TreeSet import scala.concurrent.ExecutionContext import scala.concurrent.Future +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + /** * INTERNAL API */ @@ -46,7 +52,22 @@ private[projection] object R2dbcOffsetStore { type SeqNr = Long type Pid = String - final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) + final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) extends Ordered[Record] { + override def compare(that: Record): Int = + timestamp.compareTo(that.timestamp) match { + case 0 => + Integer.compare(slice, that.slice) match { + case 0 => + pid.compareTo(that.pid) match { + case 0 => java.lang.Long.compare(seqNr, that.seqNr) + case result => result + } + case result => result + } + case result => result + } + } + final case class RecordWithOffset( record: Record, offset: TimestampOffset, @@ -57,7 +78,7 @@ private[projection] object R2dbcOffsetStore { final case class RecordWithProjectionKey(record: Record, projectionKey: String) object State { - val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH, 0, Instant.EPOCH) + val empty: State = State(Map.empty, Map.empty, Instant.EPOCH) def apply(records: immutable.IndexedSeq[Record]): State = { if (records.isEmpty) empty @@ -65,69 +86,67 @@ private[projection] object R2dbcOffsetStore { } } - final case class State( - byPid: Map[Pid, Record], - latest: immutable.IndexedSeq[Record], - oldestTimestamp: Instant, - sizeAfterEvict: Int, - startTimestamp: Instant) { + final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]], startTimestamp: Instant) { def size: Int = byPid.size - def latestTimestamp: Instant = - if (latest.isEmpty) Instant.EPOCH - else latest.head.timestamp + def latestTimestamp: Instant = { + if (bySliceSorted.isEmpty) + Instant.EPOCH + else + bySliceSorted.valuesIterator.map(_.last.timestamp).max + } def latestOffset: Option[TimestampOffset] = { - if (latest.isEmpty) + if (bySliceSorted.isEmpty) { None - else - Some(TimestampOffset(latestTimestamp, latest.map(r => r.pid -> r.seqNr).toMap)) + } else { + val t = latestTimestamp + val latest = + bySliceSorted.valuesIterator.flatMap { records => + if (records.nonEmpty && records.last.timestamp == t) + records.toVector.reverseIterator.takeWhile(_.timestamp == t).toVector + else + Vector.empty + }.toVector + + val seen = latest.foldLeft(Map.empty[Pid, SeqNr]) { + case (acc, record) => + acc.get(record.pid) match { + case None => acc.updated(record.pid, record.seqNr) + case Some(existing) => + if (record.seqNr > existing) acc.updated(record.pid, record.seqNr) + else acc + } + } + + Some(TimestampOffset(t, seen)) + } } - def add(records: immutable.IndexedSeq[Record]): State = { + def add(records: Iterable[Record]): State = { records.foldLeft(this) { case (acc, r) => - val newByPid = - acc.byPid.get(r.pid) match { - case Some(existingRecord) => - if (r.seqNr > existingRecord.seqNr) - acc.byPid.updated(r.pid, r) - else - acc.byPid // older or same seqNr - case None => - acc.byPid.updated(r.pid, r) - } - - val latestTimestamp = acc.latestTimestamp - val newLatest = - if (r.timestamp.isAfter(latestTimestamp)) { - Vector(r) - } else if (r.timestamp == latestTimestamp) { - acc.latest.find(_.pid == r.pid) match { - case None => acc.latest :+ r - case Some(existingRecord) => - // keep highest seqNr - if (r.seqNr >= existingRecord.seqNr) - acc.latest.filterNot(_.pid == r.pid) :+ r - else - acc.latest - } - } else { - acc.latest // older than existing latest, keep existing latest - } - val newOldestTimestamp = - if (acc.oldestTimestamp == Instant.EPOCH) - r.timestamp // first record - else if (r.timestamp.isBefore(acc.oldestTimestamp)) - r.timestamp - else - acc.oldestTimestamp // this is the normal case - - acc.copy(byPid = newByPid, latest = newLatest, oldestTimestamp = newOldestTimestamp) + val sorted = acc.bySliceSorted.getOrElse(r.slice, TreeSet.empty[Record]) + acc.byPid.get(r.pid) match { + case Some(existingRecord) => + if (r.seqNr > existingRecord.seqNr) + acc.copy( + byPid = acc.byPid.updated(r.pid, r), + bySliceSorted = acc.bySliceSorted.updated(r.slice, sorted - existingRecord + r)) + else + acc // older or same seqNr + case None => + acc.copy( + byPid = acc.byPid.updated(r.pid, r), + bySliceSorted = acc.bySliceSorted.updated(r.slice, sorted + r)) + } } } + def contains(pid: Pid): Boolean = + byPid.contains(pid) + def isDuplicate(record: Record): Boolean = { byPid.get(record.pid) match { case Some(existingRecord) => record.seqNr <= existingRecord.seqNr @@ -135,29 +154,36 @@ private[projection] object R2dbcOffsetStore { } } - def window: JDuration = - JDuration.between(oldestTimestamp, latestTimestamp) - - private lazy val sortedByTimestamp: Vector[Record] = byPid.valuesIterator.toVector.sortBy(_.timestamp) + def evict(slice: Int, timeWindow: JDuration, ableToEvictRecord: Record => Boolean): State = { + val recordsSortedByTimestamp = bySliceSorted.getOrElse(slice, TreeSet.empty[Record]) + if (recordsSortedByTimestamp.isEmpty) { + this + } else { + val until = recordsSortedByTimestamp.last.timestamp.minus(timeWindow) + val filtered = { + // Records comparing >= this record by recordOrdering will definitely be kept, + // Records comparing < this record by recordOrdering are subject to eviction + // Slice will be equal, and pid will compare lexicographically less than any valid pid + val untilRecord = Record(slice, "", 0, until) + // this will always keep at least one, latest per slice + val newerRecords = recordsSortedByTimestamp.rangeFrom(untilRecord) // inclusive of until + val olderRecords = recordsSortedByTimestamp.rangeUntil(untilRecord) // exclusive of until + val filteredOlder = olderRecords.filterNot(ableToEvictRecord) + + if (filteredOlder.size == olderRecords.size) recordsSortedByTimestamp + else newerRecords.union(filteredOlder) + } - lazy val latestBySlice: Vector[Record] = { - val builder = scala.collection.mutable.Map[Int, Record]() - sortedByTimestamp.reverseIterator.foreach { record => - if (!builder.contains(record.slice)) - builder.update(record.slice, record) + // adding back filtered is linear in the size of filtered, but so is checking if we're able to evict + if (filtered eq recordsSortedByTimestamp) { + this + } else { + val byPidOtherSlices = byPid.filterNot { case (_, r) => r.slice == slice } + val bySliceOtherSlices = bySliceSorted - slice + copy(byPid = byPidOtherSlices, bySliceSorted = bySliceOtherSlices) + .add(filtered) + } } - builder.values.toVector - } - - def evict(until: Instant, keepNumberOfEntries: Int): State = { - if (oldestTimestamp.isBefore(until) && size > keepNumberOfEntries) { - val newState = State( - sortedByTimestamp.take(size - keepNumberOfEntries).filterNot(_.timestamp.isBefore(until)) - ++ sortedByTimestamp.takeRight(keepNumberOfEntries) - ++ latestBySlice) - newState.copy(sizeAfterEvict = newState.size) - } else - this } } @@ -197,12 +223,17 @@ private[projection] class R2dbcOffsetStore( import R2dbcOffsetStore._ - // FIXME include projectionId in all log messages - private val logger = LoggerFactory.getLogger(this.getClass) - private val persistenceExt = Persistence(system) - private val evictWindow = settings.timeWindow.plus(settings.evictInterval) + private val (minSlice, maxSlice) = { + sourceProvider match { + case Some(provider) => (provider.minSlice, provider.maxSlice) + case None => (0, persistenceExt.numberOfSlices - 1) + } + } + + private val logger = LoggerFactory.getLogger(this.getClass) + private val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:" private val offsetSerialization = new OffsetSerialization(system) import offsetSerialization.fromStorageRepresentation @@ -220,7 +251,7 @@ private[projection] class R2dbcOffsetStore( s"[$unknown] is not a dialect supported by this version of Akka Projection R2DBC") } val dao: OffsetStoreDao = { - logger.debug("Offset store [{}] created, with dialect [{}]", projectionId, dialectName) + logger.debug("{} Offset store created, with dialect [{}]", logPrefix, dialectName) dialect.createOffsetStoreDao(settings, sourceProvider, system, r2dbcExecutor, projectionId) } @@ -237,17 +268,7 @@ private[projection] class R2dbcOffsetStore( private val inflight = new AtomicReference(Map.empty[Pid, SeqNr]) // To avoid delete requests when no new offsets have been stored since previous delete - private val idle = new AtomicBoolean(false) - - // To trigger next deletion after in-memory eviction - private val triggerDeletion = new AtomicBoolean(false) - - if (!settings.deleteInterval.isZero && !settings.deleteInterval.isNegative) - system.scheduler.scheduleWithFixedDelay( - settings.deleteInterval, - settings.deleteInterval, - () => deleteOldTimestampOffsets(), - system.executionContext) + private val triggerDeletionPerSlice = new ConcurrentHashMap[Int, java.lang.Boolean] // Foreign offsets (latest by slice offsets from other projection keys) that should be adopted when passed in time. // Contains remaining offsets to adopt. Sorted by timestamp. Can be updated concurrently with CAS retries. @@ -269,6 +290,12 @@ private[projection] class R2dbcOffsetStore( system.executionContext)) else None + private def scheduleNextDelete(): Unit = { + if (!settings.deleteInterval.isZero && !settings.deleteInterval.isNegative) + system.scheduler.scheduleOnce(settings.deleteInterval, () => deleteOldTimestampOffsets(), system.executionContext) + } + scheduleNextDelete() + private def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = { sourceProvider match { case Some(timestampQuery: EventTimestampQuery) => @@ -314,15 +341,30 @@ private[projection] class R2dbcOffsetStore( } private def readTimestampOffset(): Future[Option[TimestampOffset]] = { - idle.set(false) + implicit val sys = system // for implicit stream materializer + triggerDeletionPerSlice.clear() val oldState = state.get() - dao.readTimestampOffset().map { recordsWithKey => + val recordsWithKeyFut = + Source(minSlice to maxSlice) + .mapAsyncUnordered(settings.offsetSliceReadParallelism) { slice => + dao.readTimestampOffset(slice) + } + .mapConcat(identity) + .runWith(Sink.seq) + .map(_.toVector)(ExecutionContext.parasitic) + + recordsWithKeyFut.map { recordsWithKey => clearInflight() clearForeignOffsets() clearLatestSeen() - val newState = State(recordsWithKey.map(_.record)) + val newState = { + val s = State(recordsWithKey.map(_.record)) + (minSlice to maxSlice).foldLeft(s) { + case (acc, slice) => acc.evict(slice, settings.timeWindow, _ => true) + } + } val startOffset = if (newState == State.empty) { @@ -363,9 +405,9 @@ private[projection] class R2dbcOffsetStore( } logger.debug( - "readTimestampOffset state with [{}] persistenceIds, oldest [{}], latest [{}], start offset [{}]", + "{} readTimestampOffset state with [{}] persistenceIds, latest [{}], start offset [{}]", + logPrefix, newState.byPid.size, - newState.oldestTimestamp, newState.latestTimestamp, startOffset) @@ -405,7 +447,7 @@ private[projection] class R2dbcOffsetStore( offsets.find(_.id == projectionId).map(fromStorageRepresentation[Offset, Offset]) } - logger.trace("found offset [{}] for [{}]", result, projectionId) + logger.trace("{} found offset [{}]", logPrefix, result) result } @@ -414,6 +456,46 @@ private[projection] class R2dbcOffsetStore( } } + def load(pid: Pid): Future[State] = { + val oldState = state.get() + if (oldState.contains(pid)) + Future.successful(oldState) + else { + val slice = persistenceExt.sliceForPersistenceId(pid) + logger.trace("{} load [{}]", logPrefix, pid) + dao.readTimestampOffset(slice, pid).flatMap { + case Some(record) => + val newState = oldState.add(Vector(record)) + if (state.compareAndSet(oldState, newState)) + Future.successful(newState) + else + load(pid) // CAS retry, concurrent update + case None => Future.successful(oldState) + } + } + } + + def load(pids: IndexedSeq[Pid]): Future[State] = { + val oldState = state.get() + val pidsToLoad = pids.filterNot(oldState.contains) + if (pidsToLoad.isEmpty) + Future.successful(oldState) + else { + val loadedRecords = pidsToLoad.map { pid => + val slice = persistenceExt.sliceForPersistenceId(pid) + logger.trace("{} load [{}]", logPrefix, pid) + dao.readTimestampOffset(slice, pid) + } + Future.sequence(loadedRecords).flatMap { records => + val newState = oldState.add(records.flatten) + if (state.compareAndSet(oldState, newState)) + Future.successful(newState) + else + load(pids) // CAS retry, concurrent update + } + } + } + /** * Like saveOffsetInTx, but in own transaction. Used by atLeastOnce. */ @@ -433,7 +515,7 @@ private[projection] class R2dbcOffsetStore( case OffsetPidSeqNr(t: TimestampOffset, Some((pid, seqNr))) => val slice = persistenceExt.sliceForPersistenceId(pid) val record = Record(slice, pid, seqNr, t.timestamp) - saveTimestampOffsetInTx(conn, Vector(record)) + saveTimestampOffsetInTx(conn, Vector(record), canBeConcurrent = true) case OffsetPidSeqNr(_: TimestampOffset, None) => throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.") case _ => @@ -444,12 +526,18 @@ private[projection] class R2dbcOffsetStore( def saveOffsets(offsets: immutable.IndexedSeq[OffsetPidSeqNr]): Future[Done] = { r2dbcExecutor .withConnection("save offsets") { conn => - saveOffsetsInTx(conn, offsets) + saveOffsetsInTx(conn, offsets, canBeConcurrent = true) } .map(_ => Done)(ExecutionContext.parasitic) } - def saveOffsetsInTx(conn: Connection, offsets: immutable.IndexedSeq[OffsetPidSeqNr]): Future[Done] = { + def saveOffsetsInTx(conn: Connection, offsets: immutable.IndexedSeq[OffsetPidSeqNr]): Future[Done] = + saveOffsetsInTx(conn, offsets, canBeConcurrent = false) + + private def saveOffsetsInTx( + conn: Connection, + offsets: immutable.IndexedSeq[OffsetPidSeqNr], + canBeConcurrent: Boolean): Future[Done] = { if (offsets.isEmpty) FutureDone else if (offsets.head.offset.isInstanceOf[TimestampOffset]) { @@ -461,73 +549,78 @@ private[projection] class R2dbcOffsetStore( throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.") case _ => throw new IllegalArgumentException( - "Mix of TimestampOffset and other offset type in same transaction isnot supported") + "Mix of TimestampOffset and other offset type in same transaction is not supported") } - saveTimestampOffsetInTx(conn, records) + saveTimestampOffsetInTx(conn, records, canBeConcurrent) } else { savePrimitiveOffsetInTx(conn, offsets.last.offset) } } - private def saveTimestampOffsetInTx(conn: Connection, records: immutable.IndexedSeq[Record]): Future[Done] = { - idle.set(false) - val oldState = state.get() - val filteredRecords = { - if (records.size <= 1) - records.filterNot(oldState.isDuplicate) - else { - // use last record for each pid - records - .groupBy(_.pid) - .valuesIterator - .collect { - case recordsByPid if !oldState.isDuplicate(recordsByPid.last) => recordsByPid.last - } - .toVector + private def saveTimestampOffsetInTx( + conn: Connection, + records: immutable.IndexedSeq[Record], + canBeConcurrent: Boolean): Future[Done] = { + load(records.map(_.pid)).flatMap { oldState => + val filteredRecords = { + if (records.size <= 1) + records.filterNot(oldState.isDuplicate) + else { + // use last record for each pid + records + .groupBy(_.pid) + .valuesIterator + .collect { + case recordsByPid if !oldState.isDuplicate(recordsByPid.last) => recordsByPid.last + } + .toVector + } } - } - if (hasForeignOffsets() && records.nonEmpty) { - val latestTimestamp = - if (records.size == 1) records.head.timestamp - else records.maxBy(_.timestamp).timestamp - updateLatestSeen(latestTimestamp) - } - if (filteredRecords.isEmpty) { - FutureDone - } else { - val newState = oldState.add(filteredRecords) - - // accumulate some more than the timeWindow before evicting, and at least 10% increase of size - // for testing keepNumberOfEntries = 0 is used - val evictThresholdReached = - if (settings.keepNumberOfEntries == 0) true else newState.size > (newState.sizeAfterEvict * 1.1).toInt - val evictedNewState = - if (newState.size > settings.keepNumberOfEntries && evictThresholdReached && newState.window - .compareTo(evictWindow) > 0) { - val evictUntil = newState.latestTimestamp.minus(settings.timeWindow) - val s = newState.evict(evictUntil, settings.keepNumberOfEntries) - triggerDeletion.set(true) - logger.debug( - "Evicted [{}] records until [{}], keeping [{}] records. Latest [{}].", - newState.size - s.size, - evictUntil, - s.size, - newState.latestTimestamp) - s - } else - newState - - val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords) - - offsetInserts.map { _ => - if (state.compareAndSet(oldState, evictedNewState)) - cleanupInflight(evictedNewState) - else - throw new IllegalStateException("Unexpected concurrent modification of state from saveOffset.") - Done + if (hasForeignOffsets() && records.nonEmpty) { + val latestTimestamp = + if (records.size == 1) records.head.timestamp + else records.maxBy(_.timestamp).timestamp + updateLatestSeen(latestTimestamp) + } + if (filteredRecords.isEmpty) { + FutureDone + } else { + val newState = oldState.add(filteredRecords) + + val slices = + if (filteredRecords.size == 1) Set(filteredRecords.head.slice) + else filteredRecords.iterator.map(_.slice).toSet + + val currentInflight = getInflight() + val evictedNewState = slices.foldLeft(newState) { + case (s, slice) => + s.evict( + slice, + settings.timeWindow, + // Only persistence IDs that aren't inflight are evictable, + // if only so that those persistence IDs can be removed from + // inflight... in the absence of further records from that + // persistence ID, the next store will evict (further records + // would make that persistence ID recent enough to not be evicted) + record => !currentInflight.contains(record.pid)) + } + + val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords) + + offsetInserts.map { _ => + if (state.compareAndSet(oldState, evictedNewState)) { + slices.foreach(s => triggerDeletionPerSlice.put(s, TRUE)) + cleanupInflight(evictedNewState) + } else { // concurrent update + if (canBeConcurrent) saveTimestampOffsetInTx(conn, records, canBeConcurrent) // CAS retry + else throw new IllegalStateException("Unexpected concurrent modification of state from saveOffset.") + } + Done + } } } } + @tailrec private def cleanupInflight(newState: State): Unit = { val currentInflight = getInflight() val newInflight = @@ -541,7 +634,7 @@ private[projection] class R2dbcOffsetStore( if (newInflight.size >= 10000) { throw new IllegalStateException( s"Too many envelopes in-flight [${newInflight.size}]. " + - "Please report this issue at https://github.com/akka/akka-persistence-r2dbc") + "Please report this issue at https://github.com/akka/akka-projection") } if (!inflight.compareAndSet(currentInflight, newInflight)) cleanupInflight(newState) // CAS retry, concurrent update of inflight @@ -554,7 +647,7 @@ private[projection] class R2dbcOffsetStore( } private def savePrimitiveOffsetInTx[Offset](conn: Connection, offset: Offset): Future[Done] = { - logger.trace("saving offset [{}]", offset) + logger.trace("{} saving offset [{}]", logPrefix, offset) if (!settings.isOffsetTableDefined) Future.failed( @@ -573,11 +666,17 @@ private[projection] class R2dbcOffsetStore( /** * The stored sequence number for a persistenceId, or 0 if unknown persistenceId. */ - def storedSeqNr(pid: Pid): SeqNr = + def storedSeqNr(pid: Pid): Future[SeqNr] = { getState().byPid.get(pid) match { - case Some(record) => record.seqNr - case None => 0L + case Some(record) => Future.successful(record.seqNr) + case None => + val slice = persistenceExt.sliceForPersistenceId(pid) + dao.readTimestampOffset(slice, pid).map { + case Some(record) => record.seqNr + case None => 0L + } } + } def validateAll[Envelope](envelopes: immutable.Seq[Envelope]): Future[immutable.Seq[(Envelope, Validation)]] = { import Validation._ @@ -622,92 +721,91 @@ private[projection] class R2dbcOffsetStore( import Validation._ val pid = recordWithOffset.record.pid val seqNr = recordWithOffset.record.seqNr - val currentState = getState() - val duplicate = currentState.isDuplicate(recordWithOffset.record) + load(pid).flatMap { currentState => + val duplicate = currentState.isDuplicate(recordWithOffset.record) - if (duplicate) { - logger.trace("Filtering out duplicate sequence number [{}] for pid [{}]", seqNr, pid) - // also move latest seen forward, for adopting foreign offsets on replay of duplicates - if (hasForeignOffsets()) updateLatestSeen(recordWithOffset.offset.timestamp) - FutureDuplicate - } else if (recordWithOffset.strictSeqNr) { - // strictSeqNr == true is for event sourced - val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L)) + if (duplicate) { + logger.trace("{} Filtering out duplicate sequence number [{}] for pid [{}]", logPrefix, seqNr, pid) + // also move latest seen forward, for adopting foreign offsets on replay of duplicates + if (hasForeignOffsets()) updateLatestSeen(recordWithOffset.offset.timestamp) + FutureDuplicate + } else if (recordWithOffset.strictSeqNr) { + // strictSeqNr == true is for event sourced + val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L)) - def logUnexpected(): Unit = { - if (recordWithOffset.fromPubSub) - logger.debug( - "Rejecting pub-sub envelope, unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}", - seqNr, - pid, - prevSeqNr, - recordWithOffset.offset) - else if (!recordWithOffset.fromBacktracking) - logger.debug( - "Rejecting unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}", - seqNr, - pid, - prevSeqNr, - recordWithOffset.offset) - else - logger.warn( - "Rejecting unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}", - seqNr, - pid, - prevSeqNr, - recordWithOffset.offset) - } + def logUnexpected(): Unit = { + if (recordWithOffset.fromPubSub) + logger.debug( + "{} Rejecting pub-sub envelope, unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}", + logPrefix, + seqNr, + pid, + prevSeqNr, + recordWithOffset.offset) + else if (!recordWithOffset.fromBacktracking) + logger.debug( + "{} Rejecting unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}", + logPrefix, + seqNr, + pid, + prevSeqNr, + recordWithOffset.offset) + else + logger.warn( + "{} Rejecting unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}", + logPrefix, + seqNr, + pid, + prevSeqNr, + recordWithOffset.offset) + } - if (prevSeqNr > 0) { - // expecting seqNr to be +1 of previously known - val ok = seqNr == prevSeqNr + 1 - if (ok) { + if (prevSeqNr > 0) { + // expecting seqNr to be +1 of previously known + val ok = seqNr == prevSeqNr + 1 + if (ok) { + FutureAccepted + } else if (seqNr <= currentInflight.getOrElse(pid, 0L)) { + // currentInFlight contains those that have been processed or about to be processed in Flow, + // but offset not saved yet => ok to handle as duplicate + FutureDuplicate + } else if (recordWithOffset.fromSnapshot) { + // snapshots will mean we are starting from some arbitrary offset after last seen offset + FutureAccepted + } else if (!recordWithOffset.fromBacktracking) { + logUnexpected() + FutureRejectedSeqNr + } else { + logUnexpected() + // This will result in projection restart (with normal configuration) + FutureRejectedBacktrackingSeqNr + } + } else if (seqNr == 1) { + // always accept first event if no other event for that pid has been seen FutureAccepted - } else if (seqNr <= currentInflight.getOrElse(pid, 0L)) { - // currentInFlight contains those that have been processed or about to be processed in Flow, - // but offset not saved yet => ok to handle as duplicate - FutureDuplicate } else if (recordWithOffset.fromSnapshot) { - // snapshots will mean we are starting from some arbitrary offset after last seen offset + // always accept starting from snapshots when there was no previous event seen FutureAccepted - } else if (!recordWithOffset.fromBacktracking) { - logUnexpected() - FutureRejectedSeqNr } else { - logUnexpected() - // This will result in projection restart (with normal configuration) - FutureRejectedBacktrackingSeqNr + validateEventTimestamp(currentState, recordWithOffset) } - } else if (seqNr == 1) { - // always accept first event if no other event for that pid has been seen - FutureAccepted - } else if (recordWithOffset.fromSnapshot) { - // always accept starting from snapshots when there was no previous event seen - FutureAccepted } else { - dao.readTimestampOffset(recordWithOffset.record.slice, pid).flatMap { - case Some(loadedRecord) => - if (seqNr == loadedRecord.seqNr + 1) - FutureAccepted - else if (seqNr <= loadedRecord.seqNr) - FutureDuplicate - else - validateEventTimestamp(currentState, recordWithOffset) - case None => - validateEventTimestamp(currentState, recordWithOffset) - } - } - } else { - // strictSeqNr == false is for durable state where each revision might not be visible - val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L)) - val ok = seqNr > prevSeqNr + // strictSeqNr == false is for durable state where each revision might not be visible + val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L)) + val ok = seqNr > prevSeqNr - if (ok) { - FutureAccepted - } else { - logger.trace("Filtering out earlier revision [{}] for pid [{}], previous revision [{}]", seqNr, pid, prevSeqNr) - FutureDuplicate + if (ok) { + FutureAccepted + } else { + logger.trace( + "{} Filtering out earlier revision [{}] for pid [{}], previous revision [{}]", + logPrefix, + seqNr, + pid, + prevSeqNr) + FutureDuplicate + } } } } @@ -726,8 +824,9 @@ private[projection] class R2dbcOffsetStore( if (previousTimestamp.isBefore(acceptBefore)) { logger.debug( - "Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " + + "{} Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " + "is before start timestamp [{}] minus backtracking window [{}].", + logPrefix, pid, seqNr, previousTimestamp, @@ -736,7 +835,8 @@ private[projection] class R2dbcOffsetStore( Accepted } else if (recordWithOffset.fromPubSub) { logger.debug( - "Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}", + "{} Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}", + logPrefix, seqNr, pid, recordWithOffset.offset) @@ -744,8 +844,9 @@ private[projection] class R2dbcOffsetStore( } else if (recordWithOffset.fromBacktracking) { // This will result in projection restart (with normal configuration) logger.warn( - "Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " + + "{} Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " + "is after start timestamp [{}] minus backtracking window [{}].", + logPrefix, seqNr, pid, recordWithOffset.offset, @@ -756,7 +857,8 @@ private[projection] class R2dbcOffsetStore( } else { // This may happen rather frequently when using `publish-events`, after reconnecting and such. logger.debug( - "Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}", + "{} Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}", + logPrefix, seqNr, pid, recordWithOffset.offset) @@ -765,7 +867,11 @@ private[projection] class R2dbcOffsetStore( } case None => // previous not found, could have been deleted - logger.debug("Accepting envelope with pid [{}], seqNr [{}], where previous event not found.", pid, seqNr) + logger.debug( + "{} Accepting envelope with pid [{}], seqNr [{}], where previous event not found.", + logPrefix, + pid, + seqNr) Accepted } } @@ -806,43 +912,54 @@ private[projection] class R2dbcOffsetStore( } def deleteOldTimestampOffsets(): Future[Long] = { - if (idle.getAndSet(true)) { - // no new offsets stored since previous delete - Future.successful(0) - } else { - val currentState = getState() - if (!triggerDeletion.getAndSet(false) && currentState.window.compareTo(settings.timeWindow) < 0) { - // it hasn't filled up the window yet - Future.successful(0) - } else { - val until = currentState.latestTimestamp.minus(settings.timeWindow) - - val notInLatestBySlice = currentState.latestBySlice.collect { - case record if record.timestamp.isBefore(until) => - // note that deleteOldTimestampOffsetSql already has `AND timestamp_offset < ?` - // and that's why timestamp >= until don't have to be included here - LatestBySlice(record.slice, record.pid, record.seqNr) - } - val result = dao.deleteOldTimestampOffset(until, notInLatestBySlice) - result.failed.foreach { exc => - idle.set(false) // try again next tick - logger.warn( - "Failed to delete timestamp offset until [{}] for projection [{}]: {}", - until, - projectionId.id, - exc.toString) + // This is running in the background, so fine to progress slowly one slice at a time + def loop(slice: Int, count: Long): Future[Long] = { + if (slice > maxSlice) + Future.successful(count) + else + deleteOldTimestampOffsets(slice).flatMap { c => + loop(slice + 1, count + c) } - if (logger.isDebugEnabled) - result.foreach { rows => - logger.debug( - "Deleted [{}] timestamp offset rows until [{}] for projection [{}].", - rows, - until, - projectionId.id) - } + } - result + val result = loop(minSlice, 0L) + + if (logger.isDebugEnabled) + result.foreach { rows => + logger.debug("{} Deleted [{}] timestamp offset rows", logPrefix, rows) } + + result.andThen(_ => scheduleNextDelete()) + } + + def deleteOldTimestampOffsets(slice: Int): Future[Long] = { + val triggerDeletion = triggerDeletionPerSlice.put(slice, FALSE) + val currentState = getState() + if ((triggerDeletion == null || triggerDeletion == TRUE) && currentState.bySliceSorted.contains(slice)) { + val latest = currentState.bySliceSorted(slice).last + val until = latest.timestamp.minus(settings.deleteAfter) + + // note that deleteOldTimestampOffsetSql already has `AND timestamp_offset < ?`, + // which means that the latest for this slice will not be deleted + val result = dao.deleteOldTimestampOffset(slice, until) + result.failed.foreach { exc => + triggerDeletionPerSlice.put(slice, TRUE) // try again next tick + logger.warn( + "{} Failed to delete timestamp offset, slice [{}], until [{}] : {}", + logPrefix, + slice, + until, + exc.toString) + } + if (logger.isDebugEnabled) + result.foreach { rows => + logger.debug("{} Deleted [{}] timestamp offset rows, slice [{}], until [{}]", logPrefix, rows, slice, until) + } + + result + } else { + // no new offsets stored since previous delete + Future.successful(0L) } } @@ -949,11 +1066,7 @@ private[projection] class R2dbcOffsetStore( val result = dao.deleteNewTimestampOffsetsInTx(conn, timestamp) if (logger.isDebugEnabled) result.foreach { rows => - logger.debug( - "Deleted [{}] timestamp offset rows >= [{}] for projection [{}].", - rows, - timestamp, - projectionId.id) + logger.debug("{} Deleted [{}] timestamp offset rows >= [{}]", logPrefix, rows, timestamp) } result @@ -971,11 +1084,11 @@ private[projection] class R2dbcOffsetStore( private def clearTimestampOffset(): Future[Done] = { sourceProvider match { case Some(_) => - idle.set(false) + triggerDeletionPerSlice.clear() dao .clearTimestampOffset() .map { n => - logger.debug(s"clearing timestamp offset for [{}] - executed statement returned [{}]", projectionId, n) + logger.debug("{} clearing timestamp offsets - executed statement returned [{}]", logPrefix, n) Done } case None => @@ -986,7 +1099,7 @@ private[projection] class R2dbcOffsetStore( private def clearPrimitiveOffset(): Future[Done] = { if (settings.isOffsetTableDefined) { dao.clearPrimitiveOffset().map { n => - logger.debug(s"clearing offset for [{}] - executed statement returned [{}]", projectionId, n) + logger.debug("{} clearing offsets - executed statement returned [{}]", logPrefix, n) Done } } else { diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index e0ceeb71f..cadf8a1a1 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -77,6 +77,7 @@ private[projection] object R2dbcProjectionImpl { val log: Logger = LoggerFactory.getLogger(classOf[R2dbcProjectionImpl[_, _]]) private val FutureDone: Future[Done] = Future.successful(Done) + private val FutureFalse: Future[Boolean] = Future.successful(false) private[projection] def createOffsetStore( projectionId: ProjectionId, @@ -205,13 +206,13 @@ private[projection] object R2dbcProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope) - FutureDone + triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)( + ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) - FutureDone - else - throwRejectedEnvelope(sourceProvider, envelope) + triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, envelope) + } } } } @@ -227,46 +228,52 @@ private[projection] object R2dbcProjectionImpl { system: ActorSystem[_]): () => Handler[immutable.Seq[Envelope]] = { () => new AdaptedR2dbcHandler(handlerFactory()) { - override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = { + override def process(envelopes: Seq[Envelope]): Future[Done] = { import R2dbcOffsetStore.Validation._ offsetStore.validateAll(envelopes).flatMap { isAcceptedEnvelopes => - isAcceptedEnvelopes.foreach { - case (env, RejectedSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env) - case (env, RejectedBacktrackingSeqNr) => - if (triggerReplayIfPossible(sourceProvider, offsetStore, env)) + val replayDone = + Future.sequence(isAcceptedEnvelopes.map { + case (env, RejectedSeqNr) => + triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => Done)(ExecutionContext.parasitic) + case (env, RejectedBacktrackingSeqNr) => + triggerReplayIfPossible(sourceProvider, offsetStore, env).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, env) + } + case _ => FutureDone - else - throwRejectedEnvelope(sourceProvider, env) - case _ => - } + }) - val acceptedEnvelopes = isAcceptedEnvelopes.collect { - case (env, Accepted) => env - } + replayDone.flatMap { _ => + val acceptedEnvelopes = isAcceptedEnvelopes.collect { + case (env, Accepted) => + env + } - if (acceptedEnvelopes.isEmpty) { - FutureDone - } else { - Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { - loadedEnvelopes => - val offsets = loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector - val filteredEnvelopes = loadedEnvelopes.filterNot(isFilteredEvent) - if (filteredEnvelopes.isEmpty) { - offsetStore.saveOffsets(offsets) - } else { - r2dbcExecutor.withConnection("grouped handler") { conn => - // run users handler - val session = new R2dbcSession(conn) - delegate.process(session, filteredEnvelopes).flatMap { _ => - offsetStore.saveOffsetsInTx(conn, offsets) + if (acceptedEnvelopes.isEmpty) { + FutureDone + } else { + Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { + loadedEnvelopes => + val offsets = loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector + val filteredEnvelopes = loadedEnvelopes.filterNot(isFilteredEvent) + if (filteredEnvelopes.isEmpty) { + offsetStore.saveOffsets(offsets) + } else { + r2dbcExecutor.withConnection("grouped handler") { conn => + // run users handler + val session = new R2dbcSession(conn) + delegate.process(session, filteredEnvelopes).flatMap { _ => + offsetStore.saveOffsetsInTx(conn, offsets) + } } } - } + } } } } } + } } @@ -303,13 +310,13 @@ private[projection] object R2dbcProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope) - FutureDone + triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)( + ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) - FutureDone - else - throwRejectedEnvelope(sourceProvider, envelope) + triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, envelope) + } } } } @@ -343,13 +350,13 @@ private[projection] object R2dbcProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope) - FutureDone + triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)( + ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) - FutureDone - else - throwRejectedEnvelope(sourceProvider, envelope) + triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, envelope) + } } } } @@ -369,12 +376,12 @@ private[projection] object R2dbcProjectionImpl { offsetStore.validateAll(envelopes).flatMap { isAcceptedEnvelopes => isAcceptedEnvelopes.foreach { case (env, RejectedSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env) + triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => Done)(ExecutionContext.parasitic) case (env, RejectedBacktrackingSeqNr) => - if (triggerReplayIfPossible(sourceProvider, offsetStore, env)) - FutureDone - else - throwRejectedEnvelope(sourceProvider, env) + triggerReplayIfPossible(sourceProvider, offsetStore, env).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, env) + } case _ => } @@ -427,13 +434,12 @@ private[projection] object R2dbcProjectionImpl { case Duplicate => Future.successful(None) case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, env) - Future.successful(None) + triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => None)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - if (triggerReplayIfPossible(sourceProvider, offsetStore, env)) - Future.successful(None) - else - throwRejectedEnvelope(sourceProvider, env) + triggerReplayIfPossible(sourceProvider, offsetStore, env).map { + case true => None + case false => throwRejectedEnvelope(sourceProvider, env) + } } } .collect { @@ -443,22 +449,27 @@ private[projection] object R2dbcProjectionImpl { .via(handler) } + /** + * This replay mechanism is used by GrpcReadJournal + */ private def triggerReplayIfPossible[Offset, Envelope]( sourceProvider: SourceProvider[Offset, Envelope], offsetStore: R2dbcOffsetStore, - envelope: Envelope): Boolean = { + envelope: Envelope)(implicit ec: ExecutionContext): Future[Boolean] = { envelope match { case env: EventEnvelope[Any @unchecked] if env.sequenceNr > 1 => sourceProvider match { case provider: CanTriggerReplay => - val fromSeqNr = offsetStore.storedSeqNr(env.persistenceId) + 1 - provider.triggerReplay(env.persistenceId, fromSeqNr, env.sequenceNr) - true + offsetStore.storedSeqNr(env.persistenceId).map { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + provider.triggerReplay(env.persistenceId, fromSeqNr, env.sequenceNr) + true + } case _ => - false // no replay support for other source providers + FutureFalse // no replay support for other source providers } case _ => - false // no replay support for non typed envelopes + FutureFalse // no replay support for non typed envelopes } } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala index 9fd356dcf..9c9f74b78 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala @@ -21,7 +21,6 @@ import akka.projection.r2dbc.R2dbcProjectionSettings import akka.projection.BySlicesSourceProvider import akka.projection.ProjectionId import akka.projection.internal.OffsetSerialization.SingleOffset -import akka.projection.r2dbc.internal.R2dbcOffsetStore.LatestBySlice /** * INTERNAL API @@ -97,58 +96,17 @@ private[projection] class SqlServerOffsetStoreDao( .bind("@projectionKey", projectionId.key) } - /** - * The r2dbc-sqlserver driver seems to not support binding of array[T]. - * So have to bake the param into the statement instead of binding it. - * - * @param notInLatestBySlice not used in postgres, but needed in sql - * @return - */ - override protected def deleteOldTimestampOffsetSql(notInLatestBySlice: Seq[LatestBySlice]): String = { - val base = - s"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN @from AND @to AND projection_name = @projectionName AND timestamp_offset < @timestampOffset" - if (notInLatestBySlice.isEmpty) { - sql"$base" - } else { - - val values = (timestampOffsetBySlicesSourceProvider.minSlice to timestampOffsetBySlicesSourceProvider.maxSlice) - .map { i => - s"@s$i" - } - .mkString(", ") - sql""" - $base - AND CONCAT(persistence_id, '-', seq_nr) NOT IN ($values)""" - } + override protected def deleteOldTimestampOffsetSql(): String = { + s"DELETE FROM $timestampOffsetTable WHERE slice = @slice AND projection_name = @projectionName AND timestamp_offset < @timestampOffset" } - override protected def bindDeleteOldTimestampOffsetSql( - stmt: Statement, - minSlice: Int, - maxSlice: Int, - until: Instant, - notInLatestBySlice: Seq[LatestBySlice]): Statement = { + override protected def bindDeleteOldTimestampOffsetSql(stmt: Statement, slice: Int, until: Instant): Statement = { stmt - .bind("@from", minSlice) - .bind("@to", maxSlice) + .bind("@slice", slice) .bind("@projectionName", projectionId.name) .bindTimestamp("@timestampOffset", until) - if (notInLatestBySlice.nonEmpty) { - val sliceLookup = notInLatestBySlice.map { item => - item.slice -> item - }.toMap - - (timestampOffsetBySlicesSourceProvider.minSlice to timestampOffsetBySlicesSourceProvider.maxSlice).foreach { i => - val bindKey = s"@s$i" - sliceLookup.get(i) match { - case Some(value) => stmt.bind(bindKey, s"${value.pid}-${value.seqNr}") - case None => stmt.bind(bindKey, "-") - } - } - } - stmt }