Skip to content

Commit

Permalink
feat: add dummy health check tasks for state keeper and eth sender
Browse files Browse the repository at this point in the history
  • Loading branch information
manuelmauro committed Oct 31, 2024
1 parent 3d7e82c commit de3a5e1
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 5 deletions.
3 changes: 1 addition & 2 deletions core/node/house_keeper/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use zksync_health_check::{Health, HealthStatus, HealthUpdater};

use crate::periodic_job::PeriodicJob;

/// This struct implements a static health check describing node's version information.
#[derive(Debug, Serialize, Deserialize)]
pub struct DatabaseInfo {
last_migration: DatabaseMigration,
Expand All @@ -29,7 +28,7 @@ impl DatabaseHealthTask {

#[async_trait]
impl PeriodicJob for DatabaseHealthTask {
const SERVICE_NAME: &'static str = "L1BatchMetricsReporter";
const SERVICE_NAME: &'static str = "DatabaseHealth";

async fn run_routine_task(&mut self) -> anyhow::Result<()> {
let mut conn = self.connection_pool.connection().await.unwrap();
Expand Down
65 changes: 65 additions & 0 deletions core/node/house_keeper/src/eth_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::{Health, HealthStatus, HealthUpdater};

use crate::periodic_job::PeriodicJob;

#[derive(Debug, Serialize, Deserialize)]
pub struct EthSenderInfo {
failed_l1_txns: Option<()>,
last_created_commit_batch: Option<()>,
last_created_prove_batch: Option<()>,
last_created_execute_batch: Option<()>,
last_executed_commit_batch: Option<()>,
last_executed_prove_batch: Option<()>,
last_executed_execute_batch: Option<()>,
current_nonce: Option<()>,
latest_operator_nonce: Option<()>,
}

impl From<EthSenderInfo> for Health {
fn from(details: EthSenderInfo) -> Self {
Self::from(HealthStatus::Ready).with_details(details)
}
}

#[derive(Debug)]
pub struct EthSenderHealthTask {
pub connection_pool: ConnectionPool<Core>,
pub eth_sender_health_updater: HealthUpdater,
}

impl EthSenderHealthTask {
pub const POLLING_INTERVAL_MS: u64 = 10_000;
}

#[async_trait]
impl PeriodicJob for EthSenderHealthTask {
const SERVICE_NAME: &'static str = "EthSenderHealth";

async fn run_routine_task(&mut self) -> anyhow::Result<()> {
let mut conn = self.connection_pool.connection().await.unwrap();
let _last_migration = conn.system_dal().get_last_migration().await.unwrap();

self.eth_sender_health_updater.update(
EthSenderInfo {
failed_l1_txns: None,
last_created_commit_batch: None,
last_created_prove_batch: None,
last_created_execute_batch: None,
last_executed_commit_batch: None,
last_executed_prove_batch: None,
last_executed_execute_batch: None,
current_nonce: None,
latest_operator_nonce: None,
}
.into(),
);
Ok(())
}

fn polling_interval_ms(&self) -> u64 {
Self::POLLING_INTERVAL_MS
}
}
2 changes: 2 additions & 0 deletions core/node/house_keeper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod blocks_state_reporter;
pub mod database;
pub mod eth_sender;
mod metrics;
pub mod periodic_job;
pub mod state_keeper;
pub mod version;
53 changes: 53 additions & 0 deletions core/node/house_keeper/src/state_keeper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::{Health, HealthStatus, HealthUpdater};

use crate::periodic_job::PeriodicJob;

#[derive(Debug, Serialize, Deserialize)]
pub struct StateKeeperInfo {
last_miniblock_protocol_upgrade: Option<()>,
last_miniblock: Option<()>,
batch_number: Option<()>,
}

impl From<StateKeeperInfo> for Health {
fn from(details: StateKeeperInfo) -> Self {
Self::from(HealthStatus::Ready).with_details(details)
}
}

#[derive(Debug)]
pub struct StateKeeperHealthTask {
pub connection_pool: ConnectionPool<Core>,
pub state_keeper_health_updater: HealthUpdater,
}

impl StateKeeperHealthTask {
pub const POLLING_INTERVAL_MS: u64 = 10_000;
}

#[async_trait]
impl PeriodicJob for StateKeeperHealthTask {
const SERVICE_NAME: &'static str = "StateKeeperHealth";

async fn run_routine_task(&mut self) -> anyhow::Result<()> {
let mut conn = self.connection_pool.connection().await.unwrap();
let _last_migration = conn.system_dal().get_last_migration().await.unwrap();

self.state_keeper_health_updater.update(
StateKeeperInfo {
last_miniblock_protocol_upgrade: None,
last_miniblock: None,
batch_number: None,
}
.into(),
);
Ok(())
}

fn polling_interval_ms(&self) -> u64 {
Self::POLLING_INTERVAL_MS
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use zksync_config::configs::house_keeper::HouseKeeperConfig;
use zksync_health_check::ReactiveHealthCheck;
use zksync_house_keeper::{
blocks_state_reporter::L1BatchMetricsReporter, database::DatabaseHealthTask,
periodic_job::PeriodicJob, version::NodeVersionInfo,
eth_sender::EthSenderHealthTask, periodic_job::PeriodicJob,
state_keeper::StateKeeperHealthTask, version::NodeVersionInfo,
};

use crate::{
Expand Down Expand Up @@ -40,6 +41,10 @@ pub struct Output {
pub l1_batch_metrics_reporter: L1BatchMetricsReporter,
#[context(task)]
pub database_health_task: DatabaseHealthTask,
#[context(task)]
pub eth_sender_health_task: EthSenderHealthTask,
#[context(task)]
pub state_keeper_health_task: StateKeeperHealthTask,
}

impl HouseKeeperLayer {
Expand Down Expand Up @@ -75,8 +80,7 @@ impl WiringLayer for HouseKeeperLayer {
.insert_custom_component(Arc::new(NodeVersionInfo::default()))
.map_err(WiringError::internal)?;

let (database_health_check, database_health_updater) =
ReactiveHealthCheck::new("database_health");
let (database_health_check, database_health_updater) = ReactiveHealthCheck::new("database");

app_health
.insert_component(database_health_check)
Expand All @@ -87,9 +91,35 @@ impl WiringLayer for HouseKeeperLayer {
database_health_updater,
};

let (eth_sender_health_check, eth_sender_health_updater) =
ReactiveHealthCheck::new("eth_sender");

app_health
.insert_component(eth_sender_health_check)
.map_err(WiringError::internal)?;

let eth_sender_health_task = EthSenderHealthTask {
connection_pool: replica_pool.clone(),
eth_sender_health_updater,
};

let (state_keeper_health_check, state_keeper_health_updater) =
ReactiveHealthCheck::new("state_keeper");

app_health
.insert_component(state_keeper_health_check)
.map_err(WiringError::internal)?;

let state_keeper_health_task = StateKeeperHealthTask {
connection_pool: replica_pool.clone(),
state_keeper_health_updater,
};

Ok(Output {
l1_batch_metrics_reporter,
database_health_task,
eth_sender_health_task,
state_keeper_health_task,
})
}
}
Expand Down Expand Up @@ -119,3 +149,33 @@ impl Task for DatabaseHealthTask {
(*self).run(stop_receiver.0).await
}
}

#[async_trait::async_trait]
impl Task for EthSenderHealthTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"eth_sender_health".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
(*self).run(stop_receiver.0).await
}
}

#[async_trait::async_trait]
impl Task for StateKeeperHealthTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"state_keeper_health".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
(*self).run(stop_receiver.0).await
}
}

0 comments on commit de3a5e1

Please sign in to comment.