Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Revert "add waiting received Tx for parent one"
Browse files Browse the repository at this point in the history
This reverts commit 5015b61.
  • Loading branch information
musitdev committed Mar 13, 2024
1 parent 7f06f2c commit c38eb0c
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 205 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ async fn run(config: Arc<Config>) -> Result<()> {
http_peer_list.clone(),
database.clone(),
mempool.clone(),
database.clone(),
)
.await?;

Expand Down
94 changes: 4 additions & 90 deletions crates/node/src/txvalidation/event.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
use crate::mempool::Storage;
use crate::txvalidation::acl::AclWhitelist;
use crate::txvalidation::download_manager;
use crate::txvalidation::EventProcessError;
use crate::txvalidation::MAX_CACHED_TX_FOR_VERIFICATION;
use crate::types::Hash;
use crate::types::{
transaction::{Received, Validated},
Transaction,
};
use crate::Mempool;
use futures::future::join_all;
use futures_util::TryFutureExt;
use lru::LruCache;
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
Expand All @@ -25,9 +20,6 @@ pub struct ReceivedTx;
#[derive(Debug, Clone)]
pub struct DownloadTx;

#[derive(Debug, Clone)]
pub struct WaitTx;

#[derive(Debug, Clone)]
pub struct NewTx;

Expand Down Expand Up @@ -59,11 +51,11 @@ impl From<TxEvent<ReceivedTx>> for TxEvent<DownloadTx> {
}
}

impl From<TxEvent<DownloadTx>> for TxEvent<WaitTx> {
impl From<TxEvent<DownloadTx>> for TxEvent<NewTx> {
fn from(event: TxEvent<DownloadTx>) -> Self {
TxEvent {
tx: event.tx,
tx_type: WaitTx,
tx_type: NewTx,
}
}
}
Expand All @@ -84,15 +76,6 @@ impl From<TxEvent<DownloadTx>> for Option<TxEvent<PropagateTx>> {
}
}

impl From<TxEvent<WaitTx>> for TxEvent<NewTx> {
fn from(event: TxEvent<WaitTx>) -> Self {
TxEvent {
tx: event.tx,
tx_type: NewTx,
}
}
}

//Processing of event that arrive: SourceTxType.
impl TxEvent<ReceivedTx> {
pub async fn process_event(
Expand Down Expand Up @@ -133,7 +116,7 @@ impl TxEvent<DownloadTx> {
self,
local_directory_path: &Path,
http_peer_list: Vec<(SocketAddr, Option<u16>)>,
) -> Result<(TxEvent<WaitTx>, Option<TxEvent<PropagateTx>>), EventProcessError> {
) -> Result<(TxEvent<NewTx>, Option<TxEvent<PropagateTx>>), EventProcessError> {
let http_client = reqwest::Client::new();
let asset_file_list = self.tx.get_asset_list().map_err(|err| {
EventProcessError::DownloadAssetError(format!(
Expand All @@ -159,7 +142,7 @@ impl TxEvent<DownloadTx> {
.map_err(|err| {
EventProcessError::DownloadAssetError(format!("Execution error:{err}"))
})?;
let newtx: TxEvent<WaitTx> = self.clone().into();
let newtx: TxEvent<NewTx> = self.clone().into();
let propagate: Option<TxEvent<PropagateTx>> = self.into();
Ok((newtx, propagate))
}
Expand Down Expand Up @@ -191,40 +174,6 @@ impl TxEvent<PropagateTx> {
}
}

impl TxEvent<WaitTx> {
pub async fn process_event(
self,
cache: &mut TXCache,
storage: &impl Storage,
) -> Result<Vec<TxEvent<NewTx>>, EventProcessError> {
// Verify Tx'x parent is already present.
let new_txs = if let Some(parent) = self.tx.payload.get_parent_tx() {
if cache.is_tx_cached(parent)
|| storage
.get(parent)
.await
.map_err(|err| EventProcessError::StorageError(format!("{err}")))?
.is_some()
{
let mut new_txs = cache.remove_children_txs(parent);
new_txs.push(self);
new_txs
} else {
//parent is missing add to waiting list
cache.add_wait_tx(*parent, self);
vec![]
}
} else {
//no parent always new Tx.
vec![self]
};

let ret = new_txs.into_iter().map(|tx| tx.into()).collect();

Ok(ret)
}
}

impl TxEvent<NewTx> {
pub async fn process_event(self, mempool: &mut Mempool) -> Result<(), EventProcessError> {
let tx = Transaction {
Expand All @@ -249,38 +198,3 @@ impl TxEvent<NewTx> {
.await
}
}

pub struct TXCache {
// List of Tx waiting for parent.let waiting_txs =
waiting_tx: HashMap<Hash, Vec<TxEvent<WaitTx>>>,
// Cache of the last saved Tx in the DB. To avoid to query the db for Tx.
cachedtx_for_verification: LruCache<Hash, WaitTx>,
}

impl TXCache {
pub fn new() -> Self {
let cachedtx_for_verification =
LruCache::new(std::num::NonZeroUsize::new(MAX_CACHED_TX_FOR_VERIFICATION).unwrap());
TXCache {
waiting_tx: HashMap::new(),
cachedtx_for_verification,
}
}

pub fn is_tx_cached(&self, hash: &Hash) -> bool {
self.cachedtx_for_verification.contains(hash)
}

pub fn add_cached_tx(&mut self, hash: Hash) {
self.cachedtx_for_verification.put(hash, WaitTx);
}

pub fn add_wait_tx(&mut self, parent: Hash, tx: TxEvent<WaitTx>) {
let waiting_txs = self.waiting_tx.entry(parent).or_insert(vec![]);
waiting_txs.push(tx);
}

pub fn remove_children_txs(&mut self, parent: &Hash) -> Vec<TxEvent<WaitTx>> {
self.waiting_tx.remove(parent).unwrap_or(vec![])
}
}
131 changes: 36 additions & 95 deletions crates/node/src/txvalidation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::mempool::Storage;
use crate::txvalidation::event::TXCache;
use crate::txvalidation::event::{ReceivedTx, TxEvent};
use crate::types::{
transaction::{Created, Received, Validated},
Transaction,
};
use crate::Mempool;
use futures::stream::FuturesUnordered;
use futures_util::Stream;
use futures_util::TryFutureExt;
use std::collections::HashMap;
Expand All @@ -16,21 +13,17 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use thiserror::Error;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt;

pub mod acl;
mod download_manager;
mod event;

const MAX_CACHED_TX_FOR_VERIFICATION: usize = 2;

#[allow(clippy::enum_variant_names)]
#[derive(Error, Debug)]
pub enum EventProcessError {
Expand All @@ -49,8 +42,6 @@ pub enum EventProcessError {
DownloadAssetError(String),
#[error("Save Tx error: {0}")]
SaveTxError(String),
#[error("Storage access error: {0}")]
StorageError(String),
#[error("AclWhite list authenticate error: {0}")]
AclWhiteListAuthError(#[from] acl::AclWhiteListError),
}
Expand Down Expand Up @@ -133,7 +124,6 @@ pub async fn spawn_event_loop(
//New Tx are added to the mempool directly.
//Like for the p2p a stream can be use to decouple both process.
mempool: Arc<RwLock<Mempool>>,
storage: Arc<impl Storage + 'static>,
) -> eyre::Result<(
JoinHandle<()>,
//channel use to send RcvTx event to the processing
Expand All @@ -154,99 +144,50 @@ pub async fn spawn_event_loop(
let p2p_stream = UnboundedReceiverStream::new(p2p_recv);
let jh = tokio::spawn({
let local_directory_path = local_directory_path.clone();
let mut wait_tx_cache = TXCache::new();
let mut validated_txs_futures = FuturesUnordered::new();
let mut validation_okresult_futures = FuturesUnordered::new();
let mut validation_errresult_futures = FuturesUnordered::new();

async move {
loop {
select! {
// Execute Tx verification in a separate task.
Some((tx, callback)) = rcv_tx_event_rx.recv() => {
//create new event with the Tx
let event: TxEvent<ReceivedTx> = tx.into();

//process RcvTx(EventTx<SourceTxType>) event
let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await;

tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string());

//process the receive event
let validate_jh = tokio::spawn({
let p2p_sender = p2p_sender.clone();
let local_directory_path = local_directory_path.clone();
let acl_whitelist = acl_whitelist.clone();
let mempool = mempool.clone();
async move {
event
.process_event(acl_whitelist.as_ref())
.and_then(|download_event| {
download_event.process_event(&local_directory_path, http_peer_list)
})
.and_then(|(wait_tx, propagate_tx)| async move {
if let Some(propagate_tx) = propagate_tx {
propagate_tx.process_event(&p2p_sender).await?;
}
Ok(wait_tx)
})
.await
}
});
let fut = validate_jh
.or_else(|err| async move {Err(EventProcessError::ValidateError(format!("Process execution error:{err}")))} )
.and_then(|res| async move {Ok((res,callback))});
validated_txs_futures.push(fut);
}
// Verify Tx parent and send to mempool all ready Tx.
Some(Ok((wait_tx_res, callback))) = validated_txs_futures.next() => {
match wait_tx_res {
Ok(wait_tx) => {
match wait_tx.process_event(&mut wait_tx_cache, storage.as_ref()).await {
Ok(new_tx_list) => {
//
let jh = tokio::spawn({
let mempool = mempool.clone();
async move {
for new_tx in new_tx_list {
if let Err(err) = new_tx.process_event(&mut *(mempool.write().await)).await {
return Err(err);
}
}
Ok(())
}
});
let fut = jh
.or_else(|err| async move {Err(EventProcessError::ValidateError(format!("Process execution error:{err}")))} )
.and_then(|res| async move {Ok((res,callback))});
validation_okresult_futures.push(fut);
}
Err(err) => {
validation_errresult_futures.push(futures::future::ready((err, callback)));
}
while let Some((tx, callback)) = rcv_tx_event_rx.recv().await {
//create new event with the Tx
let event: TxEvent<ReceivedTx> = tx.into();

//process RcvTx(EventTx<SourceTxType>) event
let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await;

tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string());

//process the receive event
tokio::spawn({
let p2p_sender = p2p_sender.clone();
let local_directory_path = local_directory_path.clone();
let acl_whitelist = acl_whitelist.clone();
let mempool = mempool.clone();
async move {
let res = event
.process_event(acl_whitelist.as_ref())
.and_then(|download_event| {
download_event.process_event(&local_directory_path, http_peer_list)
})
.and_then(|(new_tx, propagate_tx)| async move {
if let Some(propagate_tx) = propagate_tx {
propagate_tx.process_event(&p2p_sender).await?;
}

}
Err(err) => {
validation_errresult_futures.push(futures::future::ready((err, callback)));
}
new_tx.process_event(&mut *(mempool.write().await)).await?;

Ok(())
})
.await;
//log the error if any error is return
if let Err(ref err) = res {
tracing::error!("An error occurs during Tx validation: {err}",);
}
}
Some(Ok((res, callback))) = validation_okresult_futures.next() => {
//send the execution result back if needed.
if let Some(callback) = callback {
//forget the result because if the RPC connection is closed the send can fail.
let _ = callback.send(res);
}
}
Some((res, callback)) = validation_errresult_futures.next() => {
if let Some(callback) = callback {
//forget the result because if the RPC connection is closed the send can fail.
let _ = callback.send(Err(res));
}
}

} // End select!
} // End loop
}
});
}
}
});
Ok((jh, tx, p2p_stream))
Expand Down
9 changes: 0 additions & 9 deletions crates/node/src/types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,6 @@ impl std::fmt::Display for Payload {
}

impl Payload {
//return the parent tx associated to the payloas if any.
pub fn get_parent_tx(&self) -> Option<&Hash> {
match self {
Payload::Proof { parent, .. } => Some(parent),
Payload::ProofKey { parent, .. } => Some(parent),
Payload::Verification { parent, .. } => Some(parent),
_ => None,
}
}
pub fn serialize_into(&self, buf: &mut Vec<u8>) {
match self {
Payload::Empty => {}
Expand Down

0 comments on commit c38eb0c

Please sign in to comment.