From df21b23e6a5e884473e1276c256ed990cd283b02 Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 10 Apr 2024 12:49:05 +0200 Subject: [PATCH] move tvvalidation in mempool --- .../src/{txvalidation/acl.rs => acl/mod.rs} | 0 crates/node/src/lib.rs | 1 + crates/node/src/main.rs | 57 +++--- .../src/{txvalidation => mempool}/event.rs | 65 ++++--- crates/node/src/mempool/mod.rs | 166 ++++++++++++++++-- .../txvalidation/download_manager.rs | 0 .../src/{ => mempool}/txvalidation/mod.rs | 146 ++------------- crates/node/src/networking/p2p/pea2pea.rs | 25 +-- crates/node/src/rpc_server/mod.rs | 17 +- crates/node/src/scheduler/mod.rs | 10 +- crates/node/src/storage/database/postgres.rs | 4 +- 11 files changed, 261 insertions(+), 230 deletions(-) rename crates/node/src/{txvalidation/acl.rs => acl/mod.rs} (100%) rename crates/node/src/{txvalidation => mempool}/event.rs (93%) rename crates/node/src/{ => mempool}/txvalidation/download_manager.rs (100%) rename crates/node/src/{ => mempool}/txvalidation/mod.rs (60%) diff --git a/crates/node/src/txvalidation/acl.rs b/crates/node/src/acl/mod.rs similarity index 100% rename from crates/node/src/txvalidation/acl.rs rename to crates/node/src/acl/mod.rs diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index c156acf1..93860b05 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -1,3 +1,4 @@ +pub mod acl; pub mod rpc_client; pub mod types; diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index f3027c96..402f353b 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -36,7 +36,6 @@ mod networking; mod rpc_server; mod scheduler; mod storage; -mod txvalidation; mod vmm; mod watchdog; mod workflow; @@ -44,8 +43,8 @@ mod workflow; use mempool::Mempool; use storage::{database::entity, Database}; +use crate::mempool::CallbackSender; use crate::networking::WhitelistSyncer; -use crate::txvalidation::{CallbackSender, ValidatedTxReceiver}; fn start_logger(default_level: LevelFilter) { let filter = match EnvFilter::try_from_default_env() { @@ -148,7 +147,7 @@ fn generate_key(opts: KeyOptions) -> Result<()> { } #[async_trait] -impl txvalidation::ValidateStorage for storage::Database { +impl mempool::ValidateStorage for storage::Database { async fn get_tx(&self, hash: &Hash) -> Result>> { self.find_transaction(hash).await } @@ -181,6 +180,8 @@ impl mempool::Storage for storage::Database { } } +impl crate::mempool::MempoolStorage for storage::Database {} + #[async_trait] impl workflow::TransactionStore for storage::Database { async fn find_transaction(&self, tx_hash: &Hash) -> Result>> { @@ -228,9 +229,22 @@ async fn run(config: Arc) -> Result<()> { let (tx_sender, rcv_tx_event_rx) = mpsc::unbounded_channel::<(Transaction, Option)>(); - //To show to idea. Should use your config definition - let new_validated_tx_receiver: Arc> = if !config.no_execution { - let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?)); + //Start Mempool + let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?)); + let (txevent_loop_jh, p2p_stream) = mempool::Mempool::start_tx_validation_event_loop( + config.data_directory.clone(), + config.p2p_listen_addr, + config.http_download_port, + http_peer_list.clone(), + rcv_tx_event_rx, + mempool.clone(), + database.clone(), + database.clone(), + ) + .await?; + + if !config.no_execution { + //start execution scheduler. let scheduler_watchdog_sender = watchdog::start_healthcheck(config.http_healthcheck_listen_addr).await?; let scheduler = scheduler::start_scheduler( @@ -244,30 +258,7 @@ async fn run(config: Arc) -> Result<()> { // Run Scheduler in its own task. tokio::spawn(async move { scheduler.run(scheduler_watchdog_sender).await }); - mempool - } else { - struct ArchiveMempool(Arc); - #[async_trait] - impl ValidatedTxReceiver for ArchiveMempool { - async fn send_new_tx(&mut self, tx: Transaction) -> eyre::Result<()> { - self.0.as_ref().add_transaction(&tx).await - } - } - Arc::new(RwLock::new(ArchiveMempool(database.clone()))) - }; - - // Start Tx process event loop. - let (txevent_loop_jh, p2p_stream) = txvalidation::spawn_event_loop( - config.data_directory.clone(), - config.p2p_listen_addr, - config.http_download_port, - http_peer_list.clone(), - database.clone(), - rcv_tx_event_rx, - new_validated_tx_receiver.clone(), - database.clone(), - ) - .await?; + } let public_node_key = PublicKey::from_secret_key(&node_key); let node_resources = scheduler::get_configured_resources(&config); @@ -280,7 +271,7 @@ async fn run(config: Arc) -> Result<()> { Some(config.http_download_port), config.p2p_advertised_listen_addr, http_peer_list, - txvalidation::TxEventSender::::build(tx_sender.clone()), + mempool::TxEventSender::::build(tx_sender.clone()), p2p_stream, node_resources, ) @@ -320,7 +311,7 @@ async fn run(config: Arc) -> Result<()> { let rpc_server = rpc_server::RpcServer::run( config.clone(), database.clone(), - txvalidation::TxEventSender::::build(tx_sender), + mempool::TxEventSender::::build(tx_sender), ) .await?; @@ -356,7 +347,7 @@ async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> { None, config.p2p_advertised_listen_addr, http_peer_list, - txvalidation::TxEventSender::::build(tx), + mempool::TxEventSender::::build(tx), p2p_stream, (0, 0, 0), // P2P beacon node's resources aren't really important. ) diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/mempool/event.rs similarity index 93% rename from crates/node/src/txvalidation/event.rs rename to crates/node/src/mempool/event.rs index dd69b465..00cda4bc 100644 --- a/crates/node/src/txvalidation/event.rs +++ b/crates/node/src/mempool/event.rs @@ -1,8 +1,6 @@ use super::ValidatedTxReceiver; -use crate::txvalidation::acl::AclWhitelist; -use crate::txvalidation::download_manager; -use crate::txvalidation::EventProcessError; -use crate::txvalidation::ValidateStorage; +use crate::mempool::txvalidation::download_manager; +use crate::mempool::ValidateStorage; use crate::types::Hash; use crate::types::{ transaction::{Received, Validated}, @@ -10,6 +8,7 @@ use crate::types::{ }; use futures::future::join_all; use futures_util::TryFutureExt; +use gevulot_node::acl; use lru::LruCache; use std::collections::HashMap; use std::fmt::Debug; @@ -19,12 +18,30 @@ use std::net::SocketAddr; use std::path::Path; use std::time::Duration; use std::time::Instant; +use thiserror::Error; use tokio::sync::mpsc::UnboundedSender; const MAX_CACHED_TX_FOR_VERIFICATION: usize = 50; const MAX_WAITING_TX_FOR_VERIFICATION: usize = 100; const MAX_WAITING_TIME_IN_MS: u64 = 3600 * 1000; // One hour. +#[allow(clippy::enum_variant_names)] +#[derive(Error, Debug)] +pub enum EventProcessError { + #[error("Fail to send the Tx on the channel: {0}")] + PropagateTxError(#[from] Box>>), + #[error("validation fail: {0}")] + ValidateError(String), + #[error("Tx asset fail to download because {0}")] + DownloadAssetError(String), + #[error("Save Tx error: {0}")] + SaveTxError(String), + #[error("Storage access error: {0}")] + StorageError(String), + #[error("AclWhite list authenticate error: {0}")] + AclWhiteListAuthError(#[from] acl::AclWhiteListError), +} + //event type. #[derive(Debug, Clone)] pub struct ReceivedTx; @@ -102,9 +119,9 @@ impl From> for TxEvent { //Processing of event that arrive: SourceTxType. impl TxEvent { - pub async fn process_event( + pub async fn verify_tx( self, - acl_whitelist: &impl AclWhitelist, + acl_whitelist: &impl acl::AclWhitelist, ) -> Result, EventProcessError> { match self.validate_tx(acl_whitelist).await { Ok(()) => Ok(self.into()), @@ -117,7 +134,7 @@ impl TxEvent { //Tx validation process. async fn validate_tx( &self, - acl_whitelist: &impl AclWhitelist, + acl_whitelist: &impl acl::AclWhitelist, ) -> Result<(), EventProcessError> { self.tx.validate().map_err(|err| { EventProcessError::ValidateError(format!("Error during transaction validation:{err}",)) @@ -136,7 +153,7 @@ impl TxEvent { //Download Tx processing impl TxEvent { - pub async fn process_event( + pub async fn downlod_tx_assets( self, local_directory_path: &Path, http_peer_list: Vec<(SocketAddr, Option)>, @@ -174,7 +191,7 @@ impl TxEvent { // Propagate Tx processing. impl TxEvent { - pub async fn process_event( + pub async fn propagate_tx( self, p2p_sender: &UnboundedSender>, ) -> Result<(), EventProcessError> { @@ -204,7 +221,7 @@ impl TxEvent { // Manage Run Tx and put to wait depending on progam. // Manage Proof and Verify Tx to wait depending on Run Tx. impl TxEvent { - pub async fn process_event( + pub async fn validate_tx_dep( self, programid_cache: &mut TxCache, parent_cache: &mut TxCache>, @@ -366,7 +383,7 @@ impl TxEvent { } impl TxEvent { - pub async fn process_event( + pub async fn save_tx( self, newtx_receiver: &mut dyn ValidatedTxReceiver, ) -> Result<(), EventProcessError> { @@ -485,7 +502,7 @@ impl Default for TxCache { mod tests { use super::*; - use crate::txvalidation::Created; + use crate::mempool::Created; use crate::types::transaction::Payload; use crate::types::transaction::ProgramMetadata; use crate::types::transaction::{Workflow, WorkflowStep}; @@ -623,7 +640,7 @@ mod tests { // Valide Run Tx. Put in Wait cache let res = run1_tx_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); // Run Tx is waiting @@ -634,7 +651,7 @@ mod tests { // Deploy1 Tx, only return the deploy Tx. Wait for deploy2 program. let res = deploy1_tx_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); assert_eq!(wait_progam_cache.waiting_tx.len(), 1); @@ -644,7 +661,7 @@ mod tests { // Deploy2 Tx, return 2 tx, deploy2 + Run. let res = deploy2_tx_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); //remove from cache @@ -762,7 +779,7 @@ mod tests { // New Tx that will wait. let tx_event = new_proof_tx_event(parent1_hash); let res = tx_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); assert_eq!(wait_tx_cache.waiting_tx.len(), 1); @@ -770,7 +787,7 @@ mod tests { // New Tx that will wait. let tx_event = new_proof_tx_event(parent2_hash); let res = tx_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); assert_eq!(wait_tx_cache.waiting_tx.len(), 2); @@ -781,7 +798,7 @@ mod tests { let tx_event = new_proof_tx_event(parent1_hash); let tx_hash = tx_event.tx.hash; let res = tx_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); // Evicted but new tx added => len=1. @@ -789,7 +806,7 @@ mod tests { //Process the parent Tx. Do child Tx return because they have been evicted. let res = parent1_tx_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); let ret_txs = res.unwrap(); @@ -814,7 +831,7 @@ mod tests { let tx1_hash = tx_event1.tx.hash; let tx1 = tx_event1.tx.clone(); let res = tx_event1 - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); // Save the Tx in db to test cache miss. @@ -828,7 +845,7 @@ mod tests { let tx2_hash = tx2_event.tx.hash; let tx2 = tx2_event.tx.clone(); let res = tx2_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); // Save the Tx in db to test cache miss. @@ -844,7 +861,7 @@ mod tests { let tx_event3 = new_proof_tx_event(parent_tx_event.tx.hash); let tx3_hash = tx_event3.tx.hash; let res = tx_event3 - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); assert_eq!(wait_tx_cache.waiting_tx.len(), 1); @@ -852,7 +869,7 @@ mod tests { // Test process parent Tx: Tx3 removed from waiting, Tx3 and parent added to cached. Return Tx3 and parent. let res = parent_tx_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); assert_eq!(wait_tx_cache.waiting_tx.len(), 0); @@ -868,7 +885,7 @@ mod tests { let tx4_event = new_proof_tx_event(tx1_hash); // tx1_hash not in the cache but in the DB let tx4_hash = tx4_event.tx.hash; let res = tx4_event - .process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db) + .validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db) .await; assert!(res.is_ok()); // Not cached because parent in db diff --git a/crates/node/src/mempool/mod.rs b/crates/node/src/mempool/mod.rs index fdc62ca6..13c6f770 100644 --- a/crates/node/src/mempool/mod.rs +++ b/crates/node/src/mempool/mod.rs @@ -1,39 +1,180 @@ -use crate::txvalidation::ValidatedTxReceiver; -use crate::types::{transaction::Validated, Hash, Transaction}; +use crate::types::{ + transaction::{Created, Received, Validated}, + Hash, Transaction, +}; use async_trait::async_trait; use eyre::Result; +use futures_util::Stream; +use gevulot_node::acl; +use std::collections::HashMap; use std::collections::VecDeque; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::net::SocketAddr; +use std::path::PathBuf; use std::sync::Arc; use thiserror::Error; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; -#[async_trait] -pub trait Storage: Send + Sync { - async fn get(&self, hash: &Hash) -> Result>>; - async fn set(&self, tx: &Transaction) -> Result<()>; - async fn fill_deque(&self, deque: &mut VecDeque>) -> Result<()>; -} +mod event; +mod txvalidation; #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] pub enum MempoolError { + #[error("Tx validation fail: {0}")] + TxValidationError(String), #[error("permission denied")] PermissionDenied, + #[error("Error during Tx processing")] + EventProcess(#[from] event::EventProcessError), + #[error("Fail to send the Tx on the channel: {0}")] + SendChannelError( + #[from] + tokio::sync::mpsc::error::SendError<(Transaction, Option)>, + ), + #[error("Fail to rcv Tx from the channel: {0}")] + RcvChannelError(#[from] tokio::sync::oneshot::error::RecvError), +} + +pub type CallbackSender = oneshot::Sender>; + +// Sending Tx interface. +// Some marker type to define the sender source. +pub struct RpcSender; +#[derive(Clone)] +pub struct P2pSender; +pub struct TxResultSender; + +// `TxEventSender` holds the received transaction of a specific state together with an optional callback interface. +#[derive(Debug, Clone)] +pub struct TxEventSender { + sender: UnboundedSender<(Transaction, Option)>, + _marker: PhantomData, +} + +//Manage send from the p2p source +impl TxEventSender { + pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { + TxEventSender { + sender, + _marker: PhantomData, + } + } + + pub fn send_tx(&self, tx: Transaction) -> Result<(), MempoolError> { + self.sender + .send((tx.into_received(Received::P2P), None)) + .map_err(|err| err.into()) + } +} + +//Manage send from the RPC source +impl TxEventSender { + pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { + TxEventSender { + sender, + _marker: PhantomData, + } + } + + pub async fn send_tx(&self, tx: Transaction) -> Result<(), MempoolError> { + let (sender, rx) = oneshot::channel(); + self.sender + .send((tx.into_received(Received::RPC), Some(sender))) + .map_err(MempoolError::from)?; + rx.await? + } +} + +//Manage send from the Tx result execution source +impl TxEventSender { + pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { + TxEventSender { + sender, + _marker: PhantomData, + } + } + + pub async fn send_tx(&self, tx: Transaction) -> Result<(), MempoolError> { + let (sender, rx) = oneshot::channel(); + self.sender + .send((tx.into_received(Received::TXRESULT), Some(sender))) + .map_err(MempoolError::from)?; + rx.await? + } +} + +// `ValidatedTxReceiver` provides a simple trait to decouple event based +// Transaction handling from the execution part. +#[async_trait::async_trait] +pub trait ValidatedTxReceiver: Send + Sync { + async fn send_new_tx(&mut self, tx: Transaction) -> eyre::Result<()>; } +#[async_trait] +pub trait ValidateStorage: Send + Sync { + async fn get_tx(&self, hash: &Hash) -> eyre::Result>>; + async fn contains_program(&self, hash: Hash) -> eyre::Result; +} + +#[async_trait] +pub trait Storage: Send + Sync { + async fn get(&self, hash: &Hash) -> Result>>; + async fn set(&self, tx: &Transaction) -> Result<()>; + async fn fill_deque(&self, deque: &mut VecDeque>) -> Result<()>; +} + +#[async_trait] +pub trait MempoolStorage: Storage + acl::AclWhitelist + ValidateStorage {} + #[derive(Clone)] pub struct Mempool { - storage: Arc, + storage: Arc, deque: VecDeque>, } impl Mempool { - pub async fn new(storage: Arc) -> Result { + pub async fn new(storage: Arc) -> Result { let mut deque = VecDeque::new(); storage.fill_deque(&mut deque).await?; - Ok(Self { storage, deque }) } + #[allow(clippy::too_many_arguments)] + pub async fn start_tx_validation_event_loop( + local_directory_path: PathBuf, + bind_addr: SocketAddr, + http_download_port: u16, + http_peer_list: Arc>>>, + // Used to receive new transactions that arrive to the node from the outside. + rcv_tx_event_rx: UnboundedReceiver<(Transaction, Option)>, + mempool: Arc>, + storage: Arc, + acl_whitelist: Arc, + ) -> eyre::Result<( + JoinHandle<()>, + // Output stream used to propagate transactions. + impl Stream>, + )> { + // Start Tx validation event loop. + txvalidation::spawn_event_loop( + local_directory_path, + bind_addr, + http_download_port, + http_peer_list, + rcv_tx_event_rx, + mempool, + storage, + acl_whitelist, + ) + .await + } + pub fn next(&mut self) -> Option> { // TODO(tuommaki): Should storage reflect the POP in state? self.deque.pop_front() @@ -46,6 +187,9 @@ impl Mempool { pub async fn add(&mut self, tx: Transaction) -> Result<()> { self.storage.set(&tx).await?; self.deque.push_back(tx); + + tracing::trace!("mempool add Tx done"); + Ok(()) } diff --git a/crates/node/src/txvalidation/download_manager.rs b/crates/node/src/mempool/txvalidation/download_manager.rs similarity index 100% rename from crates/node/src/txvalidation/download_manager.rs rename to crates/node/src/mempool/txvalidation/download_manager.rs diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/mempool/txvalidation/mod.rs similarity index 60% rename from crates/node/src/txvalidation/mod.rs rename to crates/node/src/mempool/txvalidation/mod.rs index 8e0962ca..8ec5cf59 100644 --- a/crates/node/src/txvalidation/mod.rs +++ b/crates/node/src/mempool/txvalidation/mod.rs @@ -1,137 +1,29 @@ -use crate::txvalidation::event::TxCache; -use crate::txvalidation::event::{ReceivedTx, TxEvent}; +use crate::mempool::acl; +use crate::mempool::event::EventProcessError; +use crate::mempool::event::TxCache; +use crate::mempool::event::{ReceivedTx, TxEvent}; +use crate::mempool::CallbackSender; +use crate::mempool::ValidateStorage; +use crate::mempool::ValidatedTxReceiver; use crate::types::{ - transaction::{Created, Received, Validated}, - Hash, Program, Transaction, + transaction::{Received, Validated}, + Program, Transaction, }; -use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures_util::Stream; use futures_util::TryFutureExt; use std::collections::HashMap; -use std::fmt::Debug; -use std::marker::PhantomData; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; -use thiserror::Error; use tokio::select; -use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{self, UnboundedReceiver}; -use tokio::sync::oneshot; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::StreamExt; -pub mod acl; -mod download_manager; -mod event; - -// `ValidatedTxReceiver` provides a simple trait to decouple event based -// Transaction handling from the execution part. -#[async_trait::async_trait] -pub trait ValidatedTxReceiver: Send + Sync { - async fn send_new_tx(&mut self, tx: Transaction) -> eyre::Result<()>; -} - -#[async_trait] -pub trait ValidateStorage: Send + Sync { - async fn get_tx(&self, hash: &Hash) -> eyre::Result>>; - async fn contains_program(&self, hash: Hash) -> eyre::Result; -} - -#[allow(clippy::enum_variant_names)] -#[derive(Error, Debug)] -pub enum EventProcessError { - #[error("Fail to rcv Tx from the channel: {0}")] - RcvChannelError(#[from] tokio::sync::oneshot::error::RecvError), - #[error("Fail to send the Tx on the channel: {0}")] - SendChannelError( - #[from] - tokio::sync::mpsc::error::SendError<(Transaction, Option)>, - ), - #[error("Fail to send the Tx on the channel: {0}")] - PropagateTxError(#[from] Box>>), - #[error("validation fail: {0}")] - ValidateError(String), - #[error("Tx asset fail to download because {0}")] - DownloadAssetError(String), - #[error("Save Tx error: {0}")] - SaveTxError(String), - #[error("Storage access error: {0}")] - StorageError(String), - #[error("AclWhite list authenticate error: {0}")] - AclWhiteListAuthError(#[from] acl::AclWhiteListError), -} - -pub type CallbackSender = oneshot::Sender>; - -// Sending Tx interface. -// Some marker type to define the sender source. -pub struct RpcSender; -#[derive(Clone)] -pub struct P2pSender; -pub struct TxResultSender; - -// `TxEventSender` holds the received transaction of a specific state together with an optional callback interface. -#[derive(Debug, Clone)] -pub struct TxEventSender { - sender: UnboundedSender<(Transaction, Option)>, - _marker: PhantomData, -} - -//Manage send from the p2p source -impl TxEventSender { - pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { - TxEventSender { - sender, - _marker: PhantomData, - } - } - - pub fn send_tx(&self, tx: Transaction) -> Result<(), EventProcessError> { - self.sender - .send((tx.into_received(Received::P2P), None)) - .map_err(|err| err.into()) - } -} - -//Manage send from the RPC source -impl TxEventSender { - pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { - TxEventSender { - sender, - _marker: PhantomData, - } - } - - pub async fn send_tx(&self, tx: Transaction) -> Result<(), EventProcessError> { - let (sender, rx) = oneshot::channel(); - self.sender - .send((tx.into_received(Received::RPC), Some(sender))) - .map_err(EventProcessError::from)?; - rx.await? - } -} - -//Manage send from the Tx result execution source -impl TxEventSender { - pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { - TxEventSender { - sender, - _marker: PhantomData, - } - } - - pub async fn send_tx(&self, tx: Transaction) -> Result<(), EventProcessError> { - let (sender, rx) = oneshot::channel(); - self.sender - .send((tx.into_received(Received::TXRESULT), Some(sender))) - .map_err(EventProcessError::from)?; - rx.await? - } -} +pub mod download_manager; //Main event processing loog. #[allow(clippy::too_many_arguments)] @@ -140,12 +32,12 @@ pub async fn spawn_event_loop( bind_addr: SocketAddr, http_download_port: u16, http_peer_list: Arc>>>, - acl_whitelist: Arc, // Used to receive new transactions that arrive to the node from the outside. mut rcv_tx_event_rx: UnboundedReceiver<(Transaction, Option)>, // Endpoint where validated transactions are sent to. Usually configured with Mempool. new_tx_receiver: Arc>, storage: Arc, + acl_whitelist: Arc, ) -> eyre::Result<( JoinHandle<()>, // Output stream used to propagate transactions. @@ -185,13 +77,13 @@ pub async fn spawn_event_loop( let acl_whitelist = acl_whitelist.clone(); async move { event - .process_event(acl_whitelist.as_ref()) + .verify_tx(acl_whitelist.as_ref()) .and_then(|download_event| { - download_event.process_event(&local_directory_path, http_peer_list) + download_event.downlod_tx_assets(&local_directory_path, http_peer_list) }) .and_then(|(wait_tx, propagate_tx)| async move { if let Some(propagate_tx) = propagate_tx { - propagate_tx.process_event(&p2p_sender).await?; + propagate_tx.propagate_tx(&p2p_sender).await?; } Ok(wait_tx) }) @@ -207,7 +99,7 @@ pub async fn spawn_event_loop( Some(Ok((wait_tx_res, callback))) = validated_txs_futures.next() => { match wait_tx_res { Ok(wait_tx) => { - match wait_tx.process_event(&mut program_wait_tx_cache, &mut parent_wait_tx_cache, storage.as_ref()).await { + match wait_tx.validate_tx_dep(&mut program_wait_tx_cache, &mut parent_wait_tx_cache, storage.as_ref()).await { Ok(new_tx_list) => { // Process new Tx in the main loop // to avoid when there's a lot of waiting Txs free by one Tx, @@ -216,21 +108,21 @@ pub async fn spawn_event_loop( // The unbounded channel will buffer the Tx waiting. let mut res = Ok(()); for new_tx in new_tx_list { - if let Err(err) = new_tx.process_event(&mut *(new_tx_receiver.write().await)).await { + if let Err(err) = new_tx.save_tx(&mut *(new_tx_receiver.write().await)).await { tracing::error!("Error during validate save tx process_event :{err}"); res = Err(err); } } if let Some(callback) = callback { // Forget the result because if the RPC connection is closed the send can fail. - let _ = callback.send(res); + let _ = callback.send(res.map_err(|err|err.into())); } } Err(err) => { tracing::error!("Error during Tx dependency verification :{err}"); if let Some(callback) = callback { // Forget the result because if the RPC connection is closed the send can fail. - let _ = callback.send(Err(err)); + let _ = callback.send(Err(err.into())); } } } @@ -240,7 +132,7 @@ pub async fn spawn_event_loop( tracing::error!("Error during verify tx process_event :{err}"); if let Some(callback) = callback { // Forget the result because if the RPC connection is closed the send can fail. - let _ = callback.send(Err(err)); + let _ = callback.send(Err(err.into())); } } } diff --git a/crates/node/src/networking/p2p/pea2pea.rs b/crates/node/src/networking/p2p/pea2pea.rs index f9347071..910b0057 100644 --- a/crates/node/src/networking/p2p/pea2pea.rs +++ b/crates/node/src/networking/p2p/pea2pea.rs @@ -1,6 +1,6 @@ +use crate::mempool::P2pSender; +use crate::mempool::TxEventSender; use crate::metrics; -use crate::txvalidation::P2pSender; -use crate::txvalidation::TxEventSender; use futures_util::Stream; use libsecp256k1::PublicKey; use std::{ @@ -514,10 +514,8 @@ impl OnDisconnect for P2P { #[cfg(test)] mod tests { use super::*; - use crate::txvalidation; - use crate::txvalidation::CallbackSender; - use crate::txvalidation::EventProcessError; - use eyre::Result; + use crate::mempool; + use crate::mempool::CallbackSender; use gevulot_node::types::transaction::Payload; use gevulot_node::types::transaction::Received; use libsecp256k1::SecretKey; @@ -525,7 +523,6 @@ mod tests { use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{self}; - use tokio::sync::oneshot; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; @@ -535,10 +532,7 @@ mod tests { ) -> ( P2P, UnboundedSender>, - UnboundedReceiver<( - Transaction, - Option>>, - )>, + UnboundedReceiver<(Transaction, Option)>, ) { let http_peer_list1: Arc>>> = Default::default(); @@ -546,7 +540,7 @@ mod tests { let p2p_stream1 = UnboundedReceiverStream::new(p2p_recv1); let (sendtx1, txreceiver1) = mpsc::unbounded_channel::<(Transaction, Option)>(); - let txsender1 = txvalidation::TxEventSender::::build(sendtx1); + let txsender1 = mempool::TxEventSender::::build(sendtx1); let peer = P2P::new( name, "127.0.0.1:0".parse().unwrap(), @@ -568,10 +562,7 @@ mod tests { ) -> ( P2P, UnboundedSender>, - UnboundedReceiver<( - Transaction, - Option>>, - )>, + UnboundedReceiver<(Transaction, Option)>, ) { let http_peer_list1: Arc>>> = Default::default(); @@ -579,7 +570,7 @@ mod tests { let p2p_stream1 = UnboundedReceiverStream::new(p2p_recv1); let (sendtx1, txreceiver1) = mpsc::unbounded_channel::<(Transaction, Option)>(); - let txsender1 = txvalidation::TxEventSender::::build(sendtx1); + let txsender1 = mempool::TxEventSender::::build(sendtx1); let peer = P2P::new( name, "127.0.0.1:0".parse().unwrap(), diff --git a/crates/node/src/rpc_server/mod.rs b/crates/node/src/rpc_server/mod.rs index 82c784c3..11f40423 100644 --- a/crates/node/src/rpc_server/mod.rs +++ b/crates/node/src/rpc_server/mod.rs @@ -1,6 +1,6 @@ +use crate::mempool::RpcSender; +use crate::mempool::TxEventSender; use crate::metrics; -use crate::txvalidation::RpcSender; -use crate::txvalidation::TxEventSender; use crate::types::rpc::RpcTransaction; use crate::{ cli::Config, @@ -249,9 +249,8 @@ fn build_tx_tree(hash: &Hash, txs: Vec<(Hash, Option)>) -> Rc ( RpcServer, - UnboundedReceiver<( - Transaction, - Option>>, - )>, + UnboundedReceiver<(Transaction, Option)>, ) { let cfg = Arc::new(Config { acl_whitelist_url: None, @@ -506,7 +501,7 @@ mod tests { let (sendtx, txreceiver) = mpsc::unbounded_channel::<(Transaction, Option)>(); - let txsender = txvalidation::TxEventSender::::build(sendtx); + let txsender = mempool::TxEventSender::::build(sendtx); ( RpcServer::run(cfg, db, txsender) diff --git a/crates/node/src/scheduler/mod.rs b/crates/node/src/scheduler/mod.rs index 3467670c..69637070 100644 --- a/crates/node/src/scheduler/mod.rs +++ b/crates/node/src/scheduler/mod.rs @@ -2,12 +2,12 @@ mod program_manager; mod resource_manager; use crate::cli::Config; +use crate::mempool; +use crate::mempool::CallbackSender; +use crate::mempool::TxEventSender; +use crate::mempool::TxResultSender; use crate::metrics; use crate::storage::Database; -use crate::txvalidation; -use crate::txvalidation::CallbackSender; -use crate::txvalidation::TxEventSender; -use crate::txvalidation::TxResultSender; use crate::types::file::{move_vmfile, Output, TaskVmFile, TxFile, VmOutput}; use crate::types::TaskState; use crate::vmm::qemu::Qemu; @@ -147,7 +147,7 @@ pub async fn start_scheduler( node_key, config.data_directory.clone(), download_url_prefix, - txvalidation::TxEventSender::::build(tx_sender.clone()), + mempool::TxEventSender::::build(tx_sender.clone()), )); let vm_server = VMServer::new(scheduler.clone(), provider, config.data_directory.clone()); diff --git a/crates/node/src/storage/database/postgres.rs b/crates/node/src/storage/database/postgres.rs index 0d233982..51efe5fc 100644 --- a/crates/node/src/storage/database/postgres.rs +++ b/crates/node/src/storage/database/postgres.rs @@ -1,6 +1,4 @@ use super::entity::{self}; -use crate::txvalidation::acl::AclWhiteListError; -use crate::txvalidation::acl::AclWhitelist; use crate::types::file::DbFile; use crate::types::{ self, @@ -8,6 +6,8 @@ use crate::types::{ Hash, Program, }; use eyre::Result; +use gevulot_node::acl::AclWhiteListError; +use gevulot_node::acl::AclWhitelist; use gevulot_node::types::program::ResourceRequest; use libsecp256k1::PublicKey; use sqlx::{postgres::PgPoolOptions, FromRow, Row};