From c7beac62c3882ce68f2365f02cfdbffb0ffff755 Mon Sep 17 00:00:00 2001 From: musitdev Date: Mon, 11 Mar 2024 09:35:31 +0100 Subject: [PATCH 01/13] correct download url --- crates/node/src/txvalidation/download_manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/node/src/txvalidation/download_manager.rs b/crates/node/src/txvalidation/download_manager.rs index 014d390d..9a1b1dfa 100644 --- a/crates/node/src/txvalidation/download_manager.rs +++ b/crates/node/src/txvalidation/download_manager.rs @@ -51,7 +51,8 @@ pub async fn download_asset_file( .filter_map(|(peer, port)| { port.map(|port| { //use parse to create an URL, no new method. - let mut url = reqwest::Url::parse(HTTP_SERVER_SCHEME).unwrap(); //unwrap always succeed + let mut url = + reqwest::Url::parse(format!("{HTTP_SERVER_SCHEME}localhost")).unwrap(); //unwrap always succeed url.set_ip_host(peer.ip()).unwrap(); //unwrap always succeed url.set_port(Some(port)).unwrap(); //unwrap always succeed url.set_path(&file_uri); //unwrap Path always ok From 80624fe90fc9596526f2e40c6398dd55e39b5abe Mon Sep 17 00:00:00 2001 From: musitdev Date: Mon, 11 Mar 2024 09:37:14 +0100 Subject: [PATCH 02/13] correct download url --- crates/node/src/txvalidation/download_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/node/src/txvalidation/download_manager.rs b/crates/node/src/txvalidation/download_manager.rs index 9a1b1dfa..17928e4e 100644 --- a/crates/node/src/txvalidation/download_manager.rs +++ b/crates/node/src/txvalidation/download_manager.rs @@ -52,7 +52,7 @@ pub async fn download_asset_file( port.map(|port| { //use parse to create an URL, no new method. let mut url = - reqwest::Url::parse(format!("{HTTP_SERVER_SCHEME}localhost")).unwrap(); //unwrap always succeed + reqwest::Url::parse(&format!("{HTTP_SERVER_SCHEME}localhost")).unwrap(); //unwrap always succeed url.set_ip_host(peer.ip()).unwrap(); //unwrap always succeed url.set_port(Some(port)).unwrap(); //unwrap always succeed url.set_path(&file_uri); //unwrap Path always ok From 397e61b59fd18c24862ef88995c9616103351377 Mon Sep 17 00:00:00 2001 From: musitdev Date: Tue, 12 Mar 2024 10:14:55 +0100 Subject: [PATCH 03/13] add waiting received Tx for parent one --- Cargo.lock | 10 ++ crates/node/Cargo.toml | 1 + crates/node/src/main.rs | 1 + crates/node/src/txvalidation/event.rs | 94 +++++++++++++++++- crates/node/src/txvalidation/mod.rs | 132 ++++++++++++++++++-------- crates/node/src/types/transaction.rs | 9 ++ 6 files changed, 205 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ba9727b..09f621ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1233,6 +1233,7 @@ dependencies = [ "hyper-util", "jsonrpsee", "libsecp256k1", + "lru", "num-bigint", "num-traits", "num_cpus", @@ -2110,6 +2111,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "matchers" version = "0.1.0" diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 904c0614..7f07fd69 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -37,6 +37,7 @@ http-body-util = { version = "0.1", optional = true } hyper = { version = "1", features = ["full"], optional = true } hyper-util = { version = "0.1", features = ["full"], optional = true } num_cpus = { version = "1.4.0", optional = true } +lru = "0.12.3" num-traits = { version = "0.2", optional = true } parking_lot = { version = "0.12", optional = true } pea2pea = { version = "0.48", optional = true } diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 70f60a03..b6a4dcfe 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -243,6 +243,7 @@ async fn run(config: Arc) -> Result<()> { database.clone(), rcv_tx_event_rx, new_validated_tx_receiver.clone(), + database.clone(), ) .await?; diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 12c03aa4..00797ae1 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -1,12 +1,17 @@ +use crate::mempool::Storage; use crate::txvalidation::acl::AclWhitelist; use crate::txvalidation::download_manager; use crate::txvalidation::EventProcessError; +use crate::txvalidation::MAX_CACHED_TX_FOR_VERIFICATION; +use crate::types::Hash; use crate::types::{ transaction::{Received, Validated}, Transaction, }; use futures::future::join_all; use futures_util::TryFutureExt; +use lru::LruCache; +use std::collections::HashMap; use std::fmt::Debug; use std::net::SocketAddr; use std::path::Path; @@ -21,6 +26,9 @@ pub struct ReceivedTx; #[derive(Debug, Clone)] pub struct DownloadTx; +#[derive(Debug, Clone)] +pub struct WaitTx; + #[derive(Debug, Clone)] pub struct NewTx; @@ -52,11 +60,11 @@ impl From> for TxEvent { } } -impl From> for TxEvent { +impl From> for TxEvent { fn from(event: TxEvent) -> Self { TxEvent { tx: event.tx, - tx_type: NewTx, + tx_type: WaitTx, } } } @@ -77,6 +85,15 @@ impl From> for Option> { } } +impl From> for TxEvent { + fn from(event: TxEvent) -> Self { + TxEvent { + tx: event.tx, + tx_type: NewTx, + } + } +} + //Processing of event that arrive: SourceTxType. impl TxEvent { pub async fn process_event( @@ -117,7 +134,7 @@ impl TxEvent { self, local_directory_path: &Path, http_peer_list: Vec<(SocketAddr, Option)>, - ) -> Result<(TxEvent, Option>), EventProcessError> { + ) -> Result<(TxEvent, Option>), EventProcessError> { let http_client = reqwest::Client::new(); let asset_file_list = self.tx.get_asset_list().map_err(|err| { EventProcessError::DownloadAssetError(format!( @@ -143,7 +160,7 @@ impl TxEvent { .map_err(|err| { EventProcessError::DownloadAssetError(format!("Execution error:{err}")) })?; - let newtx: TxEvent = self.clone().into(); + let newtx: TxEvent = self.clone().into(); let propagate: Option> = self.into(); Ok((newtx, propagate)) } @@ -175,6 +192,40 @@ impl TxEvent { } } +impl TxEvent { + pub async fn process_event( + self, + cache: &mut TXCache, + storage: &impl Storage, + ) -> Result>, EventProcessError> { + // Verify Tx'x parent is already present. + let new_txs = if let Some(parent) = self.tx.payload.get_parent_tx() { + if cache.is_tx_cached(parent) + || storage + .get(parent) + .await + .map_err(|err| EventProcessError::StorageError(format!("{err}")))? + .is_some() + { + let mut new_txs = cache.remove_children_txs(parent); + new_txs.push(self); + new_txs + } else { + //parent is missing add to waiting list + cache.add_wait_tx(*parent, self); + vec![] + } + } else { + //no parent always new Tx. + vec![self] + }; + + let ret = new_txs.into_iter().map(|tx| tx.into()).collect(); + + Ok(ret) + } +} + impl TxEvent { pub async fn process_event( self, @@ -202,3 +253,38 @@ impl TxEvent { .await } } + +pub struct TXCache { + // List of Tx waiting for parent.let waiting_txs = + waiting_tx: HashMap>>, + // Cache of the last saved Tx in the DB. To avoid to query the db for Tx. + cachedtx_for_verification: LruCache, +} + +impl TXCache { + pub fn new() -> Self { + let cachedtx_for_verification = + LruCache::new(std::num::NonZeroUsize::new(MAX_CACHED_TX_FOR_VERIFICATION).unwrap()); + TXCache { + waiting_tx: HashMap::new(), + cachedtx_for_verification, + } + } + + pub fn is_tx_cached(&self, hash: &Hash) -> bool { + self.cachedtx_for_verification.contains(hash) + } + + pub fn add_cached_tx(&mut self, hash: Hash) { + self.cachedtx_for_verification.put(hash, WaitTx); + } + + pub fn add_wait_tx(&mut self, parent: Hash, tx: TxEvent) { + let waiting_txs = self.waiting_tx.entry(parent).or_insert(vec![]); + waiting_txs.push(tx); + } + + pub fn remove_children_txs(&mut self, parent: &Hash) -> Vec> { + self.waiting_tx.remove(parent).unwrap_or(vec![]) + } +} diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs index 61d6b1cd..7a8a8d03 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/txvalidation/mod.rs @@ -1,8 +1,11 @@ +use crate::mempool::Storage; +use crate::txvalidation::event::TXCache; use crate::txvalidation::event::{ReceivedTx, TxEvent}; use crate::types::{ transaction::{Created, Received, Validated}, Transaction, }; +use futures::stream::FuturesUnordered; use futures_util::Stream; use futures_util::TryFutureExt; use std::collections::HashMap; @@ -12,12 +15,14 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use thiserror::Error; +use tokio::select; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{self, UnboundedReceiver}; use tokio::sync::oneshot; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::StreamExt; pub mod acl; mod download_manager; @@ -30,6 +35,8 @@ pub trait ValidatedTxReceiver: Send + Sync { async fn send_new_tx(&mut self, tx: Transaction) -> eyre::Result<()>; } +const MAX_CACHED_TX_FOR_VERIFICATION: usize = 2; + #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] pub enum EventProcessError { @@ -48,6 +55,8 @@ pub enum EventProcessError { DownloadAssetError(String), #[error("Save Tx error: {0}")] SaveTxError(String), + #[error("Storage access error: {0}")] + StorageError(String), #[error("AclWhite list authenticate error: {0}")] AclWhiteListAuthError(#[from] acl::AclWhiteListError), } @@ -131,6 +140,7 @@ pub async fn spawn_event_loop( mut rcv_tx_event_rx: UnboundedReceiver<(Transaction, Option)>, // Endpoint where validated transactions are sent to. Usually configured with Mempool. newtx_receiver: Arc>, + storage: Arc, ) -> eyre::Result<( JoinHandle<()>, // Output stream used to propagate transactions. @@ -146,52 +156,98 @@ pub async fn spawn_event_loop( let p2p_stream = UnboundedReceiverStream::new(p2p_recv); let jh = tokio::spawn({ let local_directory_path = local_directory_path.clone(); + let mut wait_tx_cache = TXCache::new(); + let mut validated_txs_futures = FuturesUnordered::new(); + let mut validation_okresult_futures = FuturesUnordered::new(); + let mut validation_errresult_futures = FuturesUnordered::new(); async move { - while let Some((tx, callback)) = rcv_tx_event_rx.recv().await { - //create new event with the Tx - let event: TxEvent = tx.into(); - - //process RcvTx(EventTx) event - let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await; - - tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string()); - - //process the receive event - tokio::spawn({ - let p2p_sender = p2p_sender.clone(); - let local_directory_path = local_directory_path.clone(); - let acl_whitelist = acl_whitelist.clone(); - let newtx_receiver = newtx_receiver.clone(); - async move { - let res = event - .process_event(acl_whitelist.as_ref()) - .and_then(|download_event| { - download_event.process_event(&local_directory_path, http_peer_list) - }) - .and_then(|(new_tx, propagate_tx)| async move { - if let Some(propagate_tx) = propagate_tx { - propagate_tx.process_event(&p2p_sender).await?; + loop { + select! { + // Execute Tx verification in a separate task. + Some((tx, callback)) = rcv_tx_event_rx.recv() => { + //create new event with the Tx + let event: TxEvent = tx.into(); + + //process RcvTx(EventTx) event + let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await; + + tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string()); + + //process the receive event + let validate_jh = tokio::spawn({ + let p2p_sender = p2p_sender.clone(); + let local_directory_path = local_directory_path.clone(); + let acl_whitelist = acl_whitelist.clone(); + async move { + event + .process_event(acl_whitelist.as_ref()) + .and_then(|download_event| { + download_event.process_event(&local_directory_path, http_peer_list) + }) + .and_then(|(wait_tx, propagate_tx)| async move { + if let Some(propagate_tx) = propagate_tx { + propagate_tx.process_event(&p2p_sender).await?; + } + Ok(wait_tx) + }) + .await + } + }); + let fut = validate_jh + .or_else(|err| async move {Err(EventProcessError::ValidateError(format!("Process execution error:{err}")))} ) + .and_then(|res| async move {Ok((res,callback))}); + validated_txs_futures.push(fut); + } + // Verify Tx parent and send to mempool all ready Tx. + Some(Ok((wait_tx_res, callback))) = validated_txs_futures.next() => { + match wait_tx_res { + Ok(wait_tx) => { + match wait_tx.process_event(&mut wait_tx_cache, storage.as_ref()).await { + Ok(new_tx_list) => { + // + let jh = tokio::spawn({ + let newtx_receiver = newtx_receiver.clone(); + async move { + for new_tx in new_tx_list { + if let Err(err) = new_tx.process_event(&mut *(newtx_receiver.write().await)).await { + return Err(err); + } + } + Ok(()) + } + }); + let fut = jh + .or_else(|err| async move {Err(EventProcessError::ValidateError(format!("Process execution error:{err}")))} ) + .and_then(|res| async move {Ok((res,callback))}); + validation_okresult_futures.push(fut); + } + Err(err) => { + validation_errresult_futures.push(futures::future::ready((err, callback))); + } } - new_tx - .process_event(&mut *(newtx_receiver.write().await)) - .await?; - - Ok(()) - }) - .await; - //log the error if any error is return - if let Err(ref err) = res { - tracing::error!("An error occurs during Tx validation: {err}",); + + } + Err(err) => { + validation_errresult_futures.push(futures::future::ready((err, callback))); + } } - //send the execution result back if needed. + } + Some(Ok((res, callback))) = validation_okresult_futures.next() => { if let Some(callback) = callback { //forget the result because if the RPC connection is closed the send can fail. let _ = callback.send(res); } - } - }); - } + } + Some((res, callback)) = validation_errresult_futures.next() => { + if let Some(callback) = callback { + //forget the result because if the RPC connection is closed the send can fail. + let _ = callback.send(Err(res)); + } + } + + } // End select! + } // End loop } }); Ok((jh, p2p_stream)) diff --git a/crates/node/src/types/transaction.rs b/crates/node/src/types/transaction.rs index a0c7f796..b6994a6a 100644 --- a/crates/node/src/types/transaction.rs +++ b/crates/node/src/types/transaction.rs @@ -193,6 +193,15 @@ impl std::fmt::Display for Payload { } impl Payload { + //return the parent tx associated to the payloas if any. + pub fn get_parent_tx(&self) -> Option<&Hash> { + match self { + Payload::Proof { parent, .. } => Some(parent), + Payload::ProofKey { parent, .. } => Some(parent), + Payload::Verification { parent, .. } => Some(parent), + _ => None, + } + } pub fn serialize_into(&self, buf: &mut Vec) { match self { Payload::Empty => {} From 7a4a69ed87d9f85924b0e21f1115347ad3cba04b Mon Sep 17 00:00:00 2001 From: musitdev Date: Tue, 12 Mar 2024 14:03:23 +0100 Subject: [PATCH 04/13] add first test for wait logic --- crates/node/src/txvalidation/event.rs | 119 ++++++++++++++++++++++++-- crates/node/src/txvalidation/mod.rs | 2 +- 2 files changed, 112 insertions(+), 9 deletions(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 00797ae1..27980519 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -2,7 +2,6 @@ use crate::mempool::Storage; use crate::txvalidation::acl::AclWhitelist; use crate::txvalidation::download_manager; use crate::txvalidation::EventProcessError; -use crate::txvalidation::MAX_CACHED_TX_FOR_VERIFICATION; use crate::types::Hash; use crate::types::{ transaction::{Received, Validated}, @@ -207,12 +206,12 @@ impl TxEvent { .map_err(|err| EventProcessError::StorageError(format!("{err}")))? .is_some() { - let mut new_txs = cache.remove_children_txs(parent); + let mut new_txs = cache.remove_waiting_children_txs(parent); new_txs.push(self); new_txs } else { //parent is missing add to waiting list - cache.add_wait_tx(*parent, self); + cache.add_new_waiting_tx(*parent, self); vec![] } } else { @@ -220,7 +219,13 @@ impl TxEvent { vec![self] }; - let ret = new_txs.into_iter().map(|tx| tx.into()).collect(); + let ret = new_txs + .into_iter() + .map(|tx| { + cache.add_cached_tx(tx.tx.hash); + tx.into() + }) + .collect(); Ok(ret) } @@ -262,9 +267,9 @@ pub struct TXCache { } impl TXCache { - pub fn new() -> Self { + pub fn new(cache_size: usize) -> Self { let cachedtx_for_verification = - LruCache::new(std::num::NonZeroUsize::new(MAX_CACHED_TX_FOR_VERIFICATION).unwrap()); + LruCache::new(std::num::NonZeroUsize::new(cache_size).unwrap()); TXCache { waiting_tx: HashMap::new(), cachedtx_for_verification, @@ -279,12 +284,110 @@ impl TXCache { self.cachedtx_for_verification.put(hash, WaitTx); } - pub fn add_wait_tx(&mut self, parent: Hash, tx: TxEvent) { + pub fn add_new_waiting_tx(&mut self, parent: Hash, tx: TxEvent) { let waiting_txs = self.waiting_tx.entry(parent).or_insert(vec![]); waiting_txs.push(tx); } - pub fn remove_children_txs(&mut self, parent: &Hash) -> Vec> { + pub fn remove_waiting_children_txs(&mut self, parent: &Hash) -> Vec> { self.waiting_tx.remove(parent).unwrap_or(vec![]) } } + +#[cfg(test)] +mod tests { + + use super::*; + use crate::txvalidation::Created; + use crate::types::transaction::Payload; + use eyre::Result; + use libsecp256k1::SecretKey; + use rand::{rngs::StdRng, SeedableRng}; + use std::collections::VecDeque; + use tokio::sync::Mutex; + + struct TestDb(Mutex>>); + + #[async_trait::async_trait] + impl Storage for TestDb { + async fn get(&self, hash: &Hash) -> Result>> { + Ok(self.0.lock().await.get(hash).cloned()) + } + async fn set(&self, tx: &Transaction) -> Result<()> { + self.0.lock().await.insert(tx.hash, tx.clone()); + Ok(()) + } + async fn fill_deque(&self, deque: &mut VecDeque>) -> Result<()> { + Ok(()) + } + } + + fn new_empty_tx() -> Transaction { + new_tx(Payload::Empty) + } + + fn new_proof_tx(parent: Hash) -> Transaction { + let payload = Payload::Proof { + parent: parent, + prover: Hash::default(), + proof: vec![], + files: vec![], + }; + + new_tx(payload) + } + + fn new_tx(payload: Payload) -> Transaction { + let rng = &mut StdRng::from_entropy(); + + let tx = Transaction::::new(Payload::Empty, &SecretKey::random(rng)); + + Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Received::P2P, + } + } + + fn into_receive(tx: Transaction) -> Transaction { + Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Validated, + } + } + + #[tokio::test] + async fn test_waittx_process_event() { + let db = TestDb(Mutex::new(HashMap::new())); + let mut wait_tx_cache = TXCache::new(2); + let new_tx1 = new_empty_tx(); + let tx1_hash = new_tx1.hash; + let tx1_event = TxEvent { + tx: new_tx1.clone(), + tx_type: WaitTx, + }; + // Test a new tx without parent. No wait and added to cache. + let res = tx1_event.process_event(&mut wait_tx_cache, &db).await; + // Save the Tx in db to test cache miss. + let _ = db.set(&into_receive(new_tx1)).await; + assert!(res.is_ok()); + // Not cached because no parent. + assert_eq!(wait_tx_cache.waiting_tx.len(), 0); + assert!(wait_tx_cache.is_tx_cached(&tx1_hash)); + + let new_tx2 = new_proof_tx(tx1_hash); + + let tx2_hash = new_tx2.hash; + } +} diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs index 7a8a8d03..5e56ed1f 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/txvalidation/mod.rs @@ -156,7 +156,7 @@ pub async fn spawn_event_loop( let p2p_stream = UnboundedReceiverStream::new(p2p_recv); let jh = tokio::spawn({ let local_directory_path = local_directory_path.clone(); - let mut wait_tx_cache = TXCache::new(); + let mut wait_tx_cache = TXCache::new(MAX_CACHED_TX_FOR_VERIFICATION); let mut validated_txs_futures = FuturesUnordered::new(); let mut validation_okresult_futures = FuturesUnordered::new(); let mut validation_errresult_futures = FuturesUnordered::new(); From dbf99250024938f603591d7af5a3b9f8ccce7cd9 Mon Sep 17 00:00:00 2001 From: musitdev Date: Tue, 12 Mar 2024 16:10:15 +0100 Subject: [PATCH 05/13] add some more test for wait tx logic --- crates/node/src/txvalidation/event.rs | 106 +++++++++++++++++++------- 1 file changed, 80 insertions(+), 26 deletions(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 27980519..37a4258a 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -197,8 +197,8 @@ impl TxEvent { cache: &mut TXCache, storage: &impl Storage, ) -> Result>, EventProcessError> { - // Verify Tx'x parent is already present. - let new_txs = if let Some(parent) = self.tx.payload.get_parent_tx() { + // Verify Tx'x parent is present or not. + let new_tx = if let Some(parent) = self.tx.payload.get_parent_tx() { if cache.is_tx_cached(parent) || storage .get(parent) @@ -206,19 +206,28 @@ impl TxEvent { .map_err(|err| EventProcessError::StorageError(format!("{err}")))? .is_some() { - let mut new_txs = cache.remove_waiting_children_txs(parent); - new_txs.push(self); - new_txs + // Present return the Tx + Some(self) } else { //parent is missing add to waiting list cache.add_new_waiting_tx(*parent, self); - vec![] + None } } else { //no parent always new Tx. - vec![self] + Some(self) }; + //remove child Tx if any from waiting list. + let new_txs = new_tx + .map(|tx| { + let mut ret = cache.remove_waiting_children_txs(&tx.tx.hash); + ret.push(tx); + ret + }) + .unwrap_or(vec![]); + + // Add new tx to the cache. let ret = new_txs .into_iter() .map(|tx| { @@ -259,6 +268,7 @@ impl TxEvent { } } +// Use to cache New tx and store waiting Tx that have missing parent. pub struct TXCache { // List of Tx waiting for parent.let waiting_txs = waiting_tx: HashMap>>, @@ -322,11 +332,11 @@ mod tests { } } - fn new_empty_tx() -> Transaction { - new_tx(Payload::Empty) + fn new_empty_tx_event() -> TxEvent { + new_tx_event(Payload::Empty) } - fn new_proof_tx(parent: Hash) -> Transaction { + fn new_proof_tx_event(parent: Hash) -> TxEvent { let payload = Payload::Proof { parent: parent, prover: Hash::default(), @@ -334,15 +344,15 @@ mod tests { files: vec![], }; - new_tx(payload) + new_tx_event(payload) } - fn new_tx(payload: Payload) -> Transaction { + fn new_tx_event(payload: Payload) -> TxEvent { let rng = &mut StdRng::from_entropy(); - let tx = Transaction::::new(Payload::Empty, &SecretKey::random(rng)); + let tx = Transaction::::new(payload, &SecretKey::random(rng)); - Transaction { + let tx = Transaction { author: tx.author, hash: tx.hash, payload: tx.payload, @@ -351,6 +361,10 @@ mod tests { propagated: tx.executed, executed: tx.executed, state: Received::P2P, + }; + TxEvent { + tx: tx, + tx_type: WaitTx, } } @@ -371,23 +385,63 @@ mod tests { async fn test_waittx_process_event() { let db = TestDb(Mutex::new(HashMap::new())); let mut wait_tx_cache = TXCache::new(2); - let new_tx1 = new_empty_tx(); - let tx1_hash = new_tx1.hash; - let tx1_event = TxEvent { - tx: new_tx1.clone(), - tx_type: WaitTx, - }; + // Test a new tx without parent. No wait and added to cache. - let res = tx1_event.process_event(&mut wait_tx_cache, &db).await; - // Save the Tx in db to test cache miss. - let _ = db.set(&into_receive(new_tx1)).await; + let tx_event1 = new_empty_tx_event(); + let tx1_hash = tx_event1.tx.hash; + let tx1 = tx_event1.tx.clone(); + let res = tx_event1.process_event(&mut wait_tx_cache, &db).await; assert!(res.is_ok()); - // Not cached because no parent. + // Save the Tx in db to test cache miss. + let _ = db.set(&into_receive(tx1)).await; + // Not in wait cache because no parent. assert_eq!(wait_tx_cache.waiting_tx.len(), 0); assert!(wait_tx_cache.is_tx_cached(&tx1_hash)); - let new_tx2 = new_proof_tx(tx1_hash); + // Test a new tx with a present parent. No wait and added to cache. + let tx2_event = new_proof_tx_event(tx1_hash); + let tx2_hash = tx2_event.tx.hash; + let tx2 = tx2_event.tx.clone(); + let res = tx2_event.process_event(&mut wait_tx_cache, &db).await; + // Save the Tx in db to test cache miss. + assert!(res.is_ok()); + let _ = db.set(&into_receive(tx2)).await; + // Not cached because no parent. + assert_eq!(wait_tx_cache.waiting_tx.len(), 0); + assert!(wait_tx_cache.is_tx_cached(&tx2_hash)); + + // Test a new Tx with a missing parent. Waiting / not cached. + let parent_tx_event = new_empty_tx_event(); + let parent_hash = parent_tx_event.tx.hash; + let parent_tx = parent_tx_event.tx.clone(); + let tx_event3 = new_proof_tx_event(parent_tx_event.tx.hash); + let tx3_hash = tx_event3.tx.hash; + let tx3 = tx_event3.tx.clone(); + let res = tx_event3.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + assert_eq!(wait_tx_cache.waiting_tx.len(), 1); + assert!(!wait_tx_cache.is_tx_cached(&tx3_hash)); - let tx2_hash = new_tx2.hash; + // Test process parent Tx: Tx3 removed from waiting, Tx3 and parent added to cached. Return Tx3 and parent. + let res = parent_tx_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + assert_eq!(wait_tx_cache.waiting_tx.len(), 0); + assert!(wait_tx_cache.is_tx_cached(&parent_hash)); + assert!(wait_tx_cache.is_tx_cached(&tx3_hash)); + let ret_events = res.unwrap(); + assert_eq!(ret_events.len(), 2); + assert!(ret_events[0].tx.hash == parent_hash || ret_events[1].tx.hash == parent_hash); + assert!(ret_events[0].tx.hash == tx3_hash || ret_events[1].tx.hash == tx3_hash); + + //Test a cache miss, get the parent from the DB. No wait + cached + assert!(!wait_tx_cache.is_tx_cached(&tx1_hash)); + let tx4_event = new_proof_tx_event(tx1_hash); // tx1_hash not in the cache but in the DB + let tx4_hash = tx4_event.tx.hash; + let tx4 = tx4_event.tx.clone(); + let res = tx4_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + // Not cached because parent in db + assert_eq!(wait_tx_cache.waiting_tx.len(), 0); + assert!(wait_tx_cache.is_tx_cached(&tx4_hash)); } } From 416d86c88d0dc708634231b353353a638c952bcf Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 13 Mar 2024 10:46:47 +0100 Subject: [PATCH 06/13] Set the parent TX to be the first in the list --- crates/node/src/txvalidation/event.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 37a4258a..6d57c899 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -222,7 +222,8 @@ impl TxEvent { let new_txs = new_tx .map(|tx| { let mut ret = cache.remove_waiting_children_txs(&tx.tx.hash); - ret.push(tx); + // Add the parent Tx first to be processed the first. + ret.insert(0, tx); ret }) .unwrap_or(vec![]); From 44680fd31b77265bfaa3178dab976ce3aa2f7603 Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 13 Mar 2024 10:56:25 +0100 Subject: [PATCH 07/13] correct some comments --- crates/node/src/txvalidation/event.rs | 4 ++-- crates/node/src/txvalidation/mod.rs | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 6d57c899..a53a5700 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -209,12 +209,12 @@ impl TxEvent { // Present return the Tx Some(self) } else { - //parent is missing add to waiting list + // Parent is missing add to waiting list cache.add_new_waiting_tx(*parent, self); None } } else { - //no parent always new Tx. + // No parent always new Tx. Some(self) }; diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs index 5e56ed1f..83da0674 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/txvalidation/mod.rs @@ -147,7 +147,7 @@ pub async fn spawn_event_loop( impl Stream>, )> { let local_directory_path = Arc::new(local_directory_path); - //start http download manager + // Start http download manager let download_jh = download_manager::serve_files(bind_addr, http_download_port, local_directory_path.clone()) .await?; @@ -169,12 +169,12 @@ pub async fn spawn_event_loop( //create new event with the Tx let event: TxEvent = tx.into(); - //process RcvTx(EventTx) event + // Process RcvTx(EventTx) event let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await; tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string()); - //process the receive event + // Process the receive event let validate_jh = tokio::spawn({ let p2p_sender = p2p_sender.clone(); let local_directory_path = local_directory_path.clone(); @@ -235,13 +235,13 @@ pub async fn spawn_event_loop( } Some(Ok((res, callback))) = validation_okresult_futures.next() => { if let Some(callback) = callback { - //forget the result because if the RPC connection is closed the send can fail. + // Forget the result because if the RPC connection is closed the send can fail. let _ = callback.send(res); } } Some((res, callback)) = validation_errresult_futures.next() => { if let Some(callback) = callback { - //forget the result because if the RPC connection is closed the send can fail. + // Forget the result because if the RPC connection is closed the send can fail. let _ = callback.send(Err(res)); } } From 0b1c0190189d73b005ba064e4c7a7a1fe8483e30 Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 13 Mar 2024 11:01:11 +0100 Subject: [PATCH 08/13] pass clippy --- crates/node/src/txvalidation/event.rs | 8 ++++---- crates/node/src/txvalidation/mod.rs | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index a53a5700..4ca85d02 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -296,12 +296,12 @@ impl TXCache { } pub fn add_new_waiting_tx(&mut self, parent: Hash, tx: TxEvent) { - let waiting_txs = self.waiting_tx.entry(parent).or_insert(vec![]); + let waiting_txs = self.waiting_tx.entry(parent).or_default(); waiting_txs.push(tx); } pub fn remove_waiting_children_txs(&mut self, parent: &Hash) -> Vec> { - self.waiting_tx.remove(parent).unwrap_or(vec![]) + self.waiting_tx.remove(parent).unwrap_or_default() } } @@ -339,7 +339,7 @@ mod tests { fn new_proof_tx_event(parent: Hash) -> TxEvent { let payload = Payload::Proof { - parent: parent, + parent, prover: Hash::default(), proof: vec![], files: vec![], @@ -364,7 +364,7 @@ mod tests { state: Received::P2P, }; TxEvent { - tx: tx, + tx, tx_type: WaitTx, } } diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs index 83da0674..2b7b6d18 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/txvalidation/mod.rs @@ -130,6 +130,7 @@ impl TxEventSender { } //Main event processing loog. +#[allow(clippy::too_many_arguments)] pub async fn spawn_event_loop( local_directory_path: PathBuf, bind_addr: SocketAddr, @@ -210,9 +211,7 @@ pub async fn spawn_event_loop( let newtx_receiver = newtx_receiver.clone(); async move { for new_tx in new_tx_list { - if let Err(err) = new_tx.process_event(&mut *(newtx_receiver.write().await)).await { - return Err(err); - } + new_tx.process_event(&mut *(newtx_receiver.write().await)).await?; } Ok(()) } From ad097c1f37ba0e1356ca4e8bfc64ee10536a2e8e Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 13 Mar 2024 16:10:31 +0100 Subject: [PATCH 09/13] add Wait Tx eviction for old one --- crates/node/src/txvalidation/event.rs | 117 ++++++++++++++++++++++++-- crates/node/src/txvalidation/mod.rs | 4 +- 2 files changed, 109 insertions(+), 12 deletions(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 4ca85d02..12be8cbb 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -1,3 +1,4 @@ +use super::ValidatedTxReceiver; use crate::mempool::Storage; use crate::txvalidation::acl::AclWhitelist; use crate::txvalidation::download_manager; @@ -14,9 +15,13 @@ use std::collections::HashMap; use std::fmt::Debug; use std::net::SocketAddr; use std::path::Path; +use std::time::Duration; +use std::time::Instant; use tokio::sync::mpsc::UnboundedSender; -use super::ValidatedTxReceiver; +const MAX_CACHED_TX_FOR_VERIFICATION: usize = 50; +const MAX_WAITING_TX_FOR_VERIFICATION: usize = 100; +const MAX_WAITING_TIME_IN_MS: u64 = 3600 * 1000; // One hour. //event type. #[derive(Debug, Clone)] @@ -272,18 +277,31 @@ impl TxEvent { // Use to cache New tx and store waiting Tx that have missing parent. pub struct TXCache { // List of Tx waiting for parent.let waiting_txs = - waiting_tx: HashMap>>, + waiting_tx: HashMap>, Instant)>, // Cache of the last saved Tx in the DB. To avoid to query the db for Tx. cachedtx_for_verification: LruCache, + // Number of Waiting Tx that trigger the Tx eviction process. + max_waiting_tx: usize, + // Max time a Tx can wait in millisecond. + max_waiting_time: Duration, } impl TXCache { - pub fn new(cache_size: usize) -> Self { + pub fn new() -> Self { + Self::build( + MAX_CACHED_TX_FOR_VERIFICATION, + MAX_WAITING_TX_FOR_VERIFICATION, + MAX_WAITING_TIME_IN_MS, + ) + } + pub fn build(cache_size: usize, max_waiting_tx: usize, max_waiting_time: u64) -> Self { let cachedtx_for_verification = LruCache::new(std::num::NonZeroUsize::new(cache_size).unwrap()); TXCache { waiting_tx: HashMap::new(), cachedtx_for_verification, + max_waiting_tx, + max_waiting_time: Duration::from_millis(max_waiting_time), } } @@ -296,12 +314,47 @@ impl TXCache { } pub fn add_new_waiting_tx(&mut self, parent: Hash, tx: TxEvent) { - let waiting_txs = self.waiting_tx.entry(parent).or_default(); + // Try to evict when the max waiting Tx is reach. + if self.waiting_tx.len() >= self.max_waiting_tx { + self.evict_old_waiting_tx(); + } + let (waiting_txs, _) = self + .waiting_tx + .entry(parent) + .or_insert((vec![], Instant::now())); waiting_txs.push(tx); } pub fn remove_waiting_children_txs(&mut self, parent: &Hash) -> Vec> { - self.waiting_tx.remove(parent).unwrap_or_default() + self.waiting_tx + .remove(parent) + .map(|(txs, _)| txs) + .unwrap_or_default() + } + + fn evict_old_waiting_tx(&mut self) { + let now = Instant::now(); + let to_remove_hash: Vec<_> = self + .waiting_tx + .iter() + .filter_map(|(hash, (_, ts))| { + (now.duration_since(*ts) > self.max_waiting_time).then_some(*hash) + }) + .collect(); + if !to_remove_hash.is_empty() { + // Warn if some Tx are evicted because it shouldn't. + tracing::warn!("Tx validation, Evict some Tx from waiting for parent tx list."); + for hash in to_remove_hash { + tracing::warn!("Tx validation, Evict Tx:{hash}."); + self.waiting_tx.remove(&hash); + } + } + } +} + +impl Default for TXCache { + fn default() -> Self { + Self::new() } } @@ -382,10 +435,58 @@ mod tests { } } + #[tokio::test] + async fn test_evict_wait_tx() { + let db = TestDb(Mutex::new(HashMap::new())); + // Set parameters to have Tx eviction + let mut wait_tx_cache = TXCache::build(2, 2, 10); + + // Create the parent Txs that will only be processed at the end. + // Create 2 parents to have 2 waiting child Tx + let parent1_tx_event = new_empty_tx_event(); + let parent1_hash = parent1_tx_event.tx.hash; + let parent2_tx_event = new_empty_tx_event(); + let parent2_hash = parent2_tx_event.tx.hash; + + // New Tx that will wait. + let tx_event = new_proof_tx_event(parent1_hash); + let res = tx_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + assert_eq!(wait_tx_cache.waiting_tx.len(), 1); + + // New Tx that will wait. + let tx_event = new_proof_tx_event(parent2_hash); + let res = tx_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + assert_eq!(wait_tx_cache.waiting_tx.len(), 2); + + tokio::time::sleep(Duration::from_millis(100)).await; + + // Add new Tx to evict the old one. + let tx_event = new_proof_tx_event(parent1_hash); + let tx_hash = tx_event.tx.hash; + let res = tx_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + // Evicted but new ket added. + assert_eq!(wait_tx_cache.waiting_tx.len(), 1); + + //Process the parent Tx. Do child Tx return because they have been evicted. + let res = parent1_tx_event + .process_event(&mut wait_tx_cache, &db) + .await; + assert!(res.is_ok()); + let ret_txs = res.unwrap(); + // Return only the parent Tx. The other has been evicted during parent Tx add. + assert_eq!(ret_txs.len(), 2); + assert!(ret_txs[0].tx.hash == parent1_hash || ret_txs[1].tx.hash == parent1_hash); + assert!(ret_txs[0].tx.hash == tx_hash || ret_txs[1].tx.hash == tx_hash); + } + #[tokio::test] async fn test_waittx_process_event() { let db = TestDb(Mutex::new(HashMap::new())); - let mut wait_tx_cache = TXCache::new(2); + // Set parameters to avoid wait tx eviction. + let mut wait_tx_cache = TXCache::build(2, 10, 1000); // Test a new tx without parent. No wait and added to cache. let tx_event1 = new_empty_tx_event(); @@ -404,8 +505,8 @@ mod tests { let tx2_hash = tx2_event.tx.hash; let tx2 = tx2_event.tx.clone(); let res = tx2_event.process_event(&mut wait_tx_cache, &db).await; - // Save the Tx in db to test cache miss. assert!(res.is_ok()); + // Save the Tx in db to test cache miss. let _ = db.set(&into_receive(tx2)).await; // Not cached because no parent. assert_eq!(wait_tx_cache.waiting_tx.len(), 0); @@ -417,7 +518,6 @@ mod tests { let parent_tx = parent_tx_event.tx.clone(); let tx_event3 = new_proof_tx_event(parent_tx_event.tx.hash); let tx3_hash = tx_event3.tx.hash; - let tx3 = tx_event3.tx.clone(); let res = tx_event3.process_event(&mut wait_tx_cache, &db).await; assert!(res.is_ok()); assert_eq!(wait_tx_cache.waiting_tx.len(), 1); @@ -438,7 +538,6 @@ mod tests { assert!(!wait_tx_cache.is_tx_cached(&tx1_hash)); let tx4_event = new_proof_tx_event(tx1_hash); // tx1_hash not in the cache but in the DB let tx4_hash = tx4_event.tx.hash; - let tx4 = tx4_event.tx.clone(); let res = tx4_event.process_event(&mut wait_tx_cache, &db).await; assert!(res.is_ok()); // Not cached because parent in db diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs index 2b7b6d18..d3496469 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/txvalidation/mod.rs @@ -35,8 +35,6 @@ pub trait ValidatedTxReceiver: Send + Sync { async fn send_new_tx(&mut self, tx: Transaction) -> eyre::Result<()>; } -const MAX_CACHED_TX_FOR_VERIFICATION: usize = 2; - #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] pub enum EventProcessError { @@ -157,7 +155,7 @@ pub async fn spawn_event_loop( let p2p_stream = UnboundedReceiverStream::new(p2p_recv); let jh = tokio::spawn({ let local_directory_path = local_directory_path.clone(); - let mut wait_tx_cache = TXCache::new(MAX_CACHED_TX_FOR_VERIFICATION); + let mut wait_tx_cache = TXCache::new(); let mut validated_txs_futures = FuturesUnordered::new(); let mut validation_okresult_futures = FuturesUnordered::new(); let mut validation_errresult_futures = FuturesUnordered::new(); From fc7077c565ac2453dbb8590b1cc10db5b5f0f4a5 Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 13 Mar 2024 20:54:37 +0100 Subject: [PATCH 10/13] correct from PR remarks --- crates/node/src/txvalidation/event.rs | 16 ++++++++-------- crates/node/src/txvalidation/mod.rs | 10 +++++----- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 12be8cbb..828520e3 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -199,7 +199,7 @@ impl TxEvent { impl TxEvent { pub async fn process_event( self, - cache: &mut TXCache, + cache: &mut TxCache, storage: &impl Storage, ) -> Result>, EventProcessError> { // Verify Tx'x parent is present or not. @@ -275,7 +275,7 @@ impl TxEvent { } // Use to cache New tx and store waiting Tx that have missing parent. -pub struct TXCache { +pub struct TxCache { // List of Tx waiting for parent.let waiting_txs = waiting_tx: HashMap>, Instant)>, // Cache of the last saved Tx in the DB. To avoid to query the db for Tx. @@ -286,7 +286,7 @@ pub struct TXCache { max_waiting_time: Duration, } -impl TXCache { +impl TxCache { pub fn new() -> Self { Self::build( MAX_CACHED_TX_FOR_VERIFICATION, @@ -297,7 +297,7 @@ impl TXCache { pub fn build(cache_size: usize, max_waiting_tx: usize, max_waiting_time: u64) -> Self { let cachedtx_for_verification = LruCache::new(std::num::NonZeroUsize::new(cache_size).unwrap()); - TXCache { + TxCache { waiting_tx: HashMap::new(), cachedtx_for_verification, max_waiting_tx, @@ -352,7 +352,7 @@ impl TXCache { } } -impl Default for TXCache { +impl Default for TxCache { fn default() -> Self { Self::new() } @@ -439,7 +439,7 @@ mod tests { async fn test_evict_wait_tx() { let db = TestDb(Mutex::new(HashMap::new())); // Set parameters to have Tx eviction - let mut wait_tx_cache = TXCache::build(2, 2, 10); + let mut wait_tx_cache = TxCache::build(2, 2, 10); // Create the parent Txs that will only be processed at the end. // Create 2 parents to have 2 waiting child Tx @@ -467,7 +467,7 @@ mod tests { let tx_hash = tx_event.tx.hash; let res = tx_event.process_event(&mut wait_tx_cache, &db).await; assert!(res.is_ok()); - // Evicted but new ket added. + // Evicted but new tx added => len=1. assert_eq!(wait_tx_cache.waiting_tx.len(), 1); //Process the parent Tx. Do child Tx return because they have been evicted. @@ -486,7 +486,7 @@ mod tests { async fn test_waittx_process_event() { let db = TestDb(Mutex::new(HashMap::new())); // Set parameters to avoid wait tx eviction. - let mut wait_tx_cache = TXCache::build(2, 10, 1000); + let mut wait_tx_cache = TxCache::build(2, 10, 1000); // Test a new tx without parent. No wait and added to cache. let tx_event1 = new_empty_tx_event(); diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs index d3496469..29dc54ba 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/txvalidation/mod.rs @@ -1,5 +1,5 @@ use crate::mempool::Storage; -use crate::txvalidation::event::TXCache; +use crate::txvalidation::event::TxCache; use crate::txvalidation::event::{ReceivedTx, TxEvent}; use crate::types::{ transaction::{Created, Received, Validated}, @@ -138,7 +138,7 @@ pub async fn spawn_event_loop( // Used to receive new transactions that arrive to the node from the outside. mut rcv_tx_event_rx: UnboundedReceiver<(Transaction, Option)>, // Endpoint where validated transactions are sent to. Usually configured with Mempool. - newtx_receiver: Arc>, + new_tx_receiver: Arc>, storage: Arc, ) -> eyre::Result<( JoinHandle<()>, @@ -155,7 +155,7 @@ pub async fn spawn_event_loop( let p2p_stream = UnboundedReceiverStream::new(p2p_recv); let jh = tokio::spawn({ let local_directory_path = local_directory_path.clone(); - let mut wait_tx_cache = TXCache::new(); + let mut wait_tx_cache = TxCache::new(); let mut validated_txs_futures = FuturesUnordered::new(); let mut validation_okresult_futures = FuturesUnordered::new(); let mut validation_errresult_futures = FuturesUnordered::new(); @@ -206,10 +206,10 @@ pub async fn spawn_event_loop( Ok(new_tx_list) => { // let jh = tokio::spawn({ - let newtx_receiver = newtx_receiver.clone(); + let new_tx_receiver = new_tx_receiver.clone(); async move { for new_tx in new_tx_list { - new_tx.process_event(&mut *(newtx_receiver.write().await)).await?; + new_tx.process_event(&mut *(new_tx_receiver.write().await)).await?; } Ok(()) } From 2dee2d64e37173f13a4433c06a94403971845e94 Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 13 Mar 2024 20:57:51 +0100 Subject: [PATCH 11/13] correct from PR remarks --- crates/node/src/txvalidation/event.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 828520e3..ed4abd12 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -279,7 +279,7 @@ pub struct TxCache { // List of Tx waiting for parent.let waiting_txs = waiting_tx: HashMap>, Instant)>, // Cache of the last saved Tx in the DB. To avoid to query the db for Tx. - cachedtx_for_verification: LruCache, + cached_tx_for_verification: LruCache, // Number of Waiting Tx that trigger the Tx eviction process. max_waiting_tx: usize, // Max time a Tx can wait in millisecond. @@ -295,22 +295,22 @@ impl TxCache { ) } pub fn build(cache_size: usize, max_waiting_tx: usize, max_waiting_time: u64) -> Self { - let cachedtx_for_verification = + let cached_tx_for_verification = LruCache::new(std::num::NonZeroUsize::new(cache_size).unwrap()); TxCache { waiting_tx: HashMap::new(), - cachedtx_for_verification, + cached_tx_for_verification, max_waiting_tx, max_waiting_time: Duration::from_millis(max_waiting_time), } } pub fn is_tx_cached(&self, hash: &Hash) -> bool { - self.cachedtx_for_verification.contains(hash) + self.cached_tx_for_verification.contains(hash) } pub fn add_cached_tx(&mut self, hash: Hash) { - self.cachedtx_for_verification.put(hash, WaitTx); + self.cached_tx_for_verification.put(hash, WaitTx); } pub fn add_new_waiting_tx(&mut self, parent: Hash, tx: TxEvent) { From f8d6cb664ab66da28b2d448768d1575955c8fcf7 Mon Sep 17 00:00:00 2001 From: Philippe Delrieu Date: Wed, 13 Mar 2024 20:59:23 +0100 Subject: [PATCH 12/13] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tuomas Mäkinen <1947505+tuommaki@users.noreply.github.com> --- crates/node/src/types/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/node/src/types/transaction.rs b/crates/node/src/types/transaction.rs index b6994a6a..82fb5dfe 100644 --- a/crates/node/src/types/transaction.rs +++ b/crates/node/src/types/transaction.rs @@ -193,7 +193,7 @@ impl std::fmt::Display for Payload { } impl Payload { - //return the parent tx associated to the payloas if any. + // Return the parent tx associated to the payloads, if any. pub fn get_parent_tx(&self) -> Option<&Hash> { match self { Payload::Proof { parent, .. } => Some(parent), From 6dbb41d4b5b8089450e22998599fb2b00788b2ff Mon Sep 17 00:00:00 2001 From: Philippe Delrieu Date: Wed, 13 Mar 2024 20:59:53 +0100 Subject: [PATCH 13/13] Update crates/node/src/txvalidation/event.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tuomas Mäkinen <1947505+tuommaki@users.noreply.github.com> --- crates/node/src/txvalidation/event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index ed4abd12..a626256d 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -483,7 +483,7 @@ mod tests { } #[tokio::test] - async fn test_waittx_process_event() { + async fn test_wait_tx_process_event() { let db = TestDb(Mutex::new(HashMap::new())); // Set parameters to avoid wait tx eviction. let mut wait_tx_cache = TxCache::build(2, 10, 1000);