Skip to content

Commit

Permalink
feat(host): rewrite using handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
keroro520 committed Jan 12, 2025
1 parent afc2809 commit d2c12c0
Show file tree
Hide file tree
Showing 23 changed files with 335 additions and 708 deletions.
2 changes: 2 additions & 0 deletions host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ sgx-prover = { path = "../provers/sgx/prover", optional = true }
raiko-lib = { workspace = true }
raiko-core = { workspace = true }
raiko-tasks = { workspace = true }
raiko-reqpool = { workspace = true }
raiko-reqactor = { workspace = true }

# alloy
alloy-rlp = { workspace = true }
Expand Down
62 changes: 50 additions & 12 deletions host/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#![allow(incomplete_features)]
use raiko_host::{interfaces::HostResult, server::serve, ProverState};
use clap::Parser;
use raiko_host::{interfaces::HostResult, server::serve, Opts};
use raiko_lib::consts::SupportedChainSpecs;
use raiko_reqpool::RedisPoolConfig;
use std::path::PathBuf;
use tracing::{debug, info};
use tracing_appender::{
Expand All @@ -14,23 +17,58 @@ async fn main() -> HostResult<()> {
env_logger::Builder::from_default_env()
.target(env_logger::Target::Stdout)
.init();
let state = ProverState::init()?;
let _guard = subscribe_log(
&state.opts.log_path,
&state.opts.log_level,
state.opts.max_log,
let opts = parse_opts()?;
let chain_specs = parse_chain_specs(&opts);

// Start pool
let pool = raiko_reqpool::RedisPool::open(RedisPoolConfig {
redis_url: opts.redis_url.clone(),
redis_ttl: opts.redis_ttl,
})
.map_err(|e| anyhow::anyhow!(e))?;

// Start Actor
let (controller, pause_tx) =
raiko_reqactor::Actor::start(pool.clone(), chain_specs.clone()).await;
let gateway = raiko_reqactor::Gateway::new(
pool,
opts.proof_request_opt.clone(),
chain_specs.clone(),
controller,
pause_tx,
);
debug!("Start config:\n{:#?}", state.opts.proof_request_opt);
debug!("Args:\n{:#?}", state.opts);

info!("Supported chains: {:?}", state.chain_specs);
info!("Start config:\n{:#?}", state.opts.proof_request_opt);
info!("Args:\n{:#?}", state.opts);
let _guard = subscribe_log(&opts.log_path, &opts.log_level, opts.max_log);
debug!("Start config:\n{:#?}", opts.proof_request_opt);
debug!("Args:\n{:#?}", opts);
info!("Supported chains: {:?}", chain_specs);

serve(state).await?;
let address = opts.address.as_str();
let concurrency = opts.concurrency_limit;
let jwt_secret = opts.jwt_secret.clone();
serve(gateway, address, concurrency, jwt_secret).await?;
Ok(())
}

fn parse_opts() -> HostResult<Opts> {
// Read the command line arguments;
let mut opts = Opts::parse();
// Read env supported options.
opts.merge_from_env();
// Read the config file.
opts.merge_from_file()?;

Ok(opts)
}

fn parse_chain_specs(opts: &Opts) -> SupportedChainSpecs {
if let Some(cs_path) = &opts.chain_spec_path {
SupportedChainSpecs::merge_from_file(cs_path.clone()).expect("Failed to parse chain specs")
} else {
SupportedChainSpecs::default()
}
}

fn subscribe_log(
log_path: &Option<PathBuf>,
log_level: &String,
Expand Down
115 changes: 8 additions & 107 deletions host/src/server/api/admin.rs
Original file line number Diff line number Diff line change
@@ -1,114 +1,15 @@
use axum::{extract::State, routing::post, Router};

use crate::{interfaces::HostResult, ProverState};
use crate::interfaces::HostResult;
use raiko_reqactor::Gateway;

pub fn create_router() -> Router<ProverState> {
Router::new()
.route("/admin/pause", post(pause))
.route("/admin/unpause", post(unpause))
pub fn create_router<P: raiko_reqpool::Pool + 'static>() -> Router<Gateway<P>> {
Router::new().route("/admin/pause", post(pause))
}

async fn pause(State(state): State<ProverState>) -> HostResult<&'static str> {
state.set_pause(true).await?;
async fn pause<P: raiko_reqpool::Pool>(
State(gateway): State<Gateway<P>>,
) -> HostResult<&'static str> {
gateway.pause().await.map_err(|e| anyhow::anyhow!(e))?;
Ok("System paused successfully")
}

async fn unpause(State(state): State<ProverState>) -> HostResult<&'static str> {
state.set_pause(false).await?;
Ok("System unpaused successfully")
}

#[cfg(test)]
mod tests {
use super::*;
use axum::{
body::Body,
http::{Request, StatusCode},
};
use clap::Parser;
use std::path::PathBuf;
use tower::ServiceExt;

#[tokio::test]
async fn test_pause() {
let opts = {
let mut opts = crate::Opts::parse();
opts.config_path = PathBuf::from("../host/config/config.json");
opts.merge_from_file().unwrap();
opts
};
let state = ProverState::init_with_opts(opts).unwrap();
let app = Router::new()
.route("/admin/pause", post(pause))
.with_state(state.clone());

let request = Request::builder()
.method("POST")
.uri("/admin/pause")
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert!(state.is_paused());
}

#[tokio::test]
async fn test_pause_when_already_paused() {
let opts = {
let mut opts = crate::Opts::parse();
opts.config_path = PathBuf::from("../host/config/config.json");
opts.merge_from_file().unwrap();
opts
};
let state = ProverState::init_with_opts(opts).unwrap();

state.set_pause(true).await.unwrap();

let app = Router::new()
.route("/admin/pause", post(pause))
.with_state(state.clone());

let request = Request::builder()
.method("POST")
.uri("/admin/pause")
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert!(state.is_paused());
}

#[tokio::test]
async fn test_unpause() {
let opts = {
let mut opts = crate::Opts::parse();
opts.config_path = PathBuf::from("../host/config/config.json");
opts.merge_from_file().unwrap();
opts
};
let state = ProverState::init_with_opts(opts).unwrap();

// Set initial paused state
state.set_pause(true).await.unwrap();
assert!(state.is_paused());

let app = Router::new()
.route("/admin/unpause", post(unpause))
.with_state(state.clone());

let request = Request::builder()
.method("POST")
.uri("/admin/unpause")
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert!(!state.is_paused());
}
}
9 changes: 5 additions & 4 deletions host/src/server/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use axum::{
http::{header, HeaderName, Method, StatusCode, Uri},
Router,
};
use raiko_reqactor::Gateway;
use tower::ServiceBuilder;
use tower_http::{
compression::CompressionLayer,
Expand All @@ -11,17 +12,17 @@ use tower_http::{
validate_request::ValidateRequestHeaderLayer,
};

use crate::ProverState;

pub mod admin;
pub mod util;
pub mod v1;
pub mod v2;
pub mod v3;

pub const MAX_BODY_SIZE: usize = 1 << 20;

pub fn create_router(concurrency_limit: usize, jwt_secret: Option<&str>) -> Router<ProverState> {
pub fn create_router<P: raiko_reqpool::Pool + 'static>(
concurrency_limit: usize,
jwt_secret: Option<&str>,
) -> Router<Gateway<P>> {
let cors = CorsLayer::new()
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_headers([
Expand Down
12 changes: 0 additions & 12 deletions host/src/server/api/util.rs

This file was deleted.

8 changes: 4 additions & 4 deletions host/src/server/api/v1/health.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use axum::{debug_handler, http::StatusCode, routing::get, Router};
use axum::{http::StatusCode, routing::get, Router};
use utoipa::OpenApi;

use crate::ProverState;
use raiko_reqactor::Gateway;

#[utoipa::path(
get,
Expand All @@ -11,7 +11,7 @@ use crate::ProverState;
(status = 200, description = "Proverd server is healthy"),
)
)]
#[debug_handler(state = ProverState)]
// #[debug_handler(state = Gateway)]
/// Health check
///
/// Currently only responds with an OK status.
Expand All @@ -28,6 +28,6 @@ pub fn create_docs() -> utoipa::openapi::OpenApi {
Docs::openapi()
}

pub fn create_router() -> Router<ProverState> {
pub fn create_router<P: raiko_reqpool::Pool + 'static>() -> Router<Gateway<P>> {
Router::new().route("/", get(health_handler))
}
8 changes: 4 additions & 4 deletions host/src/server/api/v1/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use axum::{body::Body, debug_handler, http::header, response::Response, routing::get, Router};
use axum::{body::Body, http::header, response::Response, routing::get, Router};
use prometheus::{Encoder, TextEncoder};
use utoipa::OpenApi;

use crate::{interfaces::HostResult, ProverState};
use crate::interfaces::HostResult;
use raiko_reqactor::Gateway;

#[utoipa::path(
get,
Expand All @@ -12,7 +13,6 @@ use crate::{interfaces::HostResult, ProverState};
(status = 200, description = "The metrics have been captured successfully"),
),
)]
#[debug_handler(state = ProverState)]
/// Get prometheus metrics
///
/// Currently available metrics are:
Expand Down Expand Up @@ -52,6 +52,6 @@ pub fn create_docs() -> utoipa::openapi::OpenApi {
Docs::openapi()
}

pub fn create_router() -> Router<ProverState> {
pub fn create_router<P: raiko_reqpool::Pool + 'static>() -> Router<Gateway<P>> {
Router::new().route("/", get(metrics_handler))
}
7 changes: 5 additions & 2 deletions host/src/server/api/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use utoipa::{OpenApi, ToSchema};
use utoipa_scalar::{Scalar, Servable};
use utoipa_swagger_ui::SwaggerUi;

use crate::{interfaces::HostError, ProverState};
use crate::interfaces::HostError;
use raiko_reqactor::Gateway;

pub mod health;
pub mod metrics;
Expand Down Expand Up @@ -116,7 +117,9 @@ pub fn create_docs() -> utoipa::openapi::OpenApi {
})
}

pub fn create_router(concurrency_limit: usize) -> Router<ProverState> {
pub fn create_router<P: raiko_reqpool::Pool + 'static>(
concurrency_limit: usize,
) -> Router<Gateway<P>> {
let docs = create_docs();

Router::new()
Expand Down
Loading

0 comments on commit d2c12c0

Please sign in to comment.