From 7d9dfa038dc9f8b14c7f0c55ecc44a5a83360098 Mon Sep 17 00:00:00 2001 From: Slava Savenko Date: Mon, 24 Feb 2025 16:22:10 +0100 Subject: [PATCH 1/3] Extract process_tx functionality from the Client and run it on 4 threads - put a mutex on the transaction pool - replaced the chain operations to operate on the "chain_for_view_client" - run the TxProcessor on 4 threads - and connect the JsonRpc to it for the TxRequest processing. --- benchmarks/transactions-generator/justfile | 3 +- benchmarks/transactions-generator/src/lib.rs | 31 +- chain/client/src/chunk_producer.rs | 18 +- chain/client/src/client.rs | 19 +- chain/client/src/client_actor.rs | 38 +- chain/client/src/lib.rs | 2 + chain/client/src/tx_request_handler.rs | 426 +++++++++++++++++++ chain/jsonrpc/jsonrpc-tests/src/lib.rs | 1 + chain/jsonrpc/src/lib.rs | 17 +- core/chain-configs/src/client_config.rs | 2 + integration-tests/src/env/setup.rs | 40 +- nearcore/src/config.rs | 3 + nearcore/src/lib.rs | 65 +-- 13 files changed, 596 insertions(+), 69 deletions(-) create mode 100644 chain/client/src/tx_request_handler.rs diff --git a/benchmarks/transactions-generator/justfile b/benchmarks/transactions-generator/justfile index 1c8c42d015c..ea1c915e5a2 100644 --- a/benchmarks/transactions-generator/justfile +++ b/benchmarks/transactions-generator/justfile @@ -39,7 +39,8 @@ unlimit: jq '.gas_limit=20000000000000000' {{near_genesis_file}} > tmp_genesis.json && mv tmp_genesis.json {{near_genesis_file}} jq '.view_client_threads=8 \ | .store.load_mem_tries_for_tracked_shards=true \ - | .produce_chunk_add_transactions_time_limit={"secs": 0, "nanos": 800000000}' {{near_config_file}} > tmp_config.json \ + | .produce_chunk_add_transactions_time_limit={"secs": 0, "nanos": 800000000} \ + | .transaction_request_handler_threads = 4' {{near_config_file}} > tmp_config.json \ && mv tmp_config.json {{near_config_file}} do-it tps: diff --git a/benchmarks/transactions-generator/src/lib.rs b/benchmarks/transactions-generator/src/lib.rs index 02013619523..1edc8cd0359 100644 --- a/benchmarks/transactions-generator/src/lib.rs +++ b/benchmarks/transactions-generator/src/lib.rs @@ -55,6 +55,27 @@ struct RunnerState { stats: Arc>, } +#[derive(Debug, Clone)] +struct WelfordMean { + mean: i64, + count: i64, +} + +impl WelfordMean { + fn new() -> Self { + Self { mean: 0, count: 0 } + } + + fn add_measurement(&mut self, x: i64) { + self.count += 1; + self.mean += (x - self.mean) / self.count; + } + + fn mean(&self) -> i64 { + self.mean + } +} + #[derive(Debug, Clone)] struct Stats { pool_accepted: u64, @@ -255,6 +276,7 @@ impl TxGenerator { let mut report_interval = tokio::time::interval(Duration::from_secs(1)); tokio::spawn(async move { let mut stats_prev = runner_state.stats.lock().unwrap().clone(); + let mut mean_diff = WelfordMean::new(); loop { report_interval.tick().await; let stats = { @@ -266,10 +288,13 @@ impl TxGenerator { stats.failed = failed; stats.clone() }; + tracing::info!(target: "transaction-generator", total=format!("{stats:?}"),); + let diff = stats.clone() - stats_prev; + mean_diff.add_measurement(diff.processed as i64); tracing::info!(target: "transaction-generator", - total=format!("{stats:?}"),); - tracing::info!(target: "transaction-generator", - diff=format!("{:?}", stats.clone() - stats_prev),); + diff=format!("{:?}", diff), + rate_processed=mean_diff.mean(), + ); stats_prev = stats.clone(); } }) diff --git a/chain/client/src/chunk_producer.rs b/chain/client/src/chunk_producer.rs index 445cb7c30e3..69c274dc08e 100644 --- a/chain/client/src/chunk_producer.rs +++ b/chain/client/src/chunk_producer.rs @@ -28,7 +28,7 @@ use near_store::ShardUId; use near_store::adapter::chain_store::ChainStoreAdapter; use reed_solomon_erasure::galois_8::ReedSolomon; use std::num::NonZeroUsize; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use time::ext::InstantExt as _; use tracing::{debug, instrument}; @@ -70,7 +70,7 @@ pub struct ChunkProducer { chain: ChainStoreAdapter, epoch_manager: Arc, runtime_adapter: Arc, - pub sharded_tx_pool: ShardedTransactionPool, + pub sharded_tx_pool: Arc>, /// A ReedSolomon instance to encode shard chunks. reed_solomon_encoder: ReedSolomon, /// Chunk production timing information. Used only for debug purposes. @@ -102,7 +102,10 @@ impl ChunkProducer { chain: chain_store.clone(), epoch_manager, runtime_adapter, - sharded_tx_pool: ShardedTransactionPool::new(rng_seed, transaction_pool_size_limit), + sharded_tx_pool: Arc::new(Mutex::new(ShardedTransactionPool::new( + rng_seed, + transaction_pool_size_limit, + ))), reed_solomon_encoder: ReedSolomon::new(data_parts, parity_parts).unwrap(), chunk_production_info: lru::LruCache::new( NonZeroUsize::new(PRODUCTION_TIMES_CACHE_SIZE).unwrap(), @@ -370,8 +373,8 @@ impl ChunkProducer { chain_validate: &dyn Fn(&SignedTransaction) -> bool, ) -> Result { let shard_id = shard_uid.shard_id(); - let prepared_transactions = if let Some(mut iter) = - self.sharded_tx_pool.get_pool_iterator(shard_uid) + let mut pool_guard = self.sharded_tx_pool.lock().unwrap(); + let prepared_transactions = if let Some(mut iter) = pool_guard.get_pool_iterator(shard_uid) { let storage_config = RuntimeStorageConfig { state_root: *chunk_extra.state_root(), @@ -408,9 +411,8 @@ impl ChunkProducer { }; // Reintroduce valid transactions back to the pool. They will be removed when the chunk is // included into the block. - let reintroduced_count = self - .sharded_tx_pool - .reintroduce_transactions(shard_uid, &prepared_transactions.transactions); + let reintroduced_count = + pool_guard.reintroduce_transactions(shard_uid, &prepared_transactions.transactions); if reintroduced_count < prepared_transactions.transactions.len() { debug!(target: "client", reintroduced_count, num_tx = prepared_transactions.transactions.len(), "Reintroduced transactions"); } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index b390d9ed124..2de548c1eff 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -408,9 +408,8 @@ impl Client { // By now the chunk must be in store, otherwise the block would have been orphaned let chunk = self.chain.get_chunk(&chunk_header.chunk_hash()).unwrap(); let transactions = chunk.transactions(); - self.chunk_producer - .sharded_tx_pool - .remove_transactions(shard_uid, transactions); + let mut pool_guard = self.chunk_producer.sharded_tx_pool.lock().unwrap(); + pool_guard.remove_transactions(shard_uid, transactions); } } } @@ -442,10 +441,10 @@ impl Client { ) { // By now the chunk must be in store, otherwise the block would have been orphaned let chunk = self.chain.get_chunk(&chunk_header.chunk_hash()).unwrap(); - let reintroduced_count = self - .chunk_producer - .sharded_tx_pool - .reintroduce_transactions(shard_uid, &chunk.transactions()); + let reintroduced_count = { + let mut pool_guard = self.chunk_producer.sharded_tx_pool.lock().unwrap(); + pool_guard.reintroduce_transactions(shard_uid, &chunk.transactions()) + }; if reintroduced_count < chunk.transactions().len() { debug!(target: "client", reintroduced_count, @@ -1530,9 +1529,9 @@ impl Client { match (old_shard_layout, new_shard_layout) { (Ok(old_shard_layout), Ok(new_shard_layout)) => { if old_shard_layout != new_shard_layout { - self.chunk_producer - .sharded_tx_pool - .reshard(&old_shard_layout, &new_shard_layout); + let mut guarded_pool = + self.chunk_producer.sharded_tx_pool.lock().unwrap(); + guarded_pool.reshard(&old_shard_layout, &new_shard_layout); } } (old_shard_layout, new_shard_layout) => { diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index b79b4620c3f..7768801a0f5 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -44,7 +44,7 @@ use near_chain::{ use near_chain_configs::{ClientConfig, MutableValidatorSigner, ReshardingHandle}; use near_chain_primitives::error::EpochErrorResultToChainError; use near_chunks::adapter::ShardsManagerRequestFromClient; -use near_chunks::client::ShardsManagerResponse; +use near_chunks::client::{ShardedTransactionPool, ShardsManagerResponse}; use near_client_primitives::types::{ Error, GetClientConfig, GetClientConfigError, GetNetworkInfo, NetworkInfoResponse, StateSyncStatus, Status, StatusError, StatusSyncInfo, SyncStatus, @@ -79,7 +79,7 @@ use near_telemetry::TelemetryEvent; use rand::seq::SliceRandom; use rand::{Rng, thread_rng}; use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use tracing::{debug, debug_span, error, info, trace, warn}; @@ -119,6 +119,7 @@ pub struct StartClientResult { pub client_actor: actix::Addr, pub client_arbiter_handle: actix::ArbiterHandle, pub resharding_handle: ReshardingHandle, + pub tx_pool: Arc>, } /// Starts client in a separate Arbiter (thread). @@ -179,19 +180,21 @@ pub fn start_client( let sync_jobs_actor = SyncJobsActor::new(client_sender_for_sync_jobs.as_multi_sender()); let sync_jobs_actor_addr = sync_jobs_actor.spawn_actix_actor(); + let client_actor_inner = ClientActorInner::new( + clock, + client, + node_id, + network_adapter, + telemetry_sender, + sender, + adv, + config_updater, + sync_jobs_actor_addr.with_auto_span_context().into_multi_sender(), + ) + .unwrap(); + let tx_pool = client_actor_inner.client.chunk_producer.sharded_tx_pool.clone(); + let client_addr = ClientActor::start_in_arbiter(&client_arbiter_handle, move |_| { - let client_actor_inner = ClientActorInner::new( - clock, - client, - node_id, - network_adapter, - telemetry_sender, - sender, - adv, - config_updater, - sync_jobs_actor_addr.with_auto_span_context().into_multi_sender(), - ) - .unwrap(); ActixWrapper::new(client_actor_inner) }); @@ -201,7 +204,12 @@ pub fn start_client( chain_sender_for_state_sync .bind(client_addr.clone().with_auto_span_context().into_multi_sender()); - StartClientResult { client_actor: client_addr, client_arbiter_handle, resharding_handle } + StartClientResult { + client_actor: client_addr, + client_arbiter_handle, + resharding_handle, + tx_pool, + } } #[derive(Clone, MultiSend, MultiSenderFrom)] diff --git a/chain/client/src/lib.rs b/chain/client/src/lib.rs index 5f5e11940df..cf6d4602735 100644 --- a/chain/client/src/lib.rs +++ b/chain/client/src/lib.rs @@ -14,6 +14,7 @@ pub use crate::client_actor::NetworkAdversarialMessage; pub use crate::client_actor::{ClientActor, StartClientResult, start_client}; pub use crate::config_updater::ConfigUpdater; pub use crate::stateless_validation::chunk_validator::orphan_witness_handling::HandleOrphanWitnessOutcome; +pub use crate::tx_request_handler::{TxRequestHandlerActor, spawn_tx_request_handler_actor}; pub use crate::view_client_actor::{ViewClientActor, ViewClientActorInner}; pub use chunk_producer::ProduceChunkResult; pub use near_chain::stateless_validation::processing_tracker::{ @@ -43,4 +44,5 @@ mod stateless_validation; pub mod sync; pub mod sync_jobs_actor; pub mod test_utils; +mod tx_request_handler; mod view_client_actor; diff --git a/chain/client/src/tx_request_handler.rs b/chain/client/src/tx_request_handler.rs new file mode 100644 index 00000000000..4e43cee0555 --- /dev/null +++ b/chain/client/src/tx_request_handler.rs @@ -0,0 +1,426 @@ +use near_async::actix_wrapper::SyncActixWrapper; +use near_async::messaging::CanSend; +use near_async::messaging::Handler; +use near_async::time::Clock; +use near_chain::Chain; +use near_chain::ChainGenesis; +use near_chain::types::RuntimeAdapter; +use near_chain::types::Tip; +use near_chain_configs::ClientConfig; +use near_chain_configs::MutableValidatorSigner; +use near_chunks::client::ShardedTransactionPool; +use near_epoch_manager::EpochManagerAdapter; +use near_epoch_manager::shard_assignment::account_id_to_shard_id; +use near_epoch_manager::shard_assignment::shard_id_to_uid; +use near_epoch_manager::shard_tracker::ShardTracker; +use near_network::client::ProcessTxRequest; +use near_network::client::ProcessTxResponse; +use near_network::types::NetworkRequests; +use near_network::types::PeerManagerAdapter; +use near_network::types::PeerManagerMessageRequest; +use near_pool::InsertTransactionResult; +use near_primitives::stateless_validation::ChunkProductionKey; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::EpochId; +use near_primitives::types::ShardId; +use near_primitives::unwrap_or_return; +use near_primitives::validator_signer::ValidatorSigner; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::Mutex; + +use crate::metrics; + +pub type TxRequestHandlerActor = SyncActixWrapper; + +impl Handler for TxRequestHandler { + fn handle(&mut self, msg: ProcessTxRequest) -> ProcessTxResponse { + let ProcessTxRequest { transaction, is_forwarded, check_only } = msg; + self.process_tx(transaction, is_forwarded, check_only) + } +} + +pub fn spawn_tx_request_handler_actor( + clock: Clock, + config: ClientConfig, + tx_pool: Arc>, + epoch_manager: Arc, + shard_tracker: ShardTracker, + validator_signer: MutableValidatorSigner, + runtime: Arc, + chain_genesis: ChainGenesis, + network_adapter: PeerManagerAdapter, +) -> actix::Addr { + actix::SyncArbiter::start(config.transaction_request_handler_threads, move || { + let view_client_actor = TxRequestHandler::new( + clock.clone(), + config.clone(), + tx_pool.clone(), + epoch_manager.clone(), + shard_tracker.clone(), + validator_signer.clone(), + runtime.clone(), + chain_genesis.clone(), + network_adapter.clone(), + ) + .unwrap(); + SyncActixWrapper::new(view_client_actor) + }) +} + +#[derive(Clone)] +struct TxRequestHandlerConfig { + tx_routing_height_horizon: u64, + epoch_length: u64, +} + +/// Accepts `process_tx` requests. Pushes the incoming transactions to the pool. +pub struct TxRequestHandler { + config: TxRequestHandlerConfig, + tx_pool: Arc>, + chain: Chain, + epoch_manager: Arc, + shard_tracker: ShardTracker, + validator_signer: MutableValidatorSigner, + runtime: Arc, + network_adapter: PeerManagerAdapter, +} + +impl TxRequestHandler { + pub fn new( + clock: Clock, + config: ClientConfig, + tx_pool: Arc>, + epoch_manager: Arc, + shard_tracker: ShardTracker, + validator_signer: MutableValidatorSigner, + runtime: Arc, + chain_genesis: ChainGenesis, + network_adapter: PeerManagerAdapter, + ) -> Result { + let my_config = TxRequestHandlerConfig { + tx_routing_height_horizon: config.tx_routing_height_horizon, + epoch_length: config.epoch_length, + }; + + let chain = Chain::new_for_view_client( + clock.clone(), + epoch_manager.clone(), + shard_tracker.clone(), + runtime.clone(), + &chain_genesis, + near_chain::DoomslugThresholdMode::TwoThirds, + config.save_trie_changes, + )?; + + Ok(Self { + config: my_config, + tx_pool, + validator_signer, + chain, + epoch_manager, + runtime, + shard_tracker, + network_adapter, + }) + } + + /// Submits the transaction for future inclusion into the chain. + /// + /// If accepted, it will be added to the transaction pool and possibly forwarded to another validator. + #[must_use] + pub fn process_tx( + &mut self, + tx: SignedTransaction, + is_forwarded: bool, + check_only: bool, + ) -> ProcessTxResponse { + let signer = self.validator_signer.get(); + unwrap_or_return!(self.process_tx_internal(&tx, is_forwarded, check_only, &signer), { + let me = signer.as_ref().map(|signer| signer.validator_id()); + tracing::warn!(target: "client", ?me, ?tx, "Dropping tx"); + ProcessTxResponse::NoResponse + }) + } + + /// Process transaction and either add it to the mempool or return to redirect to another validator. + fn process_tx_internal( + &mut self, + tx: &SignedTransaction, + is_forwarded: bool, + check_only: bool, + signer: &Option>, + ) -> Result { + let head = self.chain.head()?; + let me = signer.as_ref().map(|vs| vs.validator_id()); + let cur_block = self.chain.get_head_block()?; + let cur_block_header = cur_block.header(); + // here it is fine to use `cur_block_header` as it is a best effort estimate. If the transaction + // were to be included, the block that the chunk points to will have height >= height of + // `cur_block_header`. + if let Err(e) = self + .chain + .chain_store() + .check_transaction_validity_period(&cur_block_header, tx.transaction.block_hash()) + { + tracing::debug!(target: "client", ?tx, "Invalid tx: expired or from a different fork"); + return Ok(ProcessTxResponse::InvalidTx(e)); + } + let gas_price = cur_block_header.next_gas_price(); + let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&head.last_block_hash)?; + let shard_layout = self.runtime.get_shard_layout(&epoch_id)?; + let receiver_shard = account_id_to_shard_id( + self.epoch_manager.as_ref(), + tx.transaction.receiver_id(), + &epoch_id, + )?; + let receiver_congestion_info = + cur_block.block_congestion_info().get(&receiver_shard).copied(); + let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?; + + if let Err(err) = self.runtime.validate_tx( + gas_price, + None, + &shard_layout, + tx, + true, + protocol_version, + receiver_congestion_info, + ) { + tracing::debug!(target: "client", tx_hash = ?tx.get_hash(), ?err, "Invalid tx during basic validation"); + return Ok(ProcessTxResponse::InvalidTx(err)); + } + + let shard_id = account_id_to_shard_id( + self.epoch_manager.as_ref(), + tx.transaction.signer_id(), + &epoch_id, + )?; + let care_about_shard = + self.shard_tracker.care_about_shard(me, &head.last_block_hash, shard_id, true); + let will_care_about_shard = + self.shard_tracker.will_care_about_shard(me, &head.last_block_hash, shard_id, true); + if care_about_shard || will_care_about_shard { + let shard_uid = shard_id_to_uid(self.epoch_manager.as_ref(), shard_id, &epoch_id)?; + let state_root = match self.chain.get_chunk_extra(&head.last_block_hash, &shard_uid) { + Ok(chunk_extra) => *chunk_extra.state_root(), + Err(_) => { + // Not being able to fetch a state root most likely implies that we haven't + // caught up with the next epoch yet. + if is_forwarded { + return Err(near_client_primitives::types::Error::Other( + "Node has not caught up yet".to_string(), + )); + } else { + self.forward_tx(&epoch_id, tx, signer)?; + return Ok(ProcessTxResponse::RequestRouted); + } + } + }; + if let Err(err) = self.runtime.validate_tx( + gas_price, + Some(state_root), + &shard_layout, + tx, + false, + protocol_version, + receiver_congestion_info, + ) { + tracing::debug!(target: "client", ?err, "Invalid tx"); + Ok(ProcessTxResponse::InvalidTx(err)) + } else if check_only { + Ok(ProcessTxResponse::ValidTx) + } else { + // Transactions only need to be recorded if the node is a validator. + if me.is_some() { + let mut pool_guarded = self.tx_pool.lock().unwrap(); + match pool_guarded.insert_transaction(shard_uid, tx.clone()) { + InsertTransactionResult::Success => { + tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Recorded a transaction."); + } + InsertTransactionResult::Duplicate => { + tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Duplicate transaction, not forwarding it."); + return Ok(ProcessTxResponse::ValidTx); + } + InsertTransactionResult::NoSpaceLeft => { + if is_forwarded { + tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Transaction pool is full, dropping the transaction."); + } else { + tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Transaction pool is full, trying to forward the transaction."); + } + } + } + } + + // Active validator: + // possibly forward to next epoch validators + // Not active validator: + // forward to current epoch validators, + // possibly forward to next epoch validators + if self.active_validator(shard_id, signer)? { + tracing::trace!(target: "client", account = ?me, ?shard_id, tx_hash = ?tx.get_hash(), is_forwarded, "Recording a transaction."); + metrics::TRANSACTION_RECEIVED_VALIDATOR.inc(); + + if !is_forwarded { + self.possibly_forward_tx_to_next_epoch(tx, signer)?; + } + Ok(ProcessTxResponse::ValidTx) + } else if !is_forwarded { + tracing::trace!(target: "client", ?shard_id, tx_hash = ?tx.get_hash(), "Forwarding a transaction."); + metrics::TRANSACTION_RECEIVED_NON_VALIDATOR.inc(); + self.forward_tx(&epoch_id, tx, signer)?; + Ok(ProcessTxResponse::RequestRouted) + } else { + tracing::trace!(target: "client", ?shard_id, tx_hash = ?tx.get_hash(), "Non-validator received a forwarded transaction, dropping it."); + metrics::TRANSACTION_RECEIVED_NON_VALIDATOR_FORWARDED.inc(); + Ok(ProcessTxResponse::NoResponse) + } + } + } else if check_only { + Ok(ProcessTxResponse::DoesNotTrackShard) + } else if is_forwarded { + // Received forwarded transaction but we are not tracking the shard + tracing::debug!(target: "client", ?me, ?shard_id, tx_hash = ?tx.get_hash(), "Received forwarded transaction but no tracking shard"); + Ok(ProcessTxResponse::NoResponse) + } else { + // We are not tracking this shard, so there is no way to validate this tx. Just rerouting. + self.forward_tx(&epoch_id, tx, signer)?; + Ok(ProcessTxResponse::RequestRouted) + } + } + + /// Forwards given transaction to upcoming validators. + fn forward_tx( + &self, + epoch_id: &EpochId, + tx: &SignedTransaction, + signer: &Option>, + ) -> Result<(), near_client_primitives::types::Error> { + let shard_id = account_id_to_shard_id( + self.epoch_manager.as_ref(), + tx.transaction.signer_id(), + epoch_id, + )?; + // Use the header head to make sure the list of validators is as + // up-to-date as possible. + let head = self.chain.header_head()?; + let maybe_next_epoch_id = self.get_next_epoch_id_if_at_boundary(&head)?; + + let mut validators = HashSet::new(); + for horizon in (2..=self.config.tx_routing_height_horizon) + .chain(vec![self.config.tx_routing_height_horizon * 2].into_iter()) + { + let target_height = head.height + horizon - 1; + let validator = self + .epoch_manager + .get_chunk_producer_info(&ChunkProductionKey { + epoch_id: *epoch_id, + height_created: target_height, + shard_id, + })? + .take_account_id(); + validators.insert(validator); + if let Some(next_epoch_id) = &maybe_next_epoch_id { + let next_shard_id = account_id_to_shard_id( + self.epoch_manager.as_ref(), + tx.transaction.signer_id(), + next_epoch_id, + )?; + let validator = self + .epoch_manager + .get_chunk_producer_info(&ChunkProductionKey { + epoch_id: *next_epoch_id, + height_created: target_height, + shard_id: next_shard_id, + })? + .take_account_id(); + validators.insert(validator); + } + } + + if let Some(account_id) = signer.as_ref().map(|bp| bp.validator_id()) { + validators.remove(account_id); + } + for validator in validators { + let tx_hash = tx.get_hash(); + tracing::trace!(target: "client", me = ?signer.as_ref().map(|bp| bp.validator_id()), ?tx_hash, ?validator, ?shard_id, "Routing a transaction"); + + // Send message to network to actually forward transaction. + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ForwardTx(validator, tx.clone()), + )); + } + + Ok(()) + } + + /// Determine if I am a validator in next few blocks for specified shard, assuming epoch doesn't change. + fn active_validator( + &self, + shard_id: ShardId, + signer: &Option>, + ) -> Result { + let head = self.chain.head()?; + let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&head.last_block_hash)?; + + let account_id = if let Some(vs) = signer.as_ref() { + vs.validator_id() + } else { + return Ok(false); + }; + + for i in 1..=self.config.tx_routing_height_horizon { + let chunk_producer = self + .epoch_manager + .get_chunk_producer_info(&ChunkProductionKey { + epoch_id, + height_created: head.height + i, + shard_id, + })? + .take_account_id(); + if &chunk_producer == account_id { + return Ok(true); + } + } + Ok(false) + } + + /// If we're a validator in one of the next few chunks, but epoch switch could happen soon, + /// we forward to a validator from next epoch. + fn possibly_forward_tx_to_next_epoch( + &mut self, + tx: &SignedTransaction, + signer: &Option>, + ) -> Result<(), near_client_primitives::types::Error> { + let head = self.chain.head()?; + if let Some(next_epoch_id) = self.get_next_epoch_id_if_at_boundary(&head)? { + self.forward_tx(&next_epoch_id, tx, signer)?; + } else { + self.forward_tx(&head.epoch_id, tx, signer)?; + } + Ok(()) + } + + /// If we are close to epoch boundary, return next epoch id, otherwise return None. + fn get_next_epoch_id_if_at_boundary( + &self, + head: &Tip, + ) -> Result, near_client_primitives::types::Error> { + let next_epoch_started = + self.epoch_manager.is_next_block_epoch_start(&head.last_block_hash)?; + if next_epoch_started { + return Ok(None); + } + let next_epoch_estimated_height = + self.epoch_manager.get_epoch_start_height(&head.last_block_hash)? + + self.config.epoch_length; + + let epoch_boundary_possible = + head.height + self.config.tx_routing_height_horizon >= next_epoch_estimated_height; + if epoch_boundary_possible { + Ok(Some(self.epoch_manager.get_next_epoch_id_from_prev_block(&head.last_block_hash)?)) + } else { + Ok(None) + } + } +} diff --git a/chain/jsonrpc/jsonrpc-tests/src/lib.rs b/chain/jsonrpc/jsonrpc-tests/src/lib.rs index ba39359a0f2..0e630e7484c 100644 --- a/chain/jsonrpc/jsonrpc-tests/src/lib.rs +++ b/chain/jsonrpc/jsonrpc-tests/src/lib.rs @@ -58,6 +58,7 @@ pub fn start_all_with_validity_period( TEST_GENESIS_CONFIG.clone(), actor_handles.client_actor.clone().with_auto_span_context().into_multi_sender(), actor_handles.view_client_actor.clone().with_auto_span_context().into_multi_sender(), + actor_handles.tx_processor_actor.clone().with_auto_span_context().into_multi_sender(), noop().into_multi_sender(), #[cfg(feature = "test_features")] noop().into_multi_sender(), diff --git a/chain/jsonrpc/src/lib.rs b/chain/jsonrpc/src/lib.rs index dbb2e4cc71f..21ab5527f18 100644 --- a/chain/jsonrpc/src/lib.rs +++ b/chain/jsonrpc/src/lib.rs @@ -222,14 +222,20 @@ fn process_query_response( } } +#[derive(Clone, near_async::MultiSend, near_async::MultiSenderFrom)] +pub struct ProcessTxSenderForRpc( + AsyncSender>, + Sender, +); + #[derive(Clone, near_async::MultiSend, near_async::MultiSenderFrom)] pub struct ClientSenderForRpc( AsyncSender>, AsyncSender>, AsyncSender>, - AsyncSender>, + // AsyncSender>, AsyncSender>, - Sender, + // Sender, #[cfg(feature = "test_features")] Sender, #[cfg(feature = "test_features")] AsyncSender< @@ -279,6 +285,7 @@ pub struct PeerManagerSenderForRpc(AsyncSender CryptoHash { let tx = request_data.signed_transaction; let hash = tx.get_hash(); - self.client_sender.send(ProcessTxRequest { + self.process_tx_sender.send(ProcessTxRequest { transaction: tx, is_forwarded: false, check_only: false, // if we set true here it will not actually send the transaction @@ -665,7 +672,7 @@ impl JsonRpcHandler { let tx_hash = tx.get_hash(); let signer_account_id = tx.transaction.signer_id().clone(); let response = self - .client_sender + .process_tx_sender .send_async(ProcessTxRequest { transaction: tx, is_forwarded: false, check_only }) .await .map_err(RpcFrom::rpc_from)?; @@ -1649,6 +1656,7 @@ pub fn start_http( genesis_config: GenesisConfig, client_sender: ClientSenderForRpc, view_client_sender: ViewClientSenderForRpc, + process_tx_sender: ProcessTxSenderForRpc, peer_manager_sender: PeerManagerSenderForRpc, #[cfg(feature = "test_features")] gc_sender: GCSenderForRpc, entity_debug_handler: Arc, @@ -1672,6 +1680,7 @@ pub fn start_http( .app_data(web::Data::new(JsonRpcHandler { client_sender: client_sender.clone(), view_client_sender: view_client_sender.clone(), + process_tx_sender: process_tx_sender.clone(), peer_manager_sender: peer_manager_sender.clone(), polling_config, genesis_config: genesis_config.clone(), diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 5aefadb2a66..27cee637b8e 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -569,6 +569,7 @@ pub struct ClientConfig { /// which can cause extra load on the database. This option is not recommended for production use, /// as a large number of incoming witnesses could cause denial of service. pub save_latest_witnesses: bool, + pub transaction_request_handler_threads: usize, } impl ClientConfig { @@ -658,6 +659,7 @@ impl ClientConfig { orphan_state_witness_pool_size: default_orphan_state_witness_pool_size(), orphan_state_witness_max_size: default_orphan_state_witness_max_size(), save_latest_witnesses: false, + transaction_request_handler_threads: 4, } } } diff --git a/integration-tests/src/env/setup.rs b/integration-tests/src/env/setup.rs index 65033df1fbc..6879666ca36 100644 --- a/integration-tests/src/env/setup.rs +++ b/integration-tests/src/env/setup.rs @@ -22,7 +22,7 @@ use near_chain_configs::{ ChunkDistributionNetworkConfig, ClientConfig, MutableConfigValue, ReshardingConfig, }; use near_chunks::adapter::ShardsManagerRequestFromClient; -use near_chunks::client::ShardsManagerResponse; +use near_chunks::client::{ShardedTransactionPool, ShardsManagerResponse}; use near_chunks::shards_manager_actor::{ShardsManagerActor, start_shards_manager}; use near_chunks::test_utils::SynchronousShardsManagerAdapter; use near_client::adversarial::Controls; @@ -31,6 +31,7 @@ use near_client::{ Client, ClientActor, PartialWitnessActor, PartialWitnessSenderForClient, StartClientResult, SyncStatus, ViewClientActor, ViewClientActorInner, start_client, }; +use near_client::{TxRequestHandlerActor, spawn_tx_request_handler_actor}; use near_crypto::{KeyType, PublicKey}; use near_epoch_manager::EpochManagerAdapter; use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; @@ -100,6 +101,7 @@ pub fn setup( ) -> ( Addr, Addr, + Addr, ShardsManagerAdapterForTest, PartialWitnessSenderForNetwork, ) { @@ -154,6 +156,22 @@ pub fn setup( adv.clone(), ); + let mut rng_seed: RngSeed = [0; 32]; + rand::thread_rng().fill(&mut rng_seed); + let dummy_tx_pool = + Arc::new(std::sync::Mutex::new(ShardedTransactionPool::new(rng_seed, Some(60000)))); + + let tx_processor_addr = spawn_tx_request_handler_actor( + clock.clone(), + config.clone(), + dummy_tx_pool.clone(), + epoch_manager.clone(), + shard_tracker.clone(), + signer.clone(), + runtime.clone(), + chain_genesis.clone(), + network_adapter.clone(), + ); let client_adapter_for_partial_witness_actor = LateBoundSender::new(); let (partial_witness_addr, _) = spawn_actix_actor(PartialWitnessActor::new( clock.clone(), @@ -213,6 +231,7 @@ pub fn setup( ( client_actor, view_client_addr, + tx_processor_addr, shards_manager_adapter.into_multi_sender(), partial_witness_adapter.into_multi_sender(), ) @@ -350,7 +369,13 @@ pub fn setup_mock_with_validity_period( ) -> ActorHandlesForTesting { let network_adapter = LateBoundSender::new(); let vs = ValidatorSchedule::new().block_producers_per_epoch(vec![validators]); - let (client_addr, view_client_addr, shards_manager_adapter, partial_witness_sender) = setup( + let ( + client_addr, + view_client_addr, + tx_request_handler_addr, + shards_manager_adapter, + partial_witness_sender, + ) = setup( clock.clone(), vs, 10, @@ -377,6 +402,7 @@ pub fn setup_mock_with_validity_period( ActorHandlesForTesting { client_actor: client_addr, view_client_actor: view_client_addr, + tx_processor_actor: tx_request_handler_addr, shards_manager_adapter, partial_witness_sender, } @@ -386,6 +412,7 @@ pub fn setup_mock_with_validity_period( pub struct ActorHandlesForTesting { pub client_actor: Addr, pub view_client_actor: Addr, + pub tx_processor_actor: Addr, pub shards_manager_adapter: ShardsManagerAdapterForTest, pub partial_witness_sender: PartialWitnessSenderForNetwork, } @@ -971,7 +998,13 @@ pub fn setup_mock_all_validators( }) .start(); - let (client_addr, view_client_addr, shards_manager_adapter, partial_witness_sender) = setup( + let ( + client_addr, + view_client_addr, + tx_processor_addr, + shards_manager_adapter, + partial_witness_sender, + ) = setup( clock.clone(), vs, epoch_length, @@ -993,6 +1026,7 @@ pub fn setup_mock_all_validators( ret.push(ActorHandlesForTesting { client_actor: client_addr, view_client_actor: view_client_addr, + tx_processor_actor: tx_processor_addr, shards_manager_adapter, partial_witness_sender, }); diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index ec80db38d71..54c62737916 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -341,6 +341,7 @@ pub struct Config { /// which can cause extra load on the database. This option is not recommended for production use, /// as a large number of incoming witnesses could cause denial of service. pub save_latest_witnesses: bool, + pub transaction_request_handler_threads: usize, } fn is_false(value: &bool) -> bool { @@ -394,6 +395,7 @@ impl Default for Config { orphan_state_witness_max_size: default_orphan_state_witness_max_size(), max_loaded_contracts: 256, save_latest_witnesses: false, + transaction_request_handler_threads: 4, } } } @@ -578,6 +580,7 @@ impl NearConfig { orphan_state_witness_pool_size: config.orphan_state_witness_pool_size, orphan_state_witness_max_size: config.orphan_state_witness_max_size, save_latest_witnesses: config.save_latest_witnesses, + transaction_request_handler_threads: config.transaction_request_handler_threads, }, #[cfg(feature = "tx_generator")] tx_generator: config.tx_generator, diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 569b3ffdf31..27da170a13b 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -29,7 +29,7 @@ use near_client::adapter::client_sender_for_network; use near_client::gc_actor::GCActor; use near_client::{ ClientActor, ConfigUpdater, PartialWitnessActor, StartClientResult, ViewClientActor, - ViewClientActorInner, start_client, + ViewClientActorInner, spawn_tx_request_handler_actor, start_client, }; use near_epoch_manager::EpochManager; use near_epoch_manager::EpochManagerAdapter; @@ -342,7 +342,7 @@ pub fn start_with_config_and_synchronization( config.validator_signer.clone(), chain_genesis.clone(), view_epoch_manager.clone(), - view_shard_tracker, + view_shard_tracker.clone(), view_runtime.clone(), network_adapter.as_multi_sender(), config.client_config.clone(), @@ -397,28 +397,29 @@ pub fn start_with_config_and_synchronization( Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); let state_sync_spawner = Arc::new(TokioRuntimeFutureSpawner(state_sync_runtime.clone())); - let StartClientResult { client_actor, client_arbiter_handle, resharding_handle } = start_client( - Clock::real(), - config.client_config.clone(), - chain_genesis.clone(), - epoch_manager.clone(), - shard_tracker.clone(), - runtime.clone(), - node_id, - state_sync_spawner.clone(), - network_adapter.as_multi_sender(), - shards_manager_adapter.as_sender(), - config.validator_signer.clone(), - telemetry.with_auto_span_context().into_sender(), - Some(snapshot_callbacks), - shutdown_signal, - adv, - config_updater, - partial_witness_actor.clone().with_auto_span_context().into_multi_sender(), - true, - None, - resharding_sender.into_multi_sender(), - ); + let StartClientResult { client_actor, client_arbiter_handle, resharding_handle, tx_pool } = + start_client( + Clock::real(), + config.client_config.clone(), + chain_genesis.clone(), + epoch_manager.clone(), + shard_tracker.clone(), + runtime.clone(), + node_id, + state_sync_spawner.clone(), + network_adapter.as_multi_sender(), + shards_manager_adapter.as_sender(), + config.validator_signer.clone(), + telemetry.with_auto_span_context().into_sender(), + Some(snapshot_callbacks), + shutdown_signal, + adv, + config_updater, + partial_witness_actor.clone().with_auto_span_context().into_multi_sender(), + true, + None, + resharding_sender.into_multi_sender(), + ); client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context()); client_adapter_for_partial_witness_actor.bind(client_actor.clone().with_auto_span_context()); let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager( @@ -433,6 +434,18 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor.with_auto_span_context()); + let tx_processor_addr = spawn_tx_request_handler_actor( + Clock::real(), + config.client_config.clone(), + tx_pool, + view_epoch_manager.clone(), + view_shard_tracker.clone(), + config.validator_signer.clone(), + view_runtime.clone(), + chain_genesis.clone(), + network_adapter.as_multi_sender(), + ); + let mut state_sync_dumper = StateSyncDumper { clock: Clock::real(), client_config: config.client_config.clone(), @@ -475,6 +488,7 @@ pub fn start_with_config_and_synchronization( config.genesis.config.clone(), client_actor.clone().with_auto_span_context().into_multi_sender(), view_client_addr.clone().with_auto_span_context().into_multi_sender(), + tx_processor_addr.clone().with_auto_span_context().into_multi_sender(), network_actor.into_multi_sender(), #[cfg(feature = "test_features")] _gc_actor.with_auto_span_context().into_multi_sender(), @@ -515,7 +529,8 @@ pub fn start_with_config_and_synchronization( #[cfg(feature = "tx_generator")] let tx_generator = near_transactions_generator::actix_actor::start_tx_generator( config.tx_generator.unwrap_or_default(), - client_actor.clone().with_auto_span_context().into_multi_sender(), + tx_processor_addr.clone().with_auto_span_context().into_multi_sender(), + // client_actor.clone().with_auto_span_context().into_multi_sender(), view_client_addr.clone().with_auto_span_context().into_multi_sender(), ); From 23c5ac871f582b4befb55e3976ec36038e0a3bf8 Mon Sep 17 00:00:00 2001 From: Slava Savenko Date: Tue, 4 Mar 2025 11:59:50 +0100 Subject: [PATCH 2/3] deconflict --- chain/client/src/client.rs | 3 +- chain/client/src/tx_request_handler.rs | 124 ++++++++++++------------- 2 files changed, 62 insertions(+), 65 deletions(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 2de548c1eff..9ae45338c8c 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2278,7 +2278,8 @@ impl Client { } // Transactions only need to be recorded if the node is a validator. if me.is_some() { - match self.chunk_producer.sharded_tx_pool.insert_transaction(shard_uid, tx.clone()) + let mut pool = self.chunk_producer.sharded_tx_pool.lock().unwrap(); + match pool.insert_transaction(shard_uid, tx.clone()) { InsertTransactionResult::Success => { trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Recorded a transaction."); diff --git a/chain/client/src/tx_request_handler.rs b/chain/client/src/tx_request_handler.rs index 4e43cee0555..5898c69946c 100644 --- a/chain/client/src/tx_request_handler.rs +++ b/chain/client/src/tx_request_handler.rs @@ -179,11 +179,8 @@ impl TxRequestHandler { let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?; if let Err(err) = self.runtime.validate_tx( - gas_price, - None, &shard_layout, tx, - true, protocol_version, receiver_congestion_info, ) { @@ -196,11 +193,12 @@ impl TxRequestHandler { tx.transaction.signer_id(), &epoch_id, )?; - let care_about_shard = - self.shard_tracker.care_about_shard(me, &head.last_block_hash, shard_id, true); + let cares_about_shard = + self.shard_tracker.cares_about_shard(me, &head.last_block_hash, shard_id, true); let will_care_about_shard = self.shard_tracker.will_care_about_shard(me, &head.last_block_hash, shard_id, true); - if care_about_shard || will_care_about_shard { + + if cares_about_shard || will_care_about_shard { let shard_uid = shard_id_to_uid(self.epoch_manager.as_ref(), shard_id, &epoch_id)?; let state_root = match self.chain.get_chunk_extra(&head.last_block_hash, &shard_uid) { Ok(chunk_extra) => *chunk_extra.state_root(), @@ -208,85 +206,83 @@ impl TxRequestHandler { // Not being able to fetch a state root most likely implies that we haven't // caught up with the next epoch yet. if is_forwarded { - return Err(near_client_primitives::types::Error::Other( - "Node has not caught up yet".to_string(), - )); + return Err(near_client_primitives::types::Error::Other("Node has not caught up yet".to_string())); } else { self.forward_tx(&epoch_id, tx, signer)?; return Ok(ProcessTxResponse::RequestRouted); } } }; - if let Err(err) = self.runtime.validate_tx( - gas_price, - Some(state_root), + if let Err(err) = self.runtime.can_verify_and_charge_tx( &shard_layout, + gas_price, + state_root, tx, - false, protocol_version, - receiver_congestion_info, ) { tracing::debug!(target: "client", ?err, "Invalid tx"); - Ok(ProcessTxResponse::InvalidTx(err)) - } else if check_only { - Ok(ProcessTxResponse::ValidTx) - } else { - // Transactions only need to be recorded if the node is a validator. - if me.is_some() { - let mut pool_guarded = self.tx_pool.lock().unwrap(); - match pool_guarded.insert_transaction(shard_uid, tx.clone()) { - InsertTransactionResult::Success => { - tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Recorded a transaction."); - } - InsertTransactionResult::Duplicate => { - tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Duplicate transaction, not forwarding it."); - return Ok(ProcessTxResponse::ValidTx); - } - InsertTransactionResult::NoSpaceLeft => { - if is_forwarded { - tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Transaction pool is full, dropping the transaction."); - } else { - tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Transaction pool is full, trying to forward the transaction."); - } + return Ok(ProcessTxResponse::InvalidTx(err)); + } + if check_only { + return Ok(ProcessTxResponse::ValidTx); + } + // Transactions only need to be recorded if the node is a validator. + if me.is_some() { + let mut pool = self.tx_pool.lock().unwrap(); + match pool.insert_transaction(shard_uid, tx.clone()) + { + InsertTransactionResult::Success => { + tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Recorded a transaction."); + } + InsertTransactionResult::Duplicate => { + tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Duplicate transaction, not forwarding it."); + return Ok(ProcessTxResponse::ValidTx); + } + InsertTransactionResult::NoSpaceLeft => { + if is_forwarded { + tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Transaction pool is full, dropping the transaction."); + } else { + tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Transaction pool is full, trying to forward the transaction."); } } } + } - // Active validator: - // possibly forward to next epoch validators - // Not active validator: - // forward to current epoch validators, - // possibly forward to next epoch validators - if self.active_validator(shard_id, signer)? { - tracing::trace!(target: "client", account = ?me, ?shard_id, tx_hash = ?tx.get_hash(), is_forwarded, "Recording a transaction."); - metrics::TRANSACTION_RECEIVED_VALIDATOR.inc(); + // Active validator: + // possibly forward to next epoch validators + // Not active validator: + // forward to current epoch validators, + // possibly forward to next epoch validators + if self.active_validator(shard_id, signer)? { + tracing::trace!(target: "client", account = ?me, ?shard_id, tx_hash = ?tx.get_hash(), is_forwarded, "Recording a transaction."); + metrics::TRANSACTION_RECEIVED_VALIDATOR.inc(); - if !is_forwarded { - self.possibly_forward_tx_to_next_epoch(tx, signer)?; - } - Ok(ProcessTxResponse::ValidTx) - } else if !is_forwarded { - tracing::trace!(target: "client", ?shard_id, tx_hash = ?tx.get_hash(), "Forwarding a transaction."); - metrics::TRANSACTION_RECEIVED_NON_VALIDATOR.inc(); - self.forward_tx(&epoch_id, tx, signer)?; - Ok(ProcessTxResponse::RequestRouted) - } else { - tracing::trace!(target: "client", ?shard_id, tx_hash = ?tx.get_hash(), "Non-validator received a forwarded transaction, dropping it."); - metrics::TRANSACTION_RECEIVED_NON_VALIDATOR_FORWARDED.inc(); - Ok(ProcessTxResponse::NoResponse) + if !is_forwarded { + self.possibly_forward_tx_to_next_epoch(tx, signer)?; } + return Ok(ProcessTxResponse::ValidTx); + } + if !is_forwarded { + tracing::trace!(target: "client", ?shard_id, tx_hash = ?tx.get_hash(), "Forwarding a transaction."); + metrics::TRANSACTION_RECEIVED_NON_VALIDATOR.inc(); + self.forward_tx(&epoch_id, tx, signer)?; + return Ok(ProcessTxResponse::RequestRouted); } - } else if check_only { - Ok(ProcessTxResponse::DoesNotTrackShard) - } else if is_forwarded { + tracing::trace!(target: "client", ?shard_id, tx_hash = ?tx.get_hash(), "Non-validator received a forwarded transaction, dropping it."); + metrics::TRANSACTION_RECEIVED_NON_VALIDATOR_FORWARDED.inc(); + return Ok(ProcessTxResponse::NoResponse); + } + + if check_only { + return Ok(ProcessTxResponse::DoesNotTrackShard); + } + if is_forwarded { // Received forwarded transaction but we are not tracking the shard tracing::debug!(target: "client", ?me, ?shard_id, tx_hash = ?tx.get_hash(), "Received forwarded transaction but no tracking shard"); - Ok(ProcessTxResponse::NoResponse) - } else { - // We are not tracking this shard, so there is no way to validate this tx. Just rerouting. - self.forward_tx(&epoch_id, tx, signer)?; - Ok(ProcessTxResponse::RequestRouted) + return Ok(ProcessTxResponse::NoResponse); } + // We are not tracking this shard, so there is no way to validate this tx. Just rerouting. + self.forward_tx(&epoch_id, tx, signer).map(|()| ProcessTxResponse::RequestRouted) } /// Forwards given transaction to upcoming validators. From 95146fd6a1a8b353389bb1929e69039c4605a316 Mon Sep 17 00:00:00 2001 From: Slava Savenko Date: Tue, 4 Mar 2025 12:22:25 +0100 Subject: [PATCH 3/3] cosmetics --- chain/client/src/client.rs | 3 +- chain/client/src/tx_request_handler.rs | 16 +++---- chain/jsonrpc/src/lib.rs | 2 - integration-tests/src/env/setup.rs | 63 +++++++++++++------------- 4 files changed, 40 insertions(+), 44 deletions(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 9ae45338c8c..9f57c7f4215 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2279,8 +2279,7 @@ impl Client { // Transactions only need to be recorded if the node is a validator. if me.is_some() { let mut pool = self.chunk_producer.sharded_tx_pool.lock().unwrap(); - match pool.insert_transaction(shard_uid, tx.clone()) - { + match pool.insert_transaction(shard_uid, tx.clone()) { InsertTransactionResult::Success => { trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Recorded a transaction."); } diff --git a/chain/client/src/tx_request_handler.rs b/chain/client/src/tx_request_handler.rs index 5898c69946c..1b155dad332 100644 --- a/chain/client/src/tx_request_handler.rs +++ b/chain/client/src/tx_request_handler.rs @@ -178,12 +178,9 @@ impl TxRequestHandler { cur_block.block_congestion_info().get(&receiver_shard).copied(); let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?; - if let Err(err) = self.runtime.validate_tx( - &shard_layout, - tx, - protocol_version, - receiver_congestion_info, - ) { + if let Err(err) = + self.runtime.validate_tx(&shard_layout, tx, protocol_version, receiver_congestion_info) + { tracing::debug!(target: "client", tx_hash = ?tx.get_hash(), ?err, "Invalid tx during basic validation"); return Ok(ProcessTxResponse::InvalidTx(err)); } @@ -206,7 +203,9 @@ impl TxRequestHandler { // Not being able to fetch a state root most likely implies that we haven't // caught up with the next epoch yet. if is_forwarded { - return Err(near_client_primitives::types::Error::Other("Node has not caught up yet".to_string())); + return Err(near_client_primitives::types::Error::Other( + "Node has not caught up yet".to_string(), + )); } else { self.forward_tx(&epoch_id, tx, signer)?; return Ok(ProcessTxResponse::RequestRouted); @@ -229,8 +228,7 @@ impl TxRequestHandler { // Transactions only need to be recorded if the node is a validator. if me.is_some() { let mut pool = self.tx_pool.lock().unwrap(); - match pool.insert_transaction(shard_uid, tx.clone()) - { + match pool.insert_transaction(shard_uid, tx.clone()) { InsertTransactionResult::Success => { tracing::trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Recorded a transaction."); } diff --git a/chain/jsonrpc/src/lib.rs b/chain/jsonrpc/src/lib.rs index 21ab5527f18..d9fffa258e6 100644 --- a/chain/jsonrpc/src/lib.rs +++ b/chain/jsonrpc/src/lib.rs @@ -233,9 +233,7 @@ pub struct ClientSenderForRpc( AsyncSender>, AsyncSender>, AsyncSender>, - // AsyncSender>, AsyncSender>, - // Sender, #[cfg(feature = "test_features")] Sender, #[cfg(feature = "test_features")] AsyncSender< diff --git a/integration-tests/src/env/setup.rs b/integration-tests/src/env/setup.rs index 6879666ca36..6551f527c87 100644 --- a/integration-tests/src/env/setup.rs +++ b/integration-tests/src/env/setup.rs @@ -22,7 +22,7 @@ use near_chain_configs::{ ChunkDistributionNetworkConfig, ClientConfig, MutableConfigValue, ReshardingConfig, }; use near_chunks::adapter::ShardsManagerRequestFromClient; -use near_chunks::client::{ShardedTransactionPool, ShardsManagerResponse}; +use near_chunks::client::ShardsManagerResponse; use near_chunks::shards_manager_actor::{ShardsManagerActor, start_shards_manager}; use near_chunks::test_utils::SynchronousShardsManagerAdapter; use near_client::adversarial::Controls; @@ -158,20 +158,7 @@ pub fn setup( let mut rng_seed: RngSeed = [0; 32]; rand::thread_rng().fill(&mut rng_seed); - let dummy_tx_pool = - Arc::new(std::sync::Mutex::new(ShardedTransactionPool::new(rng_seed, Some(60000)))); - let tx_processor_addr = spawn_tx_request_handler_actor( - clock.clone(), - config.clone(), - dummy_tx_pool.clone(), - epoch_manager.clone(), - shard_tracker.clone(), - signer.clone(), - runtime.clone(), - chain_genesis.clone(), - network_adapter.clone(), - ); let client_adapter_for_partial_witness_actor = LateBoundSender::new(); let (partial_witness_addr, _) = spawn_actix_actor(PartialWitnessActor::new( clock.clone(), @@ -190,28 +177,42 @@ pub fn setup( let resharding_sender = resharding_sender_addr.with_auto_span_context(); let shards_manager_adapter_for_client = LateBoundSender::new(); - let StartClientResult { client_actor, .. } = start_client( - clock, + let StartClientResult { client_actor, client_arbiter_handle: _, resharding_handle: _, tx_pool } = + start_client( + clock.clone(), + config.clone(), + chain_genesis.clone(), + epoch_manager.clone(), + shard_tracker.clone(), + runtime.clone(), + PeerId::new(PublicKey::empty(KeyType::ED25519)), + Arc::new(ActixFutureSpawner), + network_adapter.clone(), + shards_manager_adapter_for_client.as_sender(), + signer.clone(), + telemetry.with_auto_span_context().into_sender(), + None, + None, + adv, + None, + partial_witness_adapter.clone().into_multi_sender(), + enable_doomslug, + Some(TEST_SEED), + resharding_sender.into_multi_sender(), + ); + + let tx_processor_addr = spawn_tx_request_handler_actor( + clock.clone(), config.clone(), - chain_genesis, + tx_pool.clone(), epoch_manager.clone(), shard_tracker.clone(), - runtime, - PeerId::new(PublicKey::empty(KeyType::ED25519)), - Arc::new(ActixFutureSpawner), + signer.clone(), + runtime.clone(), + chain_genesis.clone(), network_adapter.clone(), - shards_manager_adapter_for_client.as_sender(), - signer, - telemetry.with_auto_span_context().into_sender(), - None, - None, - adv, - None, - partial_witness_adapter.clone().into_multi_sender(), - enable_doomslug, - Some(TEST_SEED), - resharding_sender.into_multi_sender(), ); + let validator_signer = Some(Arc::new(EmptyValidatorSigner::new(account_id))); let (shards_manager_addr, _) = start_shards_manager( epoch_manager.clone(),