Skip to content

Commit

Permalink
pull max height from rpc endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ppca committed Oct 11, 2024
1 parent f52b8ee commit d88e08a
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 74 deletions.
15 changes: 9 additions & 6 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,21 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
.build()?;
let gcp_service =
rt.block_on(async { GcpService::init(&account_id, &storage_options).await })?;

let mut rpc_client = near_fetch::Client::new(&near_rpc);
if let Some(referer_param) = client_header_referer {
let client_headers = rpc_client.inner_mut().headers_mut();
client_headers.insert(http::header::REFERER, referer_param.parse().unwrap());
}
tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");

let (indexer_handle, indexer) = indexer::run(
&indexer_options,
&mpc_contract_id,
&account_id,
&sign_queue,
&gcp_service,
rpc_client.clone(),
&rt,
)?;

Expand All @@ -212,13 +221,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
let (sender, receiver) = mpsc::channel(16384);

tracing::info!(%my_address, "address detected");
let mut rpc_client = near_fetch::Client::new(&near_rpc);
if let Some(referer_param) = client_header_referer {
let client_headers = rpc_client.inner_mut().headers_mut();
client_headers.insert(http::header::REFERER, referer_param.parse().unwrap());
}

tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
Expand Down
38 changes: 34 additions & 4 deletions chain-signatures/node/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::gcp::error::DatastoreStorageError;
use crate::gcp::GcpService;
use crate::protocol::{SignQueue, SignRequest};
use crate::rpc_client;
use crate::types::LatestBlockHeight;
use crypto_shared::{derive_epsilon, ScalarExt};
use k256::Scalar;
Expand Down Expand Up @@ -53,6 +54,14 @@ pub struct Options {
/// The threshold in seconds to check if the indexer needs to be restarted due to it stalling.
#[clap(long, env("MPC_INDEXER_RUNNING_THRESHOLD"), default_value = "300")]
pub running_threshold: u64,

/// The threshold in block height lag to check if the indexer has caught up.
#[clap(
long,
env("MPC_INDEXER_BLOCK_HEIGHT_LAG_THRESHOLD"),
default_value = "50"
)]
pub block_height_lag_threshold: u64,
}

impl Options {
Expand All @@ -68,6 +77,8 @@ impl Options {
self.behind_threshold.to_string(),
"--running-threshold".to_string(),
self.running_threshold.to_string(),
"--block-height-lag-threshold".to_string(),
self.block_height_lag_threshold.to_string(),
];

if let Some(s3_url) = self.s3_url {
Expand Down Expand Up @@ -105,10 +116,16 @@ pub struct Indexer {
last_updated_timestamp: Arc<RwLock<Instant>>,
running_threshold: Duration,
behind_threshold: Duration,
block_height_lag_threshold: u64,
rpc_client: near_fetch::Client,
}

impl Indexer {
fn new(latest_block_height: LatestBlockHeight, options: &Options) -> Self {
fn new(
latest_block_height: LatestBlockHeight,
options: &Options,
rpc_client: near_fetch::Client,
) -> Self {
tracing::info!(
"creating new indexer, latest block height: {}",
latest_block_height.block_height
Expand All @@ -118,6 +135,8 @@ impl Indexer {
last_updated_timestamp: Arc::new(RwLock::new(Instant::now())),
running_threshold: Duration::from_secs(options.running_threshold),
behind_threshold: Duration::from_secs(options.behind_threshold),
block_height_lag_threshold: options.block_height_lag_threshold,
rpc_client,
}
}

Expand All @@ -126,7 +145,7 @@ impl Indexer {
self.latest_block_height.read().await.block_height
}

/// Check whether the indexer is on track with the latest block height from the chain.
/// Check whether the indexer block height has been updated recently.
pub async fn is_on_track(&self) -> bool {
self.last_updated_timestamp.read().await.elapsed() <= self.behind_threshold
}
Expand All @@ -138,7 +157,17 @@ impl Indexer {

/// Check whether the indexer is behind with the latest block height from the chain.
pub async fn is_behind(&self) -> bool {
self.last_updated_timestamp.read().await.elapsed() > self.behind_threshold
let network_latest_height = rpc_client::fetch_latest_block_height(&self.rpc_client).await;
if let Ok(network_latest_height) = network_latest_height {
self.latest_block_height().await
< network_latest_height - self.block_height_lag_threshold
} else {
false
}
}

pub async fn is_stable(&self) -> bool {
!self.is_behind().await && self.is_on_track().await
}

async fn update_block_height(
Expand Down Expand Up @@ -287,6 +316,7 @@ pub fn run(
node_account_id: &AccountId,
queue: &Arc<RwLock<SignQueue>>,
gcp_service: &crate::gcp::GcpService,
rpc_client: near_fetch::Client,
rt: &tokio::runtime::Runtime,
) -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, Indexer)> {
tracing::info!(
Expand All @@ -311,7 +341,7 @@ pub fn run(
}
});

let indexer = Indexer::new(latest_block_height, options);
let indexer = Indexer::new(latest_block_height, options, rpc_client);
let context = Context {
mpc_contract_id: mpc_contract_id.clone(),
node_account_id: node_account_id.clone(),
Expand Down
52 changes: 2 additions & 50 deletions chain-signatures/node/src/mesh/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::time::{Duration, Instant};

use cait_sith::protocol::Participant;
use near_primitives::types::BlockHeight;
use tokio::sync::RwLock;
use url::Url;

Expand Down Expand Up @@ -162,61 +161,14 @@ impl Pool {
self.potential_connections.read().await.clone()
}

async fn max_block_height_among_participants(&self) -> BlockHeight {
self.status
.read()
.await
.values()
.filter_map(|state| {
if let StateView::Running {
latest_block_height,
..
} = state
{
Some(*latest_block_height)
} else {
None
}
})
.max()
.unwrap_or(0)
}

pub async fn is_participant_indexer_progressing(&self, participant: &Participant) -> bool {
pub async fn is_participant_stable(&self, participant: &Participant) -> bool {
self.status
.read()
.await
.get(participant)
.map_or(false, |state| match state {
StateView::Running {
is_indexer_progressing,
..
} => *is_indexer_progressing,
StateView::Running { is_stable, .. } => *is_stable,
_ => false,
})
}

pub async fn is_participant_indexer_caught_up(&self, participant: &Participant) -> bool {
let max_block_height = self.max_block_height_among_participants().await;

if max_block_height == 0 {
return false;
}

let my_block_height = self
.status
.read()
.await
.get(participant)
.and_then(|state| match state {
StateView::Running {
latest_block_height,
..
} => Some(*latest_block_height),
_ => None,
})
.unwrap_or(0);

(max_block_height - my_block_height) < 50
}
}
10 changes: 1 addition & 9 deletions chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,7 @@ impl Mesh {
pub async fn stable_participants(&self) -> Participants {
let mut stable = Participants::default();
for (participant, info) in self.active_participants().iter() {
if self
.connections
.is_participant_indexer_progressing(participant)
.await
&& self
.connections
.is_participant_indexer_caught_up(participant)
.await
{
if self.connections.is_participant_stable(participant).await {
stable.insert(participant, info.clone());
}
}
Expand Down
17 changes: 17 additions & 0 deletions chain-signatures/node/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::protocol::ProtocolState;
use near_account_id::AccountId;
use near_crypto::InMemorySigner;

use near_primitives::types::BlockHeight;
use serde_json::json;

pub async fn fetch_mpc_contract_state(
Expand Down Expand Up @@ -99,3 +100,19 @@ pub async fn vote_reshared(

Ok(result)
}

pub async fn fetch_latest_block_height(
rpc_client: &near_fetch::Client,
) -> anyhow::Result<BlockHeight> {
let latest_block_height: BlockHeight = rpc_client
.view_block()
.await
.map_err(|e| {
tracing::warn!(%e, "failed to fetch latest block");
e
})?
.header
.height;
tracing::debug!(latest_block_height, "latest block height");
Ok(latest_block_height)
}
10 changes: 5 additions & 5 deletions chain-signatures/node/src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ pub enum StateView {
presignature_mine_count: usize,
presignature_potential_count: usize,
latest_block_height: BlockHeight,
is_indexer_progressing: bool,
is_stable: bool,
},
Resharing {
old_participants: Vec<Participant>,
new_participants: Vec<Participant>,
latest_block_height: BlockHeight,
is_indexer_progressing: bool,
is_stable: bool,
},
Joining {
participants: Vec<Participant>,
Expand All @@ -131,7 +131,7 @@ pub enum StateView {
async fn state(Extension(state): Extension<Arc<AxumState>>) -> Result<Json<StateView>> {
tracing::debug!("fetching state");
let latest_block_height = state.indexer.latest_block_height().await;
let is_indexer_progressing = state.indexer.is_on_track().await;
let is_stable = state.indexer.is_stable().await;
let protocol_state = state.protocol_state.read().await;

match &*protocol_state {
Expand All @@ -155,7 +155,7 @@ async fn state(Extension(state): Extension<Arc<AxumState>>) -> Result<Json<State
presignature_mine_count,
presignature_potential_count,
latest_block_height,
is_indexer_progressing,
is_stable,
}))
}
NodeState::Resharing(state) => {
Expand All @@ -165,7 +165,7 @@ async fn state(Extension(state): Extension<Arc<AxumState>>) -> Result<Json<State
old_participants,
new_participants,
latest_block_height,
is_indexer_progressing,
is_stable,
}))
}
NodeState::Joining(state) => {
Expand Down

0 comments on commit d88e08a

Please sign in to comment.