Skip to content

Commit

Permalink
Add an enpoint to get the number of ntriples per composite view (#5195)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Oct 21, 2024
1 parent 3f355af commit d757dd1
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphCoordinator
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent, DefaultProperties}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphSupervisionRoutes, BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down Expand Up @@ -241,6 +241,18 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
)
}

make[BlazegraphSupervisionRoutes].from {
(
views: BlazegraphViews,
client: BlazegraphClient @Id("blazegraph-indexing-client"),
identities: Identities,
aclCheck: AclCheck,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
BlazegraphSupervisionRoutes(views, client, identities, aclCheck)(cr, ordering)
}

make[BlazegraphScopeInitialization].from {
(views: BlazegraphViews, serviceAccount: ServiceAccount, config: BlazegraphViewsConfig) =>
new BlazegraphScopeInitialization(views, serviceAccount, config.defaults)
Expand Down Expand Up @@ -275,6 +287,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
(
bg: BlazegraphViewsRoutes,
indexing: BlazegraphViewsIndexingRoutes,
supervision: BlazegraphSupervisionRoutes,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri
) =>
Expand All @@ -283,7 +296,8 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
BlazegraphViewsRoutesHandler(
schemeDirectives,
bg.routes,
indexing.routes
indexing.routes,
supervision.routes
)(baseUri),
requiresStrictEntity = true
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes

import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.{BlazegraphSupervision, BlazegraphViewByNamespace}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.emit
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.supervision
import io.circe.syntax.EncoderOps

class BlazegraphSupervisionRoutes(
blazegraphSupervision: BlazegraphSupervision,
identities: Identities,
aclCheck: AclCheck
)(implicit baseUri: BaseUri, cr: RemoteContextResolution, ordering: JsonKeyOrdering)
)(implicit cr: RemoteContextResolution, ordering: JsonKeyOrdering)
extends AuthDirectives(identities, aclCheck)
with RdfMarshalling {

def routes: Route = baseUriPrefix(baseUri.prefix) {
def routes: Route =
pathPrefix("supervision") {
extractCaller { implicit caller =>
authorizeFor(AclAddress.Root, supervision.read).apply {
Expand All @@ -33,6 +33,17 @@ class BlazegraphSupervisionRoutes(
}
}
}
}

object BlazegraphSupervisionRoutes {

def apply(views: BlazegraphViews, client: BlazegraphClient, identities: Identities, aclCheck: AclCheck)(implicit
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
): BlazegraphSupervisionRoutes = {
val viewsByNameSpace = BlazegraphViewByNamespace(views)
val blazegraphSupervision = BlazegraphSupervision(client, viewsByNameSpace)
new BlazegraphSupervisionRoutes(blazegraphSupervision, identities, aclCheck)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,68 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaceTriples
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import io.circe.syntax.KeyOps
import io.circe.{Encoder, Json, JsonObject}

/**
* Gives supervision information for the underlying Blazegraph instance
*/
trait BlazegraphSupervision {
def get: IO[BlazegraphNamespaces]
def get: IO[BlazegraphNamespaceTriples]
}

object BlazegraphSupervision {

final case class BlazegraphNamespaces(assigned: Map[ViewRef, Long], unassigned: Map[String, Long]) {
def +(view: ViewRef, count: Long): BlazegraphNamespaces = copy(assigned = assigned + (view -> count))
def +(namespace: String, count: Long): BlazegraphNamespaces = copy(unassigned = unassigned + (namespace -> count))
/**
* Returns the number of triples
* @param total
* the total number of triples in the blazegraph instances
* @param assigned
* the triples per Blazegraph views
* @param unassigned
* the triples for namespaces which can not be associated to a Blazegraph view
*/
final case class BlazegraphNamespaceTriples(
total: Long,
assigned: Map[ViewRef, Long],
unassigned: Map[String, Long]
) {
def +(view: ViewRef, count: Long): BlazegraphNamespaceTriples =
copy(total = total + count, assigned = assigned + (view -> count))
def +(namespace: String, count: Long): BlazegraphNamespaceTriples =
copy(total = total + count, unassigned = unassigned + (namespace -> count))
}

object BlazegraphNamespaces {
val empty: BlazegraphNamespaces = BlazegraphNamespaces(Map.empty, Map.empty)
object BlazegraphNamespaceTriples {
val empty: BlazegraphNamespaceTriples = BlazegraphNamespaceTriples(0L, Map.empty, Map.empty)

implicit final val blazegraphNamespacesEncoder: Encoder[BlazegraphNamespaces] = Encoder.AsObject.instance { value =>
val assigned = value.assigned.toVector.sortBy(_._1.toString).map { case (view, count) =>
Json.obj("project" := view.project, "view" := view.viewId, "count" := count)
}
implicit final val blazegraphNamespacesEncoder: Encoder[BlazegraphNamespaceTriples] = Encoder.AsObject.instance {
value =>
val assigned = value.assigned.toVector.sortBy(_._1.toString).map { case (view, count) =>
Json.obj("project" := view.project, "view" := view.viewId, "count" := count)
}

val unassigned = value.unassigned.toVector.sortBy(_._1).map { case (namespace, count) =>
Json.obj("namespace" := namespace, "count" := count)
}
val unassigned = value.unassigned.toVector.sortBy(_._1).map { case (namespace, count) =>
Json.obj("namespace" := namespace, "count" := count)
}

JsonObject("assigned" := Json.arr(assigned: _*), "unassigned" := Json.arr(unassigned: _*))
JsonObject(
"total" := value.total,
"assigned" := Json.arr(assigned: _*),
"unassigned" := Json.arr(unassigned: _*)
)
}
}

def apply(client: BlazegraphClient, viewsByNamespace: ViewByNamespace): BlazegraphSupervision =
new BlazegraphSupervision {
override def get: IO[BlazegraphNamespaces] = {
override def get: IO[BlazegraphNamespaceTriples] = {
for {
namespaces <- client.listNamespaces
viewsByNamespace <- viewsByNamespace.get
result <- namespaces.foldLeftM(BlazegraphNamespaces.empty) { case (acc, namespace) =>
result <- namespaces.foldLeftM(BlazegraphNamespaceTriples.empty) { case (acc, namespace) =>
client.count(namespace).map { count =>
viewsByNamespace.get(namespace) match {
case Some(view) => acc + (view, count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaceTriples
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
Expand Down Expand Up @@ -36,8 +36,9 @@ class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {
private val second = ViewRef(project, nxv + "second")

private val blazegraphSupervision = new BlazegraphSupervision {
override def get: IO[BlazegraphSupervision.BlazegraphNamespaces] = IO.pure(
BlazegraphNamespaces(
override def get: IO[BlazegraphSupervision.BlazegraphNamespaceTriples] = IO.pure(
BlazegraphNamespaceTriples(
153L,
Map(first -> 42L, second -> 99L),
Map("kb" -> 0L, "unknown" -> 12L)
)
Expand All @@ -48,7 +49,7 @@ class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {

"The blazegraph supervision endpoint" should {
"be forbidden without supervision/read permission" in {
Get("/v1/supervision/blazegraph") ~> routes ~> check {
Get("/supervision/blazegraph") ~> routes ~> check {
response.shouldBeForbidden
}
}
Expand All @@ -57,6 +58,7 @@ class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {
val expected =
json"""
{
"total": 153,
"assigned" : [
{
"count" : 42,
Expand All @@ -81,7 +83,7 @@ class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {
]
}"""

Get("/v1/supervision/blazegraph") ~> asSupervisor ~> routes ~> check {
Get("/supervision/blazegraph") ~> asSupervisor ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual expected
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaceTriples
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
Expand All @@ -25,7 +25,8 @@ class BlazegraphSupervisionSuite extends NexusSuite with BlazegraphClientSetup.F
private lazy val supervision = BlazegraphSupervision(client, viewsByNamespace)

test("Return the supervision for the different namespaces") {
val expected = BlazegraphNamespaces(
val expected = BlazegraphNamespaceTriples(
0L,
Map(first -> 0L, second -> 0L),
Map("kb" -> 0L, "unknown" -> 0L)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ final class CompositeViews private (
def currentViews(project: ProjectRef): SuccessElemStream[CompositeViewDef] =
log.currentStates(Scope.Project(project)).map(toCompositeViewDef)

/**
* Return all existing views for all projects in a finite stream
*/
def currentViews: SuccessElemStream[CompositeViewDef] =
log.currentStates(Scope.Root).map(toCompositeViewDef)

/**
* Return the indexing views in a non-ending stream
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.deletion.CompositeVi
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.{CompositeIndexingDetails, CompositeProjections}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes.{CompositeViewsIndexingRoutes, CompositeViewsRoutes, CompositeViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes.{CompositeSupervisionRoutes, CompositeViewsIndexingRoutes, CompositeViewsRoutes, CompositeViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.store.CompositeRestartStore
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.{CompositeGraphStream, RemoteGraphStream}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
Expand Down Expand Up @@ -322,6 +322,19 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
)(baseUri, config.pagination, cr, ordering)
}

make[CompositeSupervisionRoutes].from {
(
views: CompositeViews,
client: BlazegraphClient @Id("blazegraph-composite-indexing-client"),
identities: Identities,
aclCheck: AclCheck,
config: CompositeViewsConfig,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
CompositeSupervisionRoutes(views, client, identities, aclCheck, config.prefix)(cr, ordering)
}

make[CompositeView.Shift].from { (views: CompositeViews, base: BaseUri) =>
CompositeView.shift(views)(base)
}
Expand All @@ -336,6 +349,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
(
cv: CompositeViewsRoutes,
indexing: CompositeViewsIndexingRoutes,
supervision: CompositeSupervisionRoutes,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri
) =>
Expand All @@ -344,7 +358,8 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
CompositeViewsRoutesHandler(
schemeDirectives,
cv.routes,
indexing.routes
indexing.routes,
supervision.routes
)(baseUri),
requiresStrictEntity = true
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes

import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.supervision.CompositeViewsByNamespace
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.emit
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.supervision
import io.circe.syntax.EncoderOps

class CompositeSupervisionRoutes(
blazegraphSupervision: BlazegraphSupervision,
identities: Identities,
aclCheck: AclCheck
)(implicit cr: RemoteContextResolution, ordering: JsonKeyOrdering)
extends AuthDirectives(identities, aclCheck)
with RdfMarshalling {

def routes: Route =
pathPrefix("supervision") {
extractCaller { implicit caller =>
authorizeFor(AclAddress.Root, supervision.read).apply {
(pathPrefix("composite-views") & get & pathEndOrSingleSlash) {
emit(blazegraphSupervision.get.map(_.asJson))
}
}
}
}
}

object CompositeSupervisionRoutes {
def apply(
views: CompositeViews,
client: BlazegraphClient,
identities: Identities,
aclCheck: AclCheck,
prefix: String
)(implicit cr: RemoteContextResolution, ordering: JsonKeyOrdering): CompositeSupervisionRoutes = {
val viewsByNameSpace = CompositeViewsByNamespace(views, prefix)
val compositeSupervision = BlazegraphSupervision(client, viewsByNameSpace)
new CompositeSupervisionRoutes(compositeSupervision, identities, aclCheck)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.supervision

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.ViewByNamespace
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{commonNamespace, CompositeViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import fs2.Stream

/**
* Allows to get a mapping for the active composite view and their common namespace
*/
object CompositeViewsByNamespace {

def apply(compositeViews: CompositeViews, prefix: String): ViewByNamespace =
apply(compositeViews.currentViews.map(_.value), prefix)

def apply(stream: Stream[IO, CompositeViewDef], prefix: String): ViewByNamespace = new ViewByNamespace {
override def get: IO[Map[String, ViewRef]] = stream
.fold(Map.empty[String, ViewRef]) {
case (acc, view: ActiveViewDef) =>
val namespace = commonNamespace(view.uuid, view.indexingRev, prefix)
acc + (namespace -> view.ref)
case (acc, _) => acc
}
.compile
.last
.map(_.getOrElse(Map.empty))
}
}
Loading

0 comments on commit d757dd1

Please sign in to comment.