Skip to content

Commit

Permalink
implement basic bootstrap monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
sh3ll3x3c committed Nov 25, 2024
1 parent faf7b39 commit 4bdca62
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 45 deletions.
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ members = [
"compatibility-tests",
"core",
"crawler",
"fat",
"fat",
"monitor-client",
"relay",
]
Expand Down
5 changes: 2 additions & 3 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use avail_light_core::{
sync_finality::SyncFinality,
telemetry::{self, otlp::Metrics, MetricCounter, MetricValue},
types::{
load_or_init_suri, Delay, IdentityConfig, MaintenanceConfig, MultiaddrConfig, SecretKey,
Uuid,
load_or_init_suri, Delay, IdentityConfig, MaintenanceConfig, PeerAddress, SecretKey, Uuid,
},
utils::{default_subscriber, install_panic_hooks, json_subscriber, spawn_in_span},
};
Expand Down Expand Up @@ -365,7 +364,7 @@ pub fn load_runtime_config(opts: &CliOpts) -> Result<RuntimeConfig> {
if let Some(network) = &opts.network {
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
cfg.rpc.full_node_ws = network.full_node_ws();
cfg.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
cfg.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
cfg.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
cfg.genesis_hash = network.genesis_hash().to_string();
}
Expand Down
1 change: 1 addition & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.0.5

- Rename `MultiaddrConfig` type to `PeerAddress` for better clarity
- Enable WASM compilation on utils and shutdown mods
- Allocate new port on each new dial attempt
- Set different dial conditions for bootstrap process and diagnostics API
Expand Down
6 changes: 3 additions & 3 deletions core/src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
event_loop::ConnectionEstablishedInfo, is_global, is_multiaddr_global, Command, EventLoop,
MultiAddressInfo, OutputEvent, PeerInfo, QueryChannel,
};
use crate::types::MultiaddrConfig;
use crate::types::PeerAddress;

#[derive(Clone)]
pub struct Client {
Expand Down Expand Up @@ -137,8 +137,8 @@ impl Client {
Box::new(move |context: &mut EventLoop| {
let opts = DialOpts::peer_id(peer_id)
.addresses(peer_address)
.condition(PeerCondition::Always)
.allocate_new_port()
.condition(dial_condition)
.build();
context.swarm.dial(opts)?;

Expand Down Expand Up @@ -166,7 +166,7 @@ impl Client {

// Bootstrap is triggered automatically on add_address call
// Bootstrap nodes are also used as autonat servers
pub async fn bootstrap_on_startup(&self, bootstraps: &[MultiaddrConfig]) -> Result<()> {
pub async fn bootstrap_on_startup(&self, bootstraps: &[PeerAddress]) -> Result<()> {
for (peer, addr) in bootstraps.iter().map(Into::into) {
self.dial_peer(peer, vec![addr.clone()], PeerCondition::Always)
.await
Expand Down
6 changes: 3 additions & 3 deletions core/src/network/p2p/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{protocol_name, ProvidersConfig};
use crate::network::p2p::MemoryStoreConfig;
use crate::types::{duration_seconds_format, KademliaMode, MultiaddrConfig, SecretKey};
use crate::types::{duration_seconds_format, KademliaMode, PeerAddress, SecretKey};
use libp2p::{kad, multiaddr::Protocol, Multiaddr};
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -139,7 +139,7 @@ pub struct LibP2PConfig {
#[serde(flatten)]
pub kademlia: KademliaConfig,
/// Vector of Relay nodes, which are used for hole punching
pub relays: Vec<MultiaddrConfig>,
pub relays: Vec<PeerAddress>,
/// Sets the amount of time to keep connections alive when they're idle. (default: 10s).
#[serde(with = "duration_seconds_format")]
pub connection_idle_timeout: Duration,
Expand All @@ -148,7 +148,7 @@ pub struct LibP2PConfig {
pub per_connection_event_buffer_size: usize,
pub dial_concurrency_factor: NonZeroU8,
/// Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty).
pub bootstraps: Vec<MultiaddrConfig>,
pub bootstraps: Vec<PeerAddress>,
/// Maximum number of parallel tasks spawned for GET and PUT operations on DHT (default: 20).
pub dht_parallelization_limit: usize,
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,16 @@ impl TryFrom<String> for CompactMultiaddress {
untagged,
expecting = "Valid multiaddress/peer_id string or a tuple (peer_id, multiaddress) expected"
)]
pub enum MultiaddrConfig {
pub enum PeerAddress {
Compact(CompactMultiaddress),
PeerIdAndMultiaddr((PeerId, Multiaddr)),
}

impl From<&MultiaddrConfig> for (PeerId, Multiaddr) {
fn from(value: &MultiaddrConfig) -> Self {
impl From<&PeerAddress> for (PeerId, Multiaddr) {
fn from(value: &PeerAddress) -> Self {
match value {
MultiaddrConfig::Compact(CompactMultiaddress(value)) => value.clone(),
MultiaddrConfig::PeerIdAndMultiaddr(value) => value.clone(),
PeerAddress::Compact(CompactMultiaddress(value)) => value.clone(),
PeerAddress::PeerIdAndMultiaddr(value) => value.clone(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crawler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use avail_light_core::{
Network,
},
telemetry::otlp::OtelConfig,
types::{block_matrix_partition_format, tracing_level_format, MultiaddrConfig, Origin},
types::{block_matrix_partition_format, tracing_level_format, Origin, PeerAddress},
};
use avail_rust::kate_recovery::matrix::Partition;
use clap::{command, Parser};
Expand Down Expand Up @@ -109,7 +109,7 @@ pub fn load(opts: &CliOpts) -> Result<Config> {
if let Some(network) = &opts.network {
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
config.rpc.full_node_ws = network.full_node_ws();
config.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
config.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
config.genesis_hash = network.genesis_hash().to_string();
}
Expand Down
4 changes: 2 additions & 2 deletions fat/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use avail_light_core::{
telemetry::otlp::OtelConfig,
types::{
block_matrix_partition_format, option_duration_seconds_format, tracing_level_format,
MultiaddrConfig, SecretKey,
PeerAddress, SecretKey,
},
};
use avail_rust::kate_recovery::matrix::Partition;
Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn load(opts: &CliOpts) -> Result<Config> {
if let Some(network) = &opts.network {
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
config.rpc.full_node_ws = network.full_node_ws();
config.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
config.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
config.genesis_hash = network.genesis_hash().to_string();
}
Expand Down
7 changes: 6 additions & 1 deletion monitor-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[package]
name = "monitor-client"
name = "avail-light-monitor"
version = "0.1.0"
authors.workspace = true
build = "../build.rs"
edition = "2021"
repository.workspace = true

Expand All @@ -14,3 +15,7 @@ serde = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

[features]
rocksdb = ["avail-light-core/rocksdb"]
default = ["rocksdb"]
38 changes: 38 additions & 0 deletions monitor-client/src/bootstrap_monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use avail_light_core::{network::p2p::Client, types::PeerAddress};
use color_eyre::Result;
use tokio::time::Interval;
use tracing::{error, info};

pub struct BootstrapMonitor {
bootstraps: Vec<PeerAddress>,
interval: Interval,
p2p_client: Client,
}

impl BootstrapMonitor {
pub fn new(bootstraps: Vec<PeerAddress>, interval: Interval, p2p_client: Client) -> Self {
Self {
bootstraps,
interval,
p2p_client,
}
}

pub async fn start_monitoring(&mut self) -> Result<()> {
info!("Bootstrap monitor started.");
loop {
self.interval.tick().await;

for (peer, addr) in self.bootstraps.iter().map(Into::into) {
match self.p2p_client.dial_peer(peer, vec![addr.clone()]).await {

Check failure on line 27 in monitor-client/src/bootstrap_monitor.rs

View workflow job for this annotation

GitHub Actions / cargo test

this method takes 3 arguments but 2 arguments were supplied
Ok(_) => {
info!("Bootstrap {peer} dialed successfully!");
},
Err(e) => {
error!("Error dialing bootstrap: {e}");
},
}
}
}
}
}
13 changes: 10 additions & 3 deletions monitor-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::num::NonZeroUsize;
use std::time::Duration;

use avail_light_core::network::{p2p::configuration::LibP2PConfig, Network};
use avail_light_core::types::{tracing_level_format, MultiaddrConfig, SecretKey};
use avail_light_core::types::{tracing_level_format, PeerAddress, SecretKey};
use clap::Parser;
use color_eyre::{eyre::eyre, Result};
use serde::{Deserialize, Serialize};
Expand All @@ -23,6 +23,9 @@ pub struct CliOpts {
/// Testnet or devnet selection.
#[arg(short, long, value_name = "network", default_value = "hex")]
pub network: Network,
/// Time interval for monitoring actions
#[arg(long, default_value = "10")]
pub interval: u64,
/// Seed string for libp2p keypair generation
#[arg(long)]
pub seed: Option<String>,
Expand All @@ -32,7 +35,7 @@ pub struct CliOpts {
/// RocksDB store location
#[arg(long, default_value = "./db")]
pub db_path: String,
#[arg(long, default_value = "10")]
#[arg(long, default_value = "5")]
pub connection_idle_timeout: Option<u64>,
#[arg(long, default_value = "10000")]
pub max_negotiating_inbound_streams: Option<usize>,
Expand All @@ -48,6 +51,8 @@ pub struct Config {
/// Genesis hash of the network to be connected to.
/// Set to "DEV" to connect to any network.
pub genesis_hash: String,
/// Time interval for monitoring actions.
pub interval: u64,
/// Log level.
#[serde(with = "tracing_level_format")]
pub log_level: Level,
Expand All @@ -67,6 +72,7 @@ impl Default for Config {
log_format_json: false,
db_path: "./db".to_string(),
libp2p: Default::default(),
interval: 10,
}
}
}
Expand All @@ -81,7 +87,7 @@ pub fn load(opts: &CliOpts) -> Result<Config> {
opts.network.bootstrap_peer_id(),
opts.network.bootstrap_multiaddr(),
);
config.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
config.genesis_hash = opts.network.genesis_hash().to_string();

if let Some(seed) = &opts.seed {
Expand Down Expand Up @@ -111,6 +117,7 @@ pub fn load(opts: &CliOpts) -> Result<Config> {
}

config.db_path = opts.db_path.to_string();
config.interval = opts.interval;

if config.libp2p.bootstraps.is_empty() {
return Err(eyre!("List of bootstraps must not be empty!"));
Expand Down
51 changes: 43 additions & 8 deletions monitor-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use std::time::Duration;

use bootstrap_monitor::BootstrapMonitor;
use tokio::time;

use avail_light_core::{
data,
network::p2p::{self, configuration::LibP2PConfig},
network::p2p::{self},
shutdown::Controller,
types::ProjectName,
utils::spawn_in_span,
utils::{default_subscriber, json_subscriber, spawn_in_span},
};
use clap::Parser;
use color_eyre::{eyre::Context, Result};
use tracing::info;
use tracing::{error, info};

mod bootstrap_monitor;
mod config;

#[tokio::main]
Expand All @@ -17,26 +23,38 @@ async fn main() -> Result<()> {
let version = clap::crate_version!();

let opts = config::CliOpts::parse();
let config = config::load(&opts)?;

let db = data::DB::open(&opts.db_path)?;
if config.log_format_json {
tracing::subscriber::set_global_default(json_subscriber(config.log_level))?;
} else {
tracing::subscriber::set_global_default(default_subscriber(config.log_level))?;
}
info!("Using configuration: {config:?}");

#[cfg(not(feature = "rocksdb"))]
let db = data::DB::default();
#[cfg(feature = "rocksdb")]
let db = data::DB::open(&config.db_path)?;

let config = config::load(&opts)?;
let shutdown = Controller::new();

let (p2p_keypair, p2p_peer_id) = p2p::identity(&config.libp2p, db.clone())?;
let (p2p_keypair, _) = p2p::identity(&config.libp2p, db.clone())?;

let (p2p_client, p2p_event_loop, p2p_event_receiver) = p2p::init(
let (p2p_client, p2p_event_loop, _) = p2p::init(
config.libp2p.clone(),
ProjectName::new("avail".to_string()),
p2p_keypair,
version,
opts.network.genesis_hash(),
&config.genesis_hash,
true,
shutdown.clone(),
db.clone(),
)
.await?;

info!("Starting event loop");

spawn_in_span(shutdown.with_cancel(p2p_event_loop.run()));

let addrs = vec![config.libp2p.tcp_multiaddress()];
Expand All @@ -47,5 +65,22 @@ async fn main() -> Result<()> {
.wrap_err("Error starting listener.")?;
info!("TCP listener started on port {}", config.libp2p.port);

let interval = time::interval(Duration::from_secs(config.interval));

// 1. Test bootstrap availability

let mut bootstrap_monitor =
BootstrapMonitor::new(config.libp2p.bootstraps, interval, p2p_client);
_ = spawn_in_span(shutdown.with_cancel(async move {
if let Err(e) = bootstrap_monitor.start_monitoring().await {
error!("Bootstrap monitor error: {e}");
};
}))
.await?;

// 2. Test the number of discovered clients from the bootstrap
// 3. Test server nodes availability
// 4. Test server latencies

Ok(())
}

0 comments on commit 4bdca62

Please sign in to comment.