diff --git a/aquadoggo/src/graphql/mod.rs b/aquadoggo/src/graphql/mod.rs index 3a8365aa7..85aba3e6b 100644 --- a/aquadoggo/src/graphql/mod.rs +++ b/aquadoggo/src/graphql/mod.rs @@ -13,4 +13,4 @@ mod schema; mod tests; pub mod utils; -pub use schema::{build_root_schema, GraphQLSchemaManager}; +pub use schema::GraphQLSchemaManager; diff --git a/aquadoggo/src/graphql/queries/document.rs b/aquadoggo/src/graphql/queries/document.rs index ea5f89504..bc85e037d 100644 --- a/aquadoggo/src/graphql/queries/document.rs +++ b/aquadoggo/src/graphql/queries/document.rs @@ -1,13 +1,16 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use async_graphql::dynamic::{Field, FieldFuture, InputValue, Object, ResolverContext, TypeRef}; +use async_graphql::dynamic::{ + Field, FieldFuture, InputValue, Object, ResolverContext, Subscription, SubscriptionField, + SubscriptionFieldFuture, TypeRef, +}; use async_graphql::Error; use dynamic_graphql::ScalarValue; use log::debug; use p2panda_rs::schema::Schema; use crate::graphql::constants; -use crate::graphql::resolvers::resolve_document; +use crate::graphql::resolvers::{resolve_document, resolve_document_subscription}; use crate::graphql::scalars::{DocumentIdScalar, DocumentViewIdScalar}; /// Adds a GraphQL query for retrieving a single document selected by its id or @@ -48,6 +51,50 @@ pub fn build_document_query(query: Object, schema: &Schema) -> Object { ) } +/// Builds a subscription object and adds it to the given root subscription. +/// +/// The subscription follows the format `(id: , viewId: )`. +pub fn build_document_subscription_query( + root_subscription: Subscription, + schema: &Schema, +) -> Subscription { + let schema_id = schema.id().clone(); + root_subscription.field( + SubscriptionField::new( + schema_id.to_string(), + TypeRef::named(schema_id.to_string()), + move |ctx| { + SubscriptionFieldFuture::new(async move { + let (document_id, document_view_id) = parse_arguments(&ctx)?; + Ok(resolve_document_subscription( + ctx, + document_id, + document_view_id, + )) + }) + }, + ) + .argument( + InputValue::new( + constants::DOCUMENT_ID_ARG, + TypeRef::named(constants::DOCUMENT_ID), + ) + .description("Specify the id of the document to be streamed"), + ) + .argument( + InputValue::new( + constants::DOCUMENT_VIEW_ID_ARG, + TypeRef::named(constants::DOCUMENT_VIEW_ID), + ) + .description("Specify the view id of the document to be streamed"), + ) + .description(format!( + "Subscription for application fields of a `{}` document.", + schema.id().name() + )), + ) +} + /// Parse and validate the arguments passed into this query. fn parse_arguments( ctx: &ResolverContext, diff --git a/aquadoggo/src/graphql/queries/mod.rs b/aquadoggo/src/graphql/queries/mod.rs index d42500b5e..db43317c1 100644 --- a/aquadoggo/src/graphql/queries/mod.rs +++ b/aquadoggo/src/graphql/queries/mod.rs @@ -5,5 +5,5 @@ mod document; mod next_args; pub use collection::build_collection_query; -pub use document::build_document_query; +pub use document::{build_document_query, build_document_subscription_query}; pub use next_args::build_next_args_query; diff --git a/aquadoggo/src/graphql/resolvers.rs b/aquadoggo/src/graphql/resolvers.rs index 7ce9fc1ea..9432b688b 100644 --- a/aquadoggo/src/graphql/resolvers.rs +++ b/aquadoggo/src/graphql/resolvers.rs @@ -3,10 +3,13 @@ use async_graphql::dynamic::ResolverContext; use async_graphql::Error; use dynamic_graphql::FieldValue; +use futures::Stream; +use log::debug; use p2panda_rs::document::traits::AsDocument; use p2panda_rs::operation::OperationValue; use p2panda_rs::schema::{FieldType, Schema}; use p2panda_rs::storage_provider::traits::DocumentStore; +use p2panda_rs::Human; use crate::db::stores::{PaginationCursor, PaginationData, RelationList}; use crate::db::types::StorageDocument; @@ -49,7 +52,7 @@ pub async fn resolve_document( ctx: ResolverContext<'_>, document_id: Option, document_view_id: Option, -) -> Result, Error> { +) -> Result>, Error> { let store = ctx.data_unchecked::(); let document = match get_document_from_params(store, &document_id, &document_view_id).await? { @@ -69,7 +72,7 @@ pub async fn resolve_document_collection( ctx: ResolverContext<'_>, schema: Schema, list: Option, -) -> Result, Error> { +) -> Result>, Error> { let store = ctx.data_unchecked::(); // Populate query arguments with values from GraphQL query @@ -126,7 +129,7 @@ pub async fn resolve_document_field( let schema = schema_provider .get(document.schema_id()) .await - .expect("Schema should be in store"); + .ok_or(Error::new("Schema not found in store"))?; // Determine name of the field to be resolved let name = ctx.field().name(); @@ -203,3 +206,42 @@ pub async fn resolve_document_field( value => Ok(Some(FieldValue::value(gql_scalar(value)))), } } + +/// Resolve a subscription to a single document. +pub fn resolve_document_subscription( + ctx: ResolverContext<'_>, + document_id: Option, + document_view_id: Option, +) -> impl Stream, Error>> { + async_stream::stream! { + match (&document_id, &document_view_id) { + (_, Some(document_view_id)) => { + debug!("Subscribing to {}", document_view_id.display()); + } + (Some(document_id), _) => { + debug!("Subscribing to {}", document_id.display()); + } + (_, _) => { + yield Err(Error::new("Document id or document view id must be provided")); + return + } + } + + loop { + let store = ctx.data_unchecked::(); + + match get_document_from_params(store, &document_id, &document_view_id).await? { + Some(document) => { + yield Ok(FieldValue::owned_any(Resolved::Document(document))); + } + None => { + yield Err(Error::new("Document not found")); + return + } + }; + + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } +} diff --git a/aquadoggo/src/graphql/scalars/document_id_scalar.rs b/aquadoggo/src/graphql/scalars/document_id_scalar.rs index e432e0adc..bbc7d4b68 100644 --- a/aquadoggo/src/graphql/scalars/document_id_scalar.rs +++ b/aquadoggo/src/graphql/scalars/document_id_scalar.rs @@ -5,6 +5,7 @@ use std::str::FromStr; use dynamic_graphql::{Error, Result, Scalar, ScalarValue, Value}; use p2panda_rs::document::DocumentId; +use p2panda_rs::Human; /// The id of a p2panda document. #[derive(Scalar, Clone, Debug, Eq, PartialEq)] @@ -50,6 +51,12 @@ impl Display for DocumentIdScalar { } } +impl Human for DocumentIdScalar { + fn display(&self) -> String { + self.0.display() + } +} + /// Validation method used internally in `async-graphql` to check scalar values passed into the /// public api. fn validate(value: &Value) -> bool { diff --git a/aquadoggo/src/graphql/scalars/document_view_id_scalar.rs b/aquadoggo/src/graphql/scalars/document_view_id_scalar.rs index 4cdeb0a57..557734bb8 100644 --- a/aquadoggo/src/graphql/scalars/document_view_id_scalar.rs +++ b/aquadoggo/src/graphql/scalars/document_view_id_scalar.rs @@ -5,6 +5,7 @@ use std::str::FromStr; use dynamic_graphql::{Error, Result, Scalar, ScalarValue, Value}; use p2panda_rs::document::DocumentViewId; +use p2panda_rs::Human; /// The document view id of a p2panda document. Refers to a specific point in a documents history /// and can be used to deterministically reconstruct its state at that time. @@ -51,6 +52,12 @@ impl Display for DocumentViewIdScalar { } } +impl Human for DocumentViewIdScalar { + fn display(&self) -> String { + self.0.display() + } +} + /// Validation method used internally in `async-graphql` to check scalar values passed into the /// public api. fn validate(value: &Value) -> bool { diff --git a/aquadoggo/src/graphql/schema.rs b/aquadoggo/src/graphql/schema.rs index 5f385c7f0..226105407 100644 --- a/aquadoggo/src/graphql/schema.rs +++ b/aquadoggo/src/graphql/schema.rs @@ -3,9 +3,11 @@ //! Dynamically create and manage GraphQL schemas. use std::sync::Arc; -use async_graphql::dynamic::{Field, FieldFuture, Object, Schema, TypeRef}; +use async_graphql::dynamic::{Field, FieldFuture, Object, Schema, Subscription, TypeRef}; use async_graphql::{Request, Response, Value}; use dynamic_graphql::internal::Registry; +use futures::stream::BoxStream; +use futures::{FutureExt, Stream, StreamExt}; use log::{debug, info, warn}; use p2panda_rs::Human; use tokio::sync::Mutex; @@ -23,7 +25,8 @@ use crate::graphql::objects::{ build_paginated_document_object, DocumentMeta, }; use crate::graphql::queries::{ - build_collection_query, build_document_query, build_next_args_query, + build_collection_query, build_document_query, build_document_subscription_query, + build_next_args_query, }; use crate::graphql::responses::NextArguments; use crate::graphql::scalars::{ @@ -75,15 +78,23 @@ pub async fn build_root_schema( .register::() .register::(); - let mut schema_builder = Schema::build("Query", Some("MutationRoot"), None); + // Construct the root query object + let mut root_query = Object::new("Query"); + + // Construct the root subscription object + let mut root_subscription = Subscription::new("Subscription"); + + // Start a schema builder with the root objects + let mut schema_builder = Schema::build( + root_query.type_name(), + Some("MutationRoot"), + Some(root_subscription.type_name()), + ); // Populate it with the registered types. We can now use these in any following dynamically // created query object fields. schema_builder = registry.apply_into_schema_builder(schema_builder); - // Construct the root query object - let mut root_query = Object::new("Query"); - // Loop through all schema retrieved from the schema store, dynamically create GraphQL objects, // input values and a query for the documents they describe for schema in all_schema { @@ -120,15 +131,18 @@ pub async fn build_root_schema( // Add a query for retrieving all documents of a certain schema root_query = build_collection_query(root_query, &schema); + + // Add a field for subscribing to changes on a single document + root_subscription = build_document_subscription_query(root_subscription, &schema); } // Add next args to the query object let root_query = build_next_args_query(root_query); - // Build the GraphQL schema. We can unwrap here since it will only fail if we forgot to - // register all required types above + // Build the GraphQL schema schema_builder .register(root_query) + .register(root_subscription) .data(store) .data(schema_provider) .data(tx) @@ -211,15 +225,23 @@ impl GraphQLSchemaManager { let mut on_schema_added = shared.schema_provider.on_schema_added(); // Create the new GraphQL based on the current state of known p2panda application schemas - async fn rebuild(shared: GraphQLSharedData, schemas: GraphQLSchemas) { + async fn rebuild( + shared: GraphQLSharedData, + schemas: GraphQLSchemas, + ) -> Result<(), async_graphql::dynamic::SchemaError> { match build_root_schema(shared.store, shared.tx, shared.schema_provider).await { - Ok(schema) => schemas.lock().await.push(schema), - Err(err) => warn!("Error building GraphQL schema: {}", err), + Ok(schema) => { + schemas.lock().await.push(schema); + Ok(()) + } + Err(err) => Err(err), } } // Always build a schema right at the beginning as we don't have one yet - rebuild(shared.clone(), schemas.clone()).await; + if let Err(err) = rebuild(shared.clone(), schemas.clone()).await { + panic!("Failed building initial GraphQL schema: {}", err); + } debug!("Finished building initial GraphQL schema"); // Spawn a task which reacts to newly registered p2panda schemas @@ -231,7 +253,9 @@ impl GraphQLSchemaManager { "Changed schema {}, rebuilding GraphQL API", schema_id.display() ); - rebuild(shared.clone(), schemas.clone()).await; + if let Err(err) = rebuild(shared.clone(), schemas.clone()).await { + warn!("Failed building GraphQL schema: {}", err); + } } Err(err) => { panic!("Failed receiving schema updates: {}", err) @@ -254,6 +278,41 @@ impl GraphQLSchemaManager { .execute(request) .await } + + /// Executes an incoming GraphQL query. + /// + /// This method makes sure the GraphQL query will be executed by the latest given schema the + /// manager knows about. + pub async fn stream( + self, + request: impl Into, + session_data: Arc, + ) -> impl Stream { + self.schemas + .lock() + .await + .last() + .expect("No schema given yet") + .execute_stream_with_session_data(request, session_data) + } +} + +#[async_trait::async_trait] +impl async_graphql::Executor for GraphQLSchemaManager { + async fn execute(&self, request: Request) -> Response { + self.execute(request).await + } + + fn execute_stream( + &self, + request: Request, + session_data: Option>, + ) -> BoxStream<'static, Response> { + self.clone() + .stream(request, session_data.unwrap_or_default()) + .flatten_stream() + .boxed() + } } impl std::fmt::Debug for GraphQLSchemaManager { diff --git a/aquadoggo/src/http/api.rs b/aquadoggo/src/http/api.rs index 71c5a3336..3d4476ec5 100644 --- a/aquadoggo/src/http/api.rs +++ b/aquadoggo/src/http/api.rs @@ -25,8 +25,12 @@ use tokio_util::io::ReaderStream; use crate::http::context::HttpServiceContext; /// Handle GraphQL playground requests at the given path. -pub async fn handle_graphql_playground(path: &str) -> impl IntoResponse { - response::Html(playground_source(GraphQLPlaygroundConfig::new(path))) +pub async fn handle_graphql_playground(path: &str, subscription_path: &str) -> impl IntoResponse { + response::Html(playground_source( + GraphQLPlaygroundConfig::new(path) + .subscription_endpoint(subscription_path) + .title("p2panda GraphQL Playground"), + )) } /// Handle GraphQL requests. diff --git a/aquadoggo/src/http/service.rs b/aquadoggo/src/http/service.rs index 855d61193..8fd1dc460 100644 --- a/aquadoggo/src/http/service.rs +++ b/aquadoggo/src/http/service.rs @@ -3,6 +3,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use anyhow::Result; +use async_graphql_axum::GraphQLSubscription; use axum::extract::Extension; use axum::http::Method; use axum::routing::get; @@ -23,6 +24,9 @@ use crate::manager::{ServiceReadySender, Shutdown}; /// Route to the GraphQL playground const GRAPHQL_ROUTE: &str = "/graphql"; +/// Route to the GraphQL subscription endpoint +const GRAPHQL_SUBSCRIPTION_ROUTE: &str = "/graphql/ws"; + /// Build HTTP server with GraphQL API. pub fn build_server(http_context: HttpServiceContext) -> Router { // Configure CORS middleware @@ -36,7 +40,12 @@ pub fn build_server(http_context: HttpServiceContext) -> Router { // Add GraphQL routes .route( GRAPHQL_ROUTE, - get(|| handle_graphql_playground(GRAPHQL_ROUTE)).post(handle_graphql_query), + get(|| handle_graphql_playground(GRAPHQL_ROUTE, GRAPHQL_SUBSCRIPTION_ROUTE)) + .post(handle_graphql_query), + ) + .route_service( + GRAPHQL_SUBSCRIPTION_ROUTE, + GraphQLSubscription::new(http_context.schema.clone()), ) // Add blob routes .route("/blobs/:document_id", get(handle_blob_document))