diff --git a/Cargo.lock b/Cargo.lock index c99f7b2f6..5fea18d04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1906,6 +1906,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1943,6 +1953,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "page_size" version = "0.6.0" @@ -2564,6 +2580,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2960,6 +2985,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", ] [[package]] @@ -2968,7 +3005,12 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", "tracing-core", + "tracing-log", ] [[package]] @@ -3051,6 +3093,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.5" diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index c8ccd5a6f..325406bf7 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -23,7 +23,7 @@ cuprate-p2p-core = { path = "../../p2p/p2p-core" } cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" } cuprate-async-buffer = { path = "../../p2p/async-buffer" } cuprate-address-book = { path = "../../p2p/address-book" } -cuprate-blockchain = { path = "../../storage/blockchain" } +cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] } cuprate-database-service = { path = "../../storage/service" } cuprate-txpool = { path = "../../storage/txpool" } cuprate-database = { path = "../../storage/database" } @@ -70,8 +70,14 @@ tokio-util = { workspace = true } tokio-stream = { workspace = true } tokio = { workspace = true } tower = { workspace = true } -tracing-subscriber = { workspace = true } +tracing-subscriber = { workspace = true, features = ["std", "fmt", "default"] } tracing = { workspace = true } [lints] workspace = true + +[profile.dev] +panic = "abort" + +[profile.release] +panic = "abort" diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 4abebeb61..a06f3fa73 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -1,6 +1,101 @@ //! Blockchain //! -//! Will contain the chain manager and syncer. +//! Contains the blockchain manager, syncer and an interface to mutate the blockchain. +use std::sync::Arc; +use futures::FutureExt; +use tokio::sync::{mpsc, Notify}; +use tower::{BoxError, Service, ServiceExt}; + +use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig}; +use cuprate_cryptonight::cryptonight_hash_v0; +use cuprate_p2p::{block_downloader::BlockDownloaderConfig, NetworkInterface}; +use cuprate_p2p_core::{ClearNet, Network}; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainWriteRequest}, + VerifiedBlockInformation, +}; + +use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; + +mod chain_service; +pub mod interface; mod manager; mod syncer; +mod types; + +use types::{ + ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, +}; + +/// Checks if the genesis block is in the blockchain and adds it if not. +pub async fn check_add_genesis( + blockchain_read_handle: &mut BlockchainReadHandle, + blockchain_write_handle: &mut BlockchainWriteHandle, + network: Network, +) { + // Try to get the chain height, will fail if the genesis block is not in the DB. + if blockchain_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainReadRequest::ChainHeight) + .await + .is_ok() + { + return; + } + + let genesis = generate_genesis_block(network); + + assert_eq!(genesis.miner_transaction.prefix().outputs.len(), 1); + assert!(genesis.transactions.is_empty()); + + blockchain_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainWriteRequest::WriteBlock( + VerifiedBlockInformation { + block_blob: genesis.serialize(), + txs: vec![], + block_hash: genesis.hash(), + pow_hash: cryptonight_hash_v0(&genesis.serialize_pow_hash()), + height: 0, + generated_coins: genesis.miner_transaction.prefix().outputs[0] + .amount + .unwrap(), + weight: genesis.miner_transaction.weight(), + long_term_weight: genesis.miner_transaction.weight(), + cumulative_difficulty: 1, + block: genesis, + }, + )) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); +} + +/// Initializes the consensus services. +pub async fn init_consensus( + blockchain_read_handle: BlockchainReadHandle, + context_config: ContextConfig, +) -> Result< + ( + ConcreteBlockVerifierService, + ConcreteTxVerifierService, + BlockChainContextService, + ), + BoxError, +> { + let read_handle = ConsensusBlockchainReadHandle::new(blockchain_read_handle, BoxError::from); + + let ctx_service = + cuprate_consensus::initialize_blockchain_context(context_config, read_handle.clone()) + .await?; + + let (block_verifier_svc, tx_verifier_svc) = + cuprate_consensus::initialize_verifier(read_handle, ctx_service.clone()); + + Ok((block_verifier_svc, tx_verifier_svc, ctx_service)) +} diff --git a/binaries/cuprated/src/blockchain/chain_service.rs b/binaries/cuprated/src/blockchain/chain_service.rs new file mode 100644 index 000000000..af862d1d2 --- /dev/null +++ b/binaries/cuprated/src/blockchain/chain_service.rs @@ -0,0 +1,72 @@ +use std::task::{Context, Poll}; + +use futures::{future::BoxFuture, FutureExt, TryFutureExt}; +use tower::Service; + +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; + +/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out +/// what blocks we need. +/// +/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier. +#[derive(Clone)] +pub struct ChainService(pub BlockchainReadHandle); + +impl Service for ChainService { + type Response = ChainSvcResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: ChainSvcRequest) -> Self::Future { + let map_res = |res: BlockchainResponse| match res { + BlockchainResponse::CompactChainHistory { + block_ids, + cumulative_difficulty, + } => ChainSvcResponse::CompactHistory { + block_ids, + cumulative_difficulty, + }, + BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res), + _ => unreachable!(), + }; + + match req { + ChainSvcRequest::CompactHistory => self + .0 + .call(BlockchainReadRequest::CompactChainHistory) + .map_ok(map_res) + .map_err(Into::into) + .boxed(), + ChainSvcRequest::FindFirstUnknown(req) => self + .0 + .call(BlockchainReadRequest::FindFirstUnknown(req)) + .map_ok(map_res) + .map_err(Into::into) + .boxed(), + ChainSvcRequest::CumulativeDifficulty => self + .0 + .call(BlockchainReadRequest::CompactChainHistory) + .map_ok(|res| { + // TODO create a custom request instead of hijacking this one. + // TODO: use the context cache. + let BlockchainResponse::CompactChainHistory { + cumulative_difficulty, + .. + } = res + else { + unreachable!() + }; + + ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty) + }) + .map_err(Into::into) + .boxed(), + } + } +} diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs new file mode 100644 index 000000000..985e60d80 --- /dev/null +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -0,0 +1,161 @@ +//! The blockchain manager interface. +//! +//! This module contains all the functions to mutate the blockchain's state in any way, through the +//! blockchain manager. +use std::{ + collections::{HashMap, HashSet}, + sync::{LazyLock, Mutex, OnceLock}, +}; + +use monero_serai::{block::Block, transaction::Transaction}; +use rayon::prelude::*; +use tokio::sync::{mpsc, oneshot}; +use tower::{Service, ServiceExt}; + +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::transactions::new_tx_verification_data; +use cuprate_helper::cast::usize_to_u64; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + Chain, +}; + +use crate::{ + blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk}, + constants::PANIC_CRITICAL_SERVICE_ERROR, +}; + +/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager. +/// +/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions +/// in this file document what happens if this is not initialized when they are called. +pub(super) static COMMAND_TX: OnceLock> = OnceLock::new(); + +/// An error that can be returned from [`handle_incoming_block`]. +#[derive(Debug, thiserror::Error)] +pub enum IncomingBlockError { + /// Some transactions in the block were unknown. + /// + /// The inner values are the block hash and the indexes of the missing txs in the block. + #[error("Unknown transactions in block.")] + UnknownTransactions([u8; 32], Vec), + /// We are missing the block's parent. + #[error("The block has an unknown parent.")] + Orphan, + /// The block was invalid. + #[error(transparent)] + InvalidBlock(anyhow::Error), +} + +/// Try to add a new block to the blockchain. +/// +/// On success returns [`IncomingBlockOk`]. +/// +/// # Errors +/// +/// This function will return an error if: +/// - the block was invalid +/// - we are missing transactions +/// - the block's parent is unknown +pub async fn handle_incoming_block( + block: Block, + given_txs: Vec, + blockchain_read_handle: &mut BlockchainReadHandle, +) -> Result { + /// A [`HashSet`] of block hashes that the blockchain manager is currently handling. + /// + /// This lock prevents sending the same block to the blockchain manager from multiple connections + /// before one of them actually gets added to the chain, allowing peers to do other things. + /// + /// This is used over something like a dashmap as we expect a lot of collisions in a short amount of + /// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks` + /// which are also more expensive than `Mutex`s. + static BLOCKS_BEING_HANDLED: LazyLock>> = + LazyLock::new(|| Mutex::new(HashSet::new())); + // FIXME: we should look in the tx-pool for txs when that is ready. + + if !block_exists(block.header.previous, blockchain_read_handle) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + { + return Err(IncomingBlockError::Orphan); + } + + let block_hash = block.hash(); + + if block_exists(block_hash, blockchain_read_handle) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + { + return Ok(IncomingBlockOk::AlreadyHave); + } + + // TODO: remove this when we have a working tx-pool. + if given_txs.len() != block.transactions.len() { + return Err(IncomingBlockError::UnknownTransactions( + block_hash, + (0..usize_to_u64(block.transactions.len())).collect(), + )); + } + + // TODO: check we actually got given the right txs. + let prepped_txs = given_txs + .into_par_iter() + .map(|tx| { + let tx = new_tx_verification_data(tx)?; + Ok((tx.tx_hash, tx)) + }) + .collect::>() + .map_err(IncomingBlockError::InvalidBlock)?; + + let Some(incoming_block_tx) = COMMAND_TX.get() else { + // We could still be starting up the blockchain manager. + return Ok(IncomingBlockOk::NotReady); + }; + + // Add the blocks hash to the blocks being handled. + if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) { + // If another place is already adding this block then we can stop. + return Ok(IncomingBlockOk::AlreadyHave); + } + + // From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`. + + let (response_tx, response_rx) = oneshot::channel(); + + incoming_block_tx + .send(BlockchainManagerCommand::AddBlock { + block, + prepped_txs, + response_tx, + }) + .await + .expect("TODO: don't actually panic here, an err means we are shutting down"); + + let res = response_rx + .await + .expect("The blockchain manager will always respond") + .map_err(IncomingBlockError::InvalidBlock); + + // Remove the block hash from the blocks being handled. + BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash); + + res +} + +/// Check if we have a block with the given hash. +async fn block_exists( + block_hash: [u8; 32], + blockchain_read_handle: &mut BlockchainReadHandle, +) -> Result { + let BlockchainResponse::FindBlock(chain) = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::FindBlock(block_hash)) + .await? + else { + unreachable!(); + }; + + Ok(chain.is_some()) +} diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 8b1378917..568ed572d 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1 +1,143 @@ +use std::{collections::HashMap, sync::Arc}; +use futures::StreamExt; +use monero_serai::block::Block; +use tokio::sync::{mpsc, oneshot, Notify}; +use tower::{Service, ServiceExt}; +use tracing::error; + +use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::{ + context::RawBlockChainContext, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, BlockVerifierService, ExtendedConsensusError, TxVerifierService, + VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, +}; +use cuprate_p2p::{ + block_downloader::{BlockBatch, BlockDownloaderConfig}, + BroadcastSvc, NetworkInterface, +}; +use cuprate_p2p_core::ClearNet; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + Chain, TransactionVerificationData, +}; + +use crate::{ + blockchain::{ + chain_service::ChainService, + interface::COMMAND_TX, + syncer, + types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle}, + }, + constants::PANIC_CRITICAL_SERVICE_ERROR, +}; + +mod commands; +mod handler; + +pub use commands::{BlockchainManagerCommand, IncomingBlockOk}; + +/// Initialize the blockchain manager. +/// +/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface) +/// can be called. +pub async fn init_blockchain_manager( + clearnet_interface: NetworkInterface, + blockchain_write_handle: BlockchainWriteHandle, + blockchain_read_handle: BlockchainReadHandle, + mut blockchain_context_service: BlockChainContextService, + block_verifier_service: ConcreteBlockVerifierService, + block_downloader_config: BlockDownloaderConfig, +) { + // TODO: find good values for these size limits + let (batch_tx, batch_rx) = mpsc::channel(1); + let stop_current_block_downloader = Arc::new(Notify::new()); + let (command_tx, command_rx) = mpsc::channel(3); + + COMMAND_TX.set(command_tx).unwrap(); + + tokio::spawn(syncer::syncer( + blockchain_context_service.clone(), + ChainService(blockchain_read_handle.clone()), + clearnet_interface.clone(), + batch_tx, + Arc::clone(&stop_current_block_downloader), + block_downloader_config, + )); + + let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::GetContext) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + let manager = BlockchainManager { + blockchain_write_handle, + blockchain_read_handle, + blockchain_context_service, + cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), + block_verifier_service, + stop_current_block_downloader, + broadcast_svc: clearnet_interface.broadcast_svc(), + }; + + tokio::spawn(manager.run(batch_rx, command_rx)); +} + +/// The blockchain manager. +/// +/// This handles all mutation of the blockchain, anything that changes the state of the blockchain must +/// go through this. +/// +/// Other parts of Cuprate can interface with this by using the functions in [`interface`](super::interface). +pub struct BlockchainManager { + /// The [`BlockchainWriteHandle`], this is the _only_ part of Cuprate where a [`BlockchainWriteHandle`] + /// is held. + blockchain_write_handle: BlockchainWriteHandle, + /// A [`BlockchainReadHandle`]. + blockchain_read_handle: BlockchainReadHandle, + // TODO: Improve the API of the cache service. + // TODO: rename the cache service -> `BlockchainContextService`. + /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve + /// values without needing to go to a [`BlockchainReadHandle`]. + blockchain_context_service: BlockChainContextService, + /// A cached context representing the current state. + cached_blockchain_context: RawBlockChainContext, + /// The block verifier service, to verify incoming blocks. + block_verifier_service: ConcreteBlockVerifierService, + /// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download + /// attempt. + stop_current_block_downloader: Arc, + /// The broadcast service, to broadcast new blocks. + broadcast_svc: BroadcastSvc, +} + +impl BlockchainManager { + /// The [`BlockchainManager`] task. + pub async fn run( + mut self, + mut block_batch_rx: mpsc::Receiver, + mut command_rx: mpsc::Receiver, + ) { + loop { + tokio::select! { + Some(batch) = block_batch_rx.recv() => { + self.handle_incoming_block_batch( + batch, + ).await; + } + Some(incoming_command) = command_rx.recv() => { + self.handle_command(incoming_command).await; + } + else => { + todo!("TODO: exit the BC manager") + } + } + } + } +} diff --git a/binaries/cuprated/src/blockchain/manager/commands.rs b/binaries/cuprated/src/blockchain/manager/commands.rs new file mode 100644 index 000000000..643ed88cb --- /dev/null +++ b/binaries/cuprated/src/blockchain/manager/commands.rs @@ -0,0 +1,32 @@ +//! This module contains the commands for the blockchain manager. +use std::collections::HashMap; + +use monero_serai::block::Block; +use tokio::sync::oneshot; + +use cuprate_types::TransactionVerificationData; + +/// The blockchain manager commands. +pub enum BlockchainManagerCommand { + /// Attempt to add a new block to the blockchain. + AddBlock { + /// The [`Block`] to add. + block: Block, + /// All the transactions defined in [`Block::transactions`]. + prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, + /// The channel to send the response down. + response_tx: oneshot::Sender>, + }, +} + +/// The [`Ok`] response for an incoming block. +pub enum IncomingBlockOk { + /// The block was added to the main-chain. + AddedToMainChain, + /// The blockchain manager is not ready yet. + NotReady, + /// The block was added to an alt-chain. + AddedToAltChain, + /// We already have the block. + AlreadyHave, +} diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs new file mode 100644 index 000000000..303e2e492 --- /dev/null +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -0,0 +1,484 @@ +//! The blockchain manager handler functions. +use bytes::Bytes; +use futures::{TryFutureExt, TryStreamExt}; +use monero_serai::{block::Block, transaction::Transaction}; +use rayon::prelude::*; +use std::ops::ControlFlow; +use std::{collections::HashMap, sync::Arc}; +use tower::{Service, ServiceExt}; +use tracing::info; + +use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::{ + block::PreparedBlock, context::NewBlockData, transactions::new_tx_verification_data, + BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService, + ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, + VerifyTxResponse, +}; +use cuprate_helper::cast::usize_to_u64; +use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest}; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, + AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, +}; + +use crate::blockchain::manager::commands::IncomingBlockOk; +use crate::{ + blockchain::{ + manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle, + }, + constants::PANIC_CRITICAL_SERVICE_ERROR, + signals::REORG_LOCK, +}; + +impl super::BlockchainManager { + /// Handle an incoming command from another part of Cuprate. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + pub async fn handle_command(&mut self, command: BlockchainManagerCommand) { + match command { + BlockchainManagerCommand::AddBlock { + block, + prepped_txs, + response_tx, + } => { + let res = self.handle_incoming_block(block, prepped_txs).await; + + drop(response_tx.send(res)); + } + } + } + + /// Broadcast a valid block to the network. + async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) { + self.broadcast_svc + .ready() + .await + .expect("Broadcast service is Infallible.") + .call(BroadcastRequest::Block { + block_bytes, + current_blockchain_height: usize_to_u64(blockchain_height), + }) + .await + .expect("Broadcast service is Infallible."); + } + + /// Handle an incoming [`Block`]. + /// + /// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow + /// the top of the main chain. + /// + /// Otherwise, this function will validate and add the block to the main chain. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + pub async fn handle_incoming_block( + &mut self, + block: Block, + prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, + ) -> Result { + if block.header.previous != self.cached_blockchain_context.top_hash { + self.handle_incoming_alt_block(block, prepared_txs).await?; + return Ok(IncomingBlockOk::AddedToAltChain); + } + + let VerifyBlockResponse::MainChain(verified_block) = self + .block_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyBlockRequest::MainChain { + block, + prepared_txs, + }) + .await? + else { + unreachable!(); + }; + + let block_blob = Bytes::copy_from_slice(&verified_block.block_blob); + self.add_valid_block_to_main_chain(verified_block).await; + + self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height) + .await; + + Ok(IncomingBlockOk::AddedToMainChain) + } + + /// Handle an incoming [`BlockBatch`]. + /// + /// This function will route to [`Self::handle_incoming_block_batch_main_chain`] or [`Self::handle_incoming_block_batch_alt_chain`] + /// depending on if the first block in the batch follows from the top of our chain. + /// + /// # Panics + /// + /// This function will panic if the batch is empty or if any internal service returns an unexpected + /// error that we cannot recover from or if the incoming batch contains no blocks. + pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { + let (first_block, _) = batch + .blocks + .first() + .expect("Block batch should not be empty"); + + if first_block.header.previous == self.cached_blockchain_context.top_hash { + self.handle_incoming_block_batch_main_chain(batch).await; + } else { + self.handle_incoming_block_batch_alt_chain(batch).await; + } + } + + /// Handles an incoming [`BlockBatch`] that follows the main chain. + /// + /// This function will handle validating the blocks in the batch and adding them to the blockchain + /// database and context cache. + /// + /// This function will also handle banning the peer and canceling the block downloader if the + /// block is invalid. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from or if the incoming batch contains no blocks. + async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { + info!( + "Handling batch to main chain height: {}", + batch.blocks.first().unwrap().0.number().unwrap() + ); + + let batch_prep_res = self + .block_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { + blocks: batch.blocks, + }) + .await; + + let prepped_blocks = match batch_prep_res { + Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks, + Err(_) => { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); + return; + } + _ => unreachable!(), + }; + + for (block, txs) in prepped_blocks { + let verify_res = self + .block_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyBlockRequest::MainChainPrepped { block, txs }) + .await; + + let verified_block = match verify_res { + Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block, + Err(_) => { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); + return; + } + _ => unreachable!(), + }; + + self.add_valid_block_to_main_chain(verified_block).await; + } + } + + /// Handles an incoming [`BlockBatch`] that does not follow the main-chain. + /// + /// This function will handle validating the alt-blocks to add them to our cache and reorging the + /// chain if the alt-chain has a higher cumulative difficulty. + /// + /// This function will also handle banning the peer and canceling the block downloader if the + /// alt block is invalid or if a reorg fails. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) { + // TODO: this needs testing (this whole section does but alt-blocks specifically). + + let mut blocks = batch.blocks.into_iter(); + + while let Some((block, txs)) = blocks.next() { + // async blocks work as try blocks. + let res = async { + let txs = txs + .into_par_iter() + .map(|tx| { + let tx = new_tx_verification_data(tx)?; + Ok((tx.tx_hash, tx)) + }) + .collect::>()?; + + let reorged = self.handle_incoming_alt_block(block, txs).await?; + + Ok::<_, anyhow::Error>(reorged) + } + .await; + + match res { + Err(e) => { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); + return; + } + Ok(AddAltBlock::Reorged) => { + // Collect the remaining blocks and add them to the main chain instead. + batch.blocks = blocks.collect(); + self.handle_incoming_block_batch_main_chain(batch).await; + return; + } + // continue adding alt blocks. + Ok(AddAltBlock::Cached) => (), + } + } + } + + /// Handles an incoming alt [`Block`]. + /// + /// This function will do some pre-validation of the alt block, then if the cumulative difficulty + /// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add + /// the alt block to the alt block cache. + /// + /// # Errors + /// + /// This will return an [`Err`] if: + /// - The alt block was invalid. + /// - An attempt to reorg the chain failed. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn handle_incoming_alt_block( + &mut self, + block: Block, + prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, + ) -> Result { + let VerifyBlockResponse::AltChain(alt_block_info) = self + .block_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyBlockRequest::AltChain { + block, + prepared_txs, + }) + .await? + else { + unreachable!(); + }; + + // TODO: check in consensus crate if alt block with this hash already exists. + + // If this alt chain + if alt_block_info.cumulative_difficulty + > self.cached_blockchain_context.cumulative_difficulty + { + self.try_do_reorg(alt_block_info).await?; + return Ok(AddAltBlock::Reorged); + } + + self.blockchain_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info)) + .await?; + + Ok(AddAltBlock::Cached) + } + + /// Attempt a re-org with the given top block of the alt-chain. + /// + /// This function will take a write lock on [`REORG_LOCK`] and then set up the blockchain database + /// and context cache to verify the alt-chain. It will then attempt to verify and add each block + /// in the alt-chain to the main-chain. Releasing the lock on [`REORG_LOCK`] when finished. + /// + /// # Errors + /// + /// This function will return an [`Err`] if the re-org was unsuccessful, if this happens the chain + /// will be returned back into its state it was at when then function was called. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn try_do_reorg( + &mut self, + top_alt_block: AltBlockInformation, + ) -> Result<(), anyhow::Error> { + let _guard = REORG_LOCK.write().await; + + let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self + .blockchain_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainReadRequest::AltBlocksInChain( + top_alt_block.chain_id, + )) + .await? + else { + unreachable!(); + }; + + alt_blocks.push(top_alt_block); + + let split_height = alt_blocks[0].height; + let current_main_chain_height = self.cached_blockchain_context.chain_height; + + let BlockchainResponse::PopBlocks(old_main_chain_id) = self + .blockchain_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainWriteRequest::PopBlocks( + current_main_chain_height - split_height + 1, + )) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!(); + }; + + self.blockchain_context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::PopBlocks { + numb_blocks: current_main_chain_height - split_height + 1, + }) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + + let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await; + + match reorg_res { + Ok(()) => Ok(()), + Err(e) => { + todo!("Reverse reorg") + } + } + } + + /// Verify and add a list of [`AltBlockInformation`]s to the main-chain. + /// + /// This function assumes the first [`AltBlockInformation`] is the next block in the blockchain + /// for the blockchain database and the context cache, or in other words that the blockchain database + /// and context cache have already had the top blocks popped to where the alt-chain meets the main-chain. + /// + /// # Errors + /// + /// This function will return an [`Err`] if the alt-blocks were invalid, in this case the re-org should + /// be aborted and the chain should be returned to its previous state. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn verify_add_alt_blocks_to_main_chain( + &mut self, + alt_blocks: Vec, + ) -> Result<(), anyhow::Error> { + for mut alt_block in alt_blocks { + let prepped_txs = alt_block + .txs + .drain(..) + .map(|tx| Ok(Arc::new(tx.try_into()?))) + .collect::>()?; + + let prepped_block = PreparedBlock::new_alt_block(alt_block)?; + + let VerifyBlockResponse::MainChain(verified_block) = self + .block_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyBlockRequest::MainChainPrepped { + block: prepped_block, + txs: prepped_txs, + }) + .await? + else { + unreachable!(); + }; + + self.add_valid_block_to_main_chain(verified_block).await; + } + + Ok(()) + } + + /// Adds a [`VerifiedBlockInformation`] to the main-chain. + /// + /// This function will update the blockchain database and the context cache, it will also + /// update [`Self::cached_blockchain_context`]. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + pub async fn add_valid_block_to_main_chain( + &mut self, + verified_block: VerifiedBlockInformation, + ) { + self.blockchain_context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::Update(NewBlockData { + block_hash: verified_block.block_hash, + height: verified_block.height, + timestamp: verified_block.block.header.timestamp, + weight: verified_block.weight, + long_term_weight: verified_block.long_term_weight, + generated_coins: verified_block.generated_coins, + vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), + cumulative_difficulty: verified_block.cumulative_difficulty, + })) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + + self.blockchain_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainWriteRequest::WriteBlock(verified_block)) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + + let BlockChainContextResponse::Context(blockchain_context) = self + .blockchain_context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::GetContext) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!(); + }; + + self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); + } +} + +/// The result from successfully adding an alt-block. +enum AddAltBlock { + /// The alt-block was cached. + Cached, + /// The chain was reorged. + Reorged, +} diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 8b1378917..9de39a8ce 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -1 +1,143 @@ +// FIXME: This whole module is not great and should be rewritten when the PeerSet is made. +use std::{pin::pin, sync::Arc, time::Duration}; +use futures::StreamExt; +use tokio::time::interval; +use tokio::{ + sync::{mpsc, Notify}, + time::sleep, +}; +use tower::{Service, ServiceExt}; +use tracing::instrument; + +use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse}; +use cuprate_p2p::{ + block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, + NetworkInterface, +}; +use cuprate_p2p_core::ClearNet; + +const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30); + +/// An error returned from the [`syncer`]. +#[derive(Debug, thiserror::Error)] +pub enum SyncerError { + #[error("Incoming block channel closed.")] + IncomingBlockChannelClosed, + #[error("One of our services returned an error: {0}.")] + ServiceError(#[from] tower::BoxError), +} + +/// The syncer tasks that makes sure we are fully synchronised with our connected peers. +#[expect( + clippy::significant_drop_tightening, + reason = "Client pool which will be removed" +)] +#[instrument(level = "debug", skip_all)] +pub async fn syncer( + mut context_svc: C, + our_chain: CN, + clearnet_interface: NetworkInterface, + incoming_block_batch_tx: mpsc::Sender, + stop_current_block_downloader: Arc, + block_downloader_config: BlockDownloaderConfig, +) -> Result<(), SyncerError> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + >, + C::Future: Send + 'static, + CN: Service + + Clone + + Send + + 'static, + CN::Future: Send + 'static, +{ + tracing::info!("Starting blockchain syncer"); + + let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY); + + let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? + else { + unreachable!(); + }; + + let client_pool = clearnet_interface.client_pool(); + + tracing::debug!("Waiting for new sync info in top sync channel"); + + loop { + check_sync_interval.tick().await; + + tracing::trace!("Checking connected peers to see if we are behind",); + + check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; + let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); + + if !client_pool.contains_client_with_more_cumulative_difficulty( + raw_blockchain_context.cumulative_difficulty, + ) { + continue; + } + + tracing::debug!( + "We are behind peers claimed cumulative difficulty, starting block downloader" + ); + let mut block_batch_stream = + clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config); + + loop { + tokio::select! { + () = stop_current_block_downloader.notified() => { + tracing::info!("Stopping block downloader"); + break; + } + batch = block_batch_stream.next() => { + let Some(batch) = batch else { + break; + }; + + tracing::debug!("Got batch, len: {}", batch.blocks.len()); + if incoming_block_batch_tx.send(batch).await.is_err() { + return Err(SyncerError::IncomingBlockChannelClosed); + } + } + } + } + } +} + +/// Checks if we should update the given [`BlockChainContext`] and updates it if needed. +async fn check_update_blockchain_context( + context_svc: C, + old_context: &mut BlockChainContext, +) -> Result<(), tower::BoxError> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + >, + C::Future: Send + 'static, +{ + if old_context.blockchain_context().is_ok() { + return Ok(()); + } + + let BlockChainContextResponse::Context(ctx) = context_svc + .oneshot(BlockChainContextRequest::GetContext) + .await? + else { + unreachable!(); + }; + + *old_context = ctx; + + Ok(()) +} diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs new file mode 100644 index 000000000..e3ee62b3c --- /dev/null +++ b/binaries/cuprated/src/blockchain/types.rs @@ -0,0 +1,24 @@ +use std::task::{Context, Poll}; + +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use tower::{util::MapErr, Service}; + +use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle}; +use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; +use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; + +/// The [`BlockVerifierService`] with all generic types defined. +pub type ConcreteBlockVerifierService = BlockVerifierService< + BlockChainContextService, + ConcreteTxVerifierService, + ConsensusBlockchainReadHandle, +>; + +/// The [`TxVerifierService`] with all generic types defined. +pub type ConcreteTxVerifierService = TxVerifierService; + +/// The [`BlockchainReadHandle`] with the [`tower::Service::Error`] mapped to conform to what the consensus crate requires. +pub type ConsensusBlockchainReadHandle = + MapErr tower::BoxError>; diff --git a/binaries/cuprated/src/constants.rs b/binaries/cuprated/src/constants.rs index 9463d4763..2f3c7bb64 100644 --- a/binaries/cuprated/src/constants.rs +++ b/binaries/cuprated/src/constants.rs @@ -14,6 +14,10 @@ pub const VERSION_BUILD: &str = if cfg!(debug_assertions) { formatcp!("{VERSION}-release") }; +/// The panic message used when cuprated encounters a critical service error. +pub const PANIC_CRITICAL_SERVICE_ERROR: &str = + "A service critical to Cuprate's function returned an unexpected error."; + #[cfg(test)] mod test { use super::*; diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 775843df9..ad7382d74 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -16,6 +16,7 @@ mod config; mod constants; mod p2p; mod rpc; +mod signals; mod statics; mod txpool; diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index f5b72ba3a..f55d41db1 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -2,4 +2,4 @@ //! //! Will handle initiating the P2P and contains a protocol request handler. -mod request_handler; +pub mod request_handler; diff --git a/binaries/cuprated/src/signals.rs b/binaries/cuprated/src/signals.rs new file mode 100644 index 000000000..42148ca83 --- /dev/null +++ b/binaries/cuprated/src/signals.rs @@ -0,0 +1,12 @@ +//! Signals for Cuprate state used throughout the binary. + +use tokio::sync::RwLock; + +/// Reorg lock. +/// +/// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken +/// for any operation which must complete without a reorg happening. +/// +/// Currently, the only operation that needs to take a read lock is adding txs to the tx-pool, +/// this can potentially be removed in the future, see: +pub static REORG_LOCK: RwLock<()> = RwLock::const_new(()); diff --git a/consensus/src/block.rs b/consensus/src/block.rs index 3d0db9955..ada46aa42 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -123,7 +123,7 @@ impl PreparedBlock { /// /// The randomX VM must be Some if RX is needed or this will panic. /// The randomX VM must also be initialised with the correct seed. - fn new(block: Block, randomx_vm: Option<&R>) -> Result { + pub fn new(block: Block, randomx_vm: Option<&R>) -> Result { let (hf_version, hf_vote) = HardFork::from_block_header(&block.header) .map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?; @@ -180,6 +180,20 @@ impl PreparedBlock { block: block.block, }) } + + /// Creates a new [`PreparedBlock`] from an [`AltBlockInformation`]. + pub fn new_alt_block(block: AltBlockInformation) -> Result { + Ok(Self { + block_blob: block.block_blob, + hf_vote: HardFork::from_version(block.block.header.hardfork_version) + .map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?, + hf_version: HardFork::from_vote(block.block.header.hardfork_signal), + block_hash: block.block_hash, + pow_hash: block.pow_hash, + miner_tx_weight: block.block.miner_transaction.weight(), + block: block.block, + }) + } } /// A request to verify a block. @@ -246,7 +260,7 @@ where + Clone + Send + 'static, - D: Database + Clone + Send + Sync + 'static, + D: Database + Clone + Send + 'static, D::Future: Send + 'static, { /// Creates a new block verifier. @@ -276,7 +290,7 @@ where + 'static, TxV::Future: Send + 'static, - D: Database + Clone + Send + Sync + 'static, + D: Database + Clone + Send + 'static, D::Future: Send + 'static, { type Response = VerifyBlockResponse; diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index e104cec9e..7280f2ff5 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -37,6 +37,7 @@ pub use context::{ pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse}; // re-export. +pub use cuprate_consensus_rules::genesis::generate_genesis_block; pub use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, HardFork, @@ -64,17 +65,13 @@ pub enum ExtendedConsensusError { } /// Initialize the 2 verifier [`tower::Service`]s (block and transaction). -#[expect(clippy::type_complexity)] pub fn initialize_verifier( database: D, ctx_svc: Ctx, -) -> Result< - ( - BlockVerifierService, D>, - TxVerifierService, - ), - ConsensusError, -> +) -> ( + BlockVerifierService, D>, + TxVerifierService, +) where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, @@ -90,7 +87,7 @@ where { let tx_svc = TxVerifierService::new(database.clone()); let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database); - Ok((block_svc, tx_svc)) + (block_svc, tx_svc) } use __private::Database; diff --git a/p2p/p2p-core/src/lib.rs b/p2p/p2p-core/src/lib.rs index 2ae1d7c87..c7c607b34 100644 --- a/p2p/p2p-core/src/lib.rs +++ b/p2p/p2p-core/src/lib.rs @@ -89,6 +89,7 @@ pub use protocol::*; use services::*; //re-export pub use cuprate_helper::network::Network; +pub use cuprate_wire::CoreSyncData; /// The direction of a connection. #[derive(Debug, Copy, Clone, Eq, PartialEq)] diff --git a/p2p/p2p/src/block_downloader.rs b/p2p/p2p/src/block_downloader.rs index 72eac2859..fcc9eb651 100644 --- a/p2p/p2p/src/block_downloader.rs +++ b/p2p/p2p/src/block_downloader.rs @@ -1,6 +1,6 @@ //! # Block Downloader //! -//! This module contains the [`BlockDownloader`], which finds a chain to +//! This module contains the block downloader, which finds a chain to //! download from our connected peers and downloads it. See the actual //! `struct` documentation for implementation details. //! diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index 77d3b6e5c..fc97fc1b6 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -153,6 +153,18 @@ impl ClientPool { self.borrow_clients(&peers).collect() } + + /// Checks all clients in the pool checking if any claim a higher cumulative difficulty than the + /// amount specified. + pub fn contains_client_with_more_cumulative_difficulty( + &self, + cumulative_difficulty: u128, + ) -> bool { + self.clients.iter().any(|element| { + let sync_data = element.value().info.core_sync_data.lock().unwrap(); + sync_data.cumulative_difficulty() > cumulative_difficulty + }) + } } mod sealed { diff --git a/p2p/p2p/src/config.rs b/p2p/p2p/src/config.rs index 97cd61e22..5b4ba4ae3 100644 --- a/p2p/p2p/src/config.rs +++ b/p2p/p2p/src/config.rs @@ -1,8 +1,9 @@ -use cuprate_address_book::AddressBookConfig; use cuprate_helper::network::Network; use cuprate_p2p_core::NetworkZone; use cuprate_wire::{common::PeerSupportFlags, BasicNodeData}; +pub use cuprate_address_book::AddressBookConfig; + /// P2P config. #[derive(Clone, Debug)] pub struct P2PConfig { diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index f2349600a..f70d64c92 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -16,14 +16,13 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); /// The durations of a short ban. -#[cfg_attr(not(test), expect(dead_code))] -pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10); +pub const SHORT_BAN: Duration = Duration::from_secs(60 * 10); /// The durations of a medium ban. -pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24); +pub const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24); /// The durations of a long ban. -pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); +pub const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); /// The default amount of time between inbound diffusion flushes. pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5); diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 243115898..b3577a77d 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -12,23 +12,21 @@ use tracing::{instrument, Instrument, Span}; use cuprate_async_buffer::BufferStream; use cuprate_p2p_core::{ client::Connector, - client::InternalPeerID, services::{AddressBookRequest, AddressBookResponse}, CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, }; -mod block_downloader; +pub mod block_downloader; mod broadcast; mod client_pool; pub mod config; pub mod connection_maintainer; -mod constants; +pub mod constants; mod inbound_server; use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}; pub use broadcast::{BroadcastRequest, BroadcastSvc}; -use client_pool::ClientPoolDropGuard; -pub use config::P2PConfig; +pub use config::{AddressBookConfig, P2PConfig}; use connection_maintainer::MakeConnectionRequest; /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. @@ -174,9 +172,8 @@ impl NetworkInterface { self.address_book.clone() } - /// Pulls a client from the client pool, returning it in a guard that will return it there when it's - /// dropped. - pub fn borrow_client(&self, peer: &InternalPeerID) -> Option> { - self.pool.borrow_client(peer) + /// Borrows the `ClientPool`, for access to connected peers. + pub const fn client_pool(&self) -> &Arc> { + &self.pool } }