diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/EventSourcedEndToEndSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/EventSourcedEndToEndSpec.scala index a58ae9c59..845e86ba9 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/EventSourcedEndToEndSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/EventSourcedEndToEndSpec.scala @@ -4,7 +4,6 @@ package akka.projection.dynamodb -import java.lang import java.util.UUID import java.util.concurrent.CompletionException import java.util.concurrent.ConcurrentHashMap @@ -323,7 +322,7 @@ class EventSourcedEndToEndSpec private var processedEventsPerProjection: Map[ProjectionId, ConcurrentHashMap[String, java.lang.Boolean]] = Map.empty - private def processedEvents(projectionId: ProjectionId): ConcurrentHashMap[String, lang.Boolean] = { + private def processedEvents(projectionId: ProjectionId): ConcurrentHashMap[String, java.lang.Boolean] = { processedEventsPerProjection.get(projectionId) match { case None => val processedEvents = new ConcurrentHashMap[String, java.lang.Boolean] @@ -492,7 +491,7 @@ class EventSourcedEndToEndSpec Thread.sleep(1500) } - val projections = startProjectionsFactory() + var projections = startProjectionsFactory() // give them some time to start before writing more events Thread.sleep(200) @@ -518,7 +517,7 @@ class EventSourcedEndToEndSpec // resume projections again if (n == (numberOfEvents / 2) + 20) - startProjectionsFactory() + projections = startProjectionsFactory() if (n % 10 == 0) Thread.sleep(50) diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala index 41d6930a9..9aa866d42 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala @@ -6,6 +6,7 @@ package akka.projection.r2dbc import java.time.Instant import java.util.UUID +import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future import scala.concurrent.duration._ @@ -23,26 +24,33 @@ import akka.persistence.query.TimestampOffset.toTimestampOffset import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.projection.Projection import akka.projection.ProjectionBehavior +import akka.projection.ProjectionContext import akka.projection.ProjectionId import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.r2dbc.scaladsl.R2dbcHandler import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.r2dbc.scaladsl.R2dbcSession +import akka.projection.scaladsl.Handler +import akka.projection.scaladsl.SourceProvider import akka.serialization.SerializationExtension +import akka.stream.scaladsl.FlowWithContext import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterEach import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory -import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement - object EventSourcedEndToEndSpec { + private val log = LoggerFactory.getLogger(classOf[EventSourcedEndToEndSpec]) + val config: Config = ConfigFactory .parseString(""" akka.persistence.r2dbc { @@ -106,22 +114,99 @@ object EventSourcedEndToEndSpec { final case class Processed(projectionId: ProjectionId, envelope: EventEnvelope[String]) - class TestHandler(projectionId: ProjectionId, probe: ActorRef[Processed]) + final case class StartParams( + entityType: String, + projectionName: String, + nrOfProjections: Int, + processedProbe: TestProbe[Processed]) + + class AsyncTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + processedEvents: ConcurrentHashMap[String, java.lang.Boolean], + exactlyOnce: Boolean = false) + extends Handler[EventEnvelope[String]] { + + override def process(envelope: EventEnvelope[String]): Future[Done] = { + log.debug( + "{} Processed {} [{}], pid [{}], seqNr [{}]", + projectionId.key, + if (processedEvents.containsKey(envelope.event)) "duplicate event" else "event", + envelope.event, + envelope.persistenceId, + envelope.sequenceNr) + val wasAbsent = processedEvents.putIfAbsent(envelope.event, true) == null + if (exactlyOnce || wasAbsent) { + // if at-least-once, only mark processed the first time + // if exactly-once, test will fail on duplicate processing + probe ! Processed(projectionId, envelope) + } + Future.successful(Done) + } + } + + class R2dbcTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + processedEvents: ConcurrentHashMap[String, java.lang.Boolean], + exactlyOnce: Boolean = true) extends R2dbcHandler[EventEnvelope[String]] { - private val log = LoggerFactory.getLogger(getClass) - override def process(session: R2dbcSession, envelope: EventEnvelope[String]): Future[Done] = { - log.debug("{} Processed {}", projectionId.key, envelope.event) - probe ! Processed(projectionId, envelope) + val delegate = new AsyncTestHandler(projectionId, probe, processedEvents, exactlyOnce) + + override def process(session: R2dbcSession, envelope: EventEnvelope[String]): Future[Done] = + delegate.process(envelope) + } + + class GroupedAsyncTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + processedEvents: ConcurrentHashMap[String, java.lang.Boolean]) + extends Handler[Seq[EventEnvelope[String]]] { + + val delegate = new AsyncTestHandler(projectionId, probe, processedEvents) + + override def process(envelopes: Seq[EventEnvelope[String]]): Future[Done] = { + envelopes.foreach(delegate.process) Future.successful(Done) } } + class GroupedR2dbcTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + processedEvents: ConcurrentHashMap[String, java.lang.Boolean]) + extends R2dbcHandler[Seq[EventEnvelope[String]]] { + + val delegate = new GroupedAsyncTestHandler(projectionId, probe, processedEvents) + + override def process(session: R2dbcSession, envelopes: Seq[EventEnvelope[String]]): Future[Done] = + delegate.process(envelopes) + } + + private def flowTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + processedEvents: ConcurrentHashMap[String, java.lang.Boolean]) = + FlowWithContext[EventEnvelope[String], ProjectionContext].map { envelope => + log.debug( + "{} Processed {} [{}], pid [{}], seqNr [{}]", + projectionId.key, + if (processedEvents.containsKey(envelope.event)) "duplicate event" else "event", + envelope.event, + envelope.persistenceId, + envelope.sequenceNr) + if (processedEvents.putIfAbsent(envelope.event, true) == null) + probe ! Processed(projectionId, envelope) + Done + } + } class EventSourcedEndToEndSpec extends ScalaTestWithActorTestKit(EventSourcedEndToEndSpec.config) with AnyWordSpecLike + with BeforeAndAfterEach with TestDbLifecycle with TestData with LogCapturing { @@ -137,8 +222,22 @@ class EventSourcedEndToEndSpec import journalSettings.codecSettings.JournalImplicits._ - override protected def beforeAll(): Unit = { - super.beforeAll() + private var processedEventsPerProjection: Map[ProjectionId, ConcurrentHashMap[String, java.lang.Boolean]] = Map.empty + + override protected def beforeEach(): Unit = { + processedEventsPerProjection = Map.empty + super.beforeEach() + } + + private def processedEvents(projectionId: ProjectionId): ConcurrentHashMap[String, java.lang.Boolean] = { + processedEventsPerProjection.get(projectionId) match { + case None => + val processedEvents = new ConcurrentHashMap[String, java.lang.Boolean] + processedEventsPerProjection = processedEventsPerProjection.updated(projectionId, processedEvents) + processedEvents + case Some(processedEvents) => + processedEvents + } } // to be able to store events with specific timestamps @@ -166,24 +265,110 @@ class EventSourcedEndToEndSpec result.futureValue shouldBe 1 } + private def startExactlyOnceProjections(startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + R2dbcProjection + .exactlyOnce( + projectionId, + Some(projectionSettings), + sourceProvider = sourceProvider, + handler = + () => new R2dbcTestHandler(projectionId, startParams.processedProbe.ref, processedEvents(projectionId)))) + } + + private def startAtLeastOnceProjections(startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + R2dbcProjection + .atLeastOnce( + projectionId, + Some(projectionSettings), + sourceProvider = sourceProvider, + handler = () => + new R2dbcTestHandler( + projectionId, + startParams.processedProbe.ref, + processedEvents(projectionId), + exactlyOnce = false))) + } + + private def startAtLeastOnceAsyncProjections( + startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + R2dbcProjection + .atLeastOnceAsync( + projectionId, + Some(projectionSettings), + sourceProvider = sourceProvider, + handler = + () => new AsyncTestHandler(projectionId, startParams.processedProbe.ref, processedEvents(projectionId)))) + } + + private def startGroupedWithinProjections(startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + R2dbcProjection + .groupedWithin( + projectionId, + Some(projectionSettings), + sourceProvider = sourceProvider, + handler = () => + new GroupedR2dbcTestHandler(projectionId, startParams.processedProbe.ref, processedEvents(projectionId))) + .withGroup(3, groupAfterDuration = 200.millis)) + } + + private def startGroupedWithinAsyncProjections( + startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + R2dbcProjection + .groupedWithinAsync( + projectionId, + Some(projectionSettings), + sourceProvider = sourceProvider, + handler = () => + new GroupedAsyncTestHandler(projectionId, startParams.processedProbe.ref, processedEvents(projectionId))) + .withGroup(3, groupAfterDuration = 200.millis)) + } + + private def startAtLeastOnceFlowProjections( + startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + R2dbcProjection + .atLeastOnceFlow( + projectionId, + Some(projectionSettings), + sourceProvider = sourceProvider, + flowTestHandler(projectionId, startParams.processedProbe.ref, processedEvents(projectionId)))) + } + private def startProjections( - entityType: String, - projectionName: String, - nrOfProjections: Int, - processedProbe: ActorRef[Processed]): Vector[ActorRef[ProjectionBehavior.Command]] = { + startParams: StartParams, + projectionFactory: ( + ProjectionId, + SourceProvider[Offset, EventEnvelope[String]]) => Projection[EventEnvelope[String]]) + : Vector[ActorRef[ProjectionBehavior.Command]] = { + import startParams._ + val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, nrOfProjections) sliceRanges.map { range => val projectionId = ProjectionId(projectionName, s"${range.min}-${range.max}") + val sourceProvider = EventSourcedProvider .eventsBySlices[String](system, R2dbcReadJournal.Identifier, entityType, range.min, range.max) - val projection = R2dbcProjection - .exactlyOnce( - projectionId, - Some(projectionSettings), - sourceProvider = sourceProvider, - handler = () => new TestHandler(projectionId, processedProbe.ref)) + + val projection = projectionFactory(projectionId, sourceProvider) spawn(ProjectionBehavior(projection)) }.toVector } @@ -214,77 +399,130 @@ class EventSourcedEndToEndSpec if (verifyProjectionId) { val byPid = processed.groupBy(_.envelope.persistenceId) byPid.foreach { - case (_, processedByPid) => - // all events of a pid must be processed by the same projection instance - processedByPid.map(_.projectionId).toSet.size shouldBe 1 - // processed events in right order - processedByPid.map(_.envelope.sequenceNr).toVector shouldBe (1 to processedByPid.size).toVector + case (pid, processedByPid) => + withClue(s"PersistenceId [$pid]: ") { + // all events of a pid must be processed by the same projection instance + processedByPid.map(_.projectionId).toSet.size shouldBe 1 + // processed events in right order + processedByPid.map(_.envelope.sequenceNr) shouldBe (1 to processedByPid.size).toVector + } } } } - s"A R2DBC projection with eventsBySlices source (dialect ${r2dbcSettings.dialectName})" must { + private def test( + startParams: StartParams, + startProjectionsFactory: () => Vector[ActorRef[ProjectionBehavior.Command]]): Unit = { + val numberOfEntities = 20 // increase this for longer testing + val numberOfEvents = numberOfEntities * 10 - "handle all events exactlyOnce" in { - val numberOfEntities = 20 - val numberOfEvents = numberOfEntities * 10 - val entityType = nextEntityType() + val entities = (0 until numberOfEntities).map { n => + val persistenceId = PersistenceId(startParams.entityType, s"p$n") + spawn(Persister(persistenceId), s"${startParams.entityType}-p$n") + } - val entities = (0 until numberOfEntities).map { n => - val persistenceId = PersistenceId(entityType, s"p$n") - spawn(Persister(persistenceId), s"$entityType-p$n") + // write some before starting the projections + var n = 1 + while (n <= numberOfEvents / 4) { + val p = n % numberOfEntities + // mix some persist 1 and persist 3 events + if (n % 7 == 0) { + entities(p) ! Persister.PersistAll((0 until 3).map(i => mkEvent(n + i)).toList) + n += 3 + } else { + entities(p) ! Persister.Persist(mkEvent(n)) + n += 1 } - // write some before starting the projections - var n = 1 - while (n <= 50) { - val p = n % numberOfEntities - // mix some persist 1 and persist 3 events - if (n % 7 == 0) { - entities(p) ! Persister.PersistAll((0 until 3).map(i => mkEvent(n + i)).toList) - n += 3 - } else { - entities(p) ! Persister.Persist(mkEvent(n)) - n += 1 + if (n % 10 == 0) + Thread.sleep(50) + else if (n % 25 == 0) + Thread.sleep(1500) + } + + var projections = startProjectionsFactory() + + // give them some time to start before writing more events + Thread.sleep(500) + + while (n <= numberOfEvents) { + val p = n % numberOfEntities + entities(p) ! Persister.Persist(mkEvent(n)) + + // stop projections + if (n == numberOfEvents / 2) { + projections.foreach { ref => + ref ! ProjectionBehavior.Stop } } - val projectionName = UUID.randomUUID().toString - val processedProbe = createTestProbe[Processed]() - var projections = startProjections(entityType, projectionName, nrOfProjections = 4, processedProbe.ref) + // wait until stopped + if (n == (numberOfEvents / 2) + 10) { + val probe = createTestProbe() + projections.foreach { ref => + probe.expectTerminated(ref) + } + } - // give them some time to start before writing more events - Thread.sleep(500) + // resume projections again + if (n == (numberOfEvents / 2) + 20) + projections = startProjectionsFactory() - while (n <= numberOfEvents) { - val p = n % numberOfEntities - entities(p) ! Persister.Persist(mkEvent(n)) + if (n % 10 == 0) + Thread.sleep(50) + else if (n % 25 == 0) + Thread.sleep(1500) - // stop projections - if (n == numberOfEvents / 2) { - val probe = createTestProbe() - projections.foreach { ref => - ref ! ProjectionBehavior.Stop - probe.expectTerminated(ref) - } - } + n += 1 + } - // resume projections again - if (n == (numberOfEvents / 2) + 20) - projections = startProjections(entityType, projectionName, nrOfProjections = 4, processedProbe.ref) + val expectedEvents = (1 to numberOfEvents).map(mkEvent).toVector + assertEventsProcessed(expectedEvents, startParams.processedProbe, verifyProjectionId = true) - if (n % 10 == 0) - Thread.sleep(50) - else if (n % 25 == 0) - Thread.sleep(1500) + projections.foreach(_ ! ProjectionBehavior.Stop) + val probe = createTestProbe() + projections.foreach { ref => + probe.expectTerminated(ref) + } + } - n += 1 - } + private def newStartParams(): StartParams = { + val entityType = nextEntityType() + val projectionName = UUID.randomUUID().toString + val processedProbe = createTestProbe[Processed]() + StartParams(entityType, projectionName, nrOfProjections = 4, processedProbe) + } + + s"An R2DBC projection with eventsBySlices source (dialect ${r2dbcSettings.dialectName})" must { + + "handle all events exactlyOnce" in { + val startParams = newStartParams() + test(startParams, () => startExactlyOnceProjections(startParams)) + } + + "handle all events atLeastOnce" in { + val startParams = newStartParams() + test(startParams, () => startAtLeastOnceProjections(startParams)) + } + + "handle all events atLeastOnceAsync" in { + val startParams = newStartParams() + test(startParams, () => startAtLeastOnceAsyncProjections(startParams)) + } - val expectedEvents = (1 to numberOfEvents).map(mkEvent).toVector - assertEventsProcessed(expectedEvents, processedProbe, verifyProjectionId = true) + "handle all events groupedWithin" in { + val startParams = newStartParams() + test(startParams, () => startGroupedWithinProjections(startParams)) + } + + "handle all events groupedWithinAsync" in { + val startParams = newStartParams() + test(startParams, () => startGroupedWithinAsyncProjections(startParams)) + } - projections.foreach(_ ! ProjectionBehavior.Stop) + "handle all events atLeastOnceFlow" in { + val startParams = newStartParams() + test(startParams, () => startAtLeastOnceFlowProjections(startParams)) } "accept unknown sequence number if previous is old" in { @@ -299,7 +537,8 @@ class EventSourcedEndToEndSpec val projectionName = UUID.randomUUID().toString val processedProbe = createTestProbe[Processed]() - val projection = startProjections(entityType, projectionName, nrOfProjections = 1, processedProbe.ref).head + val projection = + startExactlyOnceProjections(StartParams(entityType, projectionName, nrOfProjections = 1, processedProbe)).head processedProbe.receiveMessage().envelope.event shouldBe "e1-1" @@ -366,7 +605,8 @@ class EventSourcedEndToEndSpec projectionId, Some(projectionSettings), sourceProvider = sourceProvider, - handler = () => new TestHandler(projectionId, processedProbe.ref)) + handler = () => + new R2dbcTestHandler(projectionId, processedProbe.ref, processedEvents(projectionId), exactlyOnce = true)) spawn(ProjectionBehavior(projection)) } @@ -411,7 +651,8 @@ class EventSourcedEndToEndSpec projectionId, Some(projectionSettings), sourceProvider = sourceProvider, - handler = () => new TestHandler(projectionId, processedProbe.ref)) + handler = () => + new R2dbcTestHandler(projectionId, processedProbe.ref, processedEvents(projectionId), exactlyOnce = true)) spawn(ProjectionBehavior(projection)) } diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/StartFromSnapshotEndToEndSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/StartFromSnapshotEndToEndSpec.scala index a59c3052c..dfe3fddbd 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/StartFromSnapshotEndToEndSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/StartFromSnapshotEndToEndSpec.scala @@ -34,7 +34,6 @@ object StartFromSnapshotEndToEndSpec { val config: Config = ConfigFactory .parseString(""" - akka.persistence.snapshot-store.plugin = "akka.persistence.r2dbc.snapshot" akka.persistence.r2dbc { query { refresh-interval = 500 millis diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestConfig.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestConfig.scala index fbcc97fb7..842514169 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestConfig.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestConfig.scala @@ -12,6 +12,7 @@ object TestConfig { ConfigFactory .parseString(""" akka.persistence.journal.plugin = "akka.persistence.r2dbc.journal" + akka.persistence.snapshot-store.plugin = "akka.persistence.r2dbc.snapshot" akka.persistence.state.plugin = "akka.persistence.r2dbc.state" akka.persistence.r2dbc { query {