From 6e8fbf04f633f4425fc606d6dc1d3c8f88410ff8 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 4 Oct 2024 02:45:42 +0100 Subject: [PATCH 1/5] add `get_objects` handler --- Cargo.lock | 1 + binaries/cuprated/src/p2p/request_handler.rs | 106 +++++++++++++++++++ p2p/p2p/src/constants.rs | 2 +- storage/blockchain/Cargo.toml | 1 + storage/blockchain/src/ops/block.rs | 62 ++++++++++- storage/blockchain/src/service/read.rs | 38 ++++++- types/src/blockchain.rs | 10 +- 7 files changed, 214 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd303bf69..8ba7ed84b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -547,6 +547,7 @@ version = "0.0.0" dependencies = [ "bitflags 2.6.0", "bytemuck", + "bytes", "cuprate-database", "cuprate-database-service", "cuprate-helper", diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 8b1378917..818700d76 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1 +1,107 @@ +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; +use futures::future::BoxFuture; +use futures::FutureExt; +use tower::{Service, ServiceExt}; + +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::BlockChainContextService; +use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; +use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; +use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse}; + +#[derive(Clone)] +pub struct P2pProtocolRequestHandlerMaker { + pub blockchain_read_handle: BlockchainReadHandle, +} + +impl Service> for P2pProtocolRequestHandlerMaker { + type Response = P2pProtocolRequestHandler; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, peer_information: PeerInformation) -> Self::Future { + // TODO: check sync info? + + let blockchain_read_handle = self.blockchain_read_handle.clone(); + + ready(Ok(P2pProtocolRequestHandler { + peer_information, + blockchain_read_handle, + })) + } +} + +#[derive(Clone)] +pub struct P2pProtocolRequestHandler { + peer_information: PeerInformation, + blockchain_read_handle: BlockchainReadHandle, +} + +impl Service for P2pProtocolRequestHandler { + type Response = ProtocolResponse; + type Error = anyhow::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: ProtocolRequest) -> Self::Future { + match request { + ProtocolRequest::GetObjects(r) => { + get_objects(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::GetChain(_) => todo!(), + ProtocolRequest::FluffyMissingTxs(_) => todo!(), + ProtocolRequest::GetTxPoolCompliment(_) => todo!(), + ProtocolRequest::NewBlock(_) => todo!(), + ProtocolRequest::NewFluffyBlock(_) => todo!(), + ProtocolRequest::NewTransactions(_) => todo!(), + } + } +} + +//---------------------------------------------------------------------------------------------------- Handler functions + +/// [`ProtocolRequest::GetObjects`] +async fn get_objects( + request: GetObjectsRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + if request.blocks.len() > MAX_BLOCK_BATCH_LEN { + anyhow::bail!("Peer requested more blocks than allowed.") + } + + let block_ids: Vec<[u8; 32]> = (&request.blocks).into(); + // de-allocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::BlockCompleteEntries { + blocks, + missing_hashes, + blockchain_height, + } = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::BlockCompleteEntries(block_ids)) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + Ok(ProtocolResponse::GetObjects(GetObjectsResponse { + blocks, + missed_ids: ByteArrayVec::from(missing_hashes), + current_blockchain_height, + })) +} diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index f70d64c92..d1060aea0 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -52,7 +52,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; /// The enforced maximum amount of blocks to request in a batch. /// /// Requesting more than this will cause the peer to disconnect and potentially lead to bans. -pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; +pub const MAX_BLOCK_BATCH_LEN: usize = 100; /// The timeout that the block downloader will use for requests. pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 6eecb892b..073b41849 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -35,6 +35,7 @@ serde = { workspace = true, optional = true } tower = { workspace = true } thread_local = { workspace = true, optional = true } rayon = { workspace = true, optional = true } +bytes = "1.7.2" [dev-dependencies] cuprate-helper = { path = "../../helper", features = ["thread", "cast"] } diff --git a/storage/blockchain/src/ops/block.rs b/storage/blockchain/src/ops/block.rs index 6d32fd816..d038d1a1e 100644 --- a/storage/blockchain/src/ops/block.rs +++ b/storage/blockchain/src/ops/block.rs @@ -1,7 +1,9 @@ //! Block functions. +use std::slice; //---------------------------------------------------------------------------------------------------- Import use bytemuck::TransparentWrapper; +use bytes::{Bytes, BytesMut}; use monero_serai::{ block::{Block, BlockHeader}, transaction::Transaction, @@ -10,15 +12,17 @@ use monero_serai::{ use cuprate_database::{ RuntimeError, StorableVec, {DatabaseRo, DatabaseRw}, }; +use cuprate_helper::cast::usize_to_u64; use cuprate_helper::{ map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, tx::tx_fee, }; use cuprate_types::{ - AltBlockInformation, ChainId, ExtendedBlockHeader, HardFork, VerifiedBlockInformation, - VerifiedTransactionInformation, + AltBlockInformation, BlockCompleteEntry, ChainId, ExtendedBlockHeader, HardFork, + TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation, }; +use crate::tables::TablesIter; use crate::{ ops::{ alt_block, @@ -224,6 +228,60 @@ pub fn pop_block( Ok((block_height, block_info.block_hash, block)) } +//---------------------------------------------------------------------------------------------------- `get_block_blob_with_tx_indexes` +pub fn get_block_blob_with_tx_indexes( + block_height: &BlockHeight, + tables: &impl Tables, +) -> Result<(Vec, u64, usize), RuntimeError> { + use monero_serai::io::write_varint; + + let block_info = tables.block_infos().get(&block_height)?; + + let miner_tx_idx = block_info.mining_tx_index; + let mut block_txs = tables.block_txs_hashes().get(&block_height)?.0; + let numb_txs = block_txs.len(); + + // Get the block header + let mut block = tables.block_header_blobs().get(&block_height)?.0; + + // Add the miner tx to the blob. + let mut miner_tx_blob = tables.tx_blobs().get(&miner_tx_idx)?.0; + block.append(&mut miner_tx_blob); + + // Add the blocks tx hashes. + write_varint(&block_txs.len(), &mut block) + .expect("The number of txs per block will not exceed u64::MAX"); + for tx in block_txs { + block.extend_from_slice(&tx); + } + + Ok((block, miner_tx_idx, numb_txs)) +} + +//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*` +pub fn get_block_complete_entry( + block_hash: &BlockHash, + tables: &impl TablesIter, +) -> Result { + let block_height = tables.block_heights().get(block_hash)?; + let (block_blob, miner_tx_idx, numb_non_miner_txs) = + get_block_blob_with_tx_indexes(&block_height, tables)?; + + let first_tx_idx = miner_tx_idx + 1; + + let tx_blobs = tables + .tx_blobs_iter() + .get_range(first_tx_idx..=usize_to_u64(numb_non_miner_txs))? + .map(|tx_blob| Ok(Bytes::from(tx_blob?.0))) + .collect::>()?; + + Ok(BlockCompleteEntry { + block: Bytes::from(block_blob), + txs: TransactionBlobs::Normal(tx_blobs), + pruned: false, + block_weight: 0, + }) +} //---------------------------------------------------------------------------------------------------- `get_block_extended_header_*` /// Retrieve a [`ExtendedBlockHeader`] from the database. diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index b0e7e044c..171513519 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -7,7 +7,7 @@ use std::{ }; use rayon::{ - iter::{IntoParallelIterator, ParallelIterator}, + iter::{IntoParallelIterator, ParallelIterator, Either}, prelude::*, ThreadPool, }; @@ -18,9 +18,10 @@ use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThre use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, ChainId, ExtendedBlockHeader, OutputOnChain, + BlockCompleteEntry, Chain, ChainId, ExtendedBlockHeader, OutputOnChain, }; +use crate::ops::block::get_block_complete_entry; use crate::{ ops::{ alt_block::{ @@ -92,6 +93,7 @@ fn map_request( /* SOMEDAY: pre-request handling, run some code for each request? */ match request { + R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes), R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block, chain) => block_hash(env, block, chain), R::FindBlock(block_hash) => find_block(env, block_hash), @@ -182,6 +184,38 @@ macro_rules! get_tables { // TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal // amount of parallelism. +/// [`BlockchainReadRequest::BlockCompleteEntries`]. +fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + let (missing_hashes, blocks) = block_hashes + .into_par_iter() + .map(|block_hash| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + match get_block_complete_entry(&block_hash, tables) { + Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)), + res => res.map(Either::Right), + } + }) + .collect::>()?; + + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?; + + Ok(BlockchainResponse::BlockCompleteEntries { + blocks, + missing_hashes, + blockchain_height, + }) +} + /// [`BlockchainReadRequest::BlockExtendedHeader`]. #[inline] fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index f2b96db00..111270575 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -10,7 +10,7 @@ use std::{ use crate::{ types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}, - AltBlockInformation, ChainId, + AltBlockInformation, BlockCompleteEntry, ChainId, }; //---------------------------------------------------------------------------------------------------- ReadRequest @@ -24,6 +24,8 @@ use crate::{ /// See `Response` for the expected responses per `Request`. #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockchainReadRequest { + BlockCompleteEntries(Vec<[u8; 32]>), + /// Request a block's extended header. /// /// The input is the block's height. @@ -149,6 +151,12 @@ pub enum BlockchainWriteRequest { #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockchainResponse { //------------------------------------------------------ Reads + BlockCompleteEntries { + blocks: Vec, + missing_hashes: Vec<[u8; 32]>, + blockchain_height: usize, + }, + /// Response to [`BlockchainReadRequest::BlockExtendedHeader`]. /// /// Inner value is the extended headed of the requested block. From f158f869b453b4403805a9186d88cc45a21195d9 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 4 Oct 2024 17:21:57 +0100 Subject: [PATCH 2/5] add `get_chain` handler --- binaries/cuprated/src/p2p/request_handler.rs | 59 ++++++++++- storage/blockchain/src/ops/block.rs | 16 ++- storage/blockchain/src/ops/blockchain.rs | 41 ++++++++ storage/blockchain/src/service/read.rs | 105 ++++++++++++++----- types/src/blockchain.rs | 22 ++++ 5 files changed, 205 insertions(+), 38 deletions(-) diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 818700d76..a109ae1b5 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -10,10 +10,12 @@ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::BlockChainContextService; use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_helper::cast::usize_to_u64; +use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}; use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse}; +use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse}; #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { @@ -82,7 +84,7 @@ async fn get_objects( anyhow::bail!("Peer requested more blocks than allowed.") } - let block_ids: Vec<[u8; 32]> = (&request.blocks).into(); + let block_hashes: Vec<[u8; 32]> = (&request.blocks).into(); // de-allocate the backing `Bytes`. drop(request); @@ -93,7 +95,7 @@ async fn get_objects( } = blockchain_read_handle .ready() .await? - .call(BlockchainReadRequest::BlockCompleteEntries(block_ids)) + .call(BlockchainReadRequest::BlockCompleteEntries(block_hashes)) .await? else { panic!("blockchain returned wrong response!"); @@ -102,6 +104,55 @@ async fn get_objects( Ok(ProtocolResponse::GetObjects(GetObjectsResponse { blocks, missed_ids: ByteArrayVec::from(missing_hashes), - current_blockchain_height, + current_blockchain_height: usize_to_u64(blockchain_height), + })) +} + +/// [`ProtocolRequest::GetChain`] +async fn get_chain( + request: ChainRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + if request.block_ids.len() > 25_000 { + anyhow::bail!("Peer sent too many block hashes in chain request.") + } + + let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into(); + let want_pruned_data = request.prune; + // de-allocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::NextChainEntry { + start_height, + chain_height, + block_ids, + block_weights, + cumulative_difficulty, + first_block_blob, + } = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::NextChainEntry(block_hashes, 10_000)) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + let (cumulative_difficulty_low64, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(cumulative_difficulty); + + Ok(ProtocolResponse::GetChain(ChainResponse { + start_height: usize_to_u64(start_height), + total_height: usize_to_u64(chain_height), + cumulative_difficulty_low64, + cumulative_difficulty_top64, + m_block_ids: ByteArrayVec::from(block_ids), + first_block: Default::default(), + // only needed when + m_block_weights: if want_pruned_data { + block_weights.into_iter().map(usize_to_u64).collect() + } else { + vec![] + }, })) } diff --git a/storage/blockchain/src/ops/block.rs b/storage/blockchain/src/ops/block.rs index d038d1a1e..070a413c5 100644 --- a/storage/blockchain/src/ops/block.rs +++ b/storage/blockchain/src/ops/block.rs @@ -1,16 +1,15 @@ //! Block functions. -use std::slice; //---------------------------------------------------------------------------------------------------- Import use bytemuck::TransparentWrapper; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use monero_serai::{ block::{Block, BlockHeader}, transaction::Transaction, }; use cuprate_database::{ - RuntimeError, StorableVec, {DatabaseRo, DatabaseRw}, + RuntimeError, StorableVec, {DatabaseIter, DatabaseRo, DatabaseRw}, }; use cuprate_helper::cast::usize_to_u64; use cuprate_helper::{ @@ -22,7 +21,6 @@ use cuprate_types::{ TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation, }; -use crate::tables::TablesIter; use crate::{ ops::{ alt_block, @@ -31,7 +29,7 @@ use crate::{ output::get_rct_num_outputs, tx::{add_tx, remove_tx}, }, - tables::{BlockHeights, BlockInfos, Tables, TablesMut}, + tables::{BlockHeights, BlockInfos, Tables, TablesIter, TablesMut}, types::{BlockHash, BlockHeight, BlockInfo}, }; @@ -235,14 +233,14 @@ pub fn get_block_blob_with_tx_indexes( ) -> Result<(Vec, u64, usize), RuntimeError> { use monero_serai::io::write_varint; - let block_info = tables.block_infos().get(&block_height)?; + let block_info = tables.block_infos().get(block_height)?; let miner_tx_idx = block_info.mining_tx_index; - let mut block_txs = tables.block_txs_hashes().get(&block_height)?.0; + let block_txs = tables.block_txs_hashes().get(block_height)?.0; let numb_txs = block_txs.len(); // Get the block header - let mut block = tables.block_header_blobs().get(&block_height)?.0; + let mut block = tables.block_header_blobs().get(block_height)?.0; // Add the miner tx to the blob. let mut miner_tx_blob = tables.tx_blobs().get(&miner_tx_idx)?.0; @@ -273,7 +271,7 @@ pub fn get_block_complete_entry( .tx_blobs_iter() .get_range(first_tx_idx..=usize_to_u64(numb_non_miner_txs))? .map(|tx_blob| Ok(Bytes::from(tx_blob?.0))) - .collect::>()?; + .collect::>()?; Ok(BlockCompleteEntry { block: Bytes::from(block_blob), diff --git a/storage/blockchain/src/ops/blockchain.rs b/storage/blockchain/src/ops/blockchain.rs index 04f8b26d5..664c0d0c3 100644 --- a/storage/blockchain/src/ops/blockchain.rs +++ b/storage/blockchain/src/ops/blockchain.rs @@ -3,6 +3,8 @@ //---------------------------------------------------------------------------------------------------- Import use cuprate_database::{DatabaseRo, RuntimeError}; +use crate::ops::block::block_exists; +use crate::types::BlockHash; use crate::{ ops::macros::doc_error, tables::{BlockHeights, BlockInfos}, @@ -78,6 +80,45 @@ pub fn cumulative_generated_coins( } } +/// Find the split point between our chain and a list of [`BlockHash`]s from another chain. +/// +/// This function can be used accepts chains in chronological and reverse chronological order, however +/// if the wrong order is specified the return value is meaningless. +/// +/// For chronologically ordered chains this will return the index of the first unknown, for reverse +/// chronologically ordered chains this will return the index of the fist known. +/// +/// If all blocks are known for chronologically ordered chains or unknown for reverse chronologically +/// ordered chains then the length of the chain will be returned. +#[doc = doc_error!()] +#[inline] +pub fn find_split_point( + block_ids: &[BlockHash], + chronological_order: bool, + table_block_heights: &impl DatabaseRo, +) -> Result { + let mut err = None; + + // Do a binary search to find the first unknown block in the batch. + let idx = + block_ids.partition_point( + |block_id| match block_exists(block_id, table_block_heights) { + Ok(exists) => exists & chronological_order, + Err(e) => { + err.get_or_insert(e); + // if this happens the search is scrapped, just return `false` back. + false + } + }, + ); + + if let Some(e) = err { + return Err(e); + } + + Ok(idx) +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test { diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 171513519..10bafc29e 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -2,26 +2,26 @@ //---------------------------------------------------------------------------------------------------- Import use std::{ + cmp::min, collections::{HashMap, HashSet}, sync::Arc, }; use rayon::{ - iter::{IntoParallelIterator, ParallelIterator, Either}, + iter::{Either, IntoParallelIterator, ParallelIterator}, prelude::*, ThreadPool, }; use thread_local::ThreadLocal; -use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; +use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - BlockCompleteEntry, Chain, ChainId, ExtendedBlockHeader, OutputOnChain, + Chain, ChainId, ExtendedBlockHeader, OutputOnChain, }; -use crate::ops::block::get_block_complete_entry; use crate::{ ops::{ alt_block::{ @@ -29,9 +29,10 @@ use crate::{ get_alt_chain_history_ranges, }, block::{ - block_exists, get_block_extended_header_from_height, get_block_height, get_block_info, + block_exists, get_block_blob_with_tx_indexes, get_block_complete_entry, + get_block_extended_header_from_height, get_block_height, get_block_info, }, - blockchain::{cumulative_generated_coins, top_block_height}, + blockchain::{cumulative_generated_coins, find_split_point, top_block_height}, key_image::key_image_exists, output::id_to_output_on_chain, }, @@ -39,7 +40,7 @@ use crate::{ free::{compact_history_genesis_not_included, compact_history_index_to_height_offset}, types::{BlockchainReadHandle, ResponseResult}, }, - tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables}, + tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables, TablesIter}, types::{ AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId, }, @@ -107,6 +108,7 @@ fn map_request( R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec), R::KeyImagesSpent(set) => key_images_spent(env, set), R::CompactChainHistory => compact_chain_history(env), + R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id), } @@ -552,6 +554,76 @@ fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult { }) } +/// [`BlockchainReadRequest::NextChainEntry`] +/// +/// # Invariant +/// `block_ids` must be sorted in reverse chronological block order, or else +/// the returned result is unspecified and meaningless, as this function +/// performs a binary search. +fn next_chain_entry( + env: &ConcreteEnv, + block_ids: &[BlockHash], + next_entry_size: usize, +) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + + let tables = env_inner.open_tables(&tx_ro)?; + let table_block_heights = tables.block_heights(); + let table_block_infos = tables.block_infos_iter(); + + let idx = find_split_point(block_ids, false, table_block_heights)?; + + // This will happen if we have a different genesis block. + if idx == block_ids.len() { + return Ok(BlockchainResponse::NextChainEntry { + start_height: 0, + chain_height: 0, + block_ids: vec![], + block_weights: vec![], + cumulative_difficulty: 0, + first_block_blob: None, + }); + } + + // The returned chain entry must overlap with one of the blocks we were told about. + let first_known_block_hash = block_ids[idx]; + let first_known_height = table_block_heights.get(&first_known_block_hash)?; + + let chain_height = crate::ops::blockchain::chain_height(table_block_heights)?; + let last_height_in_chain_entry = min(first_known_height + next_entry_size, chain_height); + + let (block_ids, block_weights) = table_block_infos + .get_range(first_known_height..last_height_in_chain_entry)? + .map(|block_info| { + let block_info = block_info?; + + Ok((block_info.block_hash, block_info.weight)) + }) + .collect::, Vec<_>), RuntimeError>>()?; + + let top_block_info = table_block_infos.get(&(chain_height - 1))?; + + let first_block_blob = if block_ids.len() >= 2 { + Some(get_block_blob_with_tx_indexes(&(first_known_height + 1), &tables)?.0) + } else { + None + }; + + Ok(BlockchainResponse::NextChainEntry { + start_height: first_known_height, + chain_height, + block_ids, + block_weights, + cumulative_difficulty: combine_low_high_bits_to_u128( + top_block_info.cumulative_difficulty_low, + top_block_info.cumulative_difficulty_high, + ), + first_block_blob, + }) +} + /// [`BlockchainReadRequest::FindFirstUnknown`] /// /// # Invariant @@ -564,24 +636,7 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; - let mut err = None; - - // Do a binary search to find the first unknown block in the batch. - let idx = - block_ids.partition_point( - |block_id| match block_exists(block_id, &table_block_heights) { - Ok(exists) => exists, - Err(e) => { - err.get_or_insert(e); - // if this happens the search is scrapped, just return `false` back. - false - } - }, - ); - - if let Some(e) = err { - return Err(e); - } + let idx = find_split_point(block_ids, true, &table_block_heights)?; Ok(if idx == block_ids.len() { BlockchainResponse::FindFirstUnknown(None) diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 111270575..526dc4310 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -95,6 +95,16 @@ pub enum BlockchainReadRequest { /// A request for the compact chain history. CompactChainHistory, + /// A request for the next chain entry. + /// + /// Input is a list of block hashes and the amount of block hashes to return in the next chain entry. + /// + /// # Invariant + /// The [`Vec`] containing the block IDs must be sorted in reverse chronological block + /// order, or else the returned response is unspecified and meaningless, + /// as this request performs a binary search + NextChainEntry(Vec<[u8; 32]>, usize), + /// A request to find the first unknown block ID in a list of block IDs. /// /// # Invariant @@ -223,6 +233,18 @@ pub enum BlockchainResponse { cumulative_difficulty: u128, }, + /// Response to [`BlockchainReadRequest::NextChainEntry`]. + /// + /// If all blocks were unknown `start_height` will be `0`, the other fields will be meaningless. + NextChainEntry { + start_height: usize, + chain_height: usize, + block_ids: Vec<[u8; 32]>, + block_weights: Vec, + cumulative_difficulty: u128, + first_block_blob: Option>, + }, + /// The response for [`BlockchainReadRequest::FindFirstUnknown`]. /// /// Contains the index of the first unknown block and its expected height. From aa274ebafee8ca59f084d79f2c2d46b33d192fb8 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 4 Oct 2024 21:45:40 +0100 Subject: [PATCH 3/5] add `fluffy_missing_txs` handler --- Cargo.lock | 1 + binaries/cuprated/src/p2p/request_handler.rs | 76 +++++++++++++++++--- net/wire/src/p2p/protocol.rs | 2 +- storage/blockchain/src/service/read.rs | 49 ++++++++++--- types/src/blockchain.rs | 16 +++++ types/src/lib.rs | 2 +- types/src/types.rs | 6 ++ 7 files changed, 132 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6dcaba3ab..e55debeec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,6 +548,7 @@ version = "0.0.0" dependencies = [ "bitflags 2.6.0", "bytemuck", + "bytes", "cuprate-constants", "cuprate-database", "cuprate-database-service", diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index a109ae1b5..3b94770a4 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,10 +1,10 @@ +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::FutureExt; use std::{ future::{ready, Ready}, task::{Context, Poll}, }; - -use futures::future::BoxFuture; -use futures::FutureExt; use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; @@ -15,7 +15,11 @@ use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_hig use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse}; +use cuprate_types::{BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs}; +use cuprate_wire::protocol::{ + ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, + GetObjectsResponse, NewFluffyBlock, +}; #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { @@ -63,12 +67,20 @@ impl Service for P2pProtocolRequestHandler { ProtocolRequest::GetObjects(r) => { get_objects(r, self.blockchain_read_handle.clone()).boxed() } - ProtocolRequest::GetChain(_) => todo!(), - ProtocolRequest::FluffyMissingTxs(_) => todo!(), - ProtocolRequest::GetTxPoolCompliment(_) => todo!(), - ProtocolRequest::NewBlock(_) => todo!(), + ProtocolRequest::GetChain(r) => { + get_chain(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::FluffyMissingTxs(r) => { + fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!( + "Peer sent a full block when we support fluffy blocks" + ))) + .boxed(), ProtocolRequest::NewFluffyBlock(_) => todo!(), - ProtocolRequest::NewTransactions(_) => todo!(), + ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => { + ready(Ok(ProtocolResponse::NA)).boxed() + } // TODO: tx-pool } } } @@ -138,6 +150,10 @@ async fn get_chain( panic!("blockchain returned wrong response!"); }; + if start_height == 0 { + anyhow::bail!("The peers chain has a different genesis block than ours."); + } + let (cumulative_difficulty_low64, cumulative_difficulty_top64) = split_u128_into_low_high_bits(cumulative_difficulty); @@ -147,7 +163,7 @@ async fn get_chain( cumulative_difficulty_low64, cumulative_difficulty_top64, m_block_ids: ByteArrayVec::from(block_ids), - first_block: Default::default(), + first_block: first_block_blob.map_or(Bytes::new(), Bytes::from), // only needed when m_block_weights: if want_pruned_data { block_weights.into_iter().map(usize_to_u64).collect() @@ -156,3 +172,43 @@ async fn get_chain( }, })) } + +/// [`ProtocolRequest::FluffyMissingTxs`] +async fn fluffy_missing_txs( + mut request: FluffyMissingTransactionsRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + let tx_indexes = std::mem::take(&mut request.missing_tx_indices); + let block_hash: [u8; 32] = *request.block_hash; + let current_blockchain_height = request.current_blockchain_height; + + // de-allocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::MissingTxsInBlock { + block_hash, + tx_indexes, + }) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + let Some(MissingTxsInBlock { block, txs }) = res else { + anyhow::bail!("The peer requested txs out of range."); + }; + + Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock { + b: BlockCompleteEntry { + block: Bytes::from(block), + txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()), + pruned: false, + // only needed for pruned blocks. + block_weight: 0, + }, + current_blockchain_height, + })) +} diff --git a/net/wire/src/p2p/protocol.rs b/net/wire/src/p2p/protocol.rs index 1d1d45ab8..cc4b49d30 100644 --- a/net/wire/src/p2p/protocol.rs +++ b/net/wire/src/p2p/protocol.rs @@ -159,7 +159,7 @@ epee_object!( current_blockchain_height: u64, ); -/// A request for Txs we are missing from our `TxPool` +/// A request for txs we are missing from an incoming block. #[derive(Debug, Clone, PartialEq, Eq)] pub struct FluffyMissingTransactionsRequest { /// The Block we are missing the Txs in diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 10bafc29e..7edf789c6 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -14,14 +14,6 @@ use rayon::{ }; use thread_local::ThreadLocal; -use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; -use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; -use cuprate_helper::map::combine_low_high_bits_to_u128; -use cuprate_types::{ - blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, ChainId, ExtendedBlockHeader, OutputOnChain, -}; - use crate::{ ops::{ alt_block::{ @@ -45,6 +37,13 @@ use crate::{ AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId, }, }; +use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; +use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; +use cuprate_helper::map::combine_low_high_bits_to_u128; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + Chain, ChainId, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain, +}; //---------------------------------------------------------------------------------------------------- init_read_service /// Initialize the [`BlockchainReadHandle`] thread-pool backed by [`rayon`]. @@ -110,6 +109,10 @@ fn map_request( R::CompactChainHistory => compact_chain_history(env), R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), + R::MissingTxsInBlock { + block_hash, + tx_indexes, + } => missing_txs_in_block(env, block_hash, tx_indexes), R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id), } @@ -649,6 +652,36 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes }) } +/// [`BlockchainReadRequest::MissingTxsInBlock`] +fn missing_txs_in_block( + env: &ConcreteEnv, + block_hash: [u8; 32], + missing_txs: Vec, +) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + let tables = env_inner.open_tables(&tx_ro)?; + + let block_height = tables.block_heights().get(&block_hash)?; + + let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?; + let first_tx_index = miner_tx_index + 1; + + if numb_txs < missing_txs.len() { + return Ok(BlockchainResponse::MissingTxsInBlock(None)); + } + + let txs = missing_txs + .into_iter() + .map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0)) + .collect::>()?; + + Ok(BlockchainResponse::MissingTxsInBlock(Some( + MissingTxsInBlock { block, txs }, + ))) +} + /// [`BlockchainReadRequest::AltBlocksInChain`] fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 526dc4310..37a04f759 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -8,6 +8,7 @@ use std::{ ops::Range, }; +use crate::types::MissingTxsInBlock; use crate::{ types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}, AltBlockInformation, BlockCompleteEntry, ChainId, @@ -113,6 +114,16 @@ pub enum BlockchainReadRequest { /// as this request performs a binary search. FindFirstUnknown(Vec<[u8; 32]>), + /// A request for transactions from a specific block. + MissingTxsInBlock { + /// The block to get transactions from. + block_hash: [u8; 32], + /// The indexes of the transactions from the block. + /// This is not the global index of the txs, instead it is the local index as they appear in + /// the block/ + tx_indexes: Vec, + }, + /// A request for all alt blocks in the chain with the given [`ChainId`]. AltBlocksInChain(ChainId), } @@ -252,6 +263,11 @@ pub enum BlockchainResponse { /// This will be [`None`] if all blocks were known. FindFirstUnknown(Option<(usize, usize)>), + /// The response for [`BlockchainReadRequest::MissingTxsInBlock`]. + /// + /// Will return [`None`] if the request contained an index out of range. + MissingTxsInBlock(Option), + /// The response for [`BlockchainReadRequest::AltBlocksInChain`]. /// /// Contains all the alt blocks in the alt-chain in chronological order. diff --git a/types/src/lib.rs b/types/src/lib.rs index 0b0dbe679..e86cf1b4e 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -20,7 +20,7 @@ pub use transaction_verification_data::{ CachedVerificationState, TransactionVerificationData, TxVersion, }; pub use types::{ - AltBlockInformation, Chain, ChainId, ExtendedBlockHeader, OutputOnChain, + AltBlockInformation, Chain, ChainId, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain, VerifiedBlockInformation, VerifiedTransactionInformation, }; diff --git a/types/src/types.rs b/types/src/types.rs index a60ce6c60..f6f19c7c2 100644 --- a/types/src/types.rs +++ b/types/src/types.rs @@ -155,6 +155,12 @@ pub struct OutputOnChain { pub commitment: EdwardsPoint, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct MissingTxsInBlock { + pub block: Vec, + pub txs: Vec>, +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test { From 37025bdfc059008da251a8f83c1aef641a339aa0 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sat, 5 Oct 2024 01:07:27 +0100 Subject: [PATCH 4/5] add `new_fluffy_block` handler --- binaries/cuprated/src/blockchain/interface.rs | 14 +--- binaries/cuprated/src/p2p/request_handler.rs | 78 ++++++++++++++++++- p2p/p2p-core/src/protocol.rs | 4 + p2p/p2p-core/src/protocol/try_from.rs | 3 + 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index 189828e9c..b36a31ff1 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -17,7 +17,7 @@ use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_helper::cast::usize_to_u64; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, + Chain, TransactionVerificationData, }; use crate::{ @@ -65,7 +65,7 @@ pub enum IncomingBlockError { /// - the block's parent is unknown pub async fn handle_incoming_block( block: Block, - given_txs: Vec, + given_txs: HashMap<[u8; 32], TransactionVerificationData>, blockchain_read_handle: &mut BlockchainReadHandle, ) -> Result { // FIXME: we should look in the tx-pool for txs when that is ready. @@ -95,14 +95,6 @@ pub async fn handle_incoming_block( } // TODO: check we actually got given the right txs. - let prepped_txs = given_txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - Ok((tx.tx_hash, tx)) - }) - .collect::>() - .map_err(IncomingBlockError::InvalidBlock)?; let Some(incoming_block_tx) = COMMAND_TX.get() else { // We could still be starting up the blockchain manger, so just return this as there is nothing @@ -128,7 +120,7 @@ pub async fn handle_incoming_block( incoming_block_tx .send(BlockchainManagerCommand::AddBlock { block, - prepped_txs, + prepped_txs: given_txs, response_tx, }) .await diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 3b94770a4..c868de94a 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,6 +1,9 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::FutureExt; +use monero_serai::block::Block; +use monero_serai::transaction::Transaction; +use std::collections::HashSet; use std::{ future::{ready, Ready}, task::{Context, Poll}, @@ -8,8 +11,10 @@ use std::{ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_consensus::BlockChainContextService; use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_helper::asynch::rayon_spawn_async; use cuprate_helper::cast::usize_to_u64; use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}; use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; @@ -21,6 +26,9 @@ use cuprate_wire::protocol::{ GetObjectsResponse, NewFluffyBlock, }; +use crate::blockchain::interface as blockchain_interface; +use crate::blockchain::interface::IncomingBlockError; + #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { pub blockchain_read_handle: BlockchainReadHandle, @@ -77,7 +85,9 @@ impl Service for P2pProtocolRequestHandler { "Peer sent a full block when we support fluffy blocks" ))) .boxed(), - ProtocolRequest::NewFluffyBlock(_) => todo!(), + ProtocolRequest::NewFluffyBlock(r) => { + new_fluffy_block(r, self.blockchain_read_handle.clone()).boxed() + } ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => { ready(Ok(ProtocolResponse::NA)).boxed() } // TODO: tx-pool @@ -97,7 +107,7 @@ async fn get_objects( } let block_hashes: Vec<[u8; 32]> = (&request.blocks).into(); - // de-allocate the backing `Bytes`. + // deallocate the backing `Bytes`. drop(request); let BlockchainResponse::BlockCompleteEntries { @@ -131,7 +141,7 @@ async fn get_chain( let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into(); let want_pruned_data = request.prune; - // de-allocate the backing `Bytes`. + // deallocate the backing `Bytes`. drop(request); let BlockchainResponse::NextChainEntry { @@ -182,7 +192,7 @@ async fn fluffy_missing_txs( let block_hash: [u8; 32] = *request.block_hash; let current_blockchain_height = request.current_blockchain_height; - // de-allocate the backing `Bytes`. + // deallocate the backing `Bytes`. drop(request); let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle @@ -212,3 +222,63 @@ async fn fluffy_missing_txs( current_blockchain_height, })) } + +/// [`ProtocolRequest::NewFluffyBlock`] +async fn new_fluffy_block( + request: NewFluffyBlock, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + let current_blockchain_height = request.current_blockchain_height; + + let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> { + let block = Block::read(&mut request.b.block.as_ref())?; + + let tx_blobs = request + .b + .txs + .take_normal() + .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?; + + let mut txs_in_block = block.transactions.iter().copied().collect::>(); + + // TODO: size check these tx blobs + let txs = tx_blobs + .into_iter() + .map(|tx_blob| { + let tx = Transaction::read(&mut tx_blob.as_ref())?; + + let tx = new_tx_verification_data(tx)?; + + if !txs_in_block.remove(&tx.tx_hash) { + anyhow::bail!("Peer sent tx in fluffy block that wasn't actually in block") + } + + Ok((tx.tx_hash, tx)) + }) + .collect::>()?; + + // The backing `Bytes` will be deallocated when this closure returns. + + Ok((block, txs)) + }) + .await?; + + let res = + blockchain_interface::handle_incoming_block(block, txs, &mut blockchain_read_handle).await; + + match res { + Ok(_) => Ok(ProtocolResponse::NA), + Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok( + ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest { + block_hash: block_hash.into(), + current_blockchain_height, + missing_tx_indices, + }), + ), + Err(IncomingBlockError::Orphan) => { + // Block's parent was unknown, could be syncing? + Ok(ProtocolResponse::NA) + } + Err(e) => Err(e.into()), + } +} diff --git a/p2p/p2p-core/src/protocol.rs b/p2p/p2p-core/src/protocol.rs index 7d8d431b8..82aac8243 100644 --- a/p2p/p2p-core/src/protocol.rs +++ b/p2p/p2p-core/src/protocol.rs @@ -116,6 +116,7 @@ pub enum ProtocolResponse { GetChain(ChainResponse), NewFluffyBlock(NewFluffyBlock), NewTransactions(NewTransactions), + FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest), NA, } @@ -139,6 +140,9 @@ impl PeerResponse { ProtocolResponse::GetChain(_) => MessageID::GetChain, ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock, + ProtocolResponse::FluffyMissingTransactionsRequest(_) => { + MessageID::FluffyMissingTxs + } ProtocolResponse::NA => return None, }, diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index d3a7260fd..2dfc41db5 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -71,6 +71,9 @@ impl TryFrom for ProtocolMessage { ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val), ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val), ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val), + ProtocolResponse::FluffyMissingTransactionsRequest(val) => { + Self::FluffyMissingTransactionsRequest(val) + } ProtocolResponse::NA => return Err(MessageConversionError), }) } From 4af0145baa1a466559a116207ff59456f1e6a459 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sat, 5 Oct 2024 22:23:25 +0100 Subject: [PATCH 5/5] sort imports + docs --- binaries/cuprated/src/p2p/request_handler.rs | 36 ++++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index c868de94a..0bf6ee23e 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,36 +1,39 @@ -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::FutureExt; -use monero_serai::block::Block; -use monero_serai::transaction::Transaction; -use std::collections::HashSet; use std::{ + collections::HashSet, future::{ready, Ready}, task::{Context, Poll}, }; + +use bytes::Bytes; +use futures::{future::BoxFuture, FutureExt}; +use monero_serai::{block::Block, transaction::Transaction}; use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; -use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_consensus::BlockChainContextService; +use cuprate_consensus::{transactions::new_tx_verification_data, BlockChainContextService}; use cuprate_fixed_bytes::ByteArrayVec; -use cuprate_helper::asynch::rayon_spawn_async; -use cuprate_helper::cast::usize_to_u64; -use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}; +use cuprate_helper::{ + asynch::rayon_spawn_async, + cast::usize_to_u64, + map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, +}; use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; -use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use cuprate_types::{BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs}; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs, +}; use cuprate_wire::protocol::{ ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, GetObjectsResponse, NewFluffyBlock, }; -use crate::blockchain::interface as blockchain_interface; -use crate::blockchain::interface::IncomingBlockError; +use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockError}; +/// The P2P protocol request handler [`MakeService`](tower::MakeService). #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { + /// The [`BlockchainReadHandle`] pub blockchain_read_handle: BlockchainReadHandle, } @@ -55,9 +58,12 @@ impl Service> for P2pProtocolRequestHandlerMa } } +/// The P2P protocol request handler. #[derive(Clone)] pub struct P2pProtocolRequestHandler { + /// The [`PeerInformation`] for this peer. peer_information: PeerInformation, + /// The [`BlockchainReadHandle`] blockchain_read_handle: BlockchainReadHandle, }