Skip to content

Commit

Permalink
Fix computing diff on large resources during updates (#4684)
Browse files Browse the repository at this point in the history
* Fix computing diff on large resources during updates

* Fix case where the context is defined but has no @base

* Fix compilation

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Jan 25, 2024
1 parent 69f95ce commit 848012e
Show file tree
Hide file tree
Showing 17 changed files with 300 additions and 174 deletions.
2 changes: 2 additions & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ app {
event-log = ${app.defaults.event-log}
# Reject payloads which contain nexus metadata fields (any field beginning with _)
decoding-option = "strict"
# Do not create a new revision of a resource when the update does not introduce a change
skip-update-no-change = true
}

# Schemas configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverResolution.ResourceRe
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, Resolvers, ResourceResolution}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{Resource, ResourceEvent}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{Resources, ResourcesConfig, ResourcesImpl, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{DetectChange, Resources, ResourcesConfig, ResourcesImpl, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.Schema
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
Expand All @@ -40,9 +40,12 @@ object ResourcesModule extends ModuleDef {

make[ResourcesConfig].from { (config: AppConfig) => config.resources }

make[DetectChange].from { (config: ResourcesConfig) => DetectChange(config.skipUpdateNoChange) }

make[Resources].from {
(
validate: ValidateResource,
detectChange: DetectChange,
fetchContext: FetchContext[ContextRejection],
config: ResourcesConfig,
resolverContextResolution: ResolverContextResolution,
Expand All @@ -53,6 +56,7 @@ object ResourcesModule extends ModuleDef {
) =>
ResourcesImpl(
validate,
detectChange,
fetchContext.mapRejection(ProjectContextRejection),
resolverContextResolution,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverResolution.FetchResource
import ch.epfl.bluebrain.nexus.delta.sdk.resources.NexusSource.DecodingOption
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{Resources, ResourcesConfig, ResourcesImpl, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{DetectChange, Resources, ResourcesConfig, ResourcesImpl, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.Schema
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group, Subject, User}
Expand Down Expand Up @@ -104,9 +104,10 @@ class ResourcesRoutesSpec extends BaseRouteSpec with CatsIOValues {
private def routesWithDecodingOption(implicit decodingOption: DecodingOption): (Route, Resources) = {
val resources = ResourcesImpl(
validator,
DetectChange(enabled = true),
fetchContext,
resolverContextResolution,
ResourcesConfig(eventLogConfig, decodingOption),
ResourcesConfig(eventLogConfig, decodingOption, skipUpdateNoChange = true),
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package ch.epfl.bluebrain.nexus.delta.sdk.jsonld

import cats.Eq
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.RdfError
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdRejection._
import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContext, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.{CompactedJsonLd, ExpandedJsonLd}
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdRejection._
import ch.epfl.bluebrain.nexus.delta.sdk.model.jsonld.RemoteContextRef
import io.circe.Json

Expand All @@ -27,7 +26,7 @@ import io.circe.Json
* @param graph
* its graph representation
* @param remoteContexts
* it
* the resolved remote contexts
*/
final case class JsonLdAssembly(
id: Iri,
Expand All @@ -46,20 +45,6 @@ final case class JsonLdAssembly(

object JsonLdAssembly {

/**
* Defines the equality between two instances
*
* - If the remote contexts and the local context are the same, then the compacted form will be the same
* - If the graph forms are isomorphic then, the expanded form will be the same
*/
implicit val jsonLdAssemblyEq: Eq[JsonLdAssembly] = Eq.instance { (jsonld1, jsonld2) =>
jsonld1.id == jsonld2.id &&
jsonld1.remoteContexts == jsonld2.remoteContexts &&
jsonld1.compacted.ctx == jsonld2.compacted.ctx &&
jsonld1.graph.isIsomorphic(jsonld2.graph) &&
jsonld1.source == jsonld2.source
}

def apply(
iri: Iri,
source: Json,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ sealed abstract class JsonLdSourceProcessor(implicit api: JsonLdApi) {
protected def getOrGenerateId(iri: Option[Iri], context: ProjectContext): IO[Iri] =
iri.fold(uuidF().map(uuid => context.base.iri / uuid.toString))(IO.pure)

protected def expandSource(
context: ProjectContext,
source: Json
)(implicit rcr: RemoteContextResolution): IO[(ContextValue, ExplainResult[ExpandedJsonLd])] = {
implicit val opts: JsonLdOptions = JsonLdOptions(base = Some(context.base.iri))
ExpandedJsonLd
.explain(source)
.flatMap {
case result if result.value.isEmpty && source.topContextValueOrEmpty.isEmpty =>
val ctx = defaultCtx(context)
ExpandedJsonLd.explain(source.addContext(ctx.contextObj)).map(ctx -> _)
case result =>
IO.pure(source.topContextValueOrEmpty -> result)
}
.adaptError { case err: RdfError => InvalidJsonLdFormat(None, err) }
/**
* Expand the source document using the provided project context and remote context resolution.
*
* If the source does not provide a context, one will be injected from the project base and vocab.
*/
protected def expandSource(projectContext: ProjectContext, source: Json)(implicit
rcr: RemoteContextResolution
): IO[(ContextValue, ExplainResult[ExpandedJsonLd])] = {
implicit val opts: JsonLdOptions = JsonLdOptions(base = Some(projectContext.base.iri))
val sourceContext = source.topContextValueOrEmpty
if (sourceContext.isEmpty) {
val defaultContext = defaultCtx(projectContext)
ExpandedJsonLd.explain(source.addContext(defaultContext.contextObj)).map(defaultContext -> _)
} else {
ExpandedJsonLd.explain(source).map(sourceContext -> _)
}.adaptError { case err: RdfError => InvalidJsonLdFormat(None, err) }
}

protected def checkAndSetSameId(iri: Iri, expanded: ExpandedJsonLd): IO[ExpandedJsonLd] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package ch.epfl.bluebrain.nexus.delta.sdk.resources

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.CompactedJsonLd
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdAssembly
import ch.epfl.bluebrain.nexus.delta.sdk.model.jsonld.RemoteContextRef
import ch.epfl.bluebrain.nexus.delta.sdk.resources.DetectChange.Current
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceState
import io.circe.Json

/**
* Detect if the new json-ld state introduces changes compared to the current state
*/
trait DetectChange {
def apply(newValue: JsonLdAssembly, currentState: ResourceState): IO[Boolean] =
apply(
newValue,
Current(currentState.types, currentState.source, currentState.compacted, currentState.remoteContexts)
)

def apply(newValue: JsonLdAssembly, current: Current): IO[Boolean]
}

object DetectChange {

final case class Current(
types: Set[Iri],
source: Json,
compacted: CompactedJsonLd,
remoteContexts: Set[RemoteContextRef]
)

private val Disabled = new DetectChange {

override def apply(newValue: JsonLdAssembly, current: Current): IO[Boolean] = IO.pure(true)
}

/**
* Default implementation
*
* There will be a change if:
* - If there is a change in the resource types
* - If there is a change in one of the remote JSON-LD contexts
* - If there is a change in the local JSON-LD context
* - If there is a change in the rest of the payload
*
* The implementation uses `IO.cede` as comparing source can induce expensive work in the case of large payloads.
*/
private val Impl = new DetectChange {

override def apply(newValue: JsonLdAssembly, current: Current): IO[Boolean] =
IO.cede
.as(
newValue.types != current.types ||
newValue.remoteContexts != current.remoteContexts ||
newValue.compacted.ctx != current.compacted.ctx ||
newValue.source != current.source
)
.guarantee(IO.cede)
}

def apply(enabled: Boolean): DetectChange = if (enabled) Impl else Disabled

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.sdk.resources

import io.circe.Decoder.Result
import io.circe.{Decoder, DecodingFailure, HCursor, Json}
import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure}
import pureconfig.{ConfigCursor, ConfigReader}
import pureconfig.ConfigReader
import pureconfig.error.CannotConvert

final case class NexusSource(value: Json) extends AnyVal

Expand All @@ -16,30 +16,20 @@ object NexusSource {

final case object Lenient extends DecodingOption

implicit val decodingOptionConfigReader: ConfigReader[DecodingOption] = {
new ConfigReader[DecodingOption] {
private val stringReader = implicitly[ConfigReader[String]]
override def from(cur: ConfigCursor): ConfigReader.Result[DecodingOption] = {
stringReader.from(cur).flatMap {
case "strict" => Right(Strict)
case "lenient" => Right(Lenient)
case other =>
Left(
ConfigReaderFailures(
ConvertFailure(
CannotConvert(
other,
"DecodingOption",
s"values can only be 'strict' or 'lenient'"
),
cur
)
)
)
}
}
implicit val decodingOptionConfigReader: ConfigReader[DecodingOption] =
ConfigReader.fromString {
case "strict" => Right(Strict)
case "lenient" => Right(Lenient)
case other =>
Left(
CannotConvert(
other,
"DecodingOption",
s"values can only be 'strict' or 'lenient'"
)
)

}
}
}

private val strictDecoder = new Decoder[NexusSource] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ object Resources {
@SuppressWarnings(Array("OptionGet"))
private[delta] def evaluate(
validateResource: ValidateResource,
detectChange: DetectChange,
clock: Clock[IO]
)(state: Option[ResourceState], cmd: ResourceCommand): IO[ResourceEvent] = {

Expand Down Expand Up @@ -432,10 +433,9 @@ object Resources {
}

for {
state <- stateWhereResourceIsEditable(u)
stateJsonLd <- IO.fromEither(state.toAssembly)
changeDetected = sys.env.get("DISABLE_CHANGE_DETECTION").contains("true") || stateJsonLd =!= u.jsonld
event <- if (u.schemaOpt.isDefined || changeDetected) onChange(state) else fallbackToTag(state)
state <- stateWhereResourceIsEditable(u)
changeDetected <- detectChange(u.jsonld, state)
event <- if (u.schemaOpt.isDefined || changeDetected) onChange(state) else fallbackToTag(state)
} yield event
}

Expand All @@ -453,11 +453,11 @@ object Resources {
def refresh(r: RefreshResource) = {
for {
state <- stateWhereResourceIsEditable(r)
stateJsonLd <- IO.fromEither(state.toAssembly)
_ <- raiseWhenDifferentSchema(r, state)
schemaClaim = SchemaClaim(r.project, r.schemaOpt.getOrElse(state.schema), r.caller)
(schemaRev, schemaProject) <- validate(r.jsonld, schemaClaim, r.projectContext.enforceSchema)
_ <- IO.raiseWhen(stateJsonLd === r.jsonld)(NoChangeDetected(state))
changeDetected <- detectChange(r.jsonld, state)
_ <- IO.raiseUnless(changeDetected)(NoChangeDetected(state))
time <- clock.realTimeInstant
} yield ResourceRefreshed(r.project, schemaRev, schemaProject, r.jsonld, state.rev + 1, time, r.subject)
}
Expand Down Expand Up @@ -513,11 +513,12 @@ object Resources {
*/
def definition(
validateResource: ValidateResource,
detectChange: DetectChange,
clock: Clock[IO]
): ScopedEntityDefinition[Iri, ResourceState, ResourceCommand, ResourceEvent, ResourceRejection] =
ScopedEntityDefinition(
entityType,
StateMachine(None, evaluate(validateResource, clock)(_, _), next),
StateMachine(None, evaluate(validateResource, detectChange, clock)(_, _), next),
ResourceEvent.serializer,
ResourceState.serializer,
Tagger[ResourceEvent](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import pureconfig.generic.semiauto.deriveReader
*
* @param eventLog
* configuration of the event log
* @param decodingOption
* strict/lenient decoding of resources
* @param skipUpdateNoChange
* do not create a new revision when the update does not introduce a change in the current resource state
*/
final case class ResourcesConfig(eventLog: EventLogConfig, decodingOption: DecodingOption)
final case class ResourcesConfig(eventLog: EventLogConfig, decodingOption: DecodingOption, skipUpdateNoChange: Boolean)

object ResourcesConfig {
implicit final val resourcesConfigReader: ConfigReader[ResourcesConfig] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ object ResourcesImpl {
*/
final def apply(
validateResource: ValidateResource,
detectChange: DetectChange,
fetchContext: FetchContext[ProjectContextRejection],
contextResolution: ResolverContextResolution,
config: ResourcesConfig,
Expand All @@ -233,7 +234,7 @@ object ResourcesImpl {
uuidF: UUIDF = UUIDF.random
): Resources =
new ResourcesImpl(
ScopedEventLog(Resources.definition(validateResource, clock), config.eventLog, xas),
ScopedEventLog(Resources.definition(validateResource, detectChange, clock), config.eventLog, xas),
fetchContext,
JsonLdSourceResolvingParser[ResourceRejection](contextResolution, uuidF)
)
Expand Down
Loading

0 comments on commit 848012e

Please sign in to comment.