From b6d94cf78017109d55f670ddc198f925947da75d Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 13 Oct 2024 01:22:20 +0100 Subject: [PATCH] finish incoming tx handler --- binaries/cuprated/Cargo.toml | 2 +- binaries/cuprated/src/txpool.rs | 1 - binaries/cuprated/src/txpool/dandelion.rs | 31 +- .../src/txpool/dandelion/diffuse_service.rs | 21 +- .../src/txpool/dandelion/stem_service.rs | 41 ++- .../cuprated/src/txpool/dandelion/tx_store.rs | 59 ++- binaries/cuprated/src/txpool/incoming_tx.rs | 337 ++++++++++-------- binaries/cuprated/src/txpool/manager.rs | 1 - .../cuprated/src/txpool/txs_being_handled.rs | 7 +- p2p/dandelion-tower/src/router.rs | 2 +- storage/service/src/service/write.rs | 10 +- storage/txpool/src/service/interface.rs | 27 +- storage/txpool/src/service/read.rs | 1 + storage/txpool/src/types.rs | 3 + 14 files changed, 316 insertions(+), 227 deletions(-) delete mode 100644 binaries/cuprated/src/txpool/manager.rs diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 62a10322b..c8a4247fc 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -20,7 +20,7 @@ cuprate-levin = { path = "../../net/levin" } cuprate-wire = { path = "../../net/wire" } cuprate-p2p = { path = "../../p2p/p2p" } cuprate-p2p-core = { path = "../../p2p/p2p-core" } -cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" } +cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower", features = ["txpool"] } cuprate-async-buffer = { path = "../../p2p/async-buffer" } cuprate-address-book = { path = "../../p2p/address-book" } cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] } diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index af5a4205c..79731023e 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -4,5 +4,4 @@ mod dandelion; mod incoming_tx; -mod manager; mod txs_being_handled; diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs index 27f1f79da..07fd4792e 100644 --- a/binaries/cuprated/src/txpool/dandelion.rs +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -1,6 +1,8 @@ +use std::time::Duration; + use bytes::Bytes; use cuprate_dandelion_tower::pool::DandelionPoolService; -use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter}; +use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter, Graph}; use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; @@ -11,10 +13,17 @@ mod stem_service; mod tx_store; #[derive(Clone)] -struct DandelionTx(Bytes); +pub struct DandelionTx(Bytes); type TxId = [u8; 32]; +const DANDELION_CONFIG: DandelionConfig = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(10 * 60), + fluff_probability: 0.12, + graph: Graph::FourRegular, +}; + type ConcreteDandelionRouter = DandelionRouter< stem_service::OutboundPeerStream, diffuse_service::DiffuseService, @@ -35,12 +44,7 @@ pub fn start_dandelion_pool_manager( txpool_read_handle, txpool_write_handle, }, - DandelionConfig { - time_between_hop: Default::default(), - epoch_duration: Default::default(), - fluff_probability: 0.0, - graph: Default::default(), - }, + DANDELION_CONFIG, ) } @@ -49,14 +53,7 @@ pub fn dandelion_router(clear_net: NetworkInterface) -> ConcreteDandel diffuse_service::DiffuseService { clear_net_broadcast_service: clear_net.broadcast_svc(), }, - stem_service::OutboundPeerStream { - clear_net: clear_net.clone(), - }, - DandelionConfig { - time_between_hop: Default::default(), - epoch_duration: Default::default(), - fluff_probability: 0.0, - graph: Default::default(), - }, + stem_service::OutboundPeerStream { clear_net }, + DANDELION_CONFIG, ) } diff --git a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs index 57b7d2923..115799d05 100644 --- a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs +++ b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs @@ -1,19 +1,26 @@ -use std::task::{Context, Poll}; +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; + +use futures::FutureExt; use tower::Service; -use crate::txpool::dandelion::DandelionTx; use cuprate_dandelion_tower::traits::DiffuseRequest; use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface}; use cuprate_p2p_core::ClearNet; +use super::DandelionTx; + +/// The dandelion diffusion service. pub struct DiffuseService { pub clear_net_broadcast_service: BroadcastSvc, } impl Service> for DiffuseService { - type Response = BroadcastSvc::Response; + type Response = (); type Error = tower::BoxError; - type Future = BroadcastSvc::Future; + type Future = Ready>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.clear_net_broadcast_service @@ -22,11 +29,17 @@ impl Service> for DiffuseService { } fn call(&mut self, req: DiffuseRequest) -> Self::Future { + // TODO: Call `into_inner` when 1.82.0 stabilizes self.clear_net_broadcast_service .call(BroadcastRequest::Transaction { tx_bytes: req.0 .0, direction: None, received_from: None, }) + .now_or_never() + .unwrap() + .expect("Broadcast service is Infallible"); + + ready(Ok(())) } } diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs index 6970ebfc8..330c88430 100644 --- a/binaries/cuprated/src/txpool/dandelion/stem_service.rs +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -1,17 +1,23 @@ -use super::DandelionTx; -use bytes::Bytes; -use cuprate_dandelion_tower::traits::StemRequest; -use cuprate_dandelion_tower::OutboundPeer; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer}; use cuprate_p2p::NetworkInterface; -use cuprate_p2p_core::client::Client; -use cuprate_p2p_core::{ClearNet, NetworkZone, PeerRequest, ProtocolRequest}; -use cuprate_wire::protocol::NewTransactions; -use cuprate_wire::NetworkAddress; +use cuprate_p2p_core::{ + client::{Client, InternalPeerID}, + ClearNet, NetworkZone, PeerRequest, ProtocolRequest, +}; +use cuprate_wire::{protocol::NewTransactions, NetworkAddress}; + +use bytes::Bytes; use futures::Stream; -use std::pin::Pin; -use std::task::{Context, Poll}; use tower::Service; +use super::DandelionTx; + +/// The dandelion outbound peer stream. pub struct OutboundPeerStream { pub clear_net: NetworkInterface, } @@ -20,22 +26,29 @@ impl Stream for OutboundPeerStream { type Item = Result>, tower::BoxError>; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + // TODO: make the outbound peer choice random. Poll::Ready(Some(Ok(self .clear_net .client_pool() .outbound_client() .map_or(OutboundPeer::Exhausted, |client| { - OutboundPeer::Peer(client.info.id.into(), StemPeerService(client)) + let addr = match client.info.id { + InternalPeerID::KnownAddr(addr) => addr, + InternalPeerID::Unknown(_) => panic!("Outbound peer had an unknown address"), + }; + + OutboundPeer::Peer(addr.into(), StemPeerService(client)) })))) } } -pub struct StemPeerService(Client); +/// The stem service, used to send stem txs. +pub struct StemPeerService(Client); impl Service> for StemPeerService { - type Response = (); + type Response = as Service>::Response; type Error = tower::BoxError; - type Future = Client::Future; + type Future = as Service>::Future; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs index 29f719e59..f13a64602 100644 --- a/binaries/cuprated/src/txpool/dandelion/tx_store.rs +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -1,15 +1,27 @@ -use crate::txpool::dandelion::{DandelionTx, TxId}; +use std::{ + f32::consts::E, + task::{Context, Poll}, +}; + use bytes::Bytes; -use cuprate_dandelion_tower::traits::{TxStoreRequest, TxStoreResponse}; +use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt}; +use tower::{util::Oneshot, Service, ServiceExt}; + +use cuprate_dandelion_tower::{ + traits::{TxStoreRequest, TxStoreResponse}, + State, +}; use cuprate_database::RuntimeError; -use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse}; -use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; -use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt}; -use std::task::{Context, Poll}; -use tower::util::Oneshot; -use tower::{Service, ServiceExt}; +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, + TxpoolReadHandle, TxpoolWriteHandle, +}; +use super::{DandelionTx, TxId}; + +/// The dandelion tx-store service. +/// +/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides. pub struct TxStoreService { pub txpool_read_handle: TxpoolReadHandle, pub txpool_write_handle: TxpoolWriteHandle, @@ -31,17 +43,34 @@ impl Service> for TxStoreService { .clone() .oneshot(TxpoolReadRequest::TxBlob(tx_id)) .map(|res| match res { - Ok(TxpoolReadResponse::TxBlob(blob)) => Ok(TxStoreResponse::Transaction(Some( - (DandelionTx(Bytes::from(blob)), todo!()), - ))), + Ok(TxpoolReadResponse::TxBlob { + tx_blob, + state_stem, + }) => { + let state = if state_stem { + State::Stem + } else { + State::Fluff + }; + + Ok(TxStoreResponse::Transaction(Some(( + DandelionTx(Bytes::from(tx_blob)), + state, + )))) + } Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)), Err(e) => Err(e.into()), Ok(_) => unreachable!(), }) .boxed(), - TxStoreRequest::Promote(tx_id) => { - todo!() - } + TxStoreRequest::Promote(tx_id) => self + .txpool_write_handle + .oneshot(TxpoolWriteRequest::Promote(tx_id)) + .map(|res| match res { + Ok(_) | Err(RuntimeError::KeyNotFound) => TxStoreResponse::Ok, + Err(e) => Err(e.into()), + }) + .boxed(), } } } diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 37202b06f..abac69ddc 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -1,58 +1,76 @@ -use std::collections::HashSet; -use std::future::ready; -use std::sync::Arc; -use std::task::{Context, Poll}; +use std::{ + collections::HashSet, + future::ready, + sync::Arc, + task::{Context, Poll}, +}; -use crate::blockchain::ConcreteTxVerifierService; -use crate::txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled}; use bytes::Bytes; -use cuprate_consensus::transactions::new_tx_verification_data; +use dashmap::DashSet; +use futures::{future::BoxFuture, FutureExt}; +use monero_serai::transaction::Transaction; +use sha3::{Digest, Sha3_256}; +use tower::{Service, ServiceExt}; + use cuprate_consensus::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, - ExtendedConsensusError, TxVerifierService, VerifyTxRequest, VerifyTxResponse, + transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, ExtendedConsensusError, TxVerifierService, VerifyTxRequest, + VerifyTxResponse, +}; +use cuprate_dandelion_tower::{ + pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}, + TxState, }; -use cuprate_dandelion_tower::pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}; -use cuprate_dandelion_tower::TxState; use cuprate_helper::asynch::rayon_spawn_async; -use cuprate_txpool::service::interface::{ - TxpoolReadRequest, TxpoolWriteRequest, TxpoolWriteResponse, +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, + TxpoolReadHandle, TxpoolWriteHandle, }; -use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; +use cuprate_types::TransactionVerificationData; use cuprate_wire::NetworkAddress; -use dashmap::DashSet; -use futures::future::BoxFuture; -use futures::FutureExt; -use monero_serai::transaction::Transaction; -use sha3::{Digest, Sha3_256}; -use tower::{Service, ServiceExt}; +use crate::{ + blockchain::ConcreteTxVerifierService, + constants::PANIC_CRITICAL_SERVICE_ERROR, + signals::REORG_LOCK, + txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled}, +}; + +/// An error that can happen handling an incoming tx. pub enum IncomingTxError { Parse(std::io::Error), Consensus(ExtendedConsensusError), DuplicateTransaction, } -pub enum IncomingTxs { - Bytes { - txs: Vec, - state: TxState, - }, +/// Incoming transactions. +pub struct IncomingTxs { + pub txs: Vec, + pub state: TxState, } +/// The transaction type used for dandelion++. +#[derive(Clone)] struct DandelionTx(Bytes); +/// A transaction ID/hash. type TxId = [u8; 32]; +/// The service than handles incoming transaction pool transactions. +/// +/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes. pub struct IncomingTxHandler { - txs_being_added: Arc, - + /// A store of txs currently being handled in incoming tx requests. + txs_being_handled: TxsBeingHandled, + /// The blockchain context cache. blockchain_context_cache: BlockChainContextService, - + /// The dandelion txpool manager. dandelion_pool_manager: DandelionPoolService, + /// The transaction verifier service. tx_verifier_service: ConcreteTxVerifierService, - + /// The txpool write handle. txpool_write_handle: TxpoolWriteHandle, - + /// The txpool read handle. txpool_read_handle: TxpoolReadHandle, } @@ -66,70 +84,18 @@ impl Service for IncomingTxHandler { } fn call(&mut self, req: IncomingTxs) -> Self::Future { - let IncomingTxs::Bytes { mut txs, state } = req; - - let mut local_tracker = self.txs_being_added.local_tracker(); - - txs.retain(|bytes| local_tracker.try_add_tx(bytes.as_ref())); - - if txs.is_empty() { - return ready(Ok(())).boxed(); - } - - let mut blockchain_context_cache = self.blockchain_context_cache.clone(); - let mut tx_verifier_service = self.tx_verifier_service.clone(); - let mut txpool_write_handle = self.txpool_write_handle.clone(); - - async move { - let txs = rayon_spawn_async(move || { - txs.into_iter() - .map(|bytes| { - let tx = Transaction::read(&mut bytes.as_ref()) - .map_err(IncomingTxError::Parse)?; - - let tx = new_tx_verification_data(tx) - .map_err(|e| IncomingTxError::Consensus(e.into()))?; - - Ok(Arc::new(tx)) - }) - .collect::, IncomingTxError>>() - }) - .await?; - - let BlockChainContextResponse::Context(context) = blockchain_context_cache - .ready() - .await? - .call(BlockChainContextRequest::GetContext) - .await? - else { - unreachable!() - }; - - let context = context.unchecked_blockchain_context(); - - tx_verifier_service - .ready() - .await? - .call(VerifyTxRequest::Prepped { - txs: txs.clone(), - current_chain_height: context.chain_height, - top_hash: context.top_hash, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - }) - .await?; - - txpool_write_handle - .ready() - .await? - .call(TxpoolWriteRequest::AddTransaction { - tx, - state_stem: state.state_stem(), - }) - .await; - - todo!() - } + let IncomingTxs::Bytes { txs, state } = req; + + handle_incoming_txs( + txs, + state, + self.txs_being_handled.clone(), + self.blockchain_context_cache.clone(), + self.tx_verifier_service.clone(), + self.txpool_write_handle.clone(), + self.txpool_read_handle.clone(), + self.dandelion_pool_manager.clone(), + ) .boxed() } } @@ -137,38 +103,106 @@ impl Service for IncomingTxHandler { async fn handle_incoming_txs( txs: Vec, state: TxState, - tx_being_handled_locally: TxBeingHandledLocally, + txs_being_handled: TxsBeingHandled, mut blockchain_context_cache: BlockChainContextService, mut tx_verifier_service: ConcreteTxVerifierService, mut txpool_write_handle: TxpoolWriteHandle, mut txpool_read_handle: TxpoolReadHandle, mut dandelion_pool_manager: DandelionPoolService, ) -> Result<(), IncomingTxError> { + let reorg_guard = REORG_LOCK.read().await; + + let (txs, txs_being_handled_guard) = + prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; + + let BlockChainContextResponse::Context(context) = blockchain_context_cache + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::GetContext) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + tx_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await + .map_err(IncomingTxError::Consensus)?; + + for tx in txs { + handle_valid_tx( + tx, + state.clone(), + &mut txpool_write_handle, + &mut dandelion_pool_manager, + ) + .await + } + + Ok(()) +} + +/// Prepares the incoming transactions for verification. +/// +/// This will filter out all transactions already in the pool or txs already being handled in another request. +async fn prepare_incoming_txs( + tx_blobs: Vec, + txs_being_handled: TxsBeingHandled, + txpool_read_handle: &mut TxpoolReadHandle, +) -> Result<(Vec>, TxBeingHandledLocally), IncomingTxError> { let mut tx_blob_hashes = HashSet::new(); + let mut txs_being_handled_loacally = txs_being_handled.local_tracker(); - let txs = txs + // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch. + let txs = tx_blobs .into_iter() - .map(|tx_blob| { + .filter_map(|tx_blob| { let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); + + // If a duplicate is in here the incoming tx batch contained the same tx twice. if !tx_blob_hashes.insert(tx_blob_hash) { - return Err(IncomingTxError::DuplicateTransaction); + return Some(Err(IncomingTxError::DuplicateTransaction)); + } + + // If a duplicate is here it is being handled in another batch. + if !txs_being_handled_loacally.try_add_tx(tx_blob_hash) { + return None; } - Ok((tx_blob_hash, tx_blob)) + Some(Ok((tx_blob_hash, tx_blob))) }) .collect::, _>>()?; - let TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle + // Filter the txs already in the txpool out. + // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue. + let TxpoolReadResponse::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle .ready() - .await? + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes)) - .await? + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) else { unreachable!() }; - let txs = rayon_spawn_async(move || { - txs.into_iter() + // Now prepare the txs for verification. + rayon_spawn_async(move || { + let txs = txs + .into_iter() .filter_map(|(tx_blob_hash, tx_blob)| { if tx_blob_hashes.contains(&tx_blob_hash) { Some(tx_blob) @@ -184,66 +218,55 @@ async fn handle_incoming_txs( Ok(Arc::new(tx)) }) - .collect::, IncomingTxError>>() + .collect::, IncomingTxError>>()?; + + Ok((txs, txs_being_handled_loacally)) }) - .await?; + .await +} - let BlockChainContextResponse::Context(context) = blockchain_context_cache +async fn handle_valid_tx( + tx: Arc, + state: TxState, + txpool_write_handle: &mut TxpoolWriteHandle, + dandelion_pool_manager: &mut DandelionPoolService, +) { + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash); + + let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle .ready() - .await? - .call(BlockChainContextRequest::GetContext) - .await? + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::AddTransaction { + tx, + state_stem: state.state_stem(), + }) + .await + .expect("TODO") else { unreachable!() }; - let context = context.unchecked_blockchain_context(); + // TODO: track double spends to quickly ignore them from their blob hash. + if let Some(tx_hash) = double_spend { + return; + }; - tx_verifier_service - .ready() - .await? - .call(VerifyTxRequest::Prepped { - txs: txs.clone(), - current_chain_height: context.chain_height, - top_hash: context.top_hash, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - }) - .await?; + // TODO: check blockchain for double spends to prevent a race condition. - for tx in txs { - let incoming_tx = IncomingTxBuilder::new(Bytes::copy_from_slice(&tx.tx_blob), tx.tx_hash); - - let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle - .ready() - .await? - .call(TxpoolWriteRequest::AddTransaction { - tx, - state_stem: state.state_stem(), - }) - .await? - else { - unreachable!() - }; - - // TODO: track double spends to quickly ignore them from their blob hash. - if let Some(tx_hash) = double_spend { - continue; - }; - - // TODO: check blockchain for double spends to prevent a race condition. - - // TODO: fill this in properly. - let incoming_tx = incoming_tx - .with_routing_state(state.clone()) - .with_state_in_db(None) - .build() - .unwrap(); - - dandelion_pool_manager - .ready() - .await? - .call(incoming_tx) - .await?; - } + // TODO: fill this in properly. + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(None) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); } diff --git a/binaries/cuprated/src/txpool/manager.rs b/binaries/cuprated/src/txpool/manager.rs deleted file mode 100644 index 8b1378917..000000000 --- a/binaries/cuprated/src/txpool/manager.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/binaries/cuprated/src/txpool/txs_being_handled.rs b/binaries/cuprated/src/txpool/txs_being_handled.rs index 3973e7a03..8b30821d4 100644 --- a/binaries/cuprated/src/txpool/txs_being_handled.rs +++ b/binaries/cuprated/src/txpool/txs_being_handled.rs @@ -7,6 +7,7 @@ pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] { hasher.update(tx_bytes); hasher.finalize().into() } + #[derive(Clone)] pub struct TxsBeingHandled(Arc>); @@ -25,11 +26,7 @@ pub struct TxBeingHandledLocally { } impl TxBeingHandledLocally { - pub fn try_add_tx(&mut self, tx_bytes: &[u8]) -> bool { - let mut hasher = Sha3_256::new(); - hasher.update(tx_bytes); - let tx_blob_hash = hasher.finalize().into(); - + pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool { if !self.txs_being_handled.0.insert(tx_blob_hash) { return false; } diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index c04dcaea5..899b1235a 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -74,7 +74,7 @@ pub enum TxState { } impl TxState { - pub const fn state_stem(&self) -> bool { + pub fn state_stem(&self) -> bool { matches!(self, Self::Local | Self::Stem { .. }) } } diff --git a/storage/service/src/service/write.rs b/storage/service/src/service/write.rs index 3914f2293..607c4aa60 100644 --- a/storage/service/src/service/write.rs +++ b/storage/service/src/service/write.rs @@ -21,7 +21,7 @@ const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); /// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] /// will return an `async`hronous channel that can be `.await`ed upon /// to receive the corresponding response. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct DatabaseWriteHandle { /// Sender channel to the database write thread-pool. /// @@ -30,6 +30,14 @@ pub struct DatabaseWriteHandle { crossbeam::channel::Sender<(Req, oneshot::Sender>)>, } +impl Clone for DatabaseWriteHandle { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + } + } +} + impl DatabaseWriteHandle where Req: Send + 'static, diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 22a09cba5..cfcd8da80 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,13 +1,11 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. - -use std::collections::HashSet; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use cuprate_types::TransactionVerificationData; -use crate::types::TransactionHash; +use crate::types::{TransactionBlobHash, TransactionHash}; //---------------------------------------------------------------------------------------------------- TxpoolReadRequest /// The transaction pool [`tower::Service`] read request type. @@ -17,8 +15,10 @@ pub enum TxpoolReadRequest { TxBlob(TransactionHash), /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. TxVerificationData(TransactionHash), - - FilterKnownTxBlobHashes(HashSet), + /// A request to filter (remove) all **known** transactions from the set. + /// + /// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob. + FilterKnownTxBlobHashes(HashSet), } //---------------------------------------------------------------------------------------------------- TxpoolReadResponse @@ -26,12 +26,14 @@ pub enum TxpoolReadRequest { #[expect(clippy::large_enum_variant)] pub enum TxpoolReadResponse { /// A response containing the raw bytes of a transaction. - // TODO: use bytes::Bytes. - TxBlob(Vec), + TxBlob { + tx_blob: Vec, + state_stem: bool, + }, /// A response of [`TransactionVerificationData`]. TxVerificationData(TransactionVerificationData), - - FilterKnownTxBlobHashes(HashSet), + /// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. + FilterKnownTxBlobHashes(HashSet), } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest @@ -53,6 +55,11 @@ pub enum TxpoolWriteRequest { /// /// Returns [`TxpoolWriteResponse::Ok`]. RemoveTransaction(TransactionHash), + /// Promote a transaction from the stem pool to the fluff pool. + /// If the tx is already in the fluff pool this does nothing. + /// + /// Returns [`TxpoolWriteResponse::Ok`]. + Promote(TransactionHash), } //---------------------------------------------------------------------------------------------------- TxpoolWriteResponse diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index f0068130f..20a4bcdc6 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -58,6 +58,7 @@ fn map_request( match request { TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash), TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash), + _ => todo!(), } } diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index 4da2d0fed..83d9e01ab 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -17,6 +17,9 @@ pub type KeyImage = [u8; 32]; /// A transaction hash. pub type TransactionHash = [u8; 32]; +/// A transaction blob hash. +pub type TransactionBlobHash = [u8; 32]; + bitflags::bitflags! { /// Flags representing the state of the transaction in the pool. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]