Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Extract process_tx functionality from the Client and run it multithreaded #13001

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benchmarks/transactions-generator/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ unlimit:
jq '.gas_limit=20000000000000000' {{near_genesis_file}} > tmp_genesis.json && mv tmp_genesis.json {{near_genesis_file}}
jq '.view_client_threads=8 \
| .store.load_mem_tries_for_tracked_shards=true \
| .produce_chunk_add_transactions_time_limit={"secs": 0, "nanos": 800000000}' {{near_config_file}} > tmp_config.json \
| .produce_chunk_add_transactions_time_limit={"secs": 0, "nanos": 800000000} \
| .transaction_request_handler_threads = 4' {{near_config_file}} > tmp_config.json \
&& mv tmp_config.json {{near_config_file}}

do-it tps:
Expand Down
31 changes: 28 additions & 3 deletions benchmarks/transactions-generator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@
stats: Arc<std::sync::Mutex<Stats>>,
}

#[derive(Debug, Clone)]
struct WelfordMean {

Check warning on line 59 in benchmarks/transactions-generator/src/lib.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (Welford)
mean: i64,
count: i64,
}

impl WelfordMean {

Check warning on line 64 in benchmarks/transactions-generator/src/lib.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (Welford)
fn new() -> Self {
Self { mean: 0, count: 0 }
}

fn add_measurement(&mut self, x: i64) {
self.count += 1;
self.mean += (x - self.mean) / self.count;
}

fn mean(&self) -> i64 {
self.mean
}
}

#[derive(Debug, Clone)]
struct Stats {
pool_accepted: u64,
Expand Down Expand Up @@ -255,6 +276,7 @@
let mut report_interval = tokio::time::interval(Duration::from_secs(1));
tokio::spawn(async move {
let mut stats_prev = runner_state.stats.lock().unwrap().clone();
let mut mean_diff = WelfordMean::new();

Check warning on line 279 in benchmarks/transactions-generator/src/lib.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (Welford)
loop {
report_interval.tick().await;
let stats = {
Expand All @@ -266,10 +288,13 @@
stats.failed = failed;
stats.clone()
};
tracing::info!(target: "transaction-generator", total=format!("{stats:?}"),);
let diff = stats.clone() - stats_prev;
mean_diff.add_measurement(diff.processed as i64);
tracing::info!(target: "transaction-generator",
total=format!("{stats:?}"),);
tracing::info!(target: "transaction-generator",
diff=format!("{:?}", stats.clone() - stats_prev),);
diff=format!("{:?}", diff),
rate_processed=mean_diff.mean(),
);
stats_prev = stats.clone();
}
})
Expand Down
18 changes: 10 additions & 8 deletions chain/client/src/chunk_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use near_store::ShardUId;
use near_store::adapter::chain_store::ChainStoreAdapter;
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use time::ext::InstantExt as _;
use tracing::{debug, instrument};

Expand Down Expand Up @@ -70,7 +70,7 @@ pub struct ChunkProducer {
chain: ChainStoreAdapter,
epoch_manager: Arc<dyn EpochManagerAdapter>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
pub sharded_tx_pool: ShardedTransactionPool,
pub sharded_tx_pool: Arc<Mutex<ShardedTransactionPool>>,
/// A ReedSolomon instance to encode shard chunks.
reed_solomon_encoder: ReedSolomon,
/// Chunk production timing information. Used only for debug purposes.
Expand Down Expand Up @@ -102,7 +102,10 @@ impl ChunkProducer {
chain: chain_store.clone(),
epoch_manager,
runtime_adapter,
sharded_tx_pool: ShardedTransactionPool::new(rng_seed, transaction_pool_size_limit),
sharded_tx_pool: Arc::new(Mutex::new(ShardedTransactionPool::new(
rng_seed,
transaction_pool_size_limit,
))),
reed_solomon_encoder: ReedSolomon::new(data_parts, parity_parts).unwrap(),
chunk_production_info: lru::LruCache::new(
NonZeroUsize::new(PRODUCTION_TIMES_CACHE_SIZE).unwrap(),
Expand Down Expand Up @@ -370,8 +373,8 @@ impl ChunkProducer {
chain_validate: &dyn Fn(&SignedTransaction) -> bool,
) -> Result<PreparedTransactions, Error> {
let shard_id = shard_uid.shard_id();
let prepared_transactions = if let Some(mut iter) =
self.sharded_tx_pool.get_pool_iterator(shard_uid)
let mut pool_guard = self.sharded_tx_pool.lock().unwrap();
let prepared_transactions = if let Some(mut iter) = pool_guard.get_pool_iterator(shard_uid)
{
let storage_config = RuntimeStorageConfig {
state_root: *chunk_extra.state_root(),
Expand Down Expand Up @@ -408,9 +411,8 @@ impl ChunkProducer {
};
// Reintroduce valid transactions back to the pool. They will be removed when the chunk is
// included into the block.
let reintroduced_count = self
.sharded_tx_pool
.reintroduce_transactions(shard_uid, &prepared_transactions.transactions);
let reintroduced_count =
pool_guard.reintroduce_transactions(shard_uid, &prepared_transactions.transactions);
if reintroduced_count < prepared_transactions.transactions.len() {
debug!(target: "client", reintroduced_count, num_tx = prepared_transactions.transactions.len(), "Reintroduced transactions");
}
Expand Down
23 changes: 11 additions & 12 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,8 @@ impl Client {
// By now the chunk must be in store, otherwise the block would have been orphaned
let chunk = self.chain.get_chunk(&chunk_header.chunk_hash()).unwrap();
let transactions = chunk.transactions();
self.chunk_producer
.sharded_tx_pool
.remove_transactions(shard_uid, transactions);
let mut pool_guard = self.chunk_producer.sharded_tx_pool.lock().unwrap();
pool_guard.remove_transactions(shard_uid, transactions);
}
}
}
Expand Down Expand Up @@ -442,10 +441,10 @@ impl Client {
) {
// By now the chunk must be in store, otherwise the block would have been orphaned
let chunk = self.chain.get_chunk(&chunk_header.chunk_hash()).unwrap();
let reintroduced_count = self
.chunk_producer
.sharded_tx_pool
.reintroduce_transactions(shard_uid, &chunk.transactions());
let reintroduced_count = {
let mut pool_guard = self.chunk_producer.sharded_tx_pool.lock().unwrap();
pool_guard.reintroduce_transactions(shard_uid, &chunk.transactions())
};
if reintroduced_count < chunk.transactions().len() {
debug!(target: "client",
reintroduced_count,
Expand Down Expand Up @@ -1530,9 +1529,9 @@ impl Client {
match (old_shard_layout, new_shard_layout) {
(Ok(old_shard_layout), Ok(new_shard_layout)) => {
if old_shard_layout != new_shard_layout {
self.chunk_producer
.sharded_tx_pool
.reshard(&old_shard_layout, &new_shard_layout);
let mut guarded_pool =
self.chunk_producer.sharded_tx_pool.lock().unwrap();
guarded_pool.reshard(&old_shard_layout, &new_shard_layout);
}
}
(old_shard_layout, new_shard_layout) => {
Expand Down Expand Up @@ -2279,8 +2278,8 @@ impl Client {
}
// Transactions only need to be recorded if the node is a validator.
if me.is_some() {
match self.chunk_producer.sharded_tx_pool.insert_transaction(shard_uid, tx.clone())
{
let mut pool = self.chunk_producer.sharded_tx_pool.lock().unwrap();
match pool.insert_transaction(shard_uid, tx.clone()) {
InsertTransactionResult::Success => {
trace!(target: "client", ?shard_uid, tx_hash = ?tx.get_hash(), "Recorded a transaction.");
}
Expand Down
38 changes: 23 additions & 15 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use near_chain::{
use near_chain_configs::{ClientConfig, MutableValidatorSigner, ReshardingHandle};
use near_chain_primitives::error::EpochErrorResultToChainError;
use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_chunks::client::ShardsManagerResponse;
use near_chunks::client::{ShardedTransactionPool, ShardsManagerResponse};
use near_client_primitives::types::{
Error, GetClientConfig, GetClientConfigError, GetNetworkInfo, NetworkInfoResponse,
StateSyncStatus, Status, StatusError, StatusSyncInfo, SyncStatus,
Expand Down Expand Up @@ -79,7 +79,7 @@ use near_telemetry::TelemetryEvent;
use rand::seq::SliceRandom;
use rand::{Rng, thread_rng};
use std::fmt;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use tracing::{debug, debug_span, error, info, trace, warn};

Expand Down Expand Up @@ -119,6 +119,7 @@ pub struct StartClientResult {
pub client_actor: actix::Addr<ClientActor>,
pub client_arbiter_handle: actix::ArbiterHandle,
pub resharding_handle: ReshardingHandle,
pub tx_pool: Arc<Mutex<ShardedTransactionPool>>,
}

/// Starts client in a separate Arbiter (thread).
Expand Down Expand Up @@ -179,19 +180,21 @@ pub fn start_client(
let sync_jobs_actor = SyncJobsActor::new(client_sender_for_sync_jobs.as_multi_sender());
let sync_jobs_actor_addr = sync_jobs_actor.spawn_actix_actor();

let client_actor_inner = ClientActorInner::new(
clock,
client,
node_id,
network_adapter,
telemetry_sender,
sender,
adv,
config_updater,
sync_jobs_actor_addr.with_auto_span_context().into_multi_sender(),
)
.unwrap();
let tx_pool = client_actor_inner.client.chunk_producer.sharded_tx_pool.clone();

let client_addr = ClientActor::start_in_arbiter(&client_arbiter_handle, move |_| {
let client_actor_inner = ClientActorInner::new(
clock,
client,
node_id,
network_adapter,
telemetry_sender,
sender,
adv,
config_updater,
sync_jobs_actor_addr.with_auto_span_context().into_multi_sender(),
)
.unwrap();
ActixWrapper::new(client_actor_inner)
});

Expand All @@ -201,7 +204,12 @@ pub fn start_client(
chain_sender_for_state_sync
.bind(client_addr.clone().with_auto_span_context().into_multi_sender());

StartClientResult { client_actor: client_addr, client_arbiter_handle, resharding_handle }
StartClientResult {
client_actor: client_addr,
client_arbiter_handle,
resharding_handle,
tx_pool,
}
}

#[derive(Clone, MultiSend, MultiSenderFrom)]
Expand Down
2 changes: 2 additions & 0 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub use crate::client_actor::NetworkAdversarialMessage;
pub use crate::client_actor::{ClientActor, StartClientResult, start_client};
pub use crate::config_updater::ConfigUpdater;
pub use crate::stateless_validation::chunk_validator::orphan_witness_handling::HandleOrphanWitnessOutcome;
pub use crate::tx_request_handler::{TxRequestHandlerActor, spawn_tx_request_handler_actor};
pub use crate::view_client_actor::{ViewClientActor, ViewClientActorInner};
pub use chunk_producer::ProduceChunkResult;
pub use near_chain::stateless_validation::processing_tracker::{
Expand Down Expand Up @@ -43,4 +44,5 @@ mod stateless_validation;
pub mod sync;
pub mod sync_jobs_actor;
pub mod test_utils;
mod tx_request_handler;
mod view_client_actor;
Loading
Loading