diff --git a/src/bin/avail-light.rs b/src/bin/avail-light.rs index 5a53cf8b8..ed11689b2 100644 --- a/src/bin/avail-light.rs +++ b/src/bin/avail-light.rs @@ -7,7 +7,7 @@ use avail_light::{api, data, network::rpc, telemetry}; use avail_light::{ consts::EXPECTED_NETWORK_VERSION, network::p2p, - types::{CliOpts, Mode, RuntimeConfig, State}, + types::{CliOpts, RuntimeConfig, State}, }; use clap::Parser; use color_eyre::{ @@ -204,7 +204,6 @@ async fn run(error_sender: Sender) -> Result<()> { rpc::init(db.clone(), state.clone(), &cfg.full_node_ws); let publish_rpc_event_receiver = rpc_events.subscribe(); - let lc_rpc_event_receiver = rpc_events.subscribe(); let first_header_rpc_event_receiver = rpc_events.subscribe(); #[cfg(feature = "crawl")] let crawler_rpc_event_receiver = rpc_events.subscribe(); @@ -244,28 +243,25 @@ async fn run(error_sender: Sender) -> Result<()> { tokio::task::spawn(server.run()); - let (block_tx, data_rx) = if let Mode::AppClient(app_id) = Mode::from(cfg.app_id) { - // communication channels being established for talking to - // libp2p backed application client - let (block_tx, block_rx) = broadcast::channel::(1 << 7); + let (block_tx, block_rx) = broadcast::channel::(1 << 7); + + let data_rx = cfg.app_id.map(AppId).map(|app_id| { let (data_tx, data_rx) = broadcast::channel::<(u32, AppData)>(1 << 7); tokio::task::spawn(avail_light::app_client::run( (&cfg).into(), db.clone(), p2p_client.clone(), rpc_client.clone(), - AppId(app_id), - block_rx, + app_id, + block_tx.subscribe(), pp.clone(), state.clone(), sync_range.clone(), data_tx, error_sender.clone(), )); - (Some(block_tx), Some(data_rx)) - } else { - (None, None) - }; + data_rx + }); tokio::task::spawn(api::v2::publish( api::v2::types::Topic::HeaderVerified, @@ -273,13 +269,11 @@ async fn run(error_sender: Sender) -> Result<()> { ws_clients.clone(), )); - if let Some(sender) = block_tx.as_ref() { - tokio::task::spawn(api::v2::publish( - api::v2::types::Topic::ConfidenceAchieved, - sender.subscribe(), - ws_clients.clone(), - )); - } + tokio::task::spawn(api::v2::publish( + api::v2::types::Topic::ConfidenceAchieved, + block_tx.subscribe(), + ws_clients.clone(), + )); if let Some(data_rx) = data_rx { tokio::task::spawn(api::v2::publish( @@ -337,25 +331,44 @@ async fn run(error_sender: Sender) -> Result<()> { s.finality_synced = true; } - let light_client = - avail_light::light_client::new(db.clone(), p2p_client.clone(), rpc_client.clone()); - - let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc); + tokio::task::spawn(avail_light::maintenance::run( + p2p_client.clone(), + ot_metrics.clone(), + block_rx, + error_sender.clone(), + )); - let lc_channels = avail_light::light_client::Channels { + let channels = avail_light::types::ClientChannels { block_sender: block_tx, - rpc_event_receiver: lc_rpc_event_receiver, + rpc_event_receiver: rpc_events.subscribe(), error_sender: error_sender.clone(), }; - tokio::task::spawn(avail_light::light_client::run( - light_client, - light_network_client, - (&cfg).into(), - ot_metrics, - state.clone(), - lc_channels, - )); + if let Some(partition) = cfg.block_matrix_partition { + let fat_client = + avail_light::fat_client::new(db.clone(), p2p_client.clone(), rpc_client.clone()); + + tokio::task::spawn(avail_light::fat_client::run( + fat_client, + (&cfg).into(), + ot_metrics.clone(), + channels, + partition, + )); + } else { + let light_client = avail_light::light_client::new(db.clone()); + + let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc); + + tokio::task::spawn(avail_light::light_client::run( + light_client, + light_network_client, + (&cfg).into(), + ot_metrics, + state.clone(), + channels, + )); + } Ok(()) } diff --git a/src/fat_client.rs b/src/fat_client.rs new file mode 100644 index 000000000..d24ee68d7 --- /dev/null +++ b/src/fat_client.rs @@ -0,0 +1,396 @@ +//! Fat client for fetching the data partition and inserting into the DHT. +//! +//! # Flow +//! +//! * Fetches assigned block partition when finalized header is available and +//! * inserts data rows and cells to to DHT for remote fetch. +//! +//! # Notes +//! +//! In case delay is configured, block processing is delayed for configured time. + +use async_trait::async_trait; +use avail_subxt::{primitives::Header, utils::H256}; +use codec::Encode; +use color_eyre::{eyre::WrapErr, Result}; +use futures::future::join_all; +use kate_recovery::{ + data, + matrix::{Dimensions, Partition, Position}, +}; +use kate_recovery::{data::Cell, matrix::RowIndex}; +use mockall::automock; +use rocksdb::DB; +use sp_core::blake2_256; +use std::{sync::Arc, time::Instant}; +use tracing::{debug, error, info, warn}; + +use crate::{ + data::store_block_header_in_db, + network::{ + p2p::Client as P2pClient, + rpc::{Client as RpcClient, Event}, + }, + telemetry::{MetricCounter, MetricValue, Metrics}, + types::{BlockVerified, ClientChannels, FatClientConfig}, + utils::extract_kate, +}; + +#[async_trait] +#[automock] +pub trait FatClient { + async fn insert_cells_into_dht(&self, block: u32, cells: Vec) -> Result<()>; + async fn insert_rows_into_dht(&self, block: u32, rows: Vec<(RowIndex, Vec)>) -> Result<()>; + async fn get_kate_proof(&self, hash: H256, positions: &[Position]) -> Result>; + fn store_block_header_in_db(&self, header: &Header, block_number: u32) -> Result<()>; +} + +#[derive(Clone)] +struct FatClientImpl { + db: Arc, + p2p_client: P2pClient, + rpc_client: RpcClient, +} + +pub fn new(db: Arc, p2p_client: P2pClient, rpc_client: RpcClient) -> impl FatClient { + FatClientImpl { + db, + p2p_client, + rpc_client, + } +} + +#[async_trait] +impl FatClient for FatClientImpl { + async fn insert_cells_into_dht(&self, block: u32, cells: Vec) -> Result<()> { + self.p2p_client.insert_cells_into_dht(block, cells).await + } + async fn insert_rows_into_dht(&self, block: u32, rows: Vec<(RowIndex, Vec)>) -> Result<()> { + self.p2p_client.insert_rows_into_dht(block, rows).await + } + async fn get_kate_proof(&self, hash: H256, positions: &[Position]) -> Result> { + self.rpc_client.request_kate_proof(hash, positions).await + } + fn store_block_header_in_db(&self, header: &Header, block_number: u32) -> Result<()> { + store_block_header_in_db(self.db.clone(), block_number, header) + .wrap_err("Failed to store block header in DB") + } +} + +pub async fn process_block( + fat_client: &impl FatClient, + metrics: &Arc, + cfg: &FatClientConfig, + header: &Header, + received_at: Instant, + partition: Partition, +) -> Result<()> { + metrics.count(MetricCounter::SessionBlock).await; + metrics + .record(MetricValue::TotalBlockNumber(header.number)) + .await?; + + let block_number = header.number; + let header_hash: H256 = Encode::using_encoded(header, blake2_256).into(); + let block_delay = received_at.elapsed().as_secs(); + info!(block_number, block_delay, "Processing finalized block",); + + let (rows, cols, _, _) = extract_kate(&header.extension); + let Some(dimensions) = Dimensions::new(rows, cols) else { + info!( + block_number, + "Skipping block with invalid dimensions {rows}x{cols}", + ); + return Ok(()); + }; + + if dimensions.cols().get() <= 2 { + error!(block_number, "More than 2 columns are required"); + return Ok(()); + } + + // push latest mined block's header into column family specified + // for keeping block headers, to be used + // later for verifying DHT stored data + // + // @note this same data store is also written to in + // another competing thread, which syncs all block headers + // in range [0, LATEST], where LATEST = latest block number + // when this process started + fat_client + .store_block_header_in_db(header, block_number) + .wrap_err("Failed to store block header in DB")?; + + // Fat client partition upload logic + let positions: Vec = dimensions + .iter_extended_partition_positions(&partition) + .collect(); + let Partition { number, fraction } = partition; + info!( + block_number, + "partition_cells_requested" = positions.len(), + "Fetching partition ({number}/{fraction}) from RPC", + ); + + let begin = Instant::now(); + let mut rpc_fetched: Vec = vec![]; + + let get_kate_proof = |&n| fat_client.get_kate_proof(header_hash, n); + + let rpc_batches = positions.chunks(cfg.max_cells_per_rpc).collect::>(); + let parallel_batches = rpc_batches + .chunks(cfg.query_proof_rpc_parallel_tasks) + .map(|batch| join_all(batch.iter().map(get_kate_proof))); + + for batch in parallel_batches { + for (i, result) in batch.await.into_iter().enumerate() { + let batch_rpc_fetched = + result.wrap_err(format!("Failed to fetch cells from node RPC at batch {i}"))?; + + rpc_fetched.extend(batch_rpc_fetched.clone()); + } + } + + let partition_rpc_retrieve_time_elapsed = begin.elapsed(); + let partition_rpc_cells_fetched = rpc_fetched.len(); + info!( + block_number, + ?partition_rpc_retrieve_time_elapsed, + partition_rpc_cells_fetched, + "Partition cells received from RPC", + ); + metrics + .record(MetricValue::RPCCallDuration( + partition_rpc_retrieve_time_elapsed.as_secs_f64(), + )) + .await?; + + if rpc_fetched.len() >= dimensions.cols().get().into() { + let data_cells = rpc_fetched + .iter() + .filter(|cell| !cell.position.is_extended()) + .collect::>(); + + let data_rows = data::rows(dimensions, &data_cells); + + if let Err(e) = fat_client + .insert_rows_into_dht(block_number, data_rows) + .await + { + debug!("Error inserting rows into DHT: {e}"); + } + } else { + warn!("No rows has been inserted into DHT since partition size is less than one row.") + } + + if let Err(e) = fat_client + .insert_cells_into_dht(block_number, rpc_fetched) + .await + { + debug!("Error inserting cells into DHT: {e}"); + } + + Ok(()) +} + +/// Runs the fat client. +/// +/// # Arguments +/// +/// * `fat_client` - Fat client implementation +/// * `cfg` - Fat client configuration +/// * `metrics` - Metrics registry +/// * `channels` - Communitaction channels +/// * `partition` - Assigned fat client partition +pub async fn run( + fat_client: impl FatClient, + cfg: FatClientConfig, + metrics: Arc, + mut channels: ClientChannels, + partition: Partition, +) { + info!("Starting fat client..."); + + loop { + let (header, received_at) = match channels.rpc_event_receiver.recv().await { + Ok(event) => match event { + Event::HeaderUpdate { + header, + received_at, + } => (header, received_at), + }, + Err(error) => { + error!("Cannot receive message: {error}"); + return; + }, + }; + + if let Some(seconds) = cfg.block_processing_delay.sleep_duration(received_at) { + if let Err(error) = metrics + .record(MetricValue::BlockProcessingDelay(seconds.as_secs_f64())) + .await + { + error!("Cannot record block processing delay: {}", error); + } + info!("Sleeping for {seconds:?} seconds"); + tokio::time::sleep(seconds).await; + } + + if let Err(error) = + process_block(&fat_client, &metrics, &cfg, &header, received_at, partition).await + { + error!("Cannot process block: {error}"); + if let Err(error) = channels.error_sender.send(error).await { + error!("Cannot send error message: {error}"); + } + return; + }; + + let Ok(client_msg) = BlockVerified::try_from((header, None)) else { + error!("Cannot create message from header"); + continue; + }; + + // notify dht-based application client + // that newly mined block has been received + if let Err(error) = channels.block_sender.send(client_msg) { + error!("Cannot send block verified message: {error}"); + continue; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{telemetry, types::RuntimeConfig}; + use avail_subxt::{ + api::runtime_types::avail_core::{ + data_lookup::compact::CompactDataLookup, + header::extension::{v1::HeaderExtension, HeaderExtension::V1}, + kate_commitment::v1::KateCommitment, + }, + config::substrate::Digest, + }; + use hex_literal::hex; + + fn default_header() -> Header { + Header { + parent_hash: hex!("c454470d840bc2583fcf881be4fd8a0f6daeac3a20d83b9fd4865737e56c9739") + .into(), + number: 57, + state_root: hex!("7dae455e5305263f29310c60c0cc356f6f52263f9f434502121e8a40d5079c32") + .into(), + extrinsics_root: hex!( + "bf1c73d4d09fa6a437a411a935ad3ec56a67a35e7b21d7676a5459b55b397ad4" + ) + .into(), + digest: Digest { logs: vec![] }, + extension: V1(HeaderExtension { + commitment: KateCommitment { + rows: 1, + cols: 4, + data_root: hex!( + "0000000000000000000000000000000000000000000000000000000000000000" + ) + .into(), + commitment: [ + 128, 34, 252, 194, 232, 229, 27, 124, 216, 33, 253, 23, 251, 126, 112, 244, + 7, 231, 73, 242, 0, 20, 5, 116, 175, 104, 27, 50, 45, 111, 127, 123, 202, + 255, 63, 192, 243, 236, 62, 75, 104, 86, 36, 198, 134, 27, 182, 224, 128, + 34, 252, 194, 232, 229, 27, 124, 216, 33, 253, 23, 251, 126, 112, 244, 7, + 231, 73, 242, 0, 20, 5, 116, 175, 104, 27, 50, 45, 111, 127, 123, 202, 255, + 63, 192, 243, 236, 62, 75, 104, 86, 36, 198, 134, 27, 182, 224, + ] + .to_vec(), + }, + app_lookup: CompactDataLookup { + size: 1, + index: vec![], + }, + }), + } + } + + const DEFAULT_CELLS: [Cell; 4] = [ + Cell { + position: Position { row: 0, col: 2 }, + content: [ + 183, 215, 10, 175, 218, 48, 236, 18, 30, 163, 215, 125, 205, 130, 176, 227, 133, + 157, 194, 35, 153, 144, 141, 7, 208, 133, 170, 79, 27, 176, 202, 22, 111, 63, 107, + 147, 93, 44, 82, 137, 78, 32, 161, 175, 214, 152, 125, 50, 247, 52, 138, 161, 52, + 83, 193, 255, 17, 235, 98, 10, 88, 241, 25, 186, 3, 174, 139, 200, 128, 117, 255, + 213, 200, 4, 46, 244, 219, 5, 131, 0, + ], + }, + Cell { + position: Position { row: 1, col: 1 }, + content: [ + 172, 213, 85, 167, 89, 247, 11, 125, 149, 170, 217, 222, 86, 157, 11, 20, 154, 21, + 173, 247, 193, 99, 189, 7, 225, 80, 156, 94, 83, 213, 217, 185, 113, 187, 112, 20, + 170, 120, 50, 171, 52, 178, 209, 244, 158, 24, 129, 236, 83, 4, 110, 41, 9, 29, 26, + 180, 156, 219, 69, 155, 148, 49, 78, 25, 165, 147, 150, 253, 251, 174, 49, 215, + 191, 142, 169, 70, 17, 86, 218, 0, + ], + }, + Cell { + position: Position { row: 0, col: 3 }, + content: [ + 132, 180, 92, 81, 128, 83, 245, 59, 206, 224, 200, 137, 236, 113, 109, 216, 161, + 248, 236, 252, 252, 22, 140, 107, 203, 161, 33, 18, 100, 189, 157, 58, 7, 183, 146, + 75, 57, 220, 84, 106, 203, 33, 142, 10, 130, 99, 90, 38, 85, 166, 211, 97, 111, + 105, 21, 241, 123, 211, 193, 6, 254, 125, 169, 108, 252, 85, 49, 31, 54, 53, 79, + 196, 5, 122, 206, 127, 226, 224, 70, 0, + ], + }, + Cell { + position: Position { row: 1, col: 3 }, + content: [ + 132, 180, 92, 81, 128, 83, 245, 59, 206, 224, 200, 137, 236, 113, 109, 216, 161, + 248, 236, 252, 252, 22, 140, 107, 203, 161, 33, 18, 100, 189, 157, 58, 7, 183, 146, + 75, 57, 220, 84, 106, 203, 33, 142, 10, 130, 99, 90, 38, 85, 166, 211, 97, 111, + 105, 21, 241, 123, 211, 193, 6, 254, 125, 169, 108, 252, 85, 49, 31, 54, 53, 79, + 196, 5, 122, 206, 127, 226, 224, 70, 0, + ], + }, + ]; + + fn entire_block() -> Partition { + Partition { + number: 1, + fraction: 1, + } + } + + #[tokio::test] + async fn process_block_successful() { + let mut mock_client = MockFatClient::new(); + mock_client + .expect_get_kate_proof() + .returning(move |_, _| Box::pin(async move { Ok(DEFAULT_CELLS.to_vec()) })); + mock_client + .expect_store_block_header_in_db() + .returning(|_, _| Ok(())); + mock_client + .expect_insert_rows_into_dht() + .returning(|_, _| Box::pin(async move { Ok(()) })); + mock_client + .expect_insert_cells_into_dht() + .returning(|_, _| Box::pin(async move { Ok(()) })); + + let mut mock_metrics = telemetry::MockMetrics::new(); + mock_metrics.expect_count().returning(|_| ()); + mock_metrics.expect_record().returning(|_| Ok(())); + + process_block( + &mock_client, + &Arc::new(mock_metrics), + &FatClientConfig::from(&RuntimeConfig::default()), + &default_header(), + Instant::now(), + entire_block(), + ) + .await + .unwrap(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 0d077eb8b..e57dbdaa5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,9 @@ pub mod consts; #[cfg(feature = "crawl")] pub mod crawl_client; pub mod data; +pub mod fat_client; pub mod light_client; +pub mod maintenance; pub mod network; pub mod proof; pub mod sync_client; diff --git a/src/light_client.rs b/src/light_client.rs index 6210ec979..e035eb5fc 100644 --- a/src/light_client.rs +++ b/src/light_client.rs @@ -16,18 +16,12 @@ //! //! In case delay is configured, block processing is delayed for configured time. //! In case RPC is disabled, RPC calls will be skipped. -//! In case partition is configured, block partition is fetched and inserted into DHT. use async_trait::async_trait; use avail_subxt::{primitives::Header, utils::H256}; use codec::Encode; -use color_eyre::{eyre::WrapErr, Report, Result}; -use futures::future::join_all; -use kate_recovery::{ - commitments, data, - matrix::{Dimensions, Position}, -}; -use kate_recovery::{data::Cell, matrix::RowIndex}; +use color_eyre::{eyre::WrapErr, Result}; +use kate_recovery::{commitments, matrix::Dimensions}; use mockall::automock; use rocksdb::DB; use sp_core::blake2_256; @@ -35,30 +29,22 @@ use std::{ sync::{Arc, Mutex}, time::Instant, }; -use tokio::sync::{broadcast, mpsc::Sender}; -use tracing::{debug, error, info}; +use tracing::{error, info}; use crate::{ data::{store_block_header_in_db, store_confidence_in_db}, network::{ self, - p2p::Client as P2pClient, - rpc::{self, Client as RpcClient, Event}, + rpc::{self, Event}, }, telemetry::{MetricCounter, MetricValue, Metrics}, - types::{self, BlockVerified, LightClientConfig, OptionBlockRange, State}, + types::{self, ClientChannels, LightClientConfig, OptionBlockRange, State}, utils::{calculate_confidence, extract_kate}, }; #[async_trait] #[automock] pub trait LightClient { - async fn insert_cells_into_dht(&self, block: u32, cells: Vec) -> Result<()>; - async fn insert_rows_into_dht(&self, block: u32, rows: Vec<(RowIndex, Vec)>) -> Result<()>; - async fn get_kate_proof(&self, hash: H256, positions: &[Position]) -> Result>; - async fn shrink_kademlia_map(&self) -> Result<()>; - async fn get_multiaddress_and_ip(&self) -> Result<(String, String)>; - async fn count_dht_entries(&self) -> Result; fn store_block_header_in_db(&self, header: &Header, block_number: u32) -> Result<()>; fn store_confidence_in_db(&self, count: u32, block_number: u32) -> Result<()>; } @@ -66,38 +52,14 @@ pub trait LightClient { #[derive(Clone)] struct LightClientImpl { db: Arc, - p2p_client: P2pClient, - rpc_client: RpcClient, } -pub fn new(db: Arc, p2p_client: P2pClient, rpc_client: RpcClient) -> impl LightClient { - LightClientImpl { - db, - p2p_client, - rpc_client, - } +pub fn new(db: Arc) -> impl LightClient { + LightClientImpl { db } } #[async_trait] impl LightClient for LightClientImpl { - async fn insert_cells_into_dht(&self, block: u32, cells: Vec) -> Result<()> { - self.p2p_client.insert_cells_into_dht(block, cells).await - } - async fn shrink_kademlia_map(&self) -> Result<()> { - self.p2p_client.shrink_kademlia_map().await - } - async fn insert_rows_into_dht(&self, block: u32, rows: Vec<(RowIndex, Vec)>) -> Result<()> { - self.p2p_client.insert_rows_into_dht(block, rows).await - } - async fn get_kate_proof(&self, hash: H256, positions: &[Position]) -> Result> { - self.rpc_client.request_kate_proof(hash, positions).await - } - async fn get_multiaddress_and_ip(&self) -> Result<(String, String)> { - self.p2p_client.get_multiaddress_and_ip().await - } - async fn count_dht_entries(&self) -> Result { - self.p2p_client.count_dht_entries().await - } fn store_confidence_in_db(&self, count: u32, block_number: u32) -> Result<()> { store_confidence_in_db(self.db.clone(), block_number, count) .wrap_err("Failed to store confidence in DB") @@ -227,132 +189,25 @@ pub async fn process_block( .store_block_header_in_db(header, block_number) .wrap_err("Failed to store block header in DB")?; - let mut rpc_fetched: Vec = vec![]; - - // Fat client partition upload logic - if let Some(partition) = &cfg.block_matrix_partition { - let positions: Vec = dimensions - .iter_extended_partition_positions(partition) - .collect(); - info!( - block_number, - "partition_cells_requested" = positions.len(), - "Fetching partition ({}/{}) from RPC", - partition.number, - partition.fraction - ); - - let begin = Instant::now(); - let rpc_cells = positions.chunks(cfg.max_cells_per_rpc).collect::>(); - for batch in rpc_cells - // TODO: Filter already fetched cells since they are verified and in DHT - .chunks(cfg.query_proof_rpc_parallel_tasks) - .map(|e| { - join_all( - e.iter() - .map(|n| light_client.get_kate_proof(header_hash, n)) - .collect::>(), - ) - }) { - for partition_fetched in batch - .await - .into_iter() - .enumerate() - .map(|(i, e)| { - e.wrap_err(format!("Failed to fetch cells from node RPC at batch {i}")) - }) - .collect::>() - { - let partition_fetched_filtered = partition_fetched? - .into_iter() - .filter(|cell| { - !rpc_fetched - .iter() - .any(move |rpc_cell| rpc_cell.position.eq(&cell.position)) - }) - .collect::>(); - rpc_fetched.extend(partition_fetched_filtered.clone()); - } - } - let rpc_call_duration = begin.elapsed(); - let rpc_fetched_len = rpc_fetched.len(); - info!( - block_number, - "partition_rpc_retrieve_time_elapsed" = ?rpc_call_duration, - "partition_rpc_cells_fetched" = rpc_fetched_len, - "Partition cells received from RPC", - ); - metrics - .record(MetricValue::RPCCallDuration( - rpc_call_duration.as_secs_f64(), - )) - .await?; - - let rpc_fetched_data_cells = rpc_fetched - .iter() - .filter(|cell| !cell.position.is_extended()) - .collect::>(); - let rpc_fetched_data_rows = data::rows(dimensions, &rpc_fetched_data_cells); - - if let Err(e) = light_client - .insert_rows_into_dht(block_number, rpc_fetched_data_rows) - .await - { - debug!("Error inserting rows into DHT: {e}"); - } - } - - if let Err(e) = light_client - .insert_cells_into_dht(block_number, rpc_fetched) - .await - { - debug!("Error inserting cells into DHT: {e}"); - } - - light_client - .shrink_kademlia_map() - .await - .wrap_err("Unable to perform Kademlia map shrink")?; - - // dump what we have on the current p2p network - if let Ok((multiaddr, ip)) = light_client.get_multiaddress_and_ip().await { - // set Multiaddress - metrics.set_multiaddress(multiaddr).await; - metrics.set_ip(ip).await; - } - if let Ok(counted_peers) = light_client.count_dht_entries().await { - metrics - .record(MetricValue::KadRoutingPeerNum(counted_peers)) - .await? - } - - metrics.record(MetricValue::HealthCheck()).await?; - Ok(Some(confidence)) } -pub struct Channels { - pub block_sender: Option>, - pub rpc_event_receiver: broadcast::Receiver, - pub error_sender: Sender, -} - /// Runs light client. /// /// # Arguments /// /// * `light_client` - Light client implementation /// * `cfg` - Light client configuration -/// * `block_tx` - Channel used to send header of verified block -/// * `registry` - Prometheus metrics registry +/// * `metrics` - Metrics registry /// * `state` - Processed blocks state +/// * `channels` - Communitaction channels pub async fn run( light_client: impl LightClient, network_client: impl network::Client, cfg: LightClientConfig, metrics: Arc, state: Arc>, - mut channels: Channels, + mut channels: ClientChannels, ) { info!("Starting light client..."); @@ -409,11 +264,9 @@ pub async fn run( // notify dht-based application client // that newly mined block has been received - if let Some(ref channel) = channels.block_sender { - if let Err(error) = channel.send(client_msg) { - error!("Cannot send block verified message: {error}"); - continue; - } + if let Err(error) = channels.block_sender.send(client_msg) { + error!("Cannot send block verified message: {error}"); + continue; } } } @@ -437,6 +290,7 @@ mod tests { config::substrate::Digest, }; use hex_literal::hex; + use kate_recovery::{data::Cell, matrix::Position}; use test_case::test_case; #[test_case(99.9 => 10)] @@ -501,49 +355,6 @@ mod tests { }; let state = Arc::new(Mutex::new(State::default())); let recv = Instant::now(); - let kate_proof = [ - Cell { - position: Position { row: 0, col: 2 }, - content: [ - 183, 215, 10, 175, 218, 48, 236, 18, 30, 163, 215, 125, 205, 130, 176, 227, - 133, 157, 194, 35, 153, 144, 141, 7, 208, 133, 170, 79, 27, 176, 202, 22, 111, - 63, 107, 147, 93, 44, 82, 137, 78, 32, 161, 175, 214, 152, 125, 50, 247, 52, - 138, 161, 52, 83, 193, 255, 17, 235, 98, 10, 88, 241, 25, 186, 3, 174, 139, - 200, 128, 117, 255, 213, 200, 4, 46, 244, 219, 5, 131, 0, - ], - }, - Cell { - position: Position { row: 1, col: 1 }, - content: [ - 172, 213, 85, 167, 89, 247, 11, 125, 149, 170, 217, 222, 86, 157, 11, 20, 154, - 21, 173, 247, 193, 99, 189, 7, 225, 80, 156, 94, 83, 213, 217, 185, 113, 187, - 112, 20, 170, 120, 50, 171, 52, 178, 209, 244, 158, 24, 129, 236, 83, 4, 110, - 41, 9, 29, 26, 180, 156, 219, 69, 155, 148, 49, 78, 25, 165, 147, 150, 253, - 251, 174, 49, 215, 191, 142, 169, 70, 17, 86, 218, 0, - ], - }, - Cell { - position: Position { row: 0, col: 3 }, - content: [ - 132, 180, 92, 81, 128, 83, 245, 59, 206, 224, 200, 137, 236, 113, 109, 216, - 161, 248, 236, 252, 252, 22, 140, 107, 203, 161, 33, 18, 100, 189, 157, 58, 7, - 183, 146, 75, 57, 220, 84, 106, 203, 33, 142, 10, 130, 99, 90, 38, 85, 166, - 211, 97, 111, 105, 21, 241, 123, 211, 193, 6, 254, 125, 169, 108, 252, 85, 49, - 31, 54, 53, 79, 196, 5, 122, 206, 127, 226, 224, 70, 0, - ], - }, - Cell { - position: Position { row: 1, col: 3 }, - content: [ - 132, 180, 92, 81, 128, 83, 245, 59, 206, 224, 200, 137, 236, 113, 109, 216, - 161, 248, 236, 252, 252, 22, 140, 107, 203, 161, 33, 18, 100, 189, 157, 58, 7, - 183, 146, 75, 57, 220, 84, 106, 203, 33, 142, 10, 130, 99, 90, 38, 85, 166, - 211, 97, 111, 105, 21, 241, 123, 211, 193, 6, 254, 125, 169, 108, 252, 85, 49, - 31, 54, 53, 79, 196, 5, 122, 206, 127, 226, 224, 70, 0, - ], - }, - ] - .to_vec(); mock_network_client .expect_fetch_verified() .returning(move |_, _, _, _, positions| { @@ -557,172 +368,12 @@ mod tests { ); Box::pin(async move { Ok((fetched, unfetched, stats)) }) }); - mock_client.expect_get_kate_proof().returning(move |_, _| { - let kate_proof = kate_proof.clone(); - Box::pin(async move { Ok(kate_proof) }) - }); mock_client .expect_store_confidence_in_db() .returning(|_, _| Ok(())); mock_client .expect_store_block_header_in_db() .returning(|_, _| Ok(())); - mock_client - .expect_insert_rows_into_dht() - .returning(|_, _| Box::pin(async move { Ok(()) })); - mock_client - .expect_insert_cells_into_dht() - .returning(|_, _| Box::pin(async move { Ok(()) })); - mock_client - .expect_shrink_kademlia_map() - .returning(|| Box::pin(async move { Ok(()) })); - mock_client.expect_get_multiaddress_and_ip().returning(|| { - Box::pin(async move { Ok(("multiaddress".to_string(), "ip".to_string())) }) - }); - mock_client - .expect_count_dht_entries() - .returning(|| Box::pin(async move { Ok(1) })); - - let mut mock_metrics = telemetry::MockMetrics::new(); - mock_metrics.expect_count().returning(|_| ()); - mock_metrics.expect_record().returning(|_| Ok(())); - mock_metrics.expect_set_multiaddress().returning(|_| ()); - mock_metrics.expect_set_ip().returning(|_| ()); - process_block( - &mock_client, - &mock_network_client, - &Arc::new(mock_metrics), - &cfg, - &header, - recv, - state, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_process_block_without_rpc() { - let mut mock_client = MockLightClient::new(); - let mut mock_network_client = network::MockClient::new(); - let mut cfg = LightClientConfig::from(&RuntimeConfig::default()); - cfg.disable_rpc = true; - let cells_unfetched: Vec = vec![]; - let header = Header { - parent_hash: hex!("c454470d840bc2583fcf881be4fd8a0f6daeac3a20d83b9fd4865737e56c9739") - .into(), - number: 57, - state_root: hex!("7dae455e5305263f29310c60c0cc356f6f52263f9f434502121e8a40d5079c32") - .into(), - extrinsics_root: hex!( - "bf1c73d4d09fa6a437a411a935ad3ec56a67a35e7b21d7676a5459b55b397ad4" - ) - .into(), - digest: Digest { logs: vec![] }, - extension: V1(HeaderExtension { - commitment: KateCommitment { - rows: 1, - cols: 4, - data_root: hex!( - "0000000000000000000000000000000000000000000000000000000000000000" - ) - .into(), - commitment: [ - 128, 34, 252, 194, 232, 229, 27, 124, 216, 33, 253, 23, 251, 126, 112, 244, - 7, 231, 73, 242, 0, 20, 5, 116, 175, 104, 27, 50, 45, 111, 127, 123, 202, - 255, 63, 192, 243, 236, 62, 75, 104, 86, 36, 198, 134, 27, 182, 224, 128, - 34, 252, 194, 232, 229, 27, 124, 216, 33, 253, 23, 251, 126, 112, 244, 7, - 231, 73, 242, 0, 20, 5, 116, 175, 104, 27, 50, 45, 111, 127, 123, 202, 255, - 63, 192, 243, 236, 62, 75, 104, 86, 36, 198, 134, 27, 182, 224, - ] - .to_vec(), - }, - app_lookup: CompactDataLookup { - size: 1, - index: vec![], - }, - }), - }; - let state = Arc::new(Mutex::new(State::default())); - let recv = Instant::now(); - let cells_fetched = [ - Cell { - position: Position { row: 0, col: 2 }, - content: [ - 183, 215, 10, 175, 218, 48, 236, 18, 30, 163, 215, 125, 205, 130, 176, 227, - 133, 157, 194, 35, 153, 144, 141, 7, 208, 133, 170, 79, 27, 176, 202, 22, 111, - 63, 107, 147, 93, 44, 82, 137, 78, 32, 161, 175, 214, 152, 125, 50, 247, 52, - 138, 161, 52, 83, 193, 255, 17, 235, 98, 10, 88, 241, 25, 186, 3, 174, 139, - 200, 128, 117, 255, 213, 200, 4, 46, 244, 219, 5, 131, 0, - ], - }, - Cell { - position: Position { row: 1, col: 1 }, - content: [ - 172, 213, 85, 167, 89, 247, 11, 125, 149, 170, 217, 222, 86, 157, 11, 20, 154, - 21, 173, 247, 193, 99, 189, 7, 225, 80, 156, 94, 83, 213, 217, 185, 113, 187, - 112, 20, 170, 120, 50, 171, 52, 178, 209, 244, 158, 24, 129, 236, 83, 4, 110, - 41, 9, 29, 26, 180, 156, 219, 69, 155, 148, 49, 78, 25, 165, 147, 150, 253, - 251, 174, 49, 215, 191, 142, 169, 70, 17, 86, 218, 0, - ], - }, - Cell { - position: Position { row: 0, col: 3 }, - content: [ - 132, 180, 92, 81, 128, 83, 245, 59, 206, 224, 200, 137, 236, 113, 109, 216, - 161, 248, 236, 252, 252, 22, 140, 107, 203, 161, 33, 18, 100, 189, 157, 58, 7, - 183, 146, 75, 57, 220, 84, 106, 203, 33, 142, 10, 130, 99, 90, 38, 85, 166, - 211, 97, 111, 105, 21, 241, 123, 211, 193, 6, 254, 125, 169, 108, 252, 85, 49, - 31, 54, 53, 79, 196, 5, 122, 206, 127, 226, 224, 70, 0, - ], - }, - Cell { - position: Position { row: 1, col: 3 }, - content: [ - 132, 180, 92, 81, 128, 83, 245, 59, 206, 224, 200, 137, 236, 113, 109, 216, - 161, 248, 236, 252, 252, 22, 140, 107, 203, 161, 33, 18, 100, 189, 157, 58, 7, - 183, 146, 75, 57, 220, 84, 106, 203, 33, 142, 10, 130, 99, 90, 38, 85, 166, - 211, 97, 111, 105, 21, 241, 123, 211, 193, 6, 254, 125, 169, 108, 252, 85, 49, - 31, 54, 53, 79, 196, 5, 122, 206, 127, 226, 224, 70, 0, - ], - }, - ] - .to_vec(); - mock_network_client - .expect_fetch_verified() - .returning(move |_, _, _, _, positions| { - let fetched = cells_fetched.clone(); - let unfetched = cells_unfetched.clone(); - let stats = network::FetchStats::new( - positions.len(), - fetched.len(), - Duration::from_secs(0), - None, - ); - Box::pin(async move { Ok((fetched, unfetched, stats)) }) - }); - mock_client.expect_get_kate_proof().never(); - mock_client - .expect_store_confidence_in_db() - .returning(|_, _| Ok(())); - mock_client - .expect_store_block_header_in_db() - .returning(|_, _| Ok(())); - mock_client - .expect_insert_rows_into_dht() - .returning(|_, _| Box::pin(async move { Ok(()) })); - mock_client - .expect_insert_cells_into_dht() - .returning(|_, _| Box::pin(async move { Ok(()) })); - mock_client - .expect_shrink_kademlia_map() - .returning(|| Box::pin(async move { Ok(()) })); - mock_client.expect_get_multiaddress_and_ip().returning(|| { - Box::pin(async move { Ok(("multiaddress".to_string(), "ip".to_string())) }) - }); - mock_client - .expect_count_dht_entries() - .returning(|| Box::pin(async move { Ok(1) })); let mut mock_metrics = telemetry::MockMetrics::new(); mock_metrics.expect_count().returning(|_| ()); diff --git a/src/maintenance.rs b/src/maintenance.rs new file mode 100644 index 000000000..20700d5d2 --- /dev/null +++ b/src/maintenance.rs @@ -0,0 +1,57 @@ +use color_eyre::{eyre::WrapErr, Report, Result}; +use std::sync::Arc; +use tokio::sync::{broadcast, mpsc::Sender}; +use tracing::{debug, error, info}; + +use crate::{ + network::p2p::Client as P2pClient, + telemetry::{MetricValue, Metrics}, + types::BlockVerified, +}; + +pub async fn process_block( + block_number: u32, + p2p_client: &P2pClient, + metrics: &Arc, +) -> Result<()> { + p2p_client + .shrink_kademlia_map() + .await + .wrap_err("Unable to perform Kademlia map shrink")?; + + if let Ok((multiaddr, ip)) = p2p_client.get_multiaddress_and_ip().await { + metrics.set_multiaddress(multiaddr).await; + metrics.set_ip(ip).await; + } + + let peers_num = p2p_client.count_dht_entries().await?; + let peers_num_metric = MetricValue::KadRoutingPeerNum(peers_num); + + metrics.record(peers_num_metric).await?; + metrics.record(MetricValue::HealthCheck()).await?; + + debug!(block_number, "Maintenance completed"); + Ok(()) +} + +pub async fn run( + p2p_client: P2pClient, + metrics: Arc, + mut block_receiver: broadcast::Receiver, + error_sender: Sender, +) -> ! { + info!("Starting maintenance..."); + + loop { + let result = match block_receiver.recv().await { + Ok(block) => process_block(block.block_num, &p2p_client, &metrics).await, + Err(error) => Err(error.into()), + }; + + if let Err(error) = result { + if let Err(error) = error_sender.send(error).await { + error!("Cannot send error message: {error}"); + } + } + } +} diff --git a/src/sync_client.rs b/src/sync_client.rs index 4f80e0df8..6a79536e3 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -107,7 +107,7 @@ async fn process_block( header: DaHeader, header_hash: H256, cfg: &SyncClientConfig, - block_verified_sender: Option>, + block_verified_sender: broadcast::Sender, ) -> Result<()> { let block_number = header.number; let begin = Instant::now(); @@ -150,10 +150,8 @@ async fn process_block( let client_msg = BlockVerified::try_from((header, confidence)).wrap_err("converting to message failed")?; - if let Some(ref channel) = block_verified_sender { - if let Err(error) = channel.send(client_msg) { - error!("Cannot send block verified message: {error}"); - } + if let Err(error) = block_verified_sender.send(client_msg) { + error!("Cannot send block verified message: {error}"); } Ok(()) @@ -172,7 +170,7 @@ pub async fn run( network_client: impl network::Client, cfg: SyncClientConfig, sync_range: Range, - block_verified_sender: Option>, + block_verified_sender: broadcast::Sender, state: Arc>, ) { if sync_range.is_empty() { @@ -233,7 +231,7 @@ pub async fn run( } } - if block_verified_sender.is_none() { + if cfg.is_last_step { state.lock().unwrap().synced.replace(true); } } @@ -377,7 +375,7 @@ mod tests { header, header_hash, &cfg, - Some(block_tx), + block_tx, ) .await .unwrap(); @@ -463,7 +461,7 @@ mod tests { header, header_hash, &cfg, - Some(block_tx), + block_tx, ) .await .unwrap(); diff --git a/src/types.rs b/src/types.rs index 3f2c447af..a6f79ffd6 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,6 @@ //! Shared light client structs and enums. -use crate::network::rpc::Node as RpcNode; +use crate::network::rpc::{Event, Node as RpcNode}; use crate::utils::{extract_app_lookup, extract_kate}; use avail_core::DataLookup; use avail_subxt::{primitives::Header as DaHeader, utils::H256}; @@ -26,6 +26,8 @@ use std::ops::Range; use std::str::FromStr; use std::time::{Duration, Instant}; use subxt::ext::sp_core::{sr25519::Pair, Pair as _}; +use tokio::sync::broadcast; +use tokio::sync::mpsc::Sender; const CELL_SIZE: usize = 32; const PROOF_SIZE: usize = 48; @@ -83,6 +85,12 @@ pub struct BlockVerified { pub confidence: Option, } +pub struct ClientChannels { + pub block_sender: broadcast::Sender, + pub rpc_event_receiver: broadcast::Receiver, + pub error_sender: Sender, +} + impl TryFrom<(DaHeader, Option)> for BlockVerified { type Error = Report; fn try_from((header, confidence): (DaHeader, Option)) -> Result { @@ -359,15 +367,8 @@ pub struct Delay(pub Option); /// Light client configuration (see [RuntimeConfig] for details) pub struct LightClientConfig { - pub full_nodes_ws: Vec, pub confidence: f64, - pub disable_rpc: bool, - pub dht_parallelization_limit: usize, - pub query_proof_rpc_parallel_tasks: usize, pub block_processing_delay: Delay, - pub block_matrix_partition: Option, - pub max_cells_per_rpc: usize, - pub ttl: u64, } impl Delay { @@ -385,6 +386,32 @@ impl From<&RuntimeConfig> for LightClientConfig { .map(|v| Duration::from_secs(v.into())); LightClientConfig { + confidence: val.confidence, + block_processing_delay: Delay(block_processing_delay), + } + } +} + +/// Fat client configuration (see [RuntimeConfig] for details) +pub struct FatClientConfig { + pub full_nodes_ws: Vec, + pub confidence: f64, + pub disable_rpc: bool, + pub dht_parallelization_limit: usize, + pub query_proof_rpc_parallel_tasks: usize, + pub block_processing_delay: Delay, + pub block_matrix_partition: Option, + pub max_cells_per_rpc: usize, + pub ttl: u64, +} + +impl From<&RuntimeConfig> for FatClientConfig { + fn from(val: &RuntimeConfig) -> Self { + let block_processing_delay = val + .block_processing_delay + .map(|v| Duration::from_secs(v.into())); + + FatClientConfig { full_nodes_ws: val.full_node_ws.clone(), confidence: val.confidence, disable_rpc: val.disable_rpc, @@ -503,6 +530,7 @@ pub struct SyncClientConfig { pub disable_rpc: bool, pub dht_parallelization_limit: usize, pub ttl: u64, + pub is_last_step: bool, } impl From<&RuntimeConfig> for SyncClientConfig { @@ -512,6 +540,7 @@ impl From<&RuntimeConfig> for SyncClientConfig { disable_rpc: val.disable_rpc, dht_parallelization_limit: val.dht_parallelization_limit, ttl: val.kad_record_ttl, + is_last_step: val.app_id.is_none(), } } }