Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add initial healthcheck endpoint (ampd) #14

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 177 additions & 42 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ampd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rust-version = { workspace = true }
[dependencies]
async-trait = "0.1.59"
axelar-wasm-std = { workspace = true }
axum = "0.7.5"
base64 = "0.21.2"
bcs = "0.1.5"
clap = { version = "4.2.7", features = ["derive", "cargo"] }
Expand Down
2 changes: 2 additions & 0 deletions ampd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Below is the config file format, with explanations for each entry:
tm_jsonrpc=[JSON-RPC URL of Axelar node]
tm_grpc=[gRPC URL of Axelar node]
event_buffer_cap=[max blockchain events to queue. Will error if set too low]
health_check_bind_addr =[the /status endpoint bind address i.e "0.0.0.0:3000"]

[service_registry]
cosmwasm_contract=[address of service registry]
Expand Down Expand Up @@ -52,6 +53,7 @@ type=[handler type. Could be EvmMsgWorkerSetVerifier | SuiWorkerSetVerifier]

Below is an example config for connecting to a local axelard node and local tofnd process, and verifying transactions from Avalanche testnet and Sui testnet.
```
health_check_bind_addr = "0.0.0.0:3000"
tm_jsonrpc="http://localhost:26657"
tm_grpc="tcp://localhost:9090"
event_buffer_cap=10000
Expand Down
3 changes: 3 additions & 0 deletions ampd/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;

use serde::{Deserialize, Serialize};
Expand All @@ -11,6 +12,7 @@ use crate::url::Url;
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[serde(default)]
pub struct Config {
pub health_check_bind_addr: SocketAddrV4,
pub tm_jsonrpc: Url,
pub tm_grpc: Url,
pub event_buffer_cap: usize,
Expand All @@ -34,6 +36,7 @@ impl Default for Config {
event_buffer_cap: 100000,
event_stream_timeout: Duration::from_secs(15),
service_registry: ServiceRegistryConfig::default(),
health_check_bind_addr: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 3000),
}
}
}
Expand Down
98 changes: 98 additions & 0 deletions ampd/src/health_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use error_stack::Result;
use std::{io, net::SocketAddrV4};
use thiserror::Error;
use tracing::info;

use axum::{http::StatusCode, routing::get, Json, Router};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;

pub struct Server {
listener: tokio::net::TcpListener,
}

#[derive(Error, Debug)]
pub enum HealthCheckError {
#[error("Health check HTTP server problem: {0}")]
HTTPServerIO(#[from] io::Error),
}

impl Server {
pub async fn new(bind_addr: SocketAddrV4) -> Result<Self, HealthCheckError> {
Ok(Self {
listener: tokio::net::TcpListener::bind(bind_addr)
.await
.map_err(HealthCheckError::from)?,
})
}

pub async fn run(self, cancel: CancellationToken) -> Result<(), HealthCheckError> {
let app = Router::new().route("/status", get(status));
let bind_address = self.listener.local_addr().map_err(HealthCheckError::from)?;

info!("Starting health check server at: {}", bind_address);

Ok(axum::serve(self.listener, app)
.with_graceful_shutdown(async move { cancel.cancelled().await })
.await
.map_err(HealthCheckError::from)?)
}
}

// basic handler that responds with a static string
async fn status() -> (StatusCode, Json<Status>) {
(StatusCode::OK, Json(Status { ok: true }))
}

#[derive(Serialize, Deserialize)]
struct Status {
ok: bool,
}

#[cfg(test)]
mod tests {

use super::*;
use std::io;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use tokio::test as async_test;

impl Server {
fn listening_addr(&self) -> io::Result<SocketAddr> {
self.listener.local_addr()
}
}

#[async_test]
async fn server_lifecycle() {
let server = Server::new(SocketAddrV4::from_str("127.0.0.1:0").unwrap())
.await
.unwrap();
let listening_addr = server.listening_addr().unwrap();

let cancel = CancellationToken::new();

tokio::spawn(server.run(cancel.clone()));

let url = format!("http://{}/status", listening_addr);

tokio::time::sleep(Duration::from_millis(100)).await;

let response = reqwest::get(&url).await.unwrap();
assert_eq!(reqwest::StatusCode::OK, response.status());

let status = response.json::<Status>().await.unwrap();
assert!(status.ok);

cancel.cancel();

tokio::time::sleep(Duration::from_millis(100)).await;

match reqwest::get(&url).await {
Ok(_) => panic!("health check server should be closed by now"),
Err(error) => assert!(error.is_connect()),
};
}
}
18 changes: 18 additions & 0 deletions ampd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod event_processor;
mod event_sub;
mod evm;
mod handlers;
mod health_check;
mod json_rpc;
mod queue;
pub mod state;
Expand Down Expand Up @@ -71,6 +72,7 @@ async fn prepare_app(cfg: Config, state: State) -> Result<App<impl Broadcaster>,
event_buffer_cap,
event_stream_timeout,
service_registry: _service_registry,
health_check_bind_addr,
} = cfg;

let tm_client = tendermint_rpc::HttpClient::new(tm_jsonrpc.to_string().as_str())
Expand Down Expand Up @@ -122,6 +124,10 @@ async fn prepare_app(cfg: Config, state: State) -> Result<App<impl Broadcaster>,
.build()
.change_context(Error::Broadcaster)?;

let health_check_server = health_check::Server::new(health_check_bind_addr)
.await
.change_context(Error::HealthCheckError)?;

App::new(
tm_client,
broadcaster,
Expand All @@ -130,6 +136,7 @@ async fn prepare_app(cfg: Config, state: State) -> Result<App<impl Broadcaster>,
broadcast,
event_buffer_cap,
block_height_monitor,
health_check_server,
)
.configure_handlers(worker, handlers, event_stream_timeout)
.await
Expand Down Expand Up @@ -163,6 +170,7 @@ where
state_updater: StateUpdater,
ecdsa_client: SharableEcdsaClient,
block_height_monitor: BlockHeightMonitor<tendermint_rpc::HttpClient>,
health_check_server: health_check::Server,
token: CancellationToken,
}

Expand All @@ -178,6 +186,7 @@ where
broadcast_cfg: broadcaster::Config,
event_buffer_cap: usize,
block_height_monitor: BlockHeightMonitor<tendermint_rpc::HttpClient>,
health_check_server: health_check::Server,
) -> Self {
let token = CancellationToken::new();

Expand All @@ -203,6 +212,7 @@ where
state_updater,
ecdsa_client,
block_height_monitor,
health_check_server,
token,
}
}
Expand Down Expand Up @@ -375,6 +385,7 @@ where
broadcaster,
state_updater,
block_height_monitor,
health_check_server,
token,
..
} = self;
Expand Down Expand Up @@ -407,6 +418,11 @@ where
.run(token)
.change_context(Error::EventPublisher)
}))
.add_task(CancellableTask::create(|token| {
health_check_server
.run(token)
.change_context(Error::HealthCheckError)
}))
.add_task(CancellableTask::create(|token| {
event_processor
.run(token)
Expand Down Expand Up @@ -457,4 +473,6 @@ pub enum Error {
BlockHeightMonitor,
#[error("invalid finalizer type for chain {0}")]
InvalidFinalizerType(ChainName),
#[error("Health check system error")]
HealthCheckError,
}
1 change: 1 addition & 0 deletions ampd/src/tests/config_template.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
health_check_bind_addr = '0.0.0.0:3000'
tm_jsonrpc = 'http://localhost:26657/'
tm_grpc = 'tcp://localhost:9090'
event_buffer_cap = 100000
Expand Down
1 change: 1 addition & 0 deletions contracts/multisig-prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ gateway-api = { workspace = true }
hex = { version = "0.4.3", default-features = false, features = [] }
itertools = "0.11.0"
k256 = { version = "0.13.1", features = ["ecdsa"] }
monitoring = { workspace = true }
multisig = { workspace = true, features = ["library"] }
report = { workspace = true }
serde_json = "1.0.89"
Expand Down
Loading
Loading