Skip to content

Commit

Permalink
removed the remainings of put batching
Browse files Browse the repository at this point in the history
  • Loading branch information
sh3ll3x3c committed Nov 28, 2023
1 parent 1271b8c commit 443f2b0
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 131 deletions.
1 change: 0 additions & 1 deletion src/bin/avail-light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ async fn run(error_sender: Sender<anyhow::Error>) -> Result<()> {
(&cfg).into(),
cfg.dht_parallelization_limit,
cfg.kad_record_ttl,
cfg.put_batch_size,
cfg.is_fat_client(),
id_keys,
)
Expand Down
57 changes: 23 additions & 34 deletions src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
time::Instant,
};
use tokio::sync::{broadcast, mpsc::Sender};
use tracing::{error, info};
use tracing::{error, info, warn};

use crate::{
data::{store_block_header_in_db, store_confidence_in_db},
Expand All @@ -53,8 +53,8 @@ use crate::{
#[async_trait]
#[automock]
pub trait LightClient {
async fn insert_cells_into_dht(&self, block: u32, cells: Vec<Cell>) -> f32;
async fn insert_rows_into_dht(&self, block: u32, rows: Vec<(RowIndex, Vec<u8>)>) -> f32;
async fn insert_cells_into_dht(&self, block: u32, cells: Vec<Cell>) -> Result<()>;
async fn insert_rows_into_dht(&self, block: u32, rows: Vec<(RowIndex, Vec<u8>)>) -> Result<()>;
async fn get_kate_proof(&self, hash: H256, positions: &[Position]) -> Result<Vec<Cell>>;
async fn shrink_kademlia_map(&self) -> Result<()>;
async fn get_multiaddress_and_ip(&self) -> Result<(String, String)>;
Expand All @@ -80,13 +80,13 @@ pub fn new(db: Arc<DB>, p2p_client: P2pClient, rpc_client: RpcClient) -> impl Li

#[async_trait]
impl LightClient for LightClientImpl {
async fn insert_cells_into_dht(&self, block: u32, cells: Vec<Cell>) -> f32 {
async fn insert_cells_into_dht(&self, block: u32, cells: Vec<Cell>) -> Result<()> {
self.p2p_client.insert_cells_into_dht(block, cells).await
}
async fn shrink_kademlia_map(&self) -> Result<()> {
self.p2p_client.shrink_kademlia_map().await
}
async fn insert_rows_into_dht(&self, block: u32, rows: Vec<(RowIndex, Vec<u8>)>) -> f32 {
async fn insert_rows_into_dht(&self, block: u32, rows: Vec<(RowIndex, Vec<u8>)>) -> Result<()> {
self.p2p_client.insert_rows_into_dht(block, rows).await
}
async fn get_kate_proof(&self, hash: H256, positions: &[Position]) -> Result<Vec<Cell>> {
Expand Down Expand Up @@ -192,18 +192,6 @@ pub async fn process_block(
.await?;
}

if let Some(dht_put_success_rate) = fetch_stats.dht_put_success_rate {
metrics
.record(MetricValue::DHTPutSuccess(dht_put_success_rate))
.await?;
}

if let Some(dht_put_duration) = fetch_stats.dht_put_duration {
metrics
.record(MetricValue::DHTPutDuration(dht_put_duration))
.await?;
}

if positions.len() > fetched.len() {
error!(block_number, "Failed to fetch {} cells", unfetched.len());
return Ok(None);
Expand Down Expand Up @@ -305,22 +293,25 @@ pub async fn process_block(
.filter(|cell| !cell.position.is_extended())
.collect::<Vec<_>>();
let rpc_fetched_data_rows = data::rows(dimensions, &rpc_fetched_data_cells);
let rows_len = rpc_fetched_data_rows.len();

let dht_insert_rows_success_rate = light_client
light_client
.insert_rows_into_dht(block_number, rpc_fetched_data_rows)
.await;

info!(
block_number,
"DHT PUT rows operation success rate" = dht_insert_rows_success_rate,
"rows inserted into DHT" = rows_len
);
.await
.map_err(|e| {
warn!("Error inserting rows into DHT: {e}");
e
})
.ok();
}

_ = light_client
light_client
.insert_cells_into_dht(block_number, rpc_fetched)
.await;
.await
.map_err(|e| {
warn!("Error inserting cells into DHT: {e}");
e
})
.ok();

light_client
.shrink_kademlia_map()
Expand Down Expand Up @@ -567,7 +558,6 @@ mod tests {
fetched.len(),
Duration::from_secs(0),
None,
None,
);
Box::pin(async move { Ok((fetched, unfetched, stats)) })
});
Expand All @@ -583,10 +573,10 @@ mod tests {
.returning(|_, _| Ok(()));
mock_client
.expect_insert_rows_into_dht()
.returning(|_, _| Box::pin(async move { 1f32 }));
.returning(|_, _| Box::pin(async move { Ok(()) }));
mock_client
.expect_insert_cells_into_dht()
.returning(|_, _| Box::pin(async move { 1f32 }));
.returning(|_, _| Box::pin(async move { Ok(()) }));
mock_client
.expect_shrink_kademlia_map()
.returning(|| Box::pin(async move { Ok(()) }));
Expand Down Expand Up @@ -712,7 +702,6 @@ mod tests {
fetched.len(),
Duration::from_secs(0),
None,
None,
);
Box::pin(async move { Ok((fetched, unfetched, stats)) })
});
Expand All @@ -725,10 +714,10 @@ mod tests {
.returning(|_, _| Ok(()));
mock_client
.expect_insert_rows_into_dht()
.returning(|_, _| Box::pin(async move { 1f32 }));
.returning(|_, _| Box::pin(async move { Ok(()) }));
mock_client
.expect_insert_cells_into_dht()
.returning(|_, _| Box::pin(async move { 1f32 }));
.returning(|_, _| Box::pin(async move { Ok(()) }));
mock_client
.expect_shrink_kademlia_map()
.returning(|| Box::pin(async move { Ok(()) }));
Expand Down
36 changes: 10 additions & 26 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use mockall::automock;
use sp_core::H256;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use tracing::info;
use tracing::{info, warn};

use crate::proof;

Expand All @@ -36,30 +36,23 @@ pub struct FetchStats {
pub dht_fetch_duration: f64,
pub rpc_fetched: Option<f64>,
pub rpc_fetch_duration: Option<f64>,
pub dht_put_success_rate: Option<f64>,
pub dht_put_duration: Option<f64>,
}

type RPCFetchStats = (usize, Duration);

type DHTPutStats = (f64, Duration);

impl FetchStats {
pub fn new(
total: usize,
dht_fetched: usize,
dht_fetch_duration: Duration,
rpc_fetch_stats: Option<RPCFetchStats>,
dht_put_stats: Option<DHTPutStats>,
) -> Self {
FetchStats {
dht_fetched: dht_fetched as f64,
dht_fetched_percentage: dht_fetched as f64 / total as f64,
dht_fetch_duration: dht_fetch_duration.as_secs_f64(),
rpc_fetched: rpc_fetch_stats.map(|(rpc_fetched, _)| rpc_fetched as f64),
rpc_fetch_duration: rpc_fetch_stats.map(|(_, duration)| duration.as_secs_f64()),
dht_put_success_rate: dht_put_stats.map(|(rate, _)| rate),
dht_put_duration: dht_put_stats.map(|(_, duration)| duration.as_secs_f64()),
}
}
}
Expand Down Expand Up @@ -171,13 +164,8 @@ impl Client for DHTWithRPCFallbackClient {
.await?;

if self.disable_rpc {
let stats = FetchStats::new(
positions.len(),
dht_fetched.len(),
dht_fetch_duration,
None,
None,
);
let stats =
FetchStats::new(positions.len(), dht_fetched.len(), dht_fetch_duration, None);
return Ok((dht_fetched, unfetched, stats));
};

Expand All @@ -191,24 +179,20 @@ impl Client for DHTWithRPCFallbackClient {
)
.await?;

let begin = Instant::now();

let dht_put_success_rate = self
.p2p_client
self.p2p_client
.insert_cells_into_dht(block_number, rpc_fetched.clone())
.await;

info!(
block_number,
"DHT PUT operation success rate: {dht_put_success_rate}"
);
.await
.map_err(|e| {
warn!("Error inserting cells into DHT: {e}");
e
})
.ok();

let stats = FetchStats::new(
positions.len(),
dht_fetched.len(),
dht_fetch_duration,
Some((rpc_fetched.len(), rpc_fetch_duration)),
Some((dht_put_success_rate as f64, begin.elapsed())),
);

let mut fetched = vec![];
Expand Down
8 changes: 1 addition & 7 deletions src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ pub fn init(
cfg: LibP2PConfig,
dht_parallelization_limit: usize,
ttl: u64,
put_batch_size: usize,
is_fat_client: bool,
id_keys: libp2p::identity::Keypair,
) -> Result<(Client, EventLoop)> {
Expand Down Expand Up @@ -206,12 +205,7 @@ pub fn init(
let (command_sender, command_receiver) = mpsc::channel(10000);

Ok((
Client::new(
command_sender,
dht_parallelization_limit,
ttl,
put_batch_size,
),
Client::new(command_sender, dht_parallelization_limit, ttl),
EventLoop::new(
swarm,
command_receiver,
Expand Down
Loading

0 comments on commit 443f2b0

Please sign in to comment.