From adf592e5309eba8740356541ac8dd43bd59849da Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 10 Oct 2024 20:12:18 +0100 Subject: [PATCH] init dandelion integration --- Cargo.lock | 1 + Cargo.toml | 1 + binaries/cuprated/Cargo.toml | 1 + binaries/cuprated/src/blockchain.rs | 2 +- binaries/cuprated/src/txpool.rs | 5 + binaries/cuprated/src/txpool/dandelion.rs | 38 +++ .../src/txpool/dandelion/diffuse_service.rs | 32 +++ .../src/txpool/dandelion/stem_service.rs | 54 ++++ .../cuprated/src/txpool/dandelion/tx_store.rs | 47 ++++ binaries/cuprated/src/txpool/incoming_tx.rs | 249 ++++++++++++++++++ binaries/cuprated/src/txpool/manager.rs | 1 + .../cuprated/src/txpool/txs_being_handled.rs | 48 ++++ p2p/dandelion-tower/src/router.rs | 6 + p2p/p2p/src/client_pool.rs | 12 +- storage/service/src/service/write.rs | 2 +- storage/txpool/src/service/interface.rs | 8 + 16 files changed, 504 insertions(+), 3 deletions(-) create mode 100644 binaries/cuprated/src/txpool/dandelion.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/diffuse_service.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/stem_service.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/tx_store.rs create mode 100644 binaries/cuprated/src/txpool/incoming_tx.rs create mode 100644 binaries/cuprated/src/txpool/manager.rs create mode 100644 binaries/cuprated/src/txpool/txs_being_handled.rs diff --git a/Cargo.lock b/Cargo.lock index 5fea18d04..5e597f868 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1008,6 +1008,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", + "sha3", "thiserror", "thread_local", "tokio", diff --git a/Cargo.toml b/Cargo.toml index fa348ccdc..1de62ec79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ rayon = { version = "1.10.0", default-features = false } serde_bytes = { version = "0.11.15", default-features = false } serde_json = { version = "1.0.128", default-features = false } serde = { version = "1.0.210", default-features = false } +sha3 = { version = "0.10.8", default-features = false } thiserror = { version = "1.0.63", default-features = false } thread_local = { version = "1.1.8", default-features = false } tokio-util = { version = "0.7.12", default-features = false } diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 325406bf7..62a10322b 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -64,6 +64,7 @@ rayon = { workspace = true } serde_bytes = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } +sha3 = { workspace = true, features = ["std"] } thiserror = { workspace = true } thread_local = { workspace = true } tokio-util = { workspace = true } diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index a06f3fa73..c4b75e4ed 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -25,7 +25,7 @@ mod manager; mod syncer; mod types; -use types::{ +pub use types::{ ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index a6f05e751..af5a4205c 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -1,3 +1,8 @@ //! Transaction Pool //! //! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool. + +mod dandelion; +mod incoming_tx; +mod manager; +mod txs_being_handled; diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs new file mode 100644 index 000000000..47d1ca385 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -0,0 +1,38 @@ +use bytes::Bytes; +use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter}; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::ClearNet; +use cuprate_wire::NetworkAddress; + +mod diffuse_service; +mod stem_service; +mod tx_store; + +struct DandelionTx(Bytes); + +type TxId = [u8; 32]; + +pub fn start_dandelion_router( + clear_net: NetworkInterface, +) -> DandelionRouter< + stem_service::OutboundPeerStream, + diffuse_service::DiffuseService, + NetworkAddress, + stem_service::StemPeerService, + DandelionTx, +> { + DandelionRouter::new( + diffuse_service::DiffuseService { + clear_net_broadcast_service: clear_net.broadcast_svc(), + }, + stem_service::OutboundPeerStream { + clear_net: clear_net.clone(), + }, + DandelionConfig { + time_between_hop: Default::default(), + epoch_duration: Default::default(), + fluff_probability: 0.0, + graph: Default::default(), + }, + ) +} diff --git a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs new file mode 100644 index 000000000..57b7d2923 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs @@ -0,0 +1,32 @@ +use std::task::{Context, Poll}; +use tower::Service; + +use crate::txpool::dandelion::DandelionTx; +use cuprate_dandelion_tower::traits::DiffuseRequest; +use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface}; +use cuprate_p2p_core::ClearNet; + +pub struct DiffuseService { + pub clear_net_broadcast_service: BroadcastSvc, +} + +impl Service> for DiffuseService { + type Response = BroadcastSvc::Response; + type Error = tower::BoxError; + type Future = BroadcastSvc::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.clear_net_broadcast_service + .poll_ready(cx) + .map_err(Into::into) + } + + fn call(&mut self, req: DiffuseRequest) -> Self::Future { + self.clear_net_broadcast_service + .call(BroadcastRequest::Transaction { + tx_bytes: req.0 .0, + direction: None, + received_from: None, + }) + } +} diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs new file mode 100644 index 000000000..6970ebfc8 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -0,0 +1,54 @@ +use super::DandelionTx; +use bytes::Bytes; +use cuprate_dandelion_tower::traits::StemRequest; +use cuprate_dandelion_tower::OutboundPeer; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::client::Client; +use cuprate_p2p_core::{ClearNet, NetworkZone, PeerRequest, ProtocolRequest}; +use cuprate_wire::protocol::NewTransactions; +use cuprate_wire::NetworkAddress; +use futures::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::Service; + +pub struct OutboundPeerStream { + pub clear_net: NetworkInterface, +} + +impl Stream for OutboundPeerStream { + type Item = Result>, tower::BoxError>; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Some(Ok(self + .clear_net + .client_pool() + .outbound_client() + .map_or(OutboundPeer::Exhausted, |client| { + OutboundPeer::Peer(client.info.id.into(), StemPeerService(client)) + })))) + } +} + +pub struct StemPeerService(Client); + +impl Service> for StemPeerService { + type Response = (); + type Error = tower::BoxError; + type Future = Client::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, req: StemRequest) -> Self::Future { + self.0 + .call(PeerRequest::Protocol(ProtocolRequest::NewTransactions( + NewTransactions { + txs: vec![req.0 .0], + dandelionpp_fluff: false, + padding: Bytes::new(), + }, + ))) + } +} diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs new file mode 100644 index 000000000..98de764bd --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -0,0 +1,47 @@ +use crate::txpool::dandelion::{DandelionTx, TxId}; +use bytes::Bytes; +use cuprate_dandelion_tower::traits::{TxStoreRequest, TxStoreResponse}; +use cuprate_database::RuntimeError; +use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse}; +use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use std::task::{Context, Poll}; +use tower::util::Oneshot; +use tower::{Service, ServiceExt}; + +pub struct TxStoreService { + txpool_read_handle: TxpoolReadHandle, + txpool_write_handle: TxpoolWriteHandle, +} + +impl Service> for TxStoreService { + type Response = TxStoreResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: TxStoreRequest) -> Self::Future { + match req { + TxStoreRequest::Get(tx_id) => self + .txpool_read_handle + .clone() + .oneshot(TxpoolReadRequest::TxBlob(tx_id)) + .map(|res| match res { + Ok(TxpoolReadResponse::TxBlob(blob)) => Ok(TxStoreResponse::Transaction(Some( + (DandelionTx(Bytes::from(blob)), todo!()), + ))), + Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)), + Err(e) => Err(e.into()), + Ok(_) => unreachable!(), + }) + .boxed(), + TxStoreRequest::Promote(tx_id) => { + todo!() + } + } + } +} diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs new file mode 100644 index 000000000..37202b06f --- /dev/null +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -0,0 +1,249 @@ +use std::collections::HashSet; +use std::future::ready; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::blockchain::ConcreteTxVerifierService; +use crate::txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled}; +use bytes::Bytes; +use cuprate_consensus::transactions::new_tx_verification_data; +use cuprate_consensus::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, + ExtendedConsensusError, TxVerifierService, VerifyTxRequest, VerifyTxResponse, +}; +use cuprate_dandelion_tower::pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}; +use cuprate_dandelion_tower::TxState; +use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_txpool::service::interface::{ + TxpoolReadRequest, TxpoolWriteRequest, TxpoolWriteResponse, +}; +use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; +use cuprate_wire::NetworkAddress; +use dashmap::DashSet; +use futures::future::BoxFuture; +use futures::FutureExt; +use monero_serai::transaction::Transaction; +use sha3::{Digest, Sha3_256}; +use tower::{Service, ServiceExt}; + +pub enum IncomingTxError { + Parse(std::io::Error), + Consensus(ExtendedConsensusError), + DuplicateTransaction, +} + +pub enum IncomingTxs { + Bytes { + txs: Vec, + state: TxState, + }, +} + +struct DandelionTx(Bytes); + +type TxId = [u8; 32]; + +pub struct IncomingTxHandler { + txs_being_added: Arc, + + blockchain_context_cache: BlockChainContextService, + + dandelion_pool_manager: DandelionPoolService, + tx_verifier_service: ConcreteTxVerifierService, + + txpool_write_handle: TxpoolWriteHandle, + + txpool_read_handle: TxpoolReadHandle, +} + +impl Service for IncomingTxHandler { + type Response = (); + type Error = IncomingTxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: IncomingTxs) -> Self::Future { + let IncomingTxs::Bytes { mut txs, state } = req; + + let mut local_tracker = self.txs_being_added.local_tracker(); + + txs.retain(|bytes| local_tracker.try_add_tx(bytes.as_ref())); + + if txs.is_empty() { + return ready(Ok(())).boxed(); + } + + let mut blockchain_context_cache = self.blockchain_context_cache.clone(); + let mut tx_verifier_service = self.tx_verifier_service.clone(); + let mut txpool_write_handle = self.txpool_write_handle.clone(); + + async move { + let txs = rayon_spawn_async(move || { + txs.into_iter() + .map(|bytes| { + let tx = Transaction::read(&mut bytes.as_ref()) + .map_err(IncomingTxError::Parse)?; + + let tx = new_tx_verification_data(tx) + .map_err(|e| IncomingTxError::Consensus(e.into()))?; + + Ok(Arc::new(tx)) + }) + .collect::, IncomingTxError>>() + }) + .await?; + + let BlockChainContextResponse::Context(context) = blockchain_context_cache + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + tx_verifier_service + .ready() + .await? + .call(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await?; + + txpool_write_handle + .ready() + .await? + .call(TxpoolWriteRequest::AddTransaction { + tx, + state_stem: state.state_stem(), + }) + .await; + + todo!() + } + .boxed() + } +} + +async fn handle_incoming_txs( + txs: Vec, + state: TxState, + tx_being_handled_locally: TxBeingHandledLocally, + mut blockchain_context_cache: BlockChainContextService, + mut tx_verifier_service: ConcreteTxVerifierService, + mut txpool_write_handle: TxpoolWriteHandle, + mut txpool_read_handle: TxpoolReadHandle, + mut dandelion_pool_manager: DandelionPoolService, +) -> Result<(), IncomingTxError> { + let mut tx_blob_hashes = HashSet::new(); + + let txs = txs + .into_iter() + .map(|tx_blob| { + let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); + if !tx_blob_hashes.insert(tx_blob_hash) { + return Err(IncomingTxError::DuplicateTransaction); + } + + Ok((tx_blob_hash, tx_blob)) + }) + .collect::, _>>()?; + + let TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle + .ready() + .await? + .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes)) + .await? + else { + unreachable!() + }; + + let txs = rayon_spawn_async(move || { + txs.into_iter() + .filter_map(|(tx_blob_hash, tx_blob)| { + if tx_blob_hashes.contains(&tx_blob_hash) { + Some(tx_blob) + } else { + None + } + }) + .map(|bytes| { + let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?; + + let tx = new_tx_verification_data(tx) + .map_err(|e| IncomingTxError::Consensus(e.into()))?; + + Ok(Arc::new(tx)) + }) + .collect::, IncomingTxError>>() + }) + .await?; + + let BlockChainContextResponse::Context(context) = blockchain_context_cache + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + tx_verifier_service + .ready() + .await? + .call(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await?; + + for tx in txs { + let incoming_tx = IncomingTxBuilder::new(Bytes::copy_from_slice(&tx.tx_blob), tx.tx_hash); + + let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle + .ready() + .await? + .call(TxpoolWriteRequest::AddTransaction { + tx, + state_stem: state.state_stem(), + }) + .await? + else { + unreachable!() + }; + + // TODO: track double spends to quickly ignore them from their blob hash. + if let Some(tx_hash) = double_spend { + continue; + }; + + // TODO: check blockchain for double spends to prevent a race condition. + + // TODO: fill this in properly. + let incoming_tx = incoming_tx + .with_routing_state(state.clone()) + .with_state_in_db(None) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await? + .call(incoming_tx) + .await?; + } +} diff --git a/binaries/cuprated/src/txpool/manager.rs b/binaries/cuprated/src/txpool/manager.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/binaries/cuprated/src/txpool/manager.rs @@ -0,0 +1 @@ + diff --git a/binaries/cuprated/src/txpool/txs_being_handled.rs b/binaries/cuprated/src/txpool/txs_being_handled.rs new file mode 100644 index 000000000..3973e7a03 --- /dev/null +++ b/binaries/cuprated/src/txpool/txs_being_handled.rs @@ -0,0 +1,48 @@ +use dashmap::DashSet; +use sha3::{Digest, Sha3_256}; +use std::sync::Arc; + +pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] { + let mut hasher = Sha3_256::new(); + hasher.update(tx_bytes); + hasher.finalize().into() +} +#[derive(Clone)] +pub struct TxsBeingHandled(Arc>); + +impl TxsBeingHandled { + pub fn local_tracker(&self) -> TxBeingHandledLocally { + TxBeingHandledLocally { + txs_being_handled: self.clone(), + txs: vec![], + } + } +} + +pub struct TxBeingHandledLocally { + txs_being_handled: TxsBeingHandled, + txs: Vec<[u8; 32]>, +} + +impl TxBeingHandledLocally { + pub fn try_add_tx(&mut self, tx_bytes: &[u8]) -> bool { + let mut hasher = Sha3_256::new(); + hasher.update(tx_bytes); + let tx_blob_hash = hasher.finalize().into(); + + if !self.txs_being_handled.0.insert(tx_blob_hash) { + return false; + } + + self.txs.push(tx_blob_hash); + true + } +} + +impl Drop for TxBeingHandledLocally { + fn drop(&mut self) { + for hash in &self.txs { + self.txs_being_handled.0.remove(hash); + } + } +} diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index 88702be09..c04dcaea5 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -73,6 +73,12 @@ pub enum TxState { Local, } +impl TxState { + pub const fn state_stem(&self) -> bool { + matches!(self, Self::Local | Self::Stem { .. }) + } +} + /// A request to route a transaction. pub struct DandelionRouteReq { /// The transaction. diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index fc97fc1b6..25dd2420c 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -18,7 +18,7 @@ use tracing::{Instrument, Span}; use cuprate_p2p_core::{ client::{Client, InternalPeerID}, handles::ConnectionHandle, - NetworkZone, + ConnectionDirection, NetworkZone, }; pub(crate) mod disconnect_monitor; @@ -165,6 +165,16 @@ impl ClientPool { sync_data.cumulative_difficulty() > cumulative_difficulty }) } + + pub fn outbound_client(&self) -> Option> { + let client = self + .clients + .iter() + .find(|element| element.value().info.direction == ConnectionDirection::Outbound)?; + let id = *client.key(); + + Some(self.clients.remove(&id).unwrap().1) + } } mod sealed { diff --git a/storage/service/src/service/write.rs b/storage/service/src/service/write.rs index f75d61517..3914f2293 100644 --- a/storage/service/src/service/write.rs +++ b/storage/service/src/service/write.rs @@ -21,7 +21,7 @@ const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); /// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] /// will return an `async`hronous channel that can be `.await`ed upon /// to receive the corresponding response. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DatabaseWriteHandle { /// Sender channel to the database write thread-pool. /// diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 450b28d63..22a09cba5 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,6 +1,8 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. + +use std::collections::HashSet; use std::sync::Arc; use cuprate_types::TransactionVerificationData; @@ -9,11 +11,14 @@ use crate::types::TransactionHash; //---------------------------------------------------------------------------------------------------- TxpoolReadRequest /// The transaction pool [`tower::Service`] read request type. +#[derive(Clone)] pub enum TxpoolReadRequest { /// A request for the blob (raw bytes) of a transaction with the given hash. TxBlob(TransactionHash), /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. TxVerificationData(TransactionHash), + + FilterKnownTxBlobHashes(HashSet), } //---------------------------------------------------------------------------------------------------- TxpoolReadResponse @@ -25,10 +30,13 @@ pub enum TxpoolReadResponse { TxBlob(Vec), /// A response of [`TransactionVerificationData`]. TxVerificationData(TransactionVerificationData), + + FilterKnownTxBlobHashes(HashSet), } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest /// The transaction pool [`tower::Service`] write request type. +#[derive(Clone)] pub enum TxpoolWriteRequest { /// Add a transaction to the pool. ///