Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL subscription support #604

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aquadoggo/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ mod schema;
mod tests;
pub mod utils;

pub use schema::{build_root_schema, GraphQLSchemaManager};
pub use schema::GraphQLSchemaManager;
51 changes: 49 additions & 2 deletions aquadoggo/src/graphql/queries/document.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use async_graphql::dynamic::{Field, FieldFuture, InputValue, Object, ResolverContext, TypeRef};
use async_graphql::dynamic::{
Field, FieldFuture, InputValue, Object, ResolverContext, Subscription, SubscriptionField,
SubscriptionFieldFuture, TypeRef,
};
use async_graphql::Error;
use dynamic_graphql::ScalarValue;
use log::debug;
use p2panda_rs::schema::Schema;

use crate::graphql::constants;
use crate::graphql::resolvers::resolve_document;
use crate::graphql::resolvers::{resolve_document, resolve_document_subscription};
use crate::graphql::scalars::{DocumentIdScalar, DocumentViewIdScalar};

/// Adds a GraphQL query for retrieving a single document selected by its id or
Expand Down Expand Up @@ -48,6 +51,50 @@ pub fn build_document_query(query: Object, schema: &Schema) -> Object {
)
}

/// Builds a subscription object and adds it to the given root subscription.
///
/// The subscription follows the format `<SCHEMA_ID>(id: <DOCUMENT_ID>, viewId: <DOCUMENT_VIEW_ID>)`.
pub fn build_document_subscription_query(
root_subscription: Subscription,
schema: &Schema,
) -> Subscription {
let schema_id = schema.id().clone();
root_subscription.field(
SubscriptionField::new(
schema_id.to_string(),
TypeRef::named(schema_id.to_string()),
move |ctx| {
SubscriptionFieldFuture::new(async move {
let (document_id, document_view_id) = parse_arguments(&ctx)?;
Ok(resolve_document_subscription(
ctx,
document_id,
document_view_id,
))
})
},
)
.argument(
InputValue::new(
constants::DOCUMENT_ID_ARG,
TypeRef::named(constants::DOCUMENT_ID),
)
.description("Specify the id of the document to be streamed"),
)
.argument(
InputValue::new(
constants::DOCUMENT_VIEW_ID_ARG,
TypeRef::named(constants::DOCUMENT_VIEW_ID),
)
.description("Specify the view id of the document to be streamed"),
)
.description(format!(
"Subscription for application fields of a `{}` document.",
schema.id().name()
)),
)
}

/// Parse and validate the arguments passed into this query.
fn parse_arguments(
ctx: &ResolverContext,
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/graphql/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod document;
mod next_args;

pub use collection::build_collection_query;
pub use document::build_document_query;
pub use document::{build_document_query, build_document_subscription_query};
pub use next_args::build_next_args_query;
48 changes: 45 additions & 3 deletions aquadoggo/src/graphql/resolvers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
use async_graphql::dynamic::ResolverContext;
use async_graphql::Error;
use dynamic_graphql::FieldValue;
use futures::Stream;
use log::debug;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::operation::OperationValue;
use p2panda_rs::schema::{FieldType, Schema};
use p2panda_rs::storage_provider::traits::DocumentStore;
use p2panda_rs::Human;

use crate::db::stores::{PaginationCursor, PaginationData, RelationList};
use crate::db::types::StorageDocument;
Expand Down Expand Up @@ -49,7 +52,7 @@ pub async fn resolve_document(
ctx: ResolverContext<'_>,
document_id: Option<DocumentIdScalar>,
document_view_id: Option<DocumentViewIdScalar>,
) -> Result<Option<FieldValue>, Error> {
) -> Result<Option<FieldValue<'_>>, Error> {
let store = ctx.data_unchecked::<SqlStore>();

let document = match get_document_from_params(store, &document_id, &document_view_id).await? {
Expand All @@ -69,7 +72,7 @@ pub async fn resolve_document_collection(
ctx: ResolverContext<'_>,
schema: Schema,
list: Option<RelationList>,
) -> Result<Option<FieldValue>, Error> {
) -> Result<Option<FieldValue<'_>>, Error> {
let store = ctx.data_unchecked::<SqlStore>();

// Populate query arguments with values from GraphQL query
Expand Down Expand Up @@ -126,7 +129,7 @@ pub async fn resolve_document_field(
let schema = schema_provider
.get(document.schema_id())
.await
.expect("Schema should be in store");
.ok_or(Error::new("Schema not found in store"))?;

// Determine name of the field to be resolved
let name = ctx.field().name();
Expand Down Expand Up @@ -203,3 +206,42 @@ pub async fn resolve_document_field(
value => Ok(Some(FieldValue::value(gql_scalar(value)))),
}
}

/// Resolve a subscription to a single document.
pub fn resolve_document_subscription(
ctx: ResolverContext<'_>,
document_id: Option<DocumentIdScalar>,
document_view_id: Option<DocumentViewIdScalar>,
) -> impl Stream<Item = Result<FieldValue<'_>, Error>> {
async_stream::stream! {
match (&document_id, &document_view_id) {
(_, Some(document_view_id)) => {
debug!("Subscribing to {}", document_view_id.display());
}
(Some(document_id), _) => {
debug!("Subscribing to {}", document_id.display());
}
(_, _) => {
yield Err(Error::new("Document id or document view id must be provided"));
return
}
}

loop {
let store = ctx.data_unchecked::<SqlStore>();

match get_document_from_params(store, &document_id, &document_view_id).await? {
Some(document) => {
yield Ok(FieldValue::owned_any(Resolved::Document(document)));
}
None => {
yield Err(Error::new("Document not found"));
return
}
};


tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Comment on lines +230 to +245
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about push-based replication in another issue which is slightly related to this, maybe it's helpful for you:

We could introduce a "subscription service" or struct on the global Context which simply just keeps a map of who's interested in what ("node c is interested in schema p", "client k is interested in document y", etc.). The node interests we know through Announcement messages (similar to search queries on a network), so it'll be quite easy to implement that, client interests we will learn through new subscriptions.

The materializer service informs the subscription service about any finished materialization via the bus, by document id and by schema id.

The subscription service fires the regarding callbacks of the subscribers which then again trigger the logic needed (make db request, push result to client / initiate replication with node, later we'll have "live mode" which just pushes the new data without any fancy protocol).

Copy link
Member

@adzialocha adzialocha Dec 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to look at this service as part of #589 so we could tie this together at one point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, interesting. I build a more simple integration here, but yeah, that could be upgraded to a proper subscription service. I think if you are just interested in updates to a single document it's quite simple to just use what we already have, but as soon as you get into filtering and sorting it gets more complex. I also looked into using database queries as triggers for subscriptions, so that you register a query and get updated whenever its results change, but I didn't find something in sqlx or postgres yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, interesting. I build a more simple integration here

Looking already very much like that! Awesome!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you are just interested in updates to a single document it's quite simple to just use what we already have, but as soon as you get into filtering and sorting it gets more complex

That will be pretty complex indeed 🤔 I think it's enough to scope things by schema id (collection query) or document id (document query) for now, even if it's ignoring the filter

Copy link
Member Author

@cafca cafca Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can tell me a bit more about what you imagine the subscription service to offer then I can have a go at a first version. edit: actually, maybe I have an idea of what you mean

Copy link
Member

@adzialocha adzialocha Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can tell me a bit more about what you imagine the subscription service to offer then I can have a go at a first version. edit: actually, maybe I have an idea of what you mean

Out of the top of my head (no laptop with me right now) some rough ideas, probably need improvement:

  • Keep a new struct Connections / Peers etc. in the global Context, similar to schema manager
  • The purpose of it is to manage a list of all currently connected / subscribed clients and (later) nodes, probably in form of a map with some sort of unique subscription id -> subscription enum + callback list mapping while the enum is either a SchemaIdSet (collection queries and node announcements) or DocumentId (document queries)
  • Since this thing lives in global context now we can reach it from anywhere: the replication service can populate it with currently known nodes (it learns about it through peer dis-/connected messages) and the graphql service (whenever a subscription starts or ends)
  • Later (not for now) we can use this state to learn about how many nodes I'm currently connected to etc. (Could be an protected graphql or public crate method)
  • Callbacks are added and removed to the map scoped by this unique id and sets of interest
  • A small subscription service is introduced (like all other services). It hooks into the message bus and waits for materializer events. Whenever a document got touched we want to know its schema id and document id (probably that's part of the message itself), so this needs to be sent from the materializer and received here
  • The service as all others also has access to the context and therefore connection table. It looks up on each incoming message if someone is subscribed to that document or that schema and accordingly calls the callbacks
  • For nodes we probably then just want to send a message to the replication service, telling it to begin replication
  • For clients the callback probably just kicks in the subscription callback which repeats the query over the collection or document. As mentioned already I think it's fine if filters are currently ignored at this stage
  • That's probably also not part of your PRs but note for later: our replication service should know if it is in live mode with this peer already, if yes it will just send the new data to it (later just via a gossip overlay broadcast which is also just sort of subscription over a topic / schema id set), if not it will just initiate a normal 1:1 replication session as we currently already do but it would become push-based which is nice
  • Surely that subscription logic could live in another service, not it's own, not sure if it needs it's own place already. On the other hand, it doesn't do anything clearly related to one service and on top it might handle other things in the future like automatically removing timed-out callbacks etc.

}
}
7 changes: 7 additions & 0 deletions aquadoggo/src/graphql/scalars/document_id_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::str::FromStr;

use dynamic_graphql::{Error, Result, Scalar, ScalarValue, Value};
use p2panda_rs::document::DocumentId;
use p2panda_rs::Human;

/// The id of a p2panda document.
#[derive(Scalar, Clone, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -50,6 +51,12 @@ impl Display for DocumentIdScalar {
}
}

impl Human for DocumentIdScalar {
fn display(&self) -> String {
self.0.display()
}
}

/// Validation method used internally in `async-graphql` to check scalar values passed into the
/// public api.
fn validate(value: &Value) -> bool {
Expand Down
7 changes: 7 additions & 0 deletions aquadoggo/src/graphql/scalars/document_view_id_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::str::FromStr;

use dynamic_graphql::{Error, Result, Scalar, ScalarValue, Value};
use p2panda_rs::document::DocumentViewId;
use p2panda_rs::Human;

/// The document view id of a p2panda document. Refers to a specific point in a documents history
/// and can be used to deterministically reconstruct its state at that time.
Expand Down Expand Up @@ -51,6 +52,12 @@ impl Display for DocumentViewIdScalar {
}
}

impl Human for DocumentViewIdScalar {
fn display(&self) -> String {
self.0.display()
}
}

/// Validation method used internally in `async-graphql` to check scalar values passed into the
/// public api.
fn validate(value: &Value) -> bool {
Expand Down
85 changes: 72 additions & 13 deletions aquadoggo/src/graphql/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
//! Dynamically create and manage GraphQL schemas.
use std::sync::Arc;

use async_graphql::dynamic::{Field, FieldFuture, Object, Schema, TypeRef};
use async_graphql::dynamic::{Field, FieldFuture, Object, Schema, Subscription, TypeRef};
use async_graphql::{Request, Response, Value};
use dynamic_graphql::internal::Registry;
use futures::stream::BoxStream;
use futures::{FutureExt, Stream, StreamExt};
use log::{debug, info, warn};
use p2panda_rs::Human;
use tokio::sync::Mutex;
Expand All @@ -23,7 +25,8 @@ use crate::graphql::objects::{
build_paginated_document_object, DocumentMeta,
};
use crate::graphql::queries::{
build_collection_query, build_document_query, build_next_args_query,
build_collection_query, build_document_query, build_document_subscription_query,
build_next_args_query,
};
use crate::graphql::responses::NextArguments;
use crate::graphql::scalars::{
Expand Down Expand Up @@ -75,15 +78,23 @@ pub async fn build_root_schema(
.register::<PublicKeyScalar>()
.register::<SeqNumScalar>();

let mut schema_builder = Schema::build("Query", Some("MutationRoot"), None);
// Construct the root query object
let mut root_query = Object::new("Query");

// Construct the root subscription object
let mut root_subscription = Subscription::new("Subscription");

// Start a schema builder with the root objects
let mut schema_builder = Schema::build(
root_query.type_name(),
Some("MutationRoot"),
Some(root_subscription.type_name()),
);

// Populate it with the registered types. We can now use these in any following dynamically
// created query object fields.
schema_builder = registry.apply_into_schema_builder(schema_builder);

// Construct the root query object
let mut root_query = Object::new("Query");

// Loop through all schema retrieved from the schema store, dynamically create GraphQL objects,
// input values and a query for the documents they describe
for schema in all_schema {
Expand Down Expand Up @@ -120,15 +131,18 @@ pub async fn build_root_schema(

// Add a query for retrieving all documents of a certain schema
root_query = build_collection_query(root_query, &schema);

// Add a field for subscribing to changes on a single document
root_subscription = build_document_subscription_query(root_subscription, &schema);
}

// Add next args to the query object
let root_query = build_next_args_query(root_query);

// Build the GraphQL schema. We can unwrap here since it will only fail if we forgot to
// register all required types above
// Build the GraphQL schema
schema_builder
.register(root_query)
.register(root_subscription)
.data(store)
.data(schema_provider)
.data(tx)
Expand Down Expand Up @@ -211,15 +225,23 @@ impl GraphQLSchemaManager {
let mut on_schema_added = shared.schema_provider.on_schema_added();

// Create the new GraphQL based on the current state of known p2panda application schemas
async fn rebuild(shared: GraphQLSharedData, schemas: GraphQLSchemas) {
async fn rebuild(
shared: GraphQLSharedData,
schemas: GraphQLSchemas,
) -> Result<(), async_graphql::dynamic::SchemaError> {
match build_root_schema(shared.store, shared.tx, shared.schema_provider).await {
Ok(schema) => schemas.lock().await.push(schema),
Err(err) => warn!("Error building GraphQL schema: {}", err),
Ok(schema) => {
schemas.lock().await.push(schema);
Ok(())
}
Err(err) => Err(err),
}
}

// Always build a schema right at the beginning as we don't have one yet
rebuild(shared.clone(), schemas.clone()).await;
if let Err(err) = rebuild(shared.clone(), schemas.clone()).await {
panic!("Failed building initial GraphQL schema: {}", err);
}
debug!("Finished building initial GraphQL schema");

// Spawn a task which reacts to newly registered p2panda schemas
Expand All @@ -231,7 +253,9 @@ impl GraphQLSchemaManager {
"Changed schema {}, rebuilding GraphQL API",
schema_id.display()
);
rebuild(shared.clone(), schemas.clone()).await;
if let Err(err) = rebuild(shared.clone(), schemas.clone()).await {
warn!("Failed building GraphQL schema: {}", err);
}
}
Err(err) => {
panic!("Failed receiving schema updates: {}", err)
Expand All @@ -254,6 +278,41 @@ impl GraphQLSchemaManager {
.execute(request)
.await
}

/// Executes an incoming GraphQL query.
///
/// This method makes sure the GraphQL query will be executed by the latest given schema the
/// manager knows about.
pub async fn stream(
self,
request: impl Into<Request>,
session_data: Arc<async_graphql::Data>,
) -> impl Stream<Item = Response> {
self.schemas
.lock()
.await
.last()
.expect("No schema given yet")
.execute_stream_with_session_data(request, session_data)
}
}

#[async_trait::async_trait]
impl async_graphql::Executor for GraphQLSchemaManager {
async fn execute(&self, request: Request) -> Response {
self.execute(request).await
}

fn execute_stream(
&self,
request: Request,
session_data: Option<Arc<async_graphql::Data>>,
) -> BoxStream<'static, Response> {
self.clone()
.stream(request, session_data.unwrap_or_default())
.flatten_stream()
.boxed()
Comment on lines +311 to +314
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These four lines!! This drove me crazy lol

}
}

impl std::fmt::Debug for GraphQLSchemaManager {
Expand Down
8 changes: 6 additions & 2 deletions aquadoggo/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ use tokio_util::io::ReaderStream;
use crate::http::context::HttpServiceContext;

/// Handle GraphQL playground requests at the given path.
pub async fn handle_graphql_playground(path: &str) -> impl IntoResponse {
response::Html(playground_source(GraphQLPlaygroundConfig::new(path)))
pub async fn handle_graphql_playground(path: &str, subscription_path: &str) -> impl IntoResponse {
response::Html(playground_source(
GraphQLPlaygroundConfig::new(path)
.subscription_endpoint(subscription_path)
.title("p2panda GraphQL Playground"),
))
}

/// Handle GraphQL requests.
Expand Down
Loading
Loading