Skip to content

Commit

Permalink
Cell upload metric re-implementation (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
sh3ll3x3c authored Dec 5, 2023
1 parent 801b458 commit f407300
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 67 deletions.
6 changes: 4 additions & 2 deletions src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::types::{LibP2PConfig, SecretKey};
pub use client::Client;
use event_loop::EventLoop;

use self::client::BlockStat;

#[derive(Debug)]
pub enum QueryChannel {
GetRecord(oneshot::Sender<Result<PeerRecord>>),
Expand All @@ -37,15 +39,15 @@ pub struct EventLoopEntries<'a> {
pending_kad_queries: &'a mut HashMap<QueryId, QueryChannel>,
pending_swarm_events: &'a mut HashMap<PeerId, oneshot::Sender<Result<()>>>,
/// <block_num, (total_cells, result_cell_counter, time_stat)>
active_blocks: &'a mut HashMap<u32, (usize, usize, u64)>,
active_blocks: &'a mut HashMap<u32, BlockStat>,
}

impl<'a> EventLoopEntries<'a> {
pub fn new(
swarm: &'a mut Swarm<Behaviour>,
pending_kad_queries: &'a mut HashMap<QueryId, QueryChannel>,
pending_swarm_events: &'a mut HashMap<PeerId, oneshot::Sender<Result<()>>>,
active_blocks: &'a mut HashMap<u32, (usize, usize, u64)>,
active_blocks: &'a mut HashMap<u32, BlockStat>,
) -> Self {
Self {
swarm,
Expand Down
27 changes: 24 additions & 3 deletions src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ impl DHTRow {
}
}

#[derive(Debug)]
pub struct BlockStat {
pub total_count: usize,
pub remaining_counter: usize,
pub success_counter: usize,
pub error_counter: usize,
pub time_stat: u64,
}

impl BlockStat {
pub fn increase_block_stat_counters(&mut self, cell_number: usize) {
self.total_count += cell_number;
self.remaining_counter += cell_number;
}
}

struct StartListening {
addr: Multiaddr,
response_sender: Option<oneshot::Sender<Result<()>>>,
Expand Down Expand Up @@ -180,14 +196,19 @@ struct PutKadRecord {
// `active_blocks` is a list of cell counts for each block we monitor for PUT op. results
impl Command for PutKadRecord {
fn run(&mut self, mut entries: EventLoopEntries) -> Result<()> {
// `block_entry` is in format (total_cells, result_cell_counter, time_stat)
entries
.active_blocks
.entry(self.block_num)
// Increase the total cell count we monitor if the block entry already exists
.and_modify(|(total_cells, _, _)| *total_cells += self.records.len())
.and_modify(|block| block.increase_block_stat_counters(self.records.len()))
// Initiate counting for the new block if the block doesn't exist
.or_insert((self.records.len(), 0, 0));
.or_insert(BlockStat {
total_count: self.records.len(),
remaining_counter: self.records.len(),
success_counter: 0,
error_counter: 0,
time_stat: 0,
});

for record in self.records.clone() {
let query_id = entries
Expand Down
136 changes: 74 additions & 62 deletions src/network/p2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use libp2p::{
autonat::{self, NatStatus},
dcutr,
identify::{self, Info},
kad::{self, BootstrapOk, GetRecordOk, InboundRequest, QueryId, QueryResult, RecordKey},
kad::{
self, BootstrapOk, GetRecordOk, InboundRequest, QueryId, QueryResult, QueryStats, RecordKey,
},
mdns,
multiaddr::Protocol,
swarm::{
Expand All @@ -24,7 +26,8 @@ use tracing::{debug, error, info, trace, warn};
use crate::telemetry::{MetricCounter, MetricValue, Metrics};

use super::{
Behaviour, BehaviourEvent, CommandReceiver, EventLoopEntries, QueryChannel, SendableCommand,
client::BlockStat, Behaviour, BehaviourEvent, CommandReceiver, EventLoopEntries, QueryChannel,
SendableCommand,
};

// RelayState keeps track of all things relay related
Expand Down Expand Up @@ -77,7 +80,7 @@ pub struct EventLoop {
kad_remove_local_record: bool,
/// Blocks we monitor for PUT success rate
/// <block_num, (total_cells, result_cell_counter, time_stat)>
active_blocks: HashMap<u32, (usize, usize, u64)>,
active_blocks: HashMap<u32, BlockStat>,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -218,72 +221,26 @@ impl EventLoop {
_ => (),
},
QueryResult::PutRecord(Err(error)) => {
_ = self.pending_kad_queries.remove(&id);
trace!("Unable to put record: {error}");
},

QueryResult::PutRecord(Ok(record)) => {
if self.pending_kad_queries.remove(&id).is_none() {
return;
};

let block_num = match record.key.clone().try_into() {
Ok(DHTKey::Cell(block_num, _, _)) => block_num,
Ok(DHTKey::Row(block_num, _)) => block_num,
Err(error) => {
warn!("Unable to cast Kademlia key to DHT key: {error}");
return;
match error {
kad::PutRecordError::QuorumFailed { key, .. } => {
self.handle_put_result(key, stats, true, metrics).await;
},
kad::PutRecordError::Timeout { key, .. } => {
self.handle_put_result(key, stats, true, metrics).await;
},
};

let previous_block_num = block_num - 1;

// If the new block record is arrived, and previous block is still in the map,
// we take that as the cut-off point for previous blocks records delivery
if let Some(&(total_records, record_counter, time_stat)) = self
.active_blocks
.get(&block_num)
.and_then(|_| self.active_blocks.get(&previous_block_num))
{
info!("Number of confirmed uploaded records from the prev. block {previous_block_num} sent {record_counter}/{total_records}. Duration: {time_stat}");
let success_rate = record_counter as f64 / total_records as f64;

_ = metrics
.record(MetricValue::DHTPutSuccess(success_rate))
.await;

_ = metrics
.record(MetricValue::DHTPutDuration(time_stat as f64))
.await;

self.active_blocks.remove(&previous_block_num);
};

// Get record counter data for current block
// This value has already been added during the PUT operation
if let Some((_, record_counter, time_stat)) =
self.active_blocks.get_mut(&block_num)
{
// Increment the record counter for current block
*record_counter += 1;
*time_stat = stats
.duration()
.as_ref()
.map(Duration::as_secs)
.unwrap_or_default();
} else {
// If we're still receving data from one of the previous blocks, log it here
trace!("{block_num}: not in active blocks list anymore. ");
}
},

if self.kad_remove_local_record {
// Remove local records for fat clients (memory optimization)
debug!("Pruning local records on fat client");
self.swarm
.behaviour_mut()
.kademlia
.remove_record(&record.key);
}
QueryResult::PutRecord(Ok(record)) => {
if self.pending_kad_queries.remove(&id).is_none() {
return;
};
self.handle_put_result(record.key.clone(), stats, false, metrics)
.await;
},
QueryResult::Bootstrap(result) => match result {
Ok(BootstrapOk {
Expand Down Expand Up @@ -591,6 +548,61 @@ impl EventLoop {
},
}
}

async fn handle_put_result(
&mut self,
key: RecordKey,
stats: QueryStats,
is_error: bool,
metrics: Arc<impl Metrics>,
) {
let block_num = match key.clone().try_into() {
Ok(DHTKey::Cell(block_num, _, _)) => block_num,
Ok(DHTKey::Row(block_num, _)) => block_num,
Err(error) => {
warn!("Unable to cast Kademlia key to DHT key: {error}");
return;
},
};
if let Some(block) = self.active_blocks.get_mut(&block_num) {
// Decrement record counter for this block
block.remaining_counter -= 1;
if is_error {
block.error_counter += 1;
} else {
block.success_counter += 1;
}

block.time_stat = stats
.duration()
.as_ref()
.map(Duration::as_secs)
.unwrap_or_default();

if block.remaining_counter == 0 {
let success_rate = block.error_counter as f64 / block.total_count as f64;
info!(
"Cell upload success rate for block {block_num}: {}/{}. Duration: {}",
block.success_counter, block.total_count, block.time_stat
);
_ = metrics
.record(MetricValue::DHTPutSuccess(success_rate))
.await;

_ = metrics
.record(MetricValue::DHTPutDuration(block.time_stat as f64))
.await;
}

if self.kad_remove_local_record {
// Remove local records for fat clients (memory optimization)
debug!("Pruning local records on fat client");
self.swarm.behaviour_mut().kademlia.remove_record(&key);
}
} else {
debug!("Can't find block in the active blocks list")
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit f407300

Please sign in to comment.