Skip to content

Commit

Permalink
fix: restart ws connection to rpc when it was dropped and sync lost b…
Browse files Browse the repository at this point in the history
…locks (#227)
  • Loading branch information
frolvanya authored Feb 7, 2025
1 parent 3f293c6 commit 2a6a06e
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 80 deletions.
3 changes: 2 additions & 1 deletion omni-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::{Context, Result};
use clap::Parser;
use log::{error, info};
use omni_types::ChainKind;
use solana_sdk::signature::Signature;

mod config;
mod startup;
Expand All @@ -29,7 +30,7 @@ struct CliArgs {
arb_start_block: Option<u64>,
/// Start signature for Solana indexer
#[clap(long)]
solana_start_signature: Option<String>,
solana_start_signature: Option<Signature>,
}

#[tokio::main]
Expand Down
137 changes: 81 additions & 56 deletions omni-relayer/src/startup/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn start_indexer(
config: config::Config,
redis_client: redis::Client,
chain_kind: ChainKind,
start_block: Option<u64>,
mut start_block: Option<u64>,
) -> Result<()> {
let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

Expand All @@ -49,25 +49,97 @@ pub async fn start_indexer(
_ => anyhow::bail!("Unsupported chain kind: {chain_kind:?}"),
};

let http_provider = ProviderBuilder::new().on_http(rpc_http_url.parse().context(format!(
"Failed to parse {chain_kind:?} rpc provider as url",
))?);
let filter = Filter::new()
.address(bridge_token_factory_address)
.event_signature(
[
utils::evm::InitTransfer::SIGNATURE_HASH,
utils::evm::FinTransfer::SIGNATURE_HASH,
utils::evm::DeployToken::SIGNATURE_HASH,
]
.to_vec(),
);

loop {
let http_provider = ProviderBuilder::new().on_http(rpc_http_url.parse().context(
format!("Failed to parse {chain_kind:?} rpc provider as url",),
)?);

process_recent_blocks(
&mut redis_connection,
&http_provider,
&filter,
chain_kind,
start_block,
block_processing_batch_size,
expected_finalization_time,
)
.await?;

info!(
"All historical logs processed, starting {:?} WS subscription",
chain_kind
);

let ws_provider = crate::skip_fail!(
ProviderBuilder::new()
.on_ws(WsConnect::new(&rpc_ws_url))
.await,
format!("{chain_kind:?} WebSocket connection failed"),
5
);

let mut stream = crate::skip_fail!(
ws_provider.subscribe_logs(&filter).await,
format!("{chain_kind:?} WebSocket subscription failed"),
5
)
.into_stream();

info!("Subscribed to {:?} logs", chain_kind);

while let Some(log) = stream.next().await {
process_log(
chain_kind,
&mut redis_connection,
&http_provider,
log,
expected_finalization_time,
)
.await;
}

error!("{chain_kind:?} WebSocket stream closed unexpectedly, reconnecting...");
start_block = None;

tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}

async fn process_recent_blocks(
redis_connection: &mut redis::aio::MultiplexedConnection,
http_provider: &RootProvider<Http<Client>>,
filter: &Filter,
chain_kind: ChainKind,
start_block: Option<u64>,
block_processing_batch_size: u64,
expected_finalization_time: i64,
) -> Result<()> {
let last_processed_block_key = utils::redis::get_last_processed_key(chain_kind);
let latest_block = http_provider.get_block_number().await?;
let from_block = match start_block {
Some(block) => block,
None => {
if let Some(block) = utils::redis::get_last_processed::<&str, u64>(
&mut redis_connection,
redis_connection,
&last_processed_block_key,
)
.await
{
block + 1
} else {
utils::redis::update_last_processed(
&mut redis_connection,
redis_connection,
&last_processed_block_key,
latest_block + 1,
)
Expand All @@ -79,17 +151,6 @@ pub async fn start_indexer(

info!("{chain_kind:?} indexer will start from block: {from_block}");

let filter = Filter::new()
.address(bridge_token_factory_address)
.event_signature(
[
utils::evm::InitTransfer::SIGNATURE_HASH,
utils::evm::FinTransfer::SIGNATURE_HASH,
utils::evm::DeployToken::SIGNATURE_HASH,
]
.to_vec(),
);

for current_block in
(from_block..latest_block).step_by(usize::try_from(block_processing_batch_size)?)
{
Expand All @@ -105,52 +166,16 @@ pub async fn start_indexer(
for log in logs {
process_log(
chain_kind,
&mut redis_connection,
&http_provider,
redis_connection,
http_provider,
log,
expected_finalization_time,
)
.await;
}
}

info!(
"All historical logs processed, starting {:?} WS subscription",
chain_kind
);

loop {
let ws_provider = crate::skip_fail!(
ProviderBuilder::new()
.on_ws(WsConnect::new(&rpc_ws_url))
.await,
format!("{chain_kind:?} WebSocket connection failed"),
5
);

let mut stream = crate::skip_fail!(
ws_provider.subscribe_logs(&filter).await,
format!("{chain_kind:?} WebSocket subscription failed"),
5
)
.into_stream();

info!("Subscribed to {:?} logs", chain_kind);

while let Some(log) = stream.next().await {
process_log(
chain_kind,
&mut redis_connection,
&http_provider,
log,
expected_finalization_time,
)
.await;
}

error!("{chain_kind:?} WebSocket stream closed unexpectedly, reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
Ok(())
}

async fn process_log(
Expand Down
50 changes: 27 additions & 23 deletions omni-relayer/src/startup/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn get_keypair(file: Option<&String>) -> Keypair {
pub async fn start_indexer(
config: config::Config,
redis_client: redis::Client,
start_signature: Option<String>,
mut start_signature: Option<Signature>,
) -> Result<()> {
let Some(solana_config) = config.solana else {
anyhow::bail!("Failed to get Solana config");
Expand All @@ -43,27 +43,26 @@ pub async fn start_indexer(
let rpc_ws_url = &solana_config.rpc_ws_url;
let program_id = Pubkey::from_str(&solana_config.program_id)?;

let http_client = RpcClient::new(rpc_http_url.to_string());

if let Err(e) = process_recent_signatures(
&mut redis_connection,
&http_client,
&program_id,
start_signature,
)
.await
{
warn!("Failed to fetch recent logs: {}", e);
}
loop {
crate::skip_fail!(
process_recent_signatures(
&mut redis_connection,
rpc_http_url.clone(),
&program_id,
start_signature,
)
.await,
"Failed to process recent signatures",
5
);

info!("All historical logs processed, starting Solana WS subscription");
info!("All historical logs processed, starting Solana WS subscription");

let filter = RpcTransactionLogsFilter::Mentions(vec![program_id.to_string()]);
let config = RpcTransactionLogsConfig {
commitment: Some(CommitmentConfig::processed()),
};
let filter = RpcTransactionLogsFilter::Mentions(vec![program_id.to_string()]);
let config = RpcTransactionLogsConfig {
commitment: Some(CommitmentConfig::processed()),
};

loop {
let ws_client = crate::skip_fail!(
PubsubClient::new(rpc_ws_url).await,
"Solana WebSocket connection failed",
Expand Down Expand Up @@ -96,26 +95,31 @@ pub async fn start_indexer(
}

error!("Solana WebSocket stream closed, reconnecting...");
start_signature = None;

tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}

async fn process_recent_signatures(
redis_connection: &mut redis::aio::MultiplexedConnection,
http_client: &RpcClient,
rpc_http_url: String,
program_id: &Pubkey,
start_signature: Option<String>,
start_signature: Option<Signature>,
) -> Result<()> {
let http_client = RpcClient::new(rpc_http_url);

let from_signature = if let Some(signature) = start_signature {
utils::redis::add_event(
redis_connection,
utils::redis::SOLANA_EVENTS,
signature.clone(),
signature.to_string(),
// TODO: It's better to come up with a solution that wouldn't require storing `Null` value
serde_json::Value::Null,
)
.await;

Signature::from_str(&signature)?
signature
} else {
let Some(signature) = utils::redis::get_last_processed::<&str, String>(
redis_connection,
Expand Down

0 comments on commit 2a6a06e

Please sign in to comment.