Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an endpoint to get the number of ntriples per composite view #5195

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphCoordinator
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent, DefaultProperties}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphSupervisionRoutes, BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down Expand Up @@ -241,6 +241,18 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
)
}

make[BlazegraphSupervisionRoutes].from {
(
views: BlazegraphViews,
client: BlazegraphClient @Id("blazegraph-indexing-client"),
identities: Identities,
aclCheck: AclCheck,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
BlazegraphSupervisionRoutes(views, client, identities, aclCheck)(cr, ordering)
}

make[BlazegraphScopeInitialization].from {
(views: BlazegraphViews, serviceAccount: ServiceAccount, config: BlazegraphViewsConfig) =>
new BlazegraphScopeInitialization(views, serviceAccount, config.defaults)
Expand Down Expand Up @@ -275,6 +287,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
(
bg: BlazegraphViewsRoutes,
indexing: BlazegraphViewsIndexingRoutes,
supervision: BlazegraphSupervisionRoutes,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri
) =>
Expand All @@ -283,7 +296,8 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
BlazegraphViewsRoutesHandler(
schemeDirectives,
bg.routes,
indexing.routes
indexing.routes,
supervision.routes
)(baseUri),
requiresStrictEntity = true
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes

import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.{BlazegraphSupervision, BlazegraphViewByNamespace}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.emit
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.supervision
import io.circe.syntax.EncoderOps

class BlazegraphSupervisionRoutes(
blazegraphSupervision: BlazegraphSupervision,
identities: Identities,
aclCheck: AclCheck
)(implicit baseUri: BaseUri, cr: RemoteContextResolution, ordering: JsonKeyOrdering)
)(implicit cr: RemoteContextResolution, ordering: JsonKeyOrdering)
extends AuthDirectives(identities, aclCheck)
with RdfMarshalling {

def routes: Route = baseUriPrefix(baseUri.prefix) {
def routes: Route =
pathPrefix("supervision") {
extractCaller { implicit caller =>
authorizeFor(AclAddress.Root, supervision.read).apply {
Expand All @@ -33,6 +33,17 @@ class BlazegraphSupervisionRoutes(
}
}
}
}

object BlazegraphSupervisionRoutes {

def apply(views: BlazegraphViews, client: BlazegraphClient, identities: Identities, aclCheck: AclCheck)(implicit
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
): BlazegraphSupervisionRoutes = {
val viewsByNameSpace = BlazegraphViewByNamespace(views)
val blazegraphSupervision = BlazegraphSupervision(client, viewsByNameSpace)
new BlazegraphSupervisionRoutes(blazegraphSupervision, identities, aclCheck)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,68 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaceTriples
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import io.circe.syntax.KeyOps
import io.circe.{Encoder, Json, JsonObject}

/**
* Gives supervision information for the underlying Blazegraph instance
*/
trait BlazegraphSupervision {
def get: IO[BlazegraphNamespaces]
def get: IO[BlazegraphNamespaceTriples]
}

object BlazegraphSupervision {

final case class BlazegraphNamespaces(assigned: Map[ViewRef, Long], unassigned: Map[String, Long]) {
def +(view: ViewRef, count: Long): BlazegraphNamespaces = copy(assigned = assigned + (view -> count))
def +(namespace: String, count: Long): BlazegraphNamespaces = copy(unassigned = unassigned + (namespace -> count))
/**
* Returns the number of triples
* @param total
* the total number of triples in the blazegraph instances
* @param assigned
* the triples per Blazegraph views
* @param unassigned
* the triples for namespaces which can not be associated to a Blazegraph view
*/
final case class BlazegraphNamespaceTriples(
total: Long,
assigned: Map[ViewRef, Long],
unassigned: Map[String, Long]
) {
def +(view: ViewRef, count: Long): BlazegraphNamespaceTriples =
copy(total = total + count, assigned = assigned + (view -> count))
def +(namespace: String, count: Long): BlazegraphNamespaceTriples =
copy(total = total + count, unassigned = unassigned + (namespace -> count))
}

object BlazegraphNamespaces {
val empty: BlazegraphNamespaces = BlazegraphNamespaces(Map.empty, Map.empty)
object BlazegraphNamespaceTriples {
val empty: BlazegraphNamespaceTriples = BlazegraphNamespaceTriples(0L, Map.empty, Map.empty)

implicit final val blazegraphNamespacesEncoder: Encoder[BlazegraphNamespaces] = Encoder.AsObject.instance { value =>
val assigned = value.assigned.toVector.sortBy(_._1.toString).map { case (view, count) =>
Json.obj("project" := view.project, "view" := view.viewId, "count" := count)
}
implicit final val blazegraphNamespacesEncoder: Encoder[BlazegraphNamespaceTriples] = Encoder.AsObject.instance {
value =>
val assigned = value.assigned.toVector.sortBy(_._1.toString).map { case (view, count) =>
Json.obj("project" := view.project, "view" := view.viewId, "count" := count)
}

val unassigned = value.unassigned.toVector.sortBy(_._1).map { case (namespace, count) =>
Json.obj("namespace" := namespace, "count" := count)
}
val unassigned = value.unassigned.toVector.sortBy(_._1).map { case (namespace, count) =>
Json.obj("namespace" := namespace, "count" := count)
}

JsonObject("assigned" := Json.arr(assigned: _*), "unassigned" := Json.arr(unassigned: _*))
JsonObject(
"total" := value.total,
"assigned" := Json.arr(assigned: _*),
"unassigned" := Json.arr(unassigned: _*)
)
}
}

def apply(client: BlazegraphClient, viewsByNamespace: ViewByNamespace): BlazegraphSupervision =
new BlazegraphSupervision {
override def get: IO[BlazegraphNamespaces] = {
override def get: IO[BlazegraphNamespaceTriples] = {
for {
namespaces <- client.listNamespaces
viewsByNamespace <- viewsByNamespace.get
result <- namespaces.foldLeftM(BlazegraphNamespaces.empty) { case (acc, namespace) =>
result <- namespaces.foldLeftM(BlazegraphNamespaceTriples.empty) { case (acc, namespace) =>
client.count(namespace).map { count =>
viewsByNamespace.get(namespace) match {
case Some(view) => acc + (view, count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaceTriples
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
Expand Down Expand Up @@ -36,8 +36,9 @@ class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {
private val second = ViewRef(project, nxv + "second")

private val blazegraphSupervision = new BlazegraphSupervision {
override def get: IO[BlazegraphSupervision.BlazegraphNamespaces] = IO.pure(
BlazegraphNamespaces(
override def get: IO[BlazegraphSupervision.BlazegraphNamespaceTriples] = IO.pure(
BlazegraphNamespaceTriples(
153L,
Map(first -> 42L, second -> 99L),
Map("kb" -> 0L, "unknown" -> 12L)
)
Expand All @@ -48,7 +49,7 @@ class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {

"The blazegraph supervision endpoint" should {
"be forbidden without supervision/read permission" in {
Get("/v1/supervision/blazegraph") ~> routes ~> check {
Get("/supervision/blazegraph") ~> routes ~> check {
response.shouldBeForbidden
}
}
Expand All @@ -57,6 +58,7 @@ class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {
val expected =
json"""
{
"total": 153,
"assigned" : [
{
"count" : 42,
Expand All @@ -81,7 +83,7 @@ class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {
]
}"""

Get("/v1/supervision/blazegraph") ~> asSupervisor ~> routes ~> check {
Get("/supervision/blazegraph") ~> asSupervisor ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual expected
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaceTriples
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
Expand All @@ -25,7 +25,8 @@ class BlazegraphSupervisionSuite extends NexusSuite with BlazegraphClientSetup.F
private lazy val supervision = BlazegraphSupervision(client, viewsByNamespace)

test("Return the supervision for the different namespaces") {
val expected = BlazegraphNamespaces(
val expected = BlazegraphNamespaceTriples(
0L,
Map(first -> 0L, second -> 0L),
Map("kb" -> 0L, "unknown" -> 0L)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ final class CompositeViews private (
def currentViews(project: ProjectRef): SuccessElemStream[CompositeViewDef] =
log.currentStates(Scope.Project(project)).map(toCompositeViewDef)

/**
* Return all existing views for all projects in a finite stream
*/
def currentViews: SuccessElemStream[CompositeViewDef] =
log.currentStates(Scope.Root).map(toCompositeViewDef)

/**
* Return the indexing views in a non-ending stream
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.deletion.CompositeVi
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.{CompositeIndexingDetails, CompositeProjections}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes.{CompositeViewsIndexingRoutes, CompositeViewsRoutes, CompositeViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes.{CompositeSupervisionRoutes, CompositeViewsIndexingRoutes, CompositeViewsRoutes, CompositeViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.store.CompositeRestartStore
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.{CompositeGraphStream, RemoteGraphStream}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
Expand Down Expand Up @@ -322,6 +322,19 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
)(baseUri, config.pagination, cr, ordering)
}

make[CompositeSupervisionRoutes].from {
(
views: CompositeViews,
client: BlazegraphClient @Id("blazegraph-composite-indexing-client"),
identities: Identities,
aclCheck: AclCheck,
config: CompositeViewsConfig,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
CompositeSupervisionRoutes(views, client, identities, aclCheck, config.prefix)(cr, ordering)
}

make[CompositeView.Shift].from { (views: CompositeViews, base: BaseUri) =>
CompositeView.shift(views)(base)
}
Expand All @@ -336,6 +349,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
(
cv: CompositeViewsRoutes,
indexing: CompositeViewsIndexingRoutes,
supervision: CompositeSupervisionRoutes,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri
) =>
Expand All @@ -344,7 +358,8 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
CompositeViewsRoutesHandler(
schemeDirectives,
cv.routes,
indexing.routes
indexing.routes,
supervision.routes
)(baseUri),
requiresStrictEntity = true
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes

import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.supervision.CompositeViewsByNamespace
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.emit
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.supervision
import io.circe.syntax.EncoderOps

class CompositeSupervisionRoutes(
blazegraphSupervision: BlazegraphSupervision,
identities: Identities,
aclCheck: AclCheck
)(implicit cr: RemoteContextResolution, ordering: JsonKeyOrdering)
extends AuthDirectives(identities, aclCheck)
with RdfMarshalling {

def routes: Route =
pathPrefix("supervision") {
extractCaller { implicit caller =>
authorizeFor(AclAddress.Root, supervision.read).apply {
(pathPrefix("composite-views") & get & pathEndOrSingleSlash) {
emit(blazegraphSupervision.get.map(_.asJson))
}
}
}
}
}

object CompositeSupervisionRoutes {
def apply(
views: CompositeViews,
client: BlazegraphClient,
identities: Identities,
aclCheck: AclCheck,
prefix: String
)(implicit cr: RemoteContextResolution, ordering: JsonKeyOrdering): CompositeSupervisionRoutes = {
val viewsByNameSpace = CompositeViewsByNamespace(views, prefix)
val compositeSupervision = BlazegraphSupervision(client, viewsByNameSpace)
new CompositeSupervisionRoutes(compositeSupervision, identities, aclCheck)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.supervision

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.ViewByNamespace
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{commonNamespace, CompositeViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import fs2.Stream

/**
* Allows to get a mapping for the active composite view and their common namespace
*/
object CompositeViewsByNamespace {

def apply(compositeViews: CompositeViews, prefix: String): ViewByNamespace =
apply(compositeViews.currentViews.map(_.value), prefix)

def apply(stream: Stream[IO, CompositeViewDef], prefix: String): ViewByNamespace = new ViewByNamespace {
override def get: IO[Map[String, ViewRef]] = stream
.fold(Map.empty[String, ViewRef]) {
case (acc, view: ActiveViewDef) =>
val namespace = commonNamespace(view.uuid, view.indexingRev, prefix)
acc + (namespace -> view.ref)
case (acc, _) => acc
}
.compile
.last
.map(_.getOrElse(Map.empty))
}
}
Loading