Skip to content

Commit

Permalink
feat(consensus): run network manager in consensus
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-starkware committed Nov 7, 2024
1 parent 21c050d commit f456a82
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/consensus_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ papyrus_protobuf.workspace = true
serde.workspace = true
starknet_batcher_types.workspace = true
starknet_sequencer_infra.workspace = true
tokio.workspace = true
tracing.workspace = true
validator.workspace = true
24 changes: 20 additions & 4 deletions crates/consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::SinkExt;
use libp2p::PeerId;
use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError};
use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext;
use papyrus_network::network_manager::BroadcastTopicClient;
use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::ConsensusMessage;
use starknet_batcher_types::communication::SharedBatcherClient;
Expand All @@ -18,6 +18,10 @@ use tracing::{error, info};

use crate::config::ConsensusManagerConfig;

// TODO(Dan, Guy): move to config.
pub const BROADCAST_BUFFER_SIZE: usize = 100;
pub const NETWORK_TOPIC: &str = "consensus_proposals";

#[derive(Clone)]
pub struct ConsensusManager {
pub config: ConsensusManagerConfig,
Expand All @@ -30,21 +34,33 @@ impl ConsensusManager {
}

pub async fn run(&self) -> Result<(), ConsensusError> {
let network_manager =
NetworkManager::new(self.config.consensus_config.network_config.clone(), None);
let context = SequencerConsensusContext::new(
Arc::clone(&self.batcher_client),
self.config.consensus_config.num_validators,
);

papyrus_consensus::run_consensus(
let mut network_handle = tokio::task::spawn(network_manager.run());
let consensus_task = papyrus_consensus::run_consensus(
context,
self.config.consensus_config.start_height,
self.config.consensus_config.validator_id,
self.config.consensus_config.consensus_delay,
self.config.consensus_config.timeouts.clone(),
create_fake_network_channels(),
futures::stream::pending(),
)
.await
);

tokio::select! {
consensus_result = consensus_task => {
match consensus_result {
Ok(_) => panic!("Consensus task finished unexpectedly"),
Err(e) => Err(e),
}
},
_ = &mut network_handle => panic!("Consensus Network handle finished unexpectedly"),
}
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_node/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Expects to receive 2 groupings of arguments:
//! 1. TestConfig - these are prefixed with `--test.` in the command.
//! 2. NodeConfig - any argument lacking the above prefix is assumed to be in NodeConfig.

use clap::Parser;
use futures::stream::StreamExt;
use papyrus_consensus::config::ConsensusConfig;
Expand Down
14 changes: 7 additions & 7 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#[path = "run_test.rs"]
mod run_test;

use std::future::pending;
use std::future;
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -152,7 +152,7 @@ async fn spawn_rpc_server(
_pending_classes: Arc<RwLock<PendingClasses>>,
_storage_reader: StorageReader,
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
Ok(tokio::spawn(pending()))
Ok(tokio::spawn(future::pending()))
}

fn spawn_monitoring_server(
Expand All @@ -178,7 +178,7 @@ fn spawn_consensus(
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
let (Some(config), Some(network_manager)) = (config, network_manager) else {
info!("Consensus is disabled.");
return Ok(tokio::spawn(pending()));
return Ok(tokio::spawn(future::pending()));
};
let config = config.clone();
debug!("Consensus configuration: {config:?}");
Expand Down Expand Up @@ -248,7 +248,7 @@ async fn spawn_sync_client(
(Some(_), Some(_)) => {
panic!("One of --sync.#is_none or --p2p_sync.#is_none must be turned on");
}
(None, None) => tokio::spawn(pending()),
(None, None) => tokio::spawn(future::pending()),
(Some(sync_config), None) => {
let configs = (sync_config, config.central.clone(), config.base_layer.clone());
let storage = (storage_reader.clone(), storage_writer);
Expand Down Expand Up @@ -294,7 +294,7 @@ fn spawn_p2p_sync_server(
) -> JoinHandle<anyhow::Result<()>> {
let Some(network_manager) = network_manager else {
info!("P2P Sync is disabled.");
return tokio::spawn(pending());
return tokio::spawn(future::pending());
};

let header_server_receiver = network_manager
Expand Down Expand Up @@ -404,7 +404,7 @@ async fn run_threads(
} else {
match resources.maybe_network_manager {
Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }),
None => tokio::spawn(pending()),
None => tokio::spawn(future::pending()),
}
};
tokio::select! {
Expand Down Expand Up @@ -460,7 +460,7 @@ fn spawn_storage_metrics_collector(
interval: Duration,
) -> JoinHandle<anyhow::Result<()>> {
if !collect_metrics {
return tokio::spawn(pending());
return tokio::spawn(future::pending());
}

tokio::spawn(
Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::types::{
// TODO(dvir): add test for this.
#[instrument(skip_all, level = "info")]
#[allow(missing_docs)]
#[allow(clippy::too_many_arguments)]
pub async fn run_consensus<ContextT, SyncReceiverT>(
mut context: ContextT,
start_height: BlockNumber,
Expand Down

0 comments on commit f456a82

Please sign in to comment.