Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Move tx_validation in mempool (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
musitdev authored Apr 10, 2024
1 parent e3e5289 commit fb6dbd1
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 230 deletions.
File renamed without changes.
1 change: 1 addition & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod acl;
pub mod rpc_client;
pub mod types;

Expand Down
57 changes: 24 additions & 33 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,15 @@ mod networking;
mod rpc_server;
mod scheduler;
mod storage;
mod txvalidation;
mod vmm;
mod watchdog;
mod workflow;

use mempool::Mempool;
use storage::{database::entity, Database};

use crate::mempool::CallbackSender;
use crate::networking::WhitelistSyncer;
use crate::txvalidation::{CallbackSender, ValidatedTxReceiver};

fn start_logger(default_level: LevelFilter) {
let filter = match EnvFilter::try_from_default_env() {
Expand Down Expand Up @@ -157,7 +156,7 @@ fn generate_key(opts: KeyOptions) -> Result<()> {
}

#[async_trait]
impl txvalidation::ValidateStorage for storage::Database {
impl mempool::ValidateStorage for storage::Database {
async fn get_tx(&self, hash: &Hash) -> Result<Option<Transaction<Validated>>> {
self.find_transaction(hash).await
}
Expand Down Expand Up @@ -190,6 +189,8 @@ impl mempool::Storage for storage::Database {
}
}

impl crate::mempool::MempoolStorage for storage::Database {}

#[async_trait]
impl workflow::TransactionStore for storage::Database {
async fn find_transaction(&self, tx_hash: &Hash) -> Result<Option<Transaction<Validated>>> {
Expand Down Expand Up @@ -237,9 +238,22 @@ async fn run(config: Arc<Config>) -> Result<()> {
let (tx_sender, rcv_tx_event_rx) =
mpsc::unbounded_channel::<(Transaction<Received>, Option<CallbackSender>)>();

//To show to idea. Should use your config definition
let new_validated_tx_receiver: Arc<RwLock<dyn ValidatedTxReceiver>> = if !config.no_execution {
let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?));
//Start Mempool
let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?));
let (txevent_loop_jh, p2p_stream) = mempool::Mempool::start_tx_validation_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
http_peer_list.clone(),
rcv_tx_event_rx,
mempool.clone(),
database.clone(),
database.clone(),
)
.await?;

if !config.no_execution {
//start execution scheduler.
let scheduler_watchdog_sender =
watchdog::start_healthcheck(config.http_healthcheck_listen_addr).await?;
let scheduler = scheduler::start_scheduler(
Expand All @@ -253,30 +267,7 @@ async fn run(config: Arc<Config>) -> Result<()> {

// Run Scheduler in its own task.
tokio::spawn(async move { scheduler.run(scheduler_watchdog_sender).await });
mempool
} else {
struct ArchiveMempool(Arc<Database>);
#[async_trait]
impl ValidatedTxReceiver for ArchiveMempool {
async fn send_new_tx(&mut self, tx: Transaction<Validated>) -> eyre::Result<()> {
self.0.as_ref().add_transaction(&tx).await
}
}
Arc::new(RwLock::new(ArchiveMempool(database.clone())))
};

// Start Tx process event loop.
let (txevent_loop_jh, p2p_stream) = txvalidation::spawn_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
http_peer_list.clone(),
database.clone(),
rcv_tx_event_rx,
new_validated_tx_receiver.clone(),
database.clone(),
)
.await?;
}

let public_node_key = PublicKey::from_secret_key(&node_key);
let node_resources = scheduler::get_configured_resources(&config);
Expand All @@ -289,7 +280,7 @@ async fn run(config: Arc<Config>) -> Result<()> {
Some(config.http_download_port),
config.p2p_advertised_listen_addr,
http_peer_list,
txvalidation::TxEventSender::<txvalidation::P2pSender>::build(tx_sender.clone()),
mempool::TxEventSender::<mempool::P2pSender>::build(tx_sender.clone()),
p2p_stream,
node_resources,
)
Expand Down Expand Up @@ -329,7 +320,7 @@ async fn run(config: Arc<Config>) -> Result<()> {
let rpc_server = rpc_server::RpcServer::run(
config.clone(),
database.clone(),
txvalidation::TxEventSender::<txvalidation::RpcSender>::build(tx_sender),
mempool::TxEventSender::<mempool::RpcSender>::build(tx_sender),
)
.await?;

Expand Down Expand Up @@ -365,7 +356,7 @@ async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> {
None,
config.p2p_advertised_listen_addr,
http_peer_list,
txvalidation::TxEventSender::<txvalidation::P2pSender>::build(tx),
mempool::TxEventSender::<mempool::P2pSender>::build(tx),
p2p_stream,
(0, 0, 0), // P2P beacon node's resources aren't really important.
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use super::ValidatedTxReceiver;
use crate::txvalidation::acl::AclWhitelist;
use crate::txvalidation::download_manager;
use crate::txvalidation::EventProcessError;
use crate::txvalidation::ValidateStorage;
use crate::mempool::txvalidation::download_manager;
use crate::mempool::ValidateStorage;
use crate::types::Hash;
use crate::types::{
transaction::{Received, Validated},
Program, Transaction,
};
use futures::future::join_all;
use futures_util::TryFutureExt;
use gevulot_node::acl;
use lru::LruCache;
use std::collections::HashMap;
use std::fmt::Debug;
Expand All @@ -19,12 +18,30 @@ use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;
use std::time::Instant;
use thiserror::Error;
use tokio::sync::mpsc::UnboundedSender;

const MAX_CACHED_TX_FOR_VERIFICATION: usize = 50;
const MAX_WAITING_TX_FOR_VERIFICATION: usize = 100;
const MAX_WAITING_TIME_IN_MS: u64 = 3600 * 1000; // One hour.

#[allow(clippy::enum_variant_names)]
#[derive(Error, Debug)]
pub enum EventProcessError {
#[error("Fail to send the Tx on the channel: {0}")]
PropagateTxError(#[from] Box<tokio::sync::mpsc::error::SendError<Transaction<Validated>>>),
#[error("validation fail: {0}")]
ValidateError(String),
#[error("Tx asset fail to download because {0}")]
DownloadAssetError(String),
#[error("Save Tx error: {0}")]
SaveTxError(String),
#[error("Storage access error: {0}")]
StorageError(String),
#[error("AclWhite list authenticate error: {0}")]
AclWhiteListAuthError(#[from] acl::AclWhiteListError),
}

//event type.
#[derive(Debug, Clone)]
pub struct ReceivedTx;
Expand Down Expand Up @@ -102,9 +119,9 @@ impl From<TxEvent<WaitTx>> for TxEvent<NewTx> {

//Processing of event that arrive: SourceTxType.
impl TxEvent<ReceivedTx> {
pub async fn process_event(
pub async fn verify_tx(
self,
acl_whitelist: &impl AclWhitelist,
acl_whitelist: &impl acl::AclWhitelist,
) -> Result<TxEvent<DownloadTx>, EventProcessError> {
match self.validate_tx(acl_whitelist).await {
Ok(()) => Ok(self.into()),
Expand All @@ -117,7 +134,7 @@ impl TxEvent<ReceivedTx> {
//Tx validation process.
async fn validate_tx(
&self,
acl_whitelist: &impl AclWhitelist,
acl_whitelist: &impl acl::AclWhitelist,
) -> Result<(), EventProcessError> {
self.tx.validate().map_err(|err| {
EventProcessError::ValidateError(format!("Error during transaction validation:{err}",))
Expand All @@ -136,7 +153,7 @@ impl TxEvent<ReceivedTx> {

//Download Tx processing
impl TxEvent<DownloadTx> {
pub async fn process_event(
pub async fn downlod_tx_assets(
self,
local_directory_path: &Path,
http_peer_list: Vec<(SocketAddr, Option<u16>)>,
Expand Down Expand Up @@ -174,7 +191,7 @@ impl TxEvent<DownloadTx> {

// Propagate Tx processing.
impl TxEvent<PropagateTx> {
pub async fn process_event(
pub async fn propagate_tx(
self,
p2p_sender: &UnboundedSender<Transaction<Validated>>,
) -> Result<(), EventProcessError> {
Expand Down Expand Up @@ -204,7 +221,7 @@ impl TxEvent<PropagateTx> {
// Manage Run Tx and put to wait depending on progam.
// Manage Proof and Verify Tx to wait depending on Run Tx.
impl TxEvent<WaitTx> {
pub async fn process_event(
pub async fn validate_tx_dep(
self,
programid_cache: &mut TxCache<Program>,
parent_cache: &mut TxCache<Transaction<Validated>>,
Expand Down Expand Up @@ -366,7 +383,7 @@ impl TxEvent<WaitTx> {
}

impl TxEvent<NewTx> {
pub async fn process_event(
pub async fn save_tx(
self,
newtx_receiver: &mut dyn ValidatedTxReceiver,
) -> Result<(), EventProcessError> {
Expand Down Expand Up @@ -485,7 +502,7 @@ impl<Kind> Default for TxCache<Kind> {
mod tests {

use super::*;
use crate::txvalidation::Created;
use crate::mempool::Created;
use crate::types::transaction::Payload;
use crate::types::transaction::ProgramMetadata;
use crate::types::transaction::{Workflow, WorkflowStep};
Expand Down Expand Up @@ -623,7 +640,7 @@ mod tests {

// Valide Run Tx. Put in Wait cache
let res = run1_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Run Tx is waiting
Expand All @@ -634,7 +651,7 @@ mod tests {

// Deploy1 Tx, only return the deploy Tx. Wait for deploy2 program.
let res = deploy1_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_progam_cache.waiting_tx.len(), 1);
Expand All @@ -644,7 +661,7 @@ mod tests {

// Deploy2 Tx, return 2 tx, deploy2 + Run.
let res = deploy2_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
//remove from cache
Expand Down Expand Up @@ -762,15 +779,15 @@ mod tests {
// New Tx that will wait.
let tx_event = new_proof_tx_event(parent1_hash);
let res = tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_tx_cache.waiting_tx.len(), 1);

// New Tx that will wait.
let tx_event = new_proof_tx_event(parent2_hash);
let res = tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_tx_cache.waiting_tx.len(), 2);
Expand All @@ -781,15 +798,15 @@ mod tests {
let tx_event = new_proof_tx_event(parent1_hash);
let tx_hash = tx_event.tx.hash;
let res = tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Evicted but new tx added => len=1.
assert_eq!(wait_tx_cache.waiting_tx.len(), 1);

//Process the parent Tx. Do child Tx return because they have been evicted.
let res = parent1_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
let ret_txs = res.unwrap();
Expand All @@ -814,7 +831,7 @@ mod tests {
let tx1_hash = tx_event1.tx.hash;
let tx1 = tx_event1.tx.clone();
let res = tx_event1
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Save the Tx in db to test cache miss.
Expand All @@ -828,7 +845,7 @@ mod tests {
let tx2_hash = tx2_event.tx.hash;
let tx2 = tx2_event.tx.clone();
let res = tx2_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Save the Tx in db to test cache miss.
Expand All @@ -844,15 +861,15 @@ mod tests {
let tx_event3 = new_proof_tx_event(parent_tx_event.tx.hash);
let tx3_hash = tx_event3.tx.hash;
let res = tx_event3
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_tx_cache.waiting_tx.len(), 1);
assert!(!wait_tx_cache.is_tx_cached(&tx3_hash));

// Test process parent Tx: Tx3 removed from waiting, Tx3 and parent added to cached. Return Tx3 and parent.
let res = parent_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_tx_cache.waiting_tx.len(), 0);
Expand All @@ -868,7 +885,7 @@ mod tests {
let tx4_event = new_proof_tx_event(tx1_hash); // tx1_hash not in the cache but in the DB
let tx4_hash = tx4_event.tx.hash;
let res = tx4_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Not cached because parent in db
Expand Down
Loading

0 comments on commit fb6dbd1

Please sign in to comment.