Skip to content

Commit

Permalink
refactor: use graphql-client for status monitor (#486)
Browse files Browse the repository at this point in the history
  • Loading branch information
taslimmuhammed authored Nov 13, 2024
1 parent 552d16b commit 24b558c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 38 deletions.
68 changes: 30 additions & 38 deletions crates/common/src/subgraph_client/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,30 @@

use std::time::Duration;

use deployment_status_query::Health;
use graphql_client::GraphQLQuery;
use reqwest::Url;
use serde::Deserialize;
use serde_json::json;
use thegraph_core::DeploymentId;
use thegraph_graphql_http::{
http::request::IntoRequestParameters,
http_client::{ReqwestExt, ResponseResult},
};
use tokio::sync::watch::Receiver;

use crate::watcher::new_watcher;

use super::Query;

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct DeploymentStatusResponse {
indexing_statuses: Vec<DeploymentStatus>,
}
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "../graphql/indexing_status.schema.graphql",
query_path = "../graphql/subgraph_deployment_status.graphql",
response_derives = "Debug",
variables_derives = "Clone"
)]
pub struct DeploymentStatusQuery;

#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
pub struct DeploymentStatus {
pub synced: bool,
pub health: String,
}

async fn query<T: for<'de> Deserialize<'de>>(
url: Url,
query: impl IntoRequestParameters + Send,
) -> Result<ResponseResult<T>, anyhow::Error> {
Ok(reqwest::Client::new().post(url).send_graphql(query).await?)
}

pub async fn monitor_deployment_status(
deployment: DeploymentId,
status_url: Url,
Expand All @@ -50,28 +41,28 @@ pub async fn check_deployment_status(
deployment: DeploymentId,
status_url: Url,
) -> Result<DeploymentStatus, anyhow::Error> {
let body = Query::new_with_variables(
r#"
query indexingStatuses($ids: [String!]!) {
indexingStatuses(subgraphs: $ids) {
synced
health
}
}
"#,
[("ids", json!([deployment.to_string()]))],
);

let response = query::<DeploymentStatusResponse>(status_url, body).await?;

match response {
Ok(deployment_status) => deployment_status
let req_body = DeploymentStatusQuery::build_query(deployment_status_query::Variables {
ids: vec![deployment.to_string()],
});
let client = reqwest::Client::new();
let response = client.post(status_url).json(&req_body).send().await?;
let graphql_response: graphql_client::Response<deployment_status_query::ResponseData> =
response.json().await?;
match graphql_response.data {
Some(data) => data
.indexing_statuses
.first()
.cloned()
.map(|status| DeploymentStatus {
synced: status.synced,
health: match status.health {
Health::healthy => "healthy".to_owned(),
Health::unhealthy => "unhealthy".to_owned(),
_ => "failed".to_owned(),
},
})
.ok_or_else(|| anyhow::anyhow!("Deployment `{deployment}` not found")),
Err(e) => Err(anyhow::anyhow!(
"Failed to query status of deployment `{deployment}`: {e}"
None => Err(anyhow::anyhow!(
"Failed to query status of deployment `{deployment}`"
)),
}
}
Expand All @@ -80,6 +71,7 @@ pub async fn check_deployment_status(
mod tests {
use std::str::FromStr;

use serde_json::json;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

Expand Down
6 changes: 6 additions & 0 deletions crates/graphql/subgraph_deployment_status.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
query DeploymentStatusQuery($ids: [String!]!) {
indexingStatuses(subgraphs: $ids) {
synced
health
}
}

0 comments on commit 24b558c

Please sign in to comment.