diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EvaluationError.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EvaluationError.scala index 4dfe1d69e7..0363143fca 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EvaluationError.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EvaluationError.scala @@ -46,6 +46,19 @@ object EvaluationError { final case class EvaluationFailure[Command](command: Command, errorType: String, errorMessage: String) extends EvaluationError(s"'$command' failed with an error '$errorType' and a message $errorMessage") + /** + * Error when the tagged state can't be correctly computed during a tag operation + * + * @param command + * the command that failed + * @param lastRev + * the found revision for the computed state + */ + final case class EvaluationTagFailure[Command](command: Command, lastRev: Option[Int]) + extends EvaluationError( + s"'$command' could not compute the tagged state, the state could only be found until rev '$lastRev'" + ) + object EvaluationFailure { def apply[Command](command: Command, throwable: Throwable): EvaluationFailure[Command] = diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala index d4c62538ce..c293c0149d 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala @@ -3,12 +3,13 @@ package ch.epfl.bluebrain.nexus.delta.sourcing import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTimeout} +import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError._ import ch.epfl.bluebrain.nexus.delta.sourcing.ScopedEntityDefinition.Tagger import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent import ch.epfl.bluebrain.nexus.delta.sourcing.event.ScopedEventStore import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityDependency.DependsOn +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore @@ -20,6 +21,7 @@ import doobie.implicits._ import doobie.postgres.sqlstate import fs2.Stream +import java.sql.SQLException import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag @@ -225,14 +227,20 @@ object ScopedEventLog { override def evaluate(ref: ProjectRef, id: Id, command: Command): IO[(E, S)] = { - def saveTag(event: E, state: S): IO[ConnectionIO[Unit]] = - tagger.tagWhen(event).fold(IO.pure(noop)) { case (tag, rev) => - if (rev == state.rev) - IO.pure(stateStore.save(state, tag, Noop)) - else + def newTaggedState(event: E, state: S): IO[Option[(UserTag, S)]] = + tagger.tagWhen(event) match { + case Some((tag, rev)) if rev == state.rev => + IO.some(tag -> state) + case Some((tag, rev)) => stateMachine .computeState(eventStore.history(ref, id, Some(rev))) - .map(_.fold(noop) { s => stateStore.save(s, tag, Noop) }) + .flatTap { + case stateOpt if !stateOpt.exists(_.rev == rev) => + IO.raiseError(EvaluationTagFailure(command, stateOpt.map(_.rev))) + case _ => IO.unit + } + .map(_.map(tag -> _)) + case None => IO.none } def deleteTag(event: E, state: S): ConnectionIO[Unit] = tagger.untagWhen(event).fold(noop) { tag => @@ -247,33 +255,37 @@ object ScopedEventLog { def persist(event: E, original: Option[S], newState: S): IO[Unit] = { - def queries(tagQuery: ConnectionIO[Unit], init: PartitionInit) = + def queries(newTaggedState: Option[(UserTag, S)], init: PartitionInit) = for { _ <- TombstoneStore.save(entityType, original, newState) _ <- eventStore.save(event, init) _ <- stateStore.save(newState, init) - _ <- tagQuery + _ <- newTaggedState.traverse { case (tag, taggedState) => + stateStore.save(taggedState, tag, Noop) + } _ <- deleteTag(event, newState) _ <- updateDependencies(newState) } yield () { for { - init <- PartitionInit(event.project, xas.cache) - tagQuery <- saveTag(event, newState) - res <- queries(tagQuery, init) - .attemptSomeSqlState { case sqlstate.class23.UNIQUE_VIOLATION => - onUniqueViolation(id, command) - } - .transact(xas.write) - _ <- init.updateCache(xas.cache) + init <- PartitionInit(event.project, xas.cache) + taggedState <- newTaggedState(event, newState) + res <- queries(taggedState, init).transact(xas.write) + _ <- init.updateCache(xas.cache) } yield res + }.recoverWith { + case sql: SQLException if isUniqueViolation(sql) => + logger.error(sql)( + s"A unique constraint violation occurred when persisting an event for '$id' in project '$ref' and rev ${event.rev}." + ) >> + IO.raiseError(onUniqueViolation(id, command)) + case other => + logger.error(other)( + s"An error occurred when persisting an event for '$id' in project '$ref' and rev ${event.rev}." + ) >> + IO.raiseError(other) } - .flatMap { - IO.fromEither(_).onError { _ => - logger.info(s"An event for the '$id' in project '$ref' already exists for rev ${event.rev}.") - } - } } for { @@ -282,11 +294,15 @@ object ScopedEventLog { _ <- persist(result._1, originalState, result._2) } yield result }.adaptError { - case e: Rejection => e - case e: EvaluationTimeout[_] => e - case e => EvaluationFailure(command, e) + case e: Rejection => e + case e: EvaluationTimeout[_] => e + case e: EvaluationTagFailure[_] => e + case e => EvaluationFailure(command, e) } + private def isUniqueViolation(sql: SQLException) = + sql.getSQLState == sqlstate.class23.UNIQUE_VIOLATION.value + override def dryRun(ref: ProjectRef, id: Id, command: Command): IO[(E, S)] = stateStore.get(ref, id).redeem(_ => None, Some(_)).flatMap { state => stateMachine.evaluate(state, command, maxDuration) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala index 6befec57f0..94e4e7dd54 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv -import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTimeout} +import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTagFailure, EvaluationTimeout} import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestCommand._ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent.{PullRequestCreated, PullRequestMerged, PullRequestTagged} import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestRejection._ @@ -133,6 +133,11 @@ class ScopedEventLogSuite extends NexusSuite with Doobie.Fixture { } yield () } + test("Fail to tag when the tagged value can't be replayed up to the target rev") { + val tagCommand = TagPR(id, proj, 2, 4) + eventLog.evaluate(proj, id, tagCommand).interceptEquals(EvaluationTagFailure(tagCommand, Some(2))) + } + test("Dry run successfully a command without persisting anything") { for { _ <- eventLog.dryRun(proj, id, Merge(id, proj, 3)).assertEquals((merged, state3))