diff --git a/Cargo.lock b/Cargo.lock index 5ba9727b..09f621ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1233,6 +1233,7 @@ dependencies = [ "hyper-util", "jsonrpsee", "libsecp256k1", + "lru", "num-bigint", "num-traits", "num_cpus", @@ -2110,6 +2111,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "matchers" version = "0.1.0" diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 904c0614..7f07fd69 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -37,6 +37,7 @@ http-body-util = { version = "0.1", optional = true } hyper = { version = "1", features = ["full"], optional = true } hyper-util = { version = "0.1", features = ["full"], optional = true } num_cpus = { version = "1.4.0", optional = true } +lru = "0.12.3" num-traits = { version = "0.2", optional = true } parking_lot = { version = "0.12", optional = true } pea2pea = { version = "0.48", optional = true } diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 70f60a03..b6a4dcfe 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -243,6 +243,7 @@ async fn run(config: Arc) -> Result<()> { database.clone(), rcv_tx_event_rx, new_validated_tx_receiver.clone(), + database.clone(), ) .await?; diff --git a/crates/node/src/txvalidation/download_manager.rs b/crates/node/src/txvalidation/download_manager.rs index 014d390d..17928e4e 100644 --- a/crates/node/src/txvalidation/download_manager.rs +++ b/crates/node/src/txvalidation/download_manager.rs @@ -51,7 +51,8 @@ pub async fn download_asset_file( .filter_map(|(peer, port)| { port.map(|port| { //use parse to create an URL, no new method. - let mut url = reqwest::Url::parse(HTTP_SERVER_SCHEME).unwrap(); //unwrap always succeed + let mut url = + reqwest::Url::parse(&format!("{HTTP_SERVER_SCHEME}localhost")).unwrap(); //unwrap always succeed url.set_ip_host(peer.ip()).unwrap(); //unwrap always succeed url.set_port(Some(port)).unwrap(); //unwrap always succeed url.set_path(&file_uri); //unwrap Path always ok diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs index 12c03aa4..a626256d 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/txvalidation/event.rs @@ -1,18 +1,27 @@ +use super::ValidatedTxReceiver; +use crate::mempool::Storage; use crate::txvalidation::acl::AclWhitelist; use crate::txvalidation::download_manager; use crate::txvalidation::EventProcessError; +use crate::types::Hash; use crate::types::{ transaction::{Received, Validated}, Transaction, }; use futures::future::join_all; use futures_util::TryFutureExt; +use lru::LruCache; +use std::collections::HashMap; use std::fmt::Debug; use std::net::SocketAddr; use std::path::Path; +use std::time::Duration; +use std::time::Instant; use tokio::sync::mpsc::UnboundedSender; -use super::ValidatedTxReceiver; +const MAX_CACHED_TX_FOR_VERIFICATION: usize = 50; +const MAX_WAITING_TX_FOR_VERIFICATION: usize = 100; +const MAX_WAITING_TIME_IN_MS: u64 = 3600 * 1000; // One hour. //event type. #[derive(Debug, Clone)] @@ -21,6 +30,9 @@ pub struct ReceivedTx; #[derive(Debug, Clone)] pub struct DownloadTx; +#[derive(Debug, Clone)] +pub struct WaitTx; + #[derive(Debug, Clone)] pub struct NewTx; @@ -52,11 +64,11 @@ impl From> for TxEvent { } } -impl From> for TxEvent { +impl From> for TxEvent { fn from(event: TxEvent) -> Self { TxEvent { tx: event.tx, - tx_type: NewTx, + tx_type: WaitTx, } } } @@ -77,6 +89,15 @@ impl From> for Option> { } } +impl From> for TxEvent { + fn from(event: TxEvent) -> Self { + TxEvent { + tx: event.tx, + tx_type: NewTx, + } + } +} + //Processing of event that arrive: SourceTxType. impl TxEvent { pub async fn process_event( @@ -117,7 +138,7 @@ impl TxEvent { self, local_directory_path: &Path, http_peer_list: Vec<(SocketAddr, Option)>, - ) -> Result<(TxEvent, Option>), EventProcessError> { + ) -> Result<(TxEvent, Option>), EventProcessError> { let http_client = reqwest::Client::new(); let asset_file_list = self.tx.get_asset_list().map_err(|err| { EventProcessError::DownloadAssetError(format!( @@ -143,7 +164,7 @@ impl TxEvent { .map_err(|err| { EventProcessError::DownloadAssetError(format!("Execution error:{err}")) })?; - let newtx: TxEvent = self.clone().into(); + let newtx: TxEvent = self.clone().into(); let propagate: Option> = self.into(); Ok((newtx, propagate)) } @@ -175,6 +196,56 @@ impl TxEvent { } } +impl TxEvent { + pub async fn process_event( + self, + cache: &mut TxCache, + storage: &impl Storage, + ) -> Result>, EventProcessError> { + // Verify Tx'x parent is present or not. + let new_tx = if let Some(parent) = self.tx.payload.get_parent_tx() { + if cache.is_tx_cached(parent) + || storage + .get(parent) + .await + .map_err(|err| EventProcessError::StorageError(format!("{err}")))? + .is_some() + { + // Present return the Tx + Some(self) + } else { + // Parent is missing add to waiting list + cache.add_new_waiting_tx(*parent, self); + None + } + } else { + // No parent always new Tx. + Some(self) + }; + + //remove child Tx if any from waiting list. + let new_txs = new_tx + .map(|tx| { + let mut ret = cache.remove_waiting_children_txs(&tx.tx.hash); + // Add the parent Tx first to be processed the first. + ret.insert(0, tx); + ret + }) + .unwrap_or(vec![]); + + // Add new tx to the cache. + let ret = new_txs + .into_iter() + .map(|tx| { + cache.add_cached_tx(tx.tx.hash); + tx.into() + }) + .collect(); + + Ok(ret) + } +} + impl TxEvent { pub async fn process_event( self, @@ -202,3 +273,275 @@ impl TxEvent { .await } } + +// Use to cache New tx and store waiting Tx that have missing parent. +pub struct TxCache { + // List of Tx waiting for parent.let waiting_txs = + waiting_tx: HashMap>, Instant)>, + // Cache of the last saved Tx in the DB. To avoid to query the db for Tx. + cached_tx_for_verification: LruCache, + // Number of Waiting Tx that trigger the Tx eviction process. + max_waiting_tx: usize, + // Max time a Tx can wait in millisecond. + max_waiting_time: Duration, +} + +impl TxCache { + pub fn new() -> Self { + Self::build( + MAX_CACHED_TX_FOR_VERIFICATION, + MAX_WAITING_TX_FOR_VERIFICATION, + MAX_WAITING_TIME_IN_MS, + ) + } + pub fn build(cache_size: usize, max_waiting_tx: usize, max_waiting_time: u64) -> Self { + let cached_tx_for_verification = + LruCache::new(std::num::NonZeroUsize::new(cache_size).unwrap()); + TxCache { + waiting_tx: HashMap::new(), + cached_tx_for_verification, + max_waiting_tx, + max_waiting_time: Duration::from_millis(max_waiting_time), + } + } + + pub fn is_tx_cached(&self, hash: &Hash) -> bool { + self.cached_tx_for_verification.contains(hash) + } + + pub fn add_cached_tx(&mut self, hash: Hash) { + self.cached_tx_for_verification.put(hash, WaitTx); + } + + pub fn add_new_waiting_tx(&mut self, parent: Hash, tx: TxEvent) { + // Try to evict when the max waiting Tx is reach. + if self.waiting_tx.len() >= self.max_waiting_tx { + self.evict_old_waiting_tx(); + } + let (waiting_txs, _) = self + .waiting_tx + .entry(parent) + .or_insert((vec![], Instant::now())); + waiting_txs.push(tx); + } + + pub fn remove_waiting_children_txs(&mut self, parent: &Hash) -> Vec> { + self.waiting_tx + .remove(parent) + .map(|(txs, _)| txs) + .unwrap_or_default() + } + + fn evict_old_waiting_tx(&mut self) { + let now = Instant::now(); + let to_remove_hash: Vec<_> = self + .waiting_tx + .iter() + .filter_map(|(hash, (_, ts))| { + (now.duration_since(*ts) > self.max_waiting_time).then_some(*hash) + }) + .collect(); + if !to_remove_hash.is_empty() { + // Warn if some Tx are evicted because it shouldn't. + tracing::warn!("Tx validation, Evict some Tx from waiting for parent tx list."); + for hash in to_remove_hash { + tracing::warn!("Tx validation, Evict Tx:{hash}."); + self.waiting_tx.remove(&hash); + } + } + } +} + +impl Default for TxCache { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use crate::txvalidation::Created; + use crate::types::transaction::Payload; + use eyre::Result; + use libsecp256k1::SecretKey; + use rand::{rngs::StdRng, SeedableRng}; + use std::collections::VecDeque; + use tokio::sync::Mutex; + + struct TestDb(Mutex>>); + + #[async_trait::async_trait] + impl Storage for TestDb { + async fn get(&self, hash: &Hash) -> Result>> { + Ok(self.0.lock().await.get(hash).cloned()) + } + async fn set(&self, tx: &Transaction) -> Result<()> { + self.0.lock().await.insert(tx.hash, tx.clone()); + Ok(()) + } + async fn fill_deque(&self, deque: &mut VecDeque>) -> Result<()> { + Ok(()) + } + } + + fn new_empty_tx_event() -> TxEvent { + new_tx_event(Payload::Empty) + } + + fn new_proof_tx_event(parent: Hash) -> TxEvent { + let payload = Payload::Proof { + parent, + prover: Hash::default(), + proof: vec![], + files: vec![], + }; + + new_tx_event(payload) + } + + fn new_tx_event(payload: Payload) -> TxEvent { + let rng = &mut StdRng::from_entropy(); + + let tx = Transaction::::new(payload, &SecretKey::random(rng)); + + let tx = Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Received::P2P, + }; + TxEvent { + tx, + tx_type: WaitTx, + } + } + + fn into_receive(tx: Transaction) -> Transaction { + Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Validated, + } + } + + #[tokio::test] + async fn test_evict_wait_tx() { + let db = TestDb(Mutex::new(HashMap::new())); + // Set parameters to have Tx eviction + let mut wait_tx_cache = TxCache::build(2, 2, 10); + + // Create the parent Txs that will only be processed at the end. + // Create 2 parents to have 2 waiting child Tx + let parent1_tx_event = new_empty_tx_event(); + let parent1_hash = parent1_tx_event.tx.hash; + let parent2_tx_event = new_empty_tx_event(); + let parent2_hash = parent2_tx_event.tx.hash; + + // New Tx that will wait. + let tx_event = new_proof_tx_event(parent1_hash); + let res = tx_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + assert_eq!(wait_tx_cache.waiting_tx.len(), 1); + + // New Tx that will wait. + let tx_event = new_proof_tx_event(parent2_hash); + let res = tx_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + assert_eq!(wait_tx_cache.waiting_tx.len(), 2); + + tokio::time::sleep(Duration::from_millis(100)).await; + + // Add new Tx to evict the old one. + let tx_event = new_proof_tx_event(parent1_hash); + let tx_hash = tx_event.tx.hash; + let res = tx_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + // Evicted but new tx added => len=1. + assert_eq!(wait_tx_cache.waiting_tx.len(), 1); + + //Process the parent Tx. Do child Tx return because they have been evicted. + let res = parent1_tx_event + .process_event(&mut wait_tx_cache, &db) + .await; + assert!(res.is_ok()); + let ret_txs = res.unwrap(); + // Return only the parent Tx. The other has been evicted during parent Tx add. + assert_eq!(ret_txs.len(), 2); + assert!(ret_txs[0].tx.hash == parent1_hash || ret_txs[1].tx.hash == parent1_hash); + assert!(ret_txs[0].tx.hash == tx_hash || ret_txs[1].tx.hash == tx_hash); + } + + #[tokio::test] + async fn test_wait_tx_process_event() { + let db = TestDb(Mutex::new(HashMap::new())); + // Set parameters to avoid wait tx eviction. + let mut wait_tx_cache = TxCache::build(2, 10, 1000); + + // Test a new tx without parent. No wait and added to cache. + let tx_event1 = new_empty_tx_event(); + let tx1_hash = tx_event1.tx.hash; + let tx1 = tx_event1.tx.clone(); + let res = tx_event1.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + // Save the Tx in db to test cache miss. + let _ = db.set(&into_receive(tx1)).await; + // Not in wait cache because no parent. + assert_eq!(wait_tx_cache.waiting_tx.len(), 0); + assert!(wait_tx_cache.is_tx_cached(&tx1_hash)); + + // Test a new tx with a present parent. No wait and added to cache. + let tx2_event = new_proof_tx_event(tx1_hash); + let tx2_hash = tx2_event.tx.hash; + let tx2 = tx2_event.tx.clone(); + let res = tx2_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + // Save the Tx in db to test cache miss. + let _ = db.set(&into_receive(tx2)).await; + // Not cached because no parent. + assert_eq!(wait_tx_cache.waiting_tx.len(), 0); + assert!(wait_tx_cache.is_tx_cached(&tx2_hash)); + + // Test a new Tx with a missing parent. Waiting / not cached. + let parent_tx_event = new_empty_tx_event(); + let parent_hash = parent_tx_event.tx.hash; + let parent_tx = parent_tx_event.tx.clone(); + let tx_event3 = new_proof_tx_event(parent_tx_event.tx.hash); + let tx3_hash = tx_event3.tx.hash; + let res = tx_event3.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + assert_eq!(wait_tx_cache.waiting_tx.len(), 1); + assert!(!wait_tx_cache.is_tx_cached(&tx3_hash)); + + // Test process parent Tx: Tx3 removed from waiting, Tx3 and parent added to cached. Return Tx3 and parent. + let res = parent_tx_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + assert_eq!(wait_tx_cache.waiting_tx.len(), 0); + assert!(wait_tx_cache.is_tx_cached(&parent_hash)); + assert!(wait_tx_cache.is_tx_cached(&tx3_hash)); + let ret_events = res.unwrap(); + assert_eq!(ret_events.len(), 2); + assert!(ret_events[0].tx.hash == parent_hash || ret_events[1].tx.hash == parent_hash); + assert!(ret_events[0].tx.hash == tx3_hash || ret_events[1].tx.hash == tx3_hash); + + //Test a cache miss, get the parent from the DB. No wait + cached + assert!(!wait_tx_cache.is_tx_cached(&tx1_hash)); + let tx4_event = new_proof_tx_event(tx1_hash); // tx1_hash not in the cache but in the DB + let tx4_hash = tx4_event.tx.hash; + let res = tx4_event.process_event(&mut wait_tx_cache, &db).await; + assert!(res.is_ok()); + // Not cached because parent in db + assert_eq!(wait_tx_cache.waiting_tx.len(), 0); + assert!(wait_tx_cache.is_tx_cached(&tx4_hash)); + } +} diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs index 61d6b1cd..29dc54ba 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/txvalidation/mod.rs @@ -1,8 +1,11 @@ +use crate::mempool::Storage; +use crate::txvalidation::event::TxCache; use crate::txvalidation::event::{ReceivedTx, TxEvent}; use crate::types::{ transaction::{Created, Received, Validated}, Transaction, }; +use futures::stream::FuturesUnordered; use futures_util::Stream; use futures_util::TryFutureExt; use std::collections::HashMap; @@ -12,12 +15,14 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use thiserror::Error; +use tokio::select; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{self, UnboundedReceiver}; use tokio::sync::oneshot; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::StreamExt; pub mod acl; mod download_manager; @@ -48,6 +53,8 @@ pub enum EventProcessError { DownloadAssetError(String), #[error("Save Tx error: {0}")] SaveTxError(String), + #[error("Storage access error: {0}")] + StorageError(String), #[error("AclWhite list authenticate error: {0}")] AclWhiteListAuthError(#[from] acl::AclWhiteListError), } @@ -121,6 +128,7 @@ impl TxEventSender { } //Main event processing loog. +#[allow(clippy::too_many_arguments)] pub async fn spawn_event_loop( local_directory_path: PathBuf, bind_addr: SocketAddr, @@ -130,14 +138,15 @@ pub async fn spawn_event_loop( // Used to receive new transactions that arrive to the node from the outside. mut rcv_tx_event_rx: UnboundedReceiver<(Transaction, Option)>, // Endpoint where validated transactions are sent to. Usually configured with Mempool. - newtx_receiver: Arc>, + new_tx_receiver: Arc>, + storage: Arc, ) -> eyre::Result<( JoinHandle<()>, // Output stream used to propagate transactions. impl Stream>, )> { let local_directory_path = Arc::new(local_directory_path); - //start http download manager + // Start http download manager let download_jh = download_manager::serve_files(bind_addr, http_download_port, local_directory_path.clone()) .await?; @@ -146,52 +155,96 @@ pub async fn spawn_event_loop( let p2p_stream = UnboundedReceiverStream::new(p2p_recv); let jh = tokio::spawn({ let local_directory_path = local_directory_path.clone(); + let mut wait_tx_cache = TxCache::new(); + let mut validated_txs_futures = FuturesUnordered::new(); + let mut validation_okresult_futures = FuturesUnordered::new(); + let mut validation_errresult_futures = FuturesUnordered::new(); async move { - while let Some((tx, callback)) = rcv_tx_event_rx.recv().await { - //create new event with the Tx - let event: TxEvent = tx.into(); - - //process RcvTx(EventTx) event - let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await; - - tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string()); - - //process the receive event - tokio::spawn({ - let p2p_sender = p2p_sender.clone(); - let local_directory_path = local_directory_path.clone(); - let acl_whitelist = acl_whitelist.clone(); - let newtx_receiver = newtx_receiver.clone(); - async move { - let res = event - .process_event(acl_whitelist.as_ref()) - .and_then(|download_event| { - download_event.process_event(&local_directory_path, http_peer_list) - }) - .and_then(|(new_tx, propagate_tx)| async move { - if let Some(propagate_tx) = propagate_tx { - propagate_tx.process_event(&p2p_sender).await?; + loop { + select! { + // Execute Tx verification in a separate task. + Some((tx, callback)) = rcv_tx_event_rx.recv() => { + //create new event with the Tx + let event: TxEvent = tx.into(); + + // Process RcvTx(EventTx) event + let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await; + + tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string()); + + // Process the receive event + let validate_jh = tokio::spawn({ + let p2p_sender = p2p_sender.clone(); + let local_directory_path = local_directory_path.clone(); + let acl_whitelist = acl_whitelist.clone(); + async move { + event + .process_event(acl_whitelist.as_ref()) + .and_then(|download_event| { + download_event.process_event(&local_directory_path, http_peer_list) + }) + .and_then(|(wait_tx, propagate_tx)| async move { + if let Some(propagate_tx) = propagate_tx { + propagate_tx.process_event(&p2p_sender).await?; + } + Ok(wait_tx) + }) + .await + } + }); + let fut = validate_jh + .or_else(|err| async move {Err(EventProcessError::ValidateError(format!("Process execution error:{err}")))} ) + .and_then(|res| async move {Ok((res,callback))}); + validated_txs_futures.push(fut); + } + // Verify Tx parent and send to mempool all ready Tx. + Some(Ok((wait_tx_res, callback))) = validated_txs_futures.next() => { + match wait_tx_res { + Ok(wait_tx) => { + match wait_tx.process_event(&mut wait_tx_cache, storage.as_ref()).await { + Ok(new_tx_list) => { + // + let jh = tokio::spawn({ + let new_tx_receiver = new_tx_receiver.clone(); + async move { + for new_tx in new_tx_list { + new_tx.process_event(&mut *(new_tx_receiver.write().await)).await?; + } + Ok(()) + } + }); + let fut = jh + .or_else(|err| async move {Err(EventProcessError::ValidateError(format!("Process execution error:{err}")))} ) + .and_then(|res| async move {Ok((res,callback))}); + validation_okresult_futures.push(fut); + } + Err(err) => { + validation_errresult_futures.push(futures::future::ready((err, callback))); + } } - new_tx - .process_event(&mut *(newtx_receiver.write().await)) - .await?; - - Ok(()) - }) - .await; - //log the error if any error is return - if let Err(ref err) = res { - tracing::error!("An error occurs during Tx validation: {err}",); + + } + Err(err) => { + validation_errresult_futures.push(futures::future::ready((err, callback))); + } } - //send the execution result back if needed. + } + Some(Ok((res, callback))) = validation_okresult_futures.next() => { if let Some(callback) = callback { - //forget the result because if the RPC connection is closed the send can fail. + // Forget the result because if the RPC connection is closed the send can fail. let _ = callback.send(res); } - } - }); - } + } + Some((res, callback)) = validation_errresult_futures.next() => { + if let Some(callback) = callback { + // Forget the result because if the RPC connection is closed the send can fail. + let _ = callback.send(Err(res)); + } + } + + } // End select! + } // End loop } }); Ok((jh, p2p_stream)) diff --git a/crates/node/src/types/transaction.rs b/crates/node/src/types/transaction.rs index a0c7f796..82fb5dfe 100644 --- a/crates/node/src/types/transaction.rs +++ b/crates/node/src/types/transaction.rs @@ -193,6 +193,15 @@ impl std::fmt::Display for Payload { } impl Payload { + // Return the parent tx associated to the payloads, if any. + pub fn get_parent_tx(&self) -> Option<&Hash> { + match self { + Payload::Proof { parent, .. } => Some(parent), + Payload::ProofKey { parent, .. } => Some(parent), + Payload::Verification { parent, .. } => Some(parent), + _ => None, + } + } pub fn serialize_into(&self, buf: &mut Vec) { match self { Payload::Empty => {}