From c38eb0c46ff0e1c05932712cab5d15aa55d0e329 Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 13 Mar 2024 10:04:32 +0100 Subject: [PATCH] Revert "add waiting received Tx for parent one" This reverts commit 5015b61fa755efbd386067905c99ca80c6a3ade3. --- Cargo.lock | 10 -- crates/node/src/main.rs | 1 - crates/node/src/txvalidation/event.rs | 94 +----------------- crates/node/src/txvalidation/mod.rs | 131 +++++++------------------- crates/node/src/types/transaction.rs | 9 -- 5 files changed, 40 insertions(+), 205 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index caea0ba0..dec66001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1234,7 +1234,6 @@ dependencies = [ "hyper-util", "jsonrpsee", "libsecp256k1", - "lru", "num-bigint", "num-traits", "num_cpus", @@ -2112,15 +2111,6 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "lru" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" -dependencies = [ - "hashbrown 0.14.3", -] - [[package]] name = "matchers" version = "0.1.0" diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 166232d3..f7b84d2d 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -198,7 +198,6 @@ async fn run(config: Arc) -> Result<()> { http_peer_list.clone(), database.clone(), mempool.clone(), - database.clone(), ) .await?; diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 67bd2fdc..a0cdcac3 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -1,9 +1,6 @@ -use crate::mempool::Storage; use crate::txvalidation::acl::AclWhitelist; use crate::txvalidation::download_manager; use crate::txvalidation::EventProcessError; -use crate::txvalidation::MAX_CACHED_TX_FOR_VERIFICATION; -use crate::types::Hash; use crate::types::{ transaction::{Received, Validated}, Transaction, @@ -11,8 +8,6 @@ use crate::types::{ use crate::Mempool; use futures::future::join_all; use futures_util::TryFutureExt; -use lru::LruCache; -use std::collections::HashMap; use std::fmt::Debug; use std::net::SocketAddr; use std::path::Path; @@ -25,9 +20,6 @@ pub struct ReceivedTx; #[derive(Debug, Clone)] pub struct DownloadTx; -#[derive(Debug, Clone)] -pub struct WaitTx; - #[derive(Debug, Clone)] pub struct NewTx; @@ -59,11 +51,11 @@ impl From> for TxEvent { } } -impl From> for TxEvent { +impl From> for TxEvent { fn from(event: TxEvent) -> Self { TxEvent { tx: event.tx, - tx_type: WaitTx, + tx_type: NewTx, } } } @@ -84,15 +76,6 @@ impl From> for Option> { } } -impl From> for TxEvent { - fn from(event: TxEvent) -> Self { - TxEvent { - tx: event.tx, - tx_type: NewTx, - } - } -} - //Processing of event that arrive: SourceTxType. impl TxEvent { pub async fn process_event( @@ -133,7 +116,7 @@ impl TxEvent { self, local_directory_path: &Path, http_peer_list: Vec<(SocketAddr, Option)>, - ) -> Result<(TxEvent, Option>), EventProcessError> { + ) -> Result<(TxEvent, Option>), EventProcessError> { let http_client = reqwest::Client::new(); let asset_file_list = self.tx.get_asset_list().map_err(|err| { EventProcessError::DownloadAssetError(format!( @@ -159,7 +142,7 @@ impl TxEvent { .map_err(|err| { EventProcessError::DownloadAssetError(format!("Execution error:{err}")) })?; - let newtx: TxEvent = self.clone().into(); + let newtx: TxEvent = self.clone().into(); let propagate: Option> = self.into(); Ok((newtx, propagate)) } @@ -191,40 +174,6 @@ impl TxEvent { } } -impl TxEvent { - pub async fn process_event( - self, - cache: &mut TXCache, - storage: &impl Storage, - ) -> Result>, EventProcessError> { - // Verify Tx'x parent is already present. - let new_txs = if let Some(parent) = self.tx.payload.get_parent_tx() { - if cache.is_tx_cached(parent) - || storage - .get(parent) - .await - .map_err(|err| EventProcessError::StorageError(format!("{err}")))? - .is_some() - { - let mut new_txs = cache.remove_children_txs(parent); - new_txs.push(self); - new_txs - } else { - //parent is missing add to waiting list - cache.add_wait_tx(*parent, self); - vec![] - } - } else { - //no parent always new Tx. - vec![self] - }; - - let ret = new_txs.into_iter().map(|tx| tx.into()).collect(); - - Ok(ret) - } -} - impl TxEvent { pub async fn process_event(self, mempool: &mut Mempool) -> Result<(), EventProcessError> { let tx = Transaction { @@ -249,38 +198,3 @@ impl TxEvent { .await } } - -pub struct TXCache { - // List of Tx waiting for parent.let waiting_txs = - waiting_tx: HashMap>>, - // Cache of the last saved Tx in the DB. To avoid to query the db for Tx. - cachedtx_for_verification: LruCache, -} - -impl TXCache { - pub fn new() -> Self { - let cachedtx_for_verification = - LruCache::new(std::num::NonZeroUsize::new(MAX_CACHED_TX_FOR_VERIFICATION).unwrap()); - TXCache { - waiting_tx: HashMap::new(), - cachedtx_for_verification, - } - } - - pub fn is_tx_cached(&self, hash: &Hash) -> bool { - self.cachedtx_for_verification.contains(hash) - } - - pub fn add_cached_tx(&mut self, hash: Hash) { - self.cachedtx_for_verification.put(hash, WaitTx); - } - - pub fn add_wait_tx(&mut self, parent: Hash, tx: TxEvent) { - let waiting_txs = self.waiting_tx.entry(parent).or_insert(vec![]); - waiting_txs.push(tx); - } - - pub fn remove_children_txs(&mut self, parent: &Hash) -> Vec> { - self.waiting_tx.remove(parent).unwrap_or(vec![]) - } -} diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs index 4fea89de..c3bb117c 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/txvalidation/mod.rs @@ -1,12 +1,9 @@ -use crate::mempool::Storage; -use crate::txvalidation::event::TXCache; use crate::txvalidation::event::{ReceivedTx, TxEvent}; use crate::types::{ transaction::{Created, Received, Validated}, Transaction, }; use crate::Mempool; -use futures::stream::FuturesUnordered; use futures_util::Stream; use futures_util::TryFutureExt; use std::collections::HashMap; @@ -16,21 +13,17 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use thiserror::Error; -use tokio::select; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; -use tokio_stream::StreamExt; pub mod acl; mod download_manager; mod event; -const MAX_CACHED_TX_FOR_VERIFICATION: usize = 2; - #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] pub enum EventProcessError { @@ -49,8 +42,6 @@ pub enum EventProcessError { DownloadAssetError(String), #[error("Save Tx error: {0}")] SaveTxError(String), - #[error("Storage access error: {0}")] - StorageError(String), #[error("AclWhite list authenticate error: {0}")] AclWhiteListAuthError(#[from] acl::AclWhiteListError), } @@ -133,7 +124,6 @@ pub async fn spawn_event_loop( //New Tx are added to the mempool directly. //Like for the p2p a stream can be use to decouple both process. mempool: Arc>, - storage: Arc, ) -> eyre::Result<( JoinHandle<()>, //channel use to send RcvTx event to the processing @@ -154,99 +144,50 @@ pub async fn spawn_event_loop( let p2p_stream = UnboundedReceiverStream::new(p2p_recv); let jh = tokio::spawn({ let local_directory_path = local_directory_path.clone(); - let mut wait_tx_cache = TXCache::new(); - let mut validated_txs_futures = FuturesUnordered::new(); - let mut validation_okresult_futures = FuturesUnordered::new(); - let mut validation_errresult_futures = FuturesUnordered::new(); async move { - loop { - select! { - // Execute Tx verification in a separate task. - Some((tx, callback)) = rcv_tx_event_rx.recv() => { - //create new event with the Tx - let event: TxEvent = tx.into(); - - //process RcvTx(EventTx) event - let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await; - - tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string()); - - //process the receive event - let validate_jh = tokio::spawn({ - let p2p_sender = p2p_sender.clone(); - let local_directory_path = local_directory_path.clone(); - let acl_whitelist = acl_whitelist.clone(); - let mempool = mempool.clone(); - async move { - event - .process_event(acl_whitelist.as_ref()) - .and_then(|download_event| { - download_event.process_event(&local_directory_path, http_peer_list) - }) - .and_then(|(wait_tx, propagate_tx)| async move { - if let Some(propagate_tx) = propagate_tx { - propagate_tx.process_event(&p2p_sender).await?; - } - Ok(wait_tx) - }) - .await - } - }); - let fut = validate_jh - .or_else(|err| async move {Err(EventProcessError::ValidateError(format!("Process execution error:{err}")))} ) - .and_then(|res| async move {Ok((res,callback))}); - validated_txs_futures.push(fut); - } - // Verify Tx parent and send to mempool all ready Tx. - Some(Ok((wait_tx_res, callback))) = validated_txs_futures.next() => { - match wait_tx_res { - Ok(wait_tx) => { - match wait_tx.process_event(&mut wait_tx_cache, storage.as_ref()).await { - Ok(new_tx_list) => { - // - let jh = tokio::spawn({ - let mempool = mempool.clone(); - async move { - for new_tx in new_tx_list { - if let Err(err) = new_tx.process_event(&mut *(mempool.write().await)).await { - return Err(err); - } - } - Ok(()) - } - }); - let fut = jh - .or_else(|err| async move {Err(EventProcessError::ValidateError(format!("Process execution error:{err}")))} ) - .and_then(|res| async move {Ok((res,callback))}); - validation_okresult_futures.push(fut); - } - Err(err) => { - validation_errresult_futures.push(futures::future::ready((err, callback))); - } + while let Some((tx, callback)) = rcv_tx_event_rx.recv().await { + //create new event with the Tx + let event: TxEvent = tx.into(); + + //process RcvTx(EventTx) event + let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await; + + tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string()); + + //process the receive event + tokio::spawn({ + let p2p_sender = p2p_sender.clone(); + let local_directory_path = local_directory_path.clone(); + let acl_whitelist = acl_whitelist.clone(); + let mempool = mempool.clone(); + async move { + let res = event + .process_event(acl_whitelist.as_ref()) + .and_then(|download_event| { + download_event.process_event(&local_directory_path, http_peer_list) + }) + .and_then(|(new_tx, propagate_tx)| async move { + if let Some(propagate_tx) = propagate_tx { + propagate_tx.process_event(&p2p_sender).await?; } - - } - Err(err) => { - validation_errresult_futures.push(futures::future::ready((err, callback))); - } + new_tx.process_event(&mut *(mempool.write().await)).await?; + + Ok(()) + }) + .await; + //log the error if any error is return + if let Err(ref err) = res { + tracing::error!("An error occurs during Tx validation: {err}",); } - } - Some(Ok((res, callback))) = validation_okresult_futures.next() => { + //send the execution result back if needed. if let Some(callback) = callback { //forget the result because if the RPC connection is closed the send can fail. let _ = callback.send(res); } - } - Some((res, callback)) = validation_errresult_futures.next() => { - if let Some(callback) = callback { - //forget the result because if the RPC connection is closed the send can fail. - let _ = callback.send(Err(res)); - } - } - - } // End select! - } // End loop + } + }); + } } }); Ok((jh, tx, p2p_stream)) diff --git a/crates/node/src/types/transaction.rs b/crates/node/src/types/transaction.rs index b6994a6a..a0c7f796 100644 --- a/crates/node/src/types/transaction.rs +++ b/crates/node/src/types/transaction.rs @@ -193,15 +193,6 @@ impl std::fmt::Display for Payload { } impl Payload { - //return the parent tx associated to the payloas if any. - pub fn get_parent_tx(&self) -> Option<&Hash> { - match self { - Payload::Proof { parent, .. } => Some(parent), - Payload::ProofKey { parent, .. } => Some(parent), - Payload::Verification { parent, .. } => Some(parent), - _ => None, - } - } pub fn serialize_into(&self, buf: &mut Vec) { match self { Payload::Empty => {}