diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala index 210a98240..b15e8d3d2 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala @@ -85,6 +85,8 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match state6.offsetBySlice(slice("p10084")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L, "p10084" -> 9)) } + val allowAll: Any => Boolean = _ => true + "evict old" in { val p1 = "p500" // slice 645 val p2 = "p621" // slice 645 @@ -107,17 +109,17 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match .map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) // keep all - state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1 + state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000), allowAll) shouldBe state1 // evict older than time window - val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2)) + val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll) state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10)))) - val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2)) + val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll) state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L) - val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2)) + val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2), allowAll) state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map( p1 -> 1L, p2 -> 2L, @@ -156,7 +158,7 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match val timeWindow = JDuration.ofMillis(1) val state2 = slices.foldLeft(state1) { - case (acc, slice) => acc.evict(slice, timeWindow) + case (acc, slice) => acc.evict(slice, timeWindow, allowAll) } // note that p92 is evicted because it has same slice as p108 // p1 is kept because keeping one for each slice @@ -164,13 +166,35 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L) val state3 = slices.foldLeft(state2) { - case (acc, slice) => acc.evict(slice, timeWindow) + case (acc, slice) => acc.evict(slice, timeWindow, allowAll) } // still keeping one for each slice state3.byPid .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L) } + "evict old but only those allowed to be evicted" in { + val t0 = TestClock.nowMillis().instant() + val state1 = State.empty.add( + Vector( + createRecord("p92", 2, t0.plusMillis(1)), + createRecord("p108", 3, t0.plusMillis(20)), + createRecord("p229", 4, t0.plusMillis(30)))) + + val slices = state1.bySliceSorted.keySet + slices.size shouldBe 1 + + state1 + .evict(slices.head, JDuration.ofMillis(1), allowAll) + .byPid + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p229" -> 4) // p92 and p108 evicted + + state1 + .evict(slices.head, JDuration.ofMillis(1), _.pid == "p108") + .byPid // allow only p108 to be evicted + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p92" -> 2, "p229" -> 4) + } + "find duplicate" in { val t0 = TestClock.nowMillis().instant() val state = diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 0862a6544..932893e55 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -46,23 +46,19 @@ private[projection] object DynamoDBOffsetStore { type Pid = String final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) extends Ordered[Record] { - - override def compare(that: Record): Int = { - val result = this.timestamp.compareTo(that.timestamp) - if (result == 0) { - if (this.slice == that.slice) - if (this.pid == that.pid) - if (this.seqNr == that.seqNr) - 0 - else - java.lang.Long.compare(this.seqNr, that.seqNr) - else - this.pid.compareTo(that.pid) - else Integer.compare(this.slice, that.slice) - } else { - result + override def compare(that: Record): Int = + timestamp.compareTo(that.timestamp) match { + case 0 => + Integer.compare(slice, that.slice) match { + case 0 => + pid.compareTo(that.pid) match { + case 0 => java.lang.Long.compare(seqNr, that.seqNr) + case result => result + } + case result => result + } + case result => result } - } } final case class RecordWithOffset( @@ -147,14 +143,27 @@ private[projection] object DynamoDBOffsetStore { } } - def evict(slice: Int, timeWindow: JDuration): State = { + def evict(slice: Int, timeWindow: JDuration, ableToEvictRecord: Record => Boolean): State = { val recordsSortedByTimestamp = bySliceSorted.getOrElse(slice, TreeSet.empty[Record]) if (recordsSortedByTimestamp.isEmpty) { this } else { val until = recordsSortedByTimestamp.last.timestamp.minus(timeWindow) - val filtered = recordsSortedByTimestamp.dropWhile(_.timestamp.isBefore(until)) - if (filtered.size == recordsSortedByTimestamp.size) { + val filtered = { + // Records comparing >= this record by recordOrdering will definitely be kept, + // Records comparing < this record by recordOrdering are subject to eviction + // Slice will be equal, and pid will compare lexicographically less than any valid pid + val untilRecord = Record(slice, "", 0, until) + val newerRecords = recordsSortedByTimestamp.rangeFrom(untilRecord) // inclusive of until + val olderRecords = recordsSortedByTimestamp.rangeUntil(untilRecord) // exclusive of until + val filteredOlder = olderRecords.filterNot(ableToEvictRecord) + + if (filteredOlder.size == olderRecords.size) recordsSortedByTimestamp + else newerRecords.union(filteredOlder) + } + + // adding back filtered is linear in the size of filtered, but so is checking if we're able to evict + if (filtered eq recordsSortedByTimestamp) { this } else { val byPidOtherSlices = byPid.filterNot { case (_, r) => r.slice == slice } @@ -394,11 +403,12 @@ private[projection] class DynamoDBOffsetStore( storeSequenceNumbers: IndexedSeq[Record] => Future[Done], canBeConcurrent: Boolean): Future[Done] = { load(records.map(_.pid)).flatMap { oldState => - val filteredRecords = { + val filteredRecords = if (records.size <= 1) records.filterNot(oldState.isDuplicate) else { - // use last record for each pid + // Can assume (given other projection guarantees) that records for the same pid + // have montonically increasing sequence numbers records .groupBy(_.pid) .valuesIterator @@ -407,7 +417,7 @@ private[projection] class DynamoDBOffsetStore( } .toVector } - } + if (filteredRecords.isEmpty) { FutureDone } else { @@ -417,8 +427,18 @@ private[projection] class DynamoDBOffsetStore( if (filteredRecords.size == 1) Set(filteredRecords.head.slice) else filteredRecords.iterator.map(_.slice).toSet + val currentInflight = getInflight() val evictedNewState = slices.foldLeft(newState) { - case (s, slice) => s.evict(slice, settings.timeWindow) + case (s, slice) => + s.evict( + slice, + settings.timeWindow, + // Only persistence IDs that aren't inflight are evictable, + // if only so that those persistence IDs can be removed from + // inflight... in the absence of further records from that + // persistence ID, the next store will evict (further records + // would make that persistence ID recent enough to not be evicted) + record => !currentInflight.contains(record.pid)) } // FIXME we probably don't have to store the latest offset per slice all the time, but can @@ -463,7 +483,7 @@ private[projection] class DynamoDBOffsetStore( if (newInflight.size >= 10000) { throw new IllegalStateException( s"Too many envelopes in-flight [${newInflight.size}]. " + - "Please report this issue at https://github.com/akka/akka-persistence-dynamodb") + "Please report this issue at https://github.com/akka/akka-projection") } if (!inflight.compareAndSet(currentInflight, newInflight)) cleanupInflight(newState) // CAS retry, concurrent update of inflight