Skip to content

Commit

Permalink
P2P monitoring tests (#751)
Browse files Browse the repository at this point in the history
  • Loading branch information
sh3ll3x3c authored Nov 28, 2024
1 parent 9f05d0b commit 9824c45
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 20 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"core",
"crawler",
"fat",
"monitor-client",
"relay",
]
default-members = ["client"]
Expand Down
6 changes: 2 additions & 4 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use avail_light_core::{
sync_finality::SyncFinality,
telemetry::{self, otlp::Metrics, MetricCounter, MetricValue},
types::{
load_or_init_suri, Delay, IdentityConfig, MaintenanceConfig, MultiaddrConfig, SecretKey,
Uuid,
load_or_init_suri, Delay, IdentityConfig, MaintenanceConfig, PeerAddress, SecretKey, Uuid,
},
utils::{default_subscriber, install_panic_hooks, json_subscriber, spawn_in_span},
};
Expand Down Expand Up @@ -60,7 +59,6 @@ mod cli;
mod config;

/// Light Client for Avail Blockchain
async fn run(
cfg: RuntimeConfig,
identity_cfg: IdentityConfig,
Expand Down Expand Up @@ -365,7 +363,7 @@ pub fn load_runtime_config(opts: &CliOpts) -> Result<RuntimeConfig> {
if let Some(network) = &opts.network {
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
cfg.rpc.full_node_ws = network.full_node_ws();
cfg.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
cfg.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
cfg.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
cfg.genesis_hash = network.genesis_hash().to_string();
}
Expand Down
1 change: 1 addition & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.0.5

- Rename `MultiaddrConfig` type to `PeerAddress` for better clarity
- Enable WASM compilation on proof mod
- Enable WASM compilation on utils and shutdown mods
- Allocate new port on each new dial attempt
Expand Down
6 changes: 3 additions & 3 deletions core/src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
event_loop::ConnectionEstablishedInfo, is_global, is_multiaddr_global, Command, EventLoop,
MultiAddressInfo, OutputEvent, PeerInfo, QueryChannel,
};
use crate::types::MultiaddrConfig;
use crate::types::PeerAddress;

#[derive(Clone)]
pub struct Client {
Expand Down Expand Up @@ -137,8 +137,8 @@ impl Client {
Box::new(move |context: &mut EventLoop| {
let opts = DialOpts::peer_id(peer_id)
.addresses(peer_address)
.allocate_new_port()
.condition(dial_condition)
.allocate_new_port()
.build();
context.swarm.dial(opts)?;

Expand Down Expand Up @@ -166,7 +166,7 @@ impl Client {

// Bootstrap is triggered automatically on add_address call
// Bootstrap nodes are also used as autonat servers
pub async fn bootstrap_on_startup(&self, bootstraps: &[MultiaddrConfig]) -> Result<()> {
pub async fn bootstrap_on_startup(&self, bootstraps: &[PeerAddress]) -> Result<()> {
for (peer, addr) in bootstraps.iter().map(Into::into) {
self.dial_peer(peer, vec![addr.clone()], PeerCondition::Always)
.await
Expand Down
6 changes: 3 additions & 3 deletions core/src/network/p2p/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{protocol_name, ProvidersConfig};
use crate::network::p2p::MemoryStoreConfig;
use crate::types::{duration_seconds_format, KademliaMode, MultiaddrConfig, SecretKey};
use crate::types::{duration_seconds_format, KademliaMode, PeerAddress, SecretKey};
use libp2p::{kad, multiaddr::Protocol, Multiaddr};
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -139,7 +139,7 @@ pub struct LibP2PConfig {
#[serde(flatten)]
pub kademlia: KademliaConfig,
/// Vector of Relay nodes, which are used for hole punching
pub relays: Vec<MultiaddrConfig>,
pub relays: Vec<PeerAddress>,
/// Sets the amount of time to keep connections alive when they're idle. (default: 10s).
#[serde(with = "duration_seconds_format")]
pub connection_idle_timeout: Duration,
Expand All @@ -148,7 +148,7 @@ pub struct LibP2PConfig {
pub per_connection_event_buffer_size: usize,
pub dial_concurrency_factor: NonZeroU8,
/// Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty).
pub bootstraps: Vec<MultiaddrConfig>,
pub bootstraps: Vec<PeerAddress>,
/// Maximum number of parallel tasks spawned for GET and PUT operations on DHT (default: 20).
pub dht_parallelization_limit: usize,
}
Expand Down
1 change: 0 additions & 1 deletion core/src/shutdown/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ impl<T: Clone> Signal<T> {
/// If the shutdown initiates before the wrapped future completes, the resulting future yields
/// `Err(reason)` containing the shutdown reason. Upon successful completion of the wrapped future
/// before a shutdown, it yields `Ok(val)`.
pub fn with_cancel<F: Future>(&self, future: F) -> WithCancel<T, F> {
WithCancel {
signal: self.clone(),
Expand Down
10 changes: 5 additions & 5 deletions core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,16 @@ impl TryFrom<String> for CompactMultiaddress {
untagged,
expecting = "Valid multiaddress/peer_id string or a tuple (peer_id, multiaddress) expected"
)]
pub enum MultiaddrConfig {
pub enum PeerAddress {
Compact(CompactMultiaddress),
PeerIdAndMultiaddr((PeerId, Multiaddr)),
}

impl From<&MultiaddrConfig> for (PeerId, Multiaddr) {
fn from(value: &MultiaddrConfig) -> Self {
impl From<&PeerAddress> for (PeerId, Multiaddr) {
fn from(value: &PeerAddress) -> Self {
match value {
MultiaddrConfig::Compact(CompactMultiaddress(value)) => value.clone(),
MultiaddrConfig::PeerIdAndMultiaddr(value) => value.clone(),
PeerAddress::Compact(CompactMultiaddress(value)) => value.clone(),
PeerAddress::PeerIdAndMultiaddr(value) => value.clone(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crawler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use avail_light_core::{
Network,
},
telemetry::otlp::OtelConfig,
types::{block_matrix_partition_format, tracing_level_format, MultiaddrConfig, Origin},
types::{block_matrix_partition_format, tracing_level_format, Origin, PeerAddress},
};
use avail_rust::kate_recovery::matrix::Partition;
use clap::{command, Parser};
Expand Down Expand Up @@ -109,7 +109,7 @@ pub fn load(opts: &CliOpts) -> Result<Config> {
if let Some(network) = &opts.network {
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
config.rpc.full_node_ws = network.full_node_ws();
config.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
config.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
config.genesis_hash = network.genesis_hash().to_string();
}
Expand Down
4 changes: 2 additions & 2 deletions fat/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use avail_light_core::{
telemetry::otlp::OtelConfig,
types::{
block_matrix_partition_format, option_duration_seconds_format, tracing_level_format,
MultiaddrConfig, SecretKey,
PeerAddress, SecretKey,
},
};
use avail_rust::kate_recovery::matrix::Partition;
Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn load(opts: &CliOpts) -> Result<Config> {
if let Some(network) = &opts.network {
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
config.rpc.full_node_ws = network.full_node_ws();
config.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
config.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
config.genesis_hash = network.genesis_hash().to_string();
}
Expand Down
21 changes: 21 additions & 0 deletions monitor-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "avail-light-monitor"
version = "0.1.0"
authors.workspace = true
build = "../build.rs"
edition = "2021"
repository.workspace = true

[dependencies]
avail-light-core = { workspace = true }
clap = { workspace = true }
color-eyre = { workspace = true }
libp2p = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

[features]
rocksdb = ["avail-light-core/rocksdb"]
default = ["rocksdb"]
43 changes: 43 additions & 0 deletions monitor-client/src/bootstrap_monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use avail_light_core::{network::p2p::Client, types::PeerAddress};
use color_eyre::Result;
use libp2p::swarm::dial_opts::PeerCondition;
use tokio::time::Interval;
use tracing::{error, info};

pub struct BootstrapMonitor {
bootstraps: Vec<PeerAddress>,
interval: Interval,
p2p_client: Client,
}

impl BootstrapMonitor {
pub fn new(bootstraps: Vec<PeerAddress>, interval: Interval, p2p_client: Client) -> Self {
Self {
bootstraps,
interval,
p2p_client,
}
}

pub async fn start_monitoring(&mut self) -> Result<()> {
info!("Bootstrap monitor started.");
loop {
self.interval.tick().await;

for (peer, addr) in self.bootstraps.iter().map(Into::into) {
match self
.p2p_client
.dial_peer(peer, vec![addr.clone()], PeerCondition::Always)
.await
{
Ok(_) => {
info!("Bootstrap {peer} dialed successfully!");
},
Err(e) => {
error!("Error dialing bootstrap: {e}");
},
}
}
}
}
}
126 changes: 126 additions & 0 deletions monitor-client/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::num::NonZeroUsize;
use std::time::Duration;

use avail_light_core::network::{p2p::configuration::LibP2PConfig, Network};
use avail_light_core::types::{tracing_level_format, PeerAddress, SecretKey};
use clap::Parser;
use color_eyre::{eyre::eyre, Result};
use serde::{Deserialize, Serialize};
use tracing::Level;

#[derive(Parser)]
#[command(version)]
pub struct CliOpts {
/// Sets verbosity level.
#[arg(long)]
pub verbosity: Option<Level>,
/// Sets logs format to JSON.
#[arg(long)]
pub logs_json: bool,
/// Cleans DB state.
#[arg(long)]
pub clean: bool,
/// Testnet or devnet selection.
#[arg(short, long, value_name = "network", default_value = "hex")]
pub network: Network,
/// Time interval for monitoring actions
#[arg(long, default_value = "10")]
pub interval: u64,
/// Seed string for libp2p keypair generation
#[arg(long)]
pub seed: Option<String>,
/// P2P port
#[arg(short, long)]
pub port: Option<u16>,
/// RocksDB store location
#[arg(long, default_value = "./db")]
pub db_path: String,
#[arg(long, default_value = "5")]
pub connection_idle_timeout: Option<u64>,
#[arg(long, default_value = "10000")]
pub max_negotiating_inbound_streams: Option<usize>,
#[arg(long, default_value = "30000")]
pub task_command_buffer_size: Option<usize>,
#[arg(long, default_value = "10000")]
pub per_connection_event_buffer_size: Option<usize>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
/// Genesis hash of the network to be connected to.
/// Set to "DEV" to connect to any network.
pub genesis_hash: String,
/// Time interval for monitoring actions.
pub interval: u64,
/// Log level.
#[serde(with = "tracing_level_format")]
pub log_level: Level,
/// Log format: JSON for `true`, plain text for `false`.
pub log_format_json: bool,
/// Database file system path.
pub db_path: String,
#[serde(flatten)]
pub libp2p: LibP2PConfig,
}

impl Default for Config {
fn default() -> Self {
Self {
genesis_hash: "DEV".to_owned(),
log_level: Level::INFO,
log_format_json: false,
db_path: "./db".to_string(),
libp2p: Default::default(),
interval: 10,
}
}
}

pub fn load(opts: &CliOpts) -> Result<Config> {
let mut config = Config::default();

config.log_level = opts.verbosity.unwrap_or(config.log_level);
config.log_format_json = opts.logs_json || config.log_format_json;

let bootstrap = (
opts.network.bootstrap_peer_id(),
opts.network.bootstrap_multiaddr(),
);
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
config.genesis_hash = opts.network.genesis_hash().to_string();

if let Some(seed) = &opts.seed {
config.libp2p.secret_key = Some(SecretKey::Seed {
seed: seed.to_string(),
})
}

if let Some(port) = opts.port {
config.libp2p.port = port;
}

if let Some(connection_idle_timeout) = opts.connection_idle_timeout {
config.libp2p.connection_idle_timeout = Duration::from_secs(connection_idle_timeout);
}

if let Some(max_negotiating_inbound_streams) = opts.max_negotiating_inbound_streams {
config.libp2p.max_negotiating_inbound_streams = max_negotiating_inbound_streams;
}
if let Some(task_command_buffer_size) = opts.task_command_buffer_size {
config.libp2p.task_command_buffer_size =
NonZeroUsize::new(task_command_buffer_size).unwrap();
}

if let Some(per_connection_event_buffer_size) = opts.per_connection_event_buffer_size {
config.libp2p.per_connection_event_buffer_size = per_connection_event_buffer_size;
}

config.db_path = opts.db_path.to_string();
config.interval = opts.interval;

if config.libp2p.bootstraps.is_empty() {
return Err(eyre!("List of bootstraps must not be empty!"));
}
Ok(config)
}
Loading

0 comments on commit 9824c45

Please sign in to comment.