Skip to content

Commit

Permalink
Retrieve tagged value in a previous transaction, better error handling (
Browse files Browse the repository at this point in the history
#4690)

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Jan 26, 2024
1 parent 848012e commit a2ba1fc
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ object EvaluationError {
final case class EvaluationFailure[Command](command: Command, errorType: String, errorMessage: String)
extends EvaluationError(s"'$command' failed with an error '$errorType' and a message $errorMessage")

/**
* Error when the tagged state can't be correctly computed during a tag operation
*
* @param command
* the command that failed
* @param lastRev
* the found revision for the computed state
*/
final case class EvaluationTagFailure[Command](command: Command, lastRev: Option[Int])
extends EvaluationError(
s"'$command' could not compute the tagged state, the state could only be found until rev '$lastRev'"
)

object EvaluationFailure {

def apply[Command](command: Command, throwable: Throwable): EvaluationFailure[Command] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package ch.epfl.bluebrain.nexus.delta.sourcing
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTimeout}
import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError._
import ch.epfl.bluebrain.nexus.delta.sourcing.ScopedEntityDefinition.Tagger
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.event.ScopedEventStore
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityDependency.DependsOn
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore
Expand All @@ -20,6 +21,7 @@ import doobie.implicits._
import doobie.postgres.sqlstate
import fs2.Stream

import java.sql.SQLException
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

Expand Down Expand Up @@ -225,14 +227,20 @@ object ScopedEventLog {

override def evaluate(ref: ProjectRef, id: Id, command: Command): IO[(E, S)] = {

def saveTag(event: E, state: S): IO[ConnectionIO[Unit]] =
tagger.tagWhen(event).fold(IO.pure(noop)) { case (tag, rev) =>
if (rev == state.rev)
IO.pure(stateStore.save(state, tag, Noop))
else
def newTaggedState(event: E, state: S): IO[Option[(UserTag, S)]] =
tagger.tagWhen(event) match {
case Some((tag, rev)) if rev == state.rev =>
IO.some(tag -> state)
case Some((tag, rev)) =>
stateMachine
.computeState(eventStore.history(ref, id, Some(rev)))
.map(_.fold(noop) { s => stateStore.save(s, tag, Noop) })
.flatTap {
case stateOpt if !stateOpt.exists(_.rev == rev) =>
IO.raiseError(EvaluationTagFailure(command, stateOpt.map(_.rev)))
case _ => IO.unit
}
.map(_.map(tag -> _))
case None => IO.none
}

def deleteTag(event: E, state: S): ConnectionIO[Unit] = tagger.untagWhen(event).fold(noop) { tag =>
Expand All @@ -247,33 +255,37 @@ object ScopedEventLog {

def persist(event: E, original: Option[S], newState: S): IO[Unit] = {

def queries(tagQuery: ConnectionIO[Unit], init: PartitionInit) =
def queries(newTaggedState: Option[(UserTag, S)], init: PartitionInit) =
for {
_ <- TombstoneStore.save(entityType, original, newState)
_ <- eventStore.save(event, init)
_ <- stateStore.save(newState, init)
_ <- tagQuery
_ <- newTaggedState.traverse { case (tag, taggedState) =>
stateStore.save(taggedState, tag, Noop)
}
_ <- deleteTag(event, newState)
_ <- updateDependencies(newState)
} yield ()

{
for {
init <- PartitionInit(event.project, xas.cache)
tagQuery <- saveTag(event, newState)
res <- queries(tagQuery, init)
.attemptSomeSqlState { case sqlstate.class23.UNIQUE_VIOLATION =>
onUniqueViolation(id, command)
}
.transact(xas.write)
_ <- init.updateCache(xas.cache)
init <- PartitionInit(event.project, xas.cache)
taggedState <- newTaggedState(event, newState)
res <- queries(taggedState, init).transact(xas.write)
_ <- init.updateCache(xas.cache)
} yield res
}.recoverWith {
case sql: SQLException if isUniqueViolation(sql) =>
logger.error(sql)(
s"A unique constraint violation occurred when persisting an event for '$id' in project '$ref' and rev ${event.rev}."
) >>
IO.raiseError(onUniqueViolation(id, command))
case other =>
logger.error(other)(
s"An error occurred when persisting an event for '$id' in project '$ref' and rev ${event.rev}."
) >>
IO.raiseError(other)
}
.flatMap {
IO.fromEither(_).onError { _ =>
logger.info(s"An event for the '$id' in project '$ref' already exists for rev ${event.rev}.")
}
}
}

for {
Expand All @@ -282,11 +294,15 @@ object ScopedEventLog {
_ <- persist(result._1, originalState, result._2)
} yield result
}.adaptError {
case e: Rejection => e
case e: EvaluationTimeout[_] => e
case e => EvaluationFailure(command, e)
case e: Rejection => e
case e: EvaluationTimeout[_] => e
case e: EvaluationTagFailure[_] => e
case e => EvaluationFailure(command, e)
}

private def isUniqueViolation(sql: SQLException) =
sql.getSQLState == sqlstate.class23.UNIQUE_VIOLATION.value

override def dryRun(ref: ProjectRef, id: Id, command: Command): IO[(E, S)] =
stateStore.get(ref, id).redeem(_ => None, Some(_)).flatMap { state =>
stateMachine.evaluate(state, command, maxDuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing

import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTimeout}
import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTagFailure, EvaluationTimeout}
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestCommand._
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent.{PullRequestCreated, PullRequestMerged, PullRequestTagged}
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestRejection._
Expand Down Expand Up @@ -133,6 +133,11 @@ class ScopedEventLogSuite extends NexusSuite with Doobie.Fixture {
} yield ()
}

test("Fail to tag when the tagged value can't be replayed up to the target rev") {
val tagCommand = TagPR(id, proj, 2, 4)
eventLog.evaluate(proj, id, tagCommand).interceptEquals(EvaluationTagFailure(tagCommand, Some(2)))
}

test("Dry run successfully a command without persisting anything") {
for {
_ <- eventLog.dryRun(proj, id, Merge(id, proj, 3)).assertEquals((merged, state3))
Expand Down

0 comments on commit a2ba1fc

Please sign in to comment.