Skip to content

Commit

Permalink
Health endpoint (#442)
Browse files Browse the repository at this point in the history
* fixed health endpoint to work with http probe

* bump ersion

* adjusted according to review
  • Loading branch information
lassemand authored Jan 30, 2025
1 parent 87abd23 commit 2a81136
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 35 deletions.
2 changes: 1 addition & 1 deletion backend-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend-rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "concordium-scan"
version = "0.1.17"
version = "0.1.18"
edition = "2021"
description = "CCDScan: Indexer and API for the Concordium blockchain"
authors = ["Concordium <[email protected]>"]
Expand Down
52 changes: 32 additions & 20 deletions backend-rust/src/bin/ccdscan-api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context;
use axum::{http::StatusCode, Json};
use clap::Parser;
use concordium_scan::{
graphql_api, graphql_api::node_status::NodeInfoReceiver, migrations, router,
Expand Down Expand Up @@ -161,7 +162,7 @@ async fn main() -> anyhow::Result<()> {
let (subscription, subscription_listener) =
graphql_api::Subscription::new(cli.database_retry_delay_secs);
let (nodes_status_sender, nodes_status_receiver) = tokio::sync::watch::channel(None);
let block_receiver_health = nodes_status_receiver.clone();
let node_status_receiver = nodes_status_receiver.clone();
let mut pgnotify_listener = {
let pool = pool.clone();
let stop_signal = cancel_token.child_token();
Expand All @@ -183,10 +184,23 @@ async fn main() -> anyhow::Result<()> {
info!("Server is running at {:?}", cli.listen);
tokio::spawn(async move { service.serve(tcp_listener, stop_signal).await })
};
let mut node_collector_task = {
let stop_signal = cancel_token.child_token();
let service = graphql_api::node_status::Service::new(
nodes_status_sender,
&cli.node_collector_backend_origin,
Duration::from_secs(cli.node_collector_backend_pull_frequency_sec),
client,
cli.node_collector_connection_max_content_length,
stop_signal,
&mut registry,
);
tokio::spawn(service.serve())
};
let mut monitoring_task = {
let state = HealthState {
pool,
node_status_receiver: block_receiver_health,
node_status_receiver,
};
let health_routes =
axum::Router::new().route("/", axum::routing::get(health)).with_state(state);
Expand All @@ -197,18 +211,6 @@ async fn main() -> anyhow::Result<()> {
info!("Monitoring server is running at {:?}", cli.monitoring_listen);
tokio::spawn(router::serve(registry, tcp_listener, stop_signal, health_routes))
};
let mut node_collector_task = {
let stop_signal = cancel_token.child_token();
let service = graphql_api::node_status::Service::new(
nodes_status_sender,
&cli.node_collector_backend_origin,
Duration::from_secs(cli.node_collector_backend_pull_frequency_sec),
client,
cli.node_collector_connection_max_content_length,
stop_signal,
);
tokio::spawn(service.serve())
};

// Await for signal to shutdown or any of the tasks to stop.
tokio::select! {
Expand Down Expand Up @@ -265,15 +267,25 @@ struct HealthState {
/// Verifying the API service state is as expected.
async fn health(
axum::extract::State(state): axum::extract::State<HealthState>,
) -> axum::Json<serde_json::Value> {
) -> (StatusCode, Json<serde_json::Value>) {
let node_status_connected = state.node_status_receiver.borrow().is_some();
let database_connected =
migrations::ensure_compatible_schema_version(&state.pool, SUPPORTED_SCHEMA_VERSION)
.await
.is_ok();
axum::Json(json!({
"status": if node_status_connected && database_connected {"ok"} else {"error"},
"node_status": if node_status_connected {"connected"} else {"not connected"},
"database": if database_connected {"connected"} else {"not connected"},
}))

let is_healthy = node_status_connected && database_connected;

let status_code = if is_healthy {
StatusCode::OK
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
(
status_code,
Json(json!({
"node_status": if node_status_connected {"connected"} else {"not connected"},
"database_status": if database_connected {"connected"} else {"not connected"},
})),
)
}
23 changes: 13 additions & 10 deletions backend-rust/src/bin/ccdscan-indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context;
use axum::{http::StatusCode, Json};
use clap::Parser;
use concordium_rust_sdk::v2;
use concordium_scan::{
Expand Down Expand Up @@ -177,15 +178,17 @@ async fn main() -> anyhow::Result<()> {
/// Verifying the indexer service state is as expected.
async fn health(
axum::extract::State(pool): axum::extract::State<sqlx::PgPool>,
) -> axum::Json<serde_json::Value> {
match migrations::ensure_latest_schema_version(&pool).await {
Ok(_) => axum::Json(json!({
"status": "ok",
"database": "connected"
})),
Err(err) => axum::Json(json!({
"status": "error",
"database": format!("not connected: {}", err)
) -> (StatusCode, Json<serde_json::Value>) {
let database_connected = migrations::ensure_latest_schema_version(&pool).await.is_ok();
let status_code = if database_connected {
StatusCode::OK
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
(
status_code,
Json(json!({
"database_status": if database_connected {"connected"} else {"not connected"},
})),
}
)
}
18 changes: 15 additions & 3 deletions backend-rust/src/graphql_api/node_status.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{ApiError, ApiResult};
use crate::connection::connection_from_slice;
use async_graphql::{connection, types, ComplexObject, Context, Enum, Object, SimpleObject};
use prometheus_client::{metrics::counter::Counter, registry::Registry};
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use std::{cmp::Ordering::Equal, time::Duration};
Expand Down Expand Up @@ -81,10 +82,11 @@ enum NodeSortDirection {
}

pub struct Service {
sender: Sender<Option<Vec<NodeStatus>>>,
sender: Sender<Option<Vec<NodeStatus>>>,
node_collector_backend: NodeCollectorBackendClient,
pull_frequency: Duration,
cancellation_token: CancellationToken,
pull_frequency: Duration,
cancellation_token: CancellationToken,
failed_node_status_fetch_counter: Counter,
}

impl Service {
Expand All @@ -95,14 +97,23 @@ impl Service {
client: Client,
max_content_length: u64,
cancellation_token: CancellationToken,
registry: &mut Registry,
) -> Self {
let failed_node_status_fetch_counter = Counter::default();
registry.register(
"failed_node_status_fetch_counter",
"Number of failed attempts to retrieve data from the node status collector",
failed_node_status_fetch_counter.clone(),
);

let node_collector_backend =
NodeCollectorBackendClient::new(client, origin, max_content_length);
Self {
sender,
node_collector_backend,
pull_frequency,
cancellation_token,
failed_node_status_fetch_counter,
}
}

Expand All @@ -121,6 +132,7 @@ impl Service {
}
}
Err(err) => {
self.failed_node_status_fetch_counter.inc();
error!("Error querying node summary: {}", err);
}
}
Expand Down

0 comments on commit 2a81136

Please sign in to comment.