Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Health endpoint #442

Merged
merged 3 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 32 additions & 20 deletions backend-rust/src/bin/ccdscan-api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context;
use axum::{http::StatusCode, Json};
use clap::Parser;
use concordium_scan::{
graphql_api, graphql_api::node_status::NodeInfoReceiver, migrations, router,
Expand Down Expand Up @@ -161,7 +162,7 @@ async fn main() -> anyhow::Result<()> {
let (subscription, subscription_listener) =
graphql_api::Subscription::new(cli.database_retry_delay_secs);
let (nodes_status_sender, nodes_status_receiver) = tokio::sync::watch::channel(None);
let block_receiver_health = nodes_status_receiver.clone();
let node_status_receiver = nodes_status_receiver.clone();
let mut pgnotify_listener = {
let pool = pool.clone();
let stop_signal = cancel_token.child_token();
Expand All @@ -183,10 +184,23 @@ async fn main() -> anyhow::Result<()> {
info!("Server is running at {:?}", cli.listen);
tokio::spawn(async move { service.serve(tcp_listener, stop_signal).await })
};
let mut node_collector_task = {
let stop_signal = cancel_token.child_token();
let service = graphql_api::node_status::Service::new(
nodes_status_sender,
&cli.node_collector_backend_origin,
Duration::from_secs(cli.node_collector_backend_pull_frequency_sec),
client,
cli.node_collector_connection_max_content_length,
stop_signal,
&mut registry,
);
tokio::spawn(service.serve())
};
let mut monitoring_task = {
let state = HealthState {
pool,
node_status_receiver: block_receiver_health,
node_status_receiver,
};
let health_routes =
axum::Router::new().route("/", axum::routing::get(health)).with_state(state);
Expand All @@ -197,18 +211,6 @@ async fn main() -> anyhow::Result<()> {
info!("Monitoring server is running at {:?}", cli.monitoring_listen);
tokio::spawn(router::serve(registry, tcp_listener, stop_signal, health_routes))
};
let mut node_collector_task = {
let stop_signal = cancel_token.child_token();
let service = graphql_api::node_status::Service::new(
nodes_status_sender,
&cli.node_collector_backend_origin,
Duration::from_secs(cli.node_collector_backend_pull_frequency_sec),
client,
cli.node_collector_connection_max_content_length,
stop_signal,
);
tokio::spawn(service.serve())
};

// Await for signal to shutdown or any of the tasks to stop.
tokio::select! {
Expand Down Expand Up @@ -265,15 +267,25 @@ struct HealthState {
/// Verifying the API service state is as expected.
async fn health(
axum::extract::State(state): axum::extract::State<HealthState>,
) -> axum::Json<serde_json::Value> {
) -> (StatusCode, Json<serde_json::Value>) {
let node_status_connected = state.node_status_receiver.borrow().is_some();
let database_connected =
migrations::ensure_compatible_schema_version(&state.pool, SUPPORTED_SCHEMA_VERSION)
.await
.is_ok();
axum::Json(json!({
"status": if node_status_connected && database_connected {"ok"} else {"error"},
lassemand marked this conversation as resolved.
Show resolved Hide resolved
"node_status": if node_status_connected {"connected"} else {"not connected"},
"database": if database_connected {"connected"} else {"not connected"},
}))

let is_healthy = node_status_connected && database_connected;

let status_code = if is_healthy {
StatusCode::OK
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
(
status_code,
Json(json!({
"node_status": if node_status_connected {"connected"} else {"not connected"},
"database": if database_connected {"connected"} else {"not connected"},
lassemand marked this conversation as resolved.
Show resolved Hide resolved
})),
)
}
24 changes: 14 additions & 10 deletions backend-rust/src/bin/ccdscan-indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context;
use axum::{http::StatusCode, Json};
use clap::Parser;
use concordium_rust_sdk::v2;
use concordium_scan::{
Expand Down Expand Up @@ -177,15 +178,18 @@ async fn main() -> anyhow::Result<()> {
/// Verifying the indexer service state is as expected.
async fn health(
axum::extract::State(pool): axum::extract::State<sqlx::PgPool>,
) -> axum::Json<serde_json::Value> {
match migrations::ensure_latest_schema_version(&pool).await {
Ok(_) => axum::Json(json!({
"status": "ok",
"database": "connected"
})),
Err(err) => axum::Json(json!({
"status": "error",
"database": format!("not connected: {}", err)
) -> (StatusCode, Json<serde_json::Value>) {
let database_connected = migrations::ensure_latest_schema_version(&pool).await.is_ok();
let is_healthy = database_connected;
lassemand marked this conversation as resolved.
Show resolved Hide resolved
let status_code = if is_healthy {
StatusCode::OK
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
(
status_code,
Json(json!({
"database": if database_connected {"connected"} else {"not connected"},
lassemand marked this conversation as resolved.
Show resolved Hide resolved
})),
}
)
}
18 changes: 15 additions & 3 deletions backend-rust/src/graphql_api/node_status.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{ApiError, ApiResult};
use crate::connection::connection_from_slice;
use async_graphql::{connection, types, ComplexObject, Context, Enum, Object, SimpleObject};
use prometheus_client::{metrics::counter::Counter, registry::Registry};
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use std::{cmp::Ordering::Equal, time::Duration};
Expand Down Expand Up @@ -81,10 +82,11 @@ enum NodeSortDirection {
}

pub struct Service {
sender: Sender<Option<Vec<NodeStatus>>>,
sender: Sender<Option<Vec<NodeStatus>>>,
node_collector_backend: NodeCollectorBackendClient,
pull_frequency: Duration,
cancellation_token: CancellationToken,
pull_frequency: Duration,
cancellation_token: CancellationToken,
failed_node_status_fetch_counter: Counter,
}

impl Service {
Expand All @@ -95,14 +97,23 @@ impl Service {
client: Client,
max_content_length: u64,
cancellation_token: CancellationToken,
registry: &mut Registry,
) -> Self {
let failed_node_status_fetch_counter = Counter::default();
registry.register(
"failed_node_status_fetch_counter",
"Number of failed attempts to retrieve data from the node status collector",
failed_node_status_fetch_counter.clone(),
);

let node_collector_backend =
NodeCollectorBackendClient::new(client, origin, max_content_length);
Self {
sender,
node_collector_backend,
pull_frequency,
cancellation_token,
failed_node_status_fetch_counter,
}
}

Expand All @@ -121,6 +132,7 @@ impl Service {
}
}
Err(err) => {
self.failed_node_status_fetch_counter.inc();
error!("Error querying node summary: {}", err);
}
}
Expand Down