Skip to content

Commit

Permalink
Merge branch 'p2p-request-handler' into cuprated-startup
Browse files Browse the repository at this point in the history
  • Loading branch information
Boog900 committed Oct 6, 2024
2 parents cb960ab + 4af0145 commit 49b8463
Show file tree
Hide file tree
Showing 14 changed files with 610 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 3 additions & 11 deletions binaries/cuprated/src/blockchain/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain,
Chain, TransactionVerificationData,
};

use crate::{
Expand Down Expand Up @@ -59,7 +59,7 @@ pub enum IncomingBlockError {
/// - the block's parent is unknown
pub async fn handle_incoming_block(
block: Block,
given_txs: Vec<Transaction>,
given_txs: HashMap<[u8; 32], TransactionVerificationData>,
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<IncomingBlockOk, IncomingBlockError> {
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
Expand Down Expand Up @@ -99,14 +99,6 @@ pub async fn handle_incoming_block(
}

// TODO: check we actually got given the right txs.
let prepped_txs = given_txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
Ok((tx.tx_hash, tx))
})
.collect::<Result<_, anyhow::Error>>()
.map_err(IncomingBlockError::InvalidBlock)?;

let Some(incoming_block_tx) = COMMAND_TX.get() else {
// We could still be starting up the blockchain manager.
Expand All @@ -126,7 +118,7 @@ pub async fn handle_incoming_block(
incoming_block_tx
.send(BlockchainManagerCommand::AddBlock {
block,
prepped_txs,
prepped_txs: given_txs,
response_tx,
})
.await
Expand Down
289 changes: 289 additions & 0 deletions binaries/cuprated/src/p2p/request_handler.rs
Original file line number Diff line number Diff line change
@@ -1 +1,290 @@
use std::{
collections::HashSet,
future::{ready, Ready},
task::{Context, Poll},
};

use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt};
use monero_serai::{block::Block, transaction::Transaction};
use tower::{Service, ServiceExt};

use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::{transactions::new_tx_verification_data, BlockChainContextService};
use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_helper::{
asynch::rayon_spawn_async,
cast::usize_to_u64,
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
};
use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse};
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs,
};
use cuprate_wire::protocol::{
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
GetObjectsResponse, NewFluffyBlock,
};

use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockError};

/// The P2P protocol request handler [`MakeService`](tower::MakeService).
#[derive(Clone)]
pub struct P2pProtocolRequestHandlerMaker {
/// The [`BlockchainReadHandle`]
pub blockchain_read_handle: BlockchainReadHandle,
}

impl<N: NetworkZone> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
type Response = P2pProtocolRequestHandler<N>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future {
// TODO: check sync info?

let blockchain_read_handle = self.blockchain_read_handle.clone();

ready(Ok(P2pProtocolRequestHandler {
peer_information,
blockchain_read_handle,
}))
}
}

/// The P2P protocol request handler.
#[derive(Clone)]
pub struct P2pProtocolRequestHandler<N: NetworkZone> {
/// The [`PeerInformation`] for this peer.
peer_information: PeerInformation<N>,
/// The [`BlockchainReadHandle`]
blockchain_read_handle: BlockchainReadHandle,
}

impl<Z: NetworkZone> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
type Response = ProtocolResponse;
type Error = anyhow::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, request: ProtocolRequest) -> Self::Future {
match request {
ProtocolRequest::GetObjects(r) => {
get_objects(r, self.blockchain_read_handle.clone()).boxed()
}
ProtocolRequest::GetChain(r) => {
get_chain(r, self.blockchain_read_handle.clone()).boxed()
}
ProtocolRequest::FluffyMissingTxs(r) => {
fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed()
}
ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!(
"Peer sent a full block when we support fluffy blocks"
)))
.boxed(),
ProtocolRequest::NewFluffyBlock(r) => {
new_fluffy_block(r, self.blockchain_read_handle.clone()).boxed()
}
ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => {
ready(Ok(ProtocolResponse::NA)).boxed()
} // TODO: tx-pool
}
}
}

//---------------------------------------------------------------------------------------------------- Handler functions

/// [`ProtocolRequest::GetObjects`]
async fn get_objects(
request: GetObjectsRequest,
mut blockchain_read_handle: BlockchainReadHandle,
) -> anyhow::Result<ProtocolResponse> {
if request.blocks.len() > MAX_BLOCK_BATCH_LEN {
anyhow::bail!("Peer requested more blocks than allowed.")
}

let block_hashes: Vec<[u8; 32]> = (&request.blocks).into();
// deallocate the backing `Bytes`.
drop(request);

let BlockchainResponse::BlockCompleteEntries {
blocks,
missing_hashes,
blockchain_height,
} = blockchain_read_handle
.ready()
.await?
.call(BlockchainReadRequest::BlockCompleteEntries(block_hashes))
.await?
else {
panic!("blockchain returned wrong response!");
};

Ok(ProtocolResponse::GetObjects(GetObjectsResponse {
blocks,
missed_ids: ByteArrayVec::from(missing_hashes),
current_blockchain_height: usize_to_u64(blockchain_height),
}))
}

/// [`ProtocolRequest::GetChain`]
async fn get_chain(
request: ChainRequest,
mut blockchain_read_handle: BlockchainReadHandle,
) -> anyhow::Result<ProtocolResponse> {
if request.block_ids.len() > 25_000 {
anyhow::bail!("Peer sent too many block hashes in chain request.")
}

let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into();
let want_pruned_data = request.prune;
// deallocate the backing `Bytes`.
drop(request);

let BlockchainResponse::NextChainEntry {
start_height,
chain_height,
block_ids,
block_weights,
cumulative_difficulty,
first_block_blob,
} = blockchain_read_handle
.ready()
.await?
.call(BlockchainReadRequest::NextChainEntry(block_hashes, 10_000))
.await?
else {
panic!("blockchain returned wrong response!");
};

if start_height == 0 {
anyhow::bail!("The peers chain has a different genesis block than ours.");
}

let (cumulative_difficulty_low64, cumulative_difficulty_top64) =
split_u128_into_low_high_bits(cumulative_difficulty);

Ok(ProtocolResponse::GetChain(ChainResponse {
start_height: usize_to_u64(start_height),
total_height: usize_to_u64(chain_height),
cumulative_difficulty_low64,
cumulative_difficulty_top64,
m_block_ids: ByteArrayVec::from(block_ids),
first_block: first_block_blob.map_or(Bytes::new(), Bytes::from),
// only needed when
m_block_weights: if want_pruned_data {
block_weights.into_iter().map(usize_to_u64).collect()
} else {
vec![]
},
}))
}

/// [`ProtocolRequest::FluffyMissingTxs`]
async fn fluffy_missing_txs(
mut request: FluffyMissingTransactionsRequest,
mut blockchain_read_handle: BlockchainReadHandle,
) -> anyhow::Result<ProtocolResponse> {
let tx_indexes = std::mem::take(&mut request.missing_tx_indices);
let block_hash: [u8; 32] = *request.block_hash;
let current_blockchain_height = request.current_blockchain_height;

// deallocate the backing `Bytes`.
drop(request);

let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle
.ready()
.await?
.call(BlockchainReadRequest::MissingTxsInBlock {
block_hash,
tx_indexes,
})
.await?
else {
panic!("blockchain returned wrong response!");
};

let Some(MissingTxsInBlock { block, txs }) = res else {
anyhow::bail!("The peer requested txs out of range.");
};

Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock {
b: BlockCompleteEntry {
block: Bytes::from(block),
txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()),
pruned: false,
// only needed for pruned blocks.
block_weight: 0,
},
current_blockchain_height,
}))
}

/// [`ProtocolRequest::NewFluffyBlock`]
async fn new_fluffy_block(
request: NewFluffyBlock,
mut blockchain_read_handle: BlockchainReadHandle,
) -> anyhow::Result<ProtocolResponse> {
let current_blockchain_height = request.current_blockchain_height;

let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> {
let block = Block::read(&mut request.b.block.as_ref())?;

let tx_blobs = request
.b
.txs
.take_normal()
.ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?;

let mut txs_in_block = block.transactions.iter().copied().collect::<HashSet<_>>();

// TODO: size check these tx blobs
let txs = tx_blobs
.into_iter()
.map(|tx_blob| {
let tx = Transaction::read(&mut tx_blob.as_ref())?;

let tx = new_tx_verification_data(tx)?;

if !txs_in_block.remove(&tx.tx_hash) {
anyhow::bail!("Peer sent tx in fluffy block that wasn't actually in block")
}

Ok((tx.tx_hash, tx))
})
.collect::<Result<_, anyhow::Error>>()?;

// The backing `Bytes` will be deallocated when this closure returns.

Ok((block, txs))
})
.await?;

let res =
blockchain_interface::handle_incoming_block(block, txs, &mut blockchain_read_handle).await;

match res {
Ok(_) => Ok(ProtocolResponse::NA),
Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok(
ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest {
block_hash: block_hash.into(),
current_blockchain_height,
missing_tx_indices,
}),
),
Err(IncomingBlockError::Orphan) => {
// Block's parent was unknown, could be syncing?
Ok(ProtocolResponse::NA)
}
Err(e) => Err(e.into()),
}
}
2 changes: 1 addition & 1 deletion net/wire/src/p2p/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ epee_object!(
current_blockchain_height: u64,
);

/// A request for Txs we are missing from our `TxPool`
/// A request for txs we are missing from an incoming block.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FluffyMissingTransactionsRequest {
/// The Block we are missing the Txs in
Expand Down
4 changes: 4 additions & 0 deletions p2p/p2p-core/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub enum ProtocolResponse {
GetChain(ChainResponse),
NewFluffyBlock(NewFluffyBlock),
NewTransactions(NewTransactions),
FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest),
NA,
}

Expand All @@ -139,6 +140,9 @@ impl PeerResponse {
ProtocolResponse::GetChain(_) => MessageID::GetChain,
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock,
ProtocolResponse::FluffyMissingTransactionsRequest(_) => {
MessageID::FluffyMissingTxs
}

ProtocolResponse::NA => return None,
},
Expand Down
3 changes: 3 additions & 0 deletions p2p/p2p-core/src/protocol/try_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val),
ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val),
ProtocolResponse::FluffyMissingTransactionsRequest(val) => {
Self::FluffyMissingTransactionsRequest(val)
}
ProtocolResponse::NA => return Err(MessageConversionError),
})
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/p2p/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
/// The enforced maximum amount of blocks to request in a batch.
///
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100;
pub const MAX_BLOCK_BATCH_LEN: usize = 100;

/// The timeout that the block downloader will use for requests.
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
Expand Down
1 change: 1 addition & 0 deletions storage/blockchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ serde = { workspace = true, optional = true }
tower = { workspace = true }
thread_local = { workspace = true, optional = true }
rayon = { workspace = true, optional = true }
bytes = "1.7.2"

[dev-dependencies]
cuprate-constants = { path = "../../constants" }
Expand Down
Loading

0 comments on commit 49b8463

Please sign in to comment.