Skip to content

Commit

Permalink
Split fat client and maintenance from light client.
Browse files Browse the repository at this point in the history
* Split fat client from light client.

* Split maintenance from light client.
  • Loading branch information
aterentic-ethernal authored Dec 5, 2023
1 parent 9656477 commit a429a72
Show file tree
Hide file tree
Showing 7 changed files with 559 additions and 413 deletions.
79 changes: 46 additions & 33 deletions src/bin/avail-light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use avail_light::{api, data, network::rpc, telemetry};
use avail_light::{
consts::EXPECTED_NETWORK_VERSION,
network::p2p,
types::{CliOpts, Mode, RuntimeConfig, State},
types::{CliOpts, RuntimeConfig, State},
};
use clap::Parser;
use color_eyre::{
Expand Down Expand Up @@ -231,7 +231,6 @@ async fn run(error_sender: Sender<Report>) -> Result<()> {
rpc::init(db.clone(), state.clone(), &cfg.full_node_ws);

let publish_rpc_event_receiver = rpc_events.subscribe();
let lc_rpc_event_receiver = rpc_events.subscribe();
let first_header_rpc_event_receiver = rpc_events.subscribe();
#[cfg(feature = "crawl")]
let crawler_rpc_event_receiver = rpc_events.subscribe();
Expand Down Expand Up @@ -271,42 +270,37 @@ async fn run(error_sender: Sender<Report>) -> Result<()> {

tokio::task::spawn(server.run());

let (block_tx, data_rx) = if let Mode::AppClient(app_id) = Mode::from(cfg.app_id) {
// communication channels being established for talking to
// libp2p backed application client
let (block_tx, block_rx) = broadcast::channel::<avail_light::types::BlockVerified>(1 << 7);
let (block_tx, block_rx) = broadcast::channel::<avail_light::types::BlockVerified>(1 << 7);

let data_rx = cfg.app_id.map(AppId).map(|app_id| {
let (data_tx, data_rx) = broadcast::channel::<(u32, AppData)>(1 << 7);
tokio::task::spawn(avail_light::app_client::run(
(&cfg).into(),
db.clone(),
p2p_client.clone(),
rpc_client.clone(),
AppId(app_id),
block_rx,
app_id,
block_tx.subscribe(),
pp.clone(),
state.clone(),
sync_range.clone(),
data_tx,
error_sender.clone(),
));
(Some(block_tx), Some(data_rx))
} else {
(None, None)
};
data_rx
});

tokio::task::spawn(api::v2::publish(
api::v2::types::Topic::HeaderVerified,
publish_rpc_event_receiver,
ws_clients.clone(),
));

if let Some(sender) = block_tx.as_ref() {
tokio::task::spawn(api::v2::publish(
api::v2::types::Topic::ConfidenceAchieved,
sender.subscribe(),
ws_clients.clone(),
));
}
tokio::task::spawn(api::v2::publish(
api::v2::types::Topic::ConfidenceAchieved,
block_tx.subscribe(),
ws_clients.clone(),
));

if let Some(data_rx) = data_rx {
tokio::task::spawn(api::v2::publish(
Expand Down Expand Up @@ -364,25 +358,44 @@ async fn run(error_sender: Sender<Report>) -> Result<()> {
s.finality_synced = true;
}

let light_client =
avail_light::light_client::new(db.clone(), p2p_client.clone(), rpc_client.clone());

let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc);
tokio::task::spawn(avail_light::maintenance::run(
p2p_client.clone(),
ot_metrics.clone(),
block_rx,
error_sender.clone(),
));

let lc_channels = avail_light::light_client::Channels {
let channels = avail_light::types::ClientChannels {
block_sender: block_tx,
rpc_event_receiver: lc_rpc_event_receiver,
rpc_event_receiver: rpc_events.subscribe(),
error_sender: error_sender.clone(),
};

tokio::task::spawn(avail_light::light_client::run(
light_client,
light_network_client,
(&cfg).into(),
ot_metrics,
state.clone(),
lc_channels,
));
if let Some(partition) = cfg.block_matrix_partition {
let fat_client =
avail_light::fat_client::new(db.clone(), p2p_client.clone(), rpc_client.clone());

tokio::task::spawn(avail_light::fat_client::run(
fat_client,
(&cfg).into(),
ot_metrics.clone(),
channels,
partition,
));
} else {
let light_client = avail_light::light_client::new(db.clone());

let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc);

tokio::task::spawn(avail_light::light_client::run(
light_client,
light_network_client,
(&cfg).into(),
ot_metrics,
state.clone(),
channels,
));
}

Ok(())
}
Expand Down
Loading

0 comments on commit a429a72

Please sign in to comment.