diff --git a/chain/chain/src/block_processing_utils.rs b/chain/chain/src/block_processing_utils.rs index 5b4a3ab37e2..3d416de308e 100644 --- a/chain/chain/src/block_processing_utils.rs +++ b/chain/chain/src/block_processing_utils.rs @@ -6,6 +6,7 @@ use near_async::time::Instant; use near_primitives::block::Block; use near_primitives::challenge::{ChallengeBody, ChallengesResult}; use near_primitives::hash::CryptoHash; +use near_primitives::optimistic_block::{BlockToApply, OptimisticBlock}; use near_primitives::sharding::{ReceiptProof, ShardChunkHeader, StateSyncInfo}; use near_primitives::types::ShardId; use std::collections::HashMap; @@ -40,10 +41,20 @@ pub(crate) struct BlockPreprocessInfo { pub(crate) block_start_processing_time: Instant, } +pub(crate) struct OptimisticBlockInfo { + /// Used to get notified when the applying chunks of a block finishes. + #[allow(unused)] + pub(crate) apply_chunks_done_waiter: ApplyChunksDoneWaiter, + /// This is used to calculate block processing time metric + #[allow(unused)] + pub(crate) block_start_processing_time: Instant, +} + /// Blocks which finished pre-processing and are now being applied asynchronously pub(crate) struct BlocksInProcessing { // A map that stores all blocks in processing preprocessed_blocks: HashMap, + optimistic_blocks: HashMap, } #[derive(Debug)] @@ -81,11 +92,14 @@ pub struct BlockNotInPoolError; impl BlocksInProcessing { pub(crate) fn new() -> Self { - BlocksInProcessing { preprocessed_blocks: HashMap::new() } + BlocksInProcessing { + preprocessed_blocks: HashMap::new(), + optimistic_blocks: HashMap::new(), + } } pub(crate) fn len(&self) -> usize { - self.preprocessed_blocks.len() + self.preprocessed_blocks.len() + self.optimistic_blocks.len() } /// Add a preprocessed block to the pool. Return Error::ExceedingPoolSize if the pool already @@ -95,14 +109,28 @@ impl BlocksInProcessing { block: Block, preprocess_info: BlockPreprocessInfo, ) -> Result<(), AddError> { - self.add_dry_run(block.hash())?; + self.add_dry_run(&BlockToApply::Normal(*block.hash()))?; self.preprocessed_blocks.insert(*block.hash(), (block, preprocess_info)); Ok(()) } - pub(crate) fn contains(&self, block_hash: &CryptoHash) -> bool { - self.preprocessed_blocks.contains_key(block_hash) + pub(crate) fn add_optimistic( + &mut self, + block: OptimisticBlock, + preprocess_info: OptimisticBlockInfo, + ) -> Result<(), AddError> { + self.add_dry_run(&BlockToApply::Optimistic(*block.hash()))?; + + self.optimistic_blocks.insert(*block.hash(), (block, preprocess_info)); + Ok(()) + } + + pub(crate) fn contains(&self, block_to_apply: &BlockToApply) -> bool { + match block_to_apply { + BlockToApply::Normal(block_hash) => self.preprocessed_blocks.contains_key(block_hash), + BlockToApply::Optimistic(block_hash) => self.optimistic_blocks.contains_key(block_hash), + } } pub(crate) fn remove( @@ -112,15 +140,22 @@ impl BlocksInProcessing { self.preprocessed_blocks.remove(block_hash) } + pub(crate) fn remove_optimistic( + &mut self, + optimistic_block_hash: &CryptoHash, + ) -> Option<(OptimisticBlock, OptimisticBlockInfo)> { + self.optimistic_blocks.remove(optimistic_block_hash) + } + /// This function does NOT add the block, it simply checks if the block can be added - pub(crate) fn add_dry_run(&self, block_hash: &CryptoHash) -> Result<(), AddError> { + pub(crate) fn add_dry_run(&self, block_to_apply: &BlockToApply) -> Result<(), AddError> { // We set a limit to the max number of blocks that we will be processing at the same time. // Since processing a block requires that the its previous block is processed, this limit // is likely never hit, unless there are many forks in the chain. // In this case, we will simply drop the block. - if self.preprocessed_blocks.len() >= MAX_PROCESSING_BLOCKS { + if self.len() >= MAX_PROCESSING_BLOCKS { Err(AddError::ExceedingPoolSize) - } else if self.preprocessed_blocks.contains_key(block_hash) { + } else if self.contains(block_to_apply) { Err(AddError::BlockAlreadyInPool) } else { Ok(()) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 03557418456..9ef79c67b98 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -1,14 +1,14 @@ use crate::approval_verification::verify_approval_with_approvers_info; use crate::block_processing_utils::{ ApplyChunksDoneWaiter, ApplyChunksStillApplying, BlockPreprocessInfo, BlockProcessingArtifact, - BlocksInProcessing, + BlocksInProcessing, OptimisticBlockInfo, }; use crate::blocks_delay_tracker::BlocksDelayTracker; use crate::chain_update::ChainUpdate; use crate::crypto_hash_timer::CryptoHashTimer; use crate::lightclient::get_epoch_block_producers_view; use crate::migrations::check_if_block_is_first_with_chunk_of_version; -use crate::missing_chunks::MissingChunksPool; +use crate::missing_chunks::{MissingChunksPool, OptimisticBlockChunksPool}; use crate::orphan::{Orphan, OrphanBlockPool}; use crate::rayon_spawner::RayonAsyncComputationSpawner; use crate::resharding::manager::ReshardingManager; @@ -68,6 +68,9 @@ use near_primitives::epoch_block_info::BlockInfo; use near_primitives::errors::EpochError; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::PartialMerkleTree; +use near_primitives::optimistic_block::{ + BlockToApply, CachedShardUpdateKey, OptimisticBlock, OptimisticBlockKeySource, +}; use near_primitives::receipt::Receipt; use near_primitives::sandbox::state_patch::SandboxStatePatch; use near_primitives::shard_layout::{ShardLayout, ShardUId}; @@ -102,6 +105,7 @@ use near_store::get_genesis_state_roots; use near_store::DBCol; use node_runtime::bootstrap_congestion_info; use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use std::cell::Cell; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::num::NonZeroUsize; @@ -109,6 +113,8 @@ use std::sync::Arc; use time::ext::InstantExt as _; use tracing::{debug, debug_span, error, info, warn, Span}; +pub const APPLY_CHUNK_RESULTS_CACHE_SIZE: usize = 100; + /// The size of the invalid_blocks in-memory pool pub const INVALID_CHUNKS_POOL_SIZE: usize = 5000; @@ -210,7 +216,7 @@ pub fn check_known( if block_hash == &head.last_block_hash || block_hash == &head.prev_block_hash { return Ok(Err(BlockKnownError::KnownInHead)); } - if chain.blocks_in_processing.contains(block_hash) { + if chain.blocks_in_processing.contains(&BlockToApply::Normal(*block_hash)) { return Ok(Err(BlockKnownError::KnownInProcessing)); } // Check if this block is in the set of known orphans. @@ -226,7 +232,52 @@ pub fn check_known( check_known_store(chain, block_hash) } -type BlockApplyChunksResult = (CryptoHash, Vec<(ShardId, Result)>); +pub struct ApplyChunksResultCache { + cache: LruCache, + /// We use Cell to record access statistics even if we don't have + /// mutability. + hits: Cell, + misses: Cell, +} + +impl ApplyChunksResultCache { + pub fn new(size: usize) -> Self { + Self { + cache: LruCache::new(NonZeroUsize::new(size).unwrap()), + hits: Cell::new(0), + misses: Cell::new(0), + } + } + + pub fn peek(&self, key: &CachedShardUpdateKey) -> Option<&ShardUpdateResult> { + if let Some(result) = self.cache.peek(key) { + self.hits.set(self.hits.get() + 1); + return Some(result); + } + + self.misses.set(self.misses.get() + 1); + None + } + + pub fn push(&mut self, key: CachedShardUpdateKey, result: ShardUpdateResult) { + self.cache.put(key, result); + } + + pub fn hits(&self) -> usize { + self.hits.get() + } + + pub fn misses(&self) -> usize { + self.misses.get() + } + + pub fn len(&self) -> usize { + self.cache.len() + } +} + +type BlockApplyChunksResult = + (BlockToApply, Vec<(ShardId, CachedShardUpdateKey, Result)>); /// Facade to the blockchain block processing and storage. /// Provides current view on the state according to the chain state. @@ -239,6 +290,7 @@ pub struct Chain { pub state_sync_adapter: ChainStateSyncAdapter, pub(crate) orphans: OrphanBlockPool, pub blocks_with_missing_chunks: MissingChunksPool, + pub optimistic_block_chunks: OptimisticBlockChunksPool, genesis: Block, pub epoch_length: BlockHeightDelta, /// Block economics, relevant to changes when new block must be produced. @@ -256,6 +308,7 @@ pub struct Chain { apply_chunks_receiver: Receiver, /// Used to spawn the apply chunks jobs. apply_chunks_spawner: Arc, + pub apply_chunk_results_cache: ApplyChunksResultCache, /// Time when head was updated most recently. last_time_head_updated: Instant, /// Prevents re-application of known-to-be-invalid blocks, so that in case of a @@ -291,8 +344,11 @@ impl Drop for Chain { /// UpdateShardJob is a closure that is responsible for updating a shard for a single block. /// Execution context (latest blocks/chunks details) are already captured within. -type UpdateShardJob = - (ShardId, Box Result + Send + Sync + 'static>); +type UpdateShardJob = ( + ShardId, + CachedShardUpdateKey, + Box Result + Send + Sync + 'static>, +); /// PreprocessBlockResult is a tuple where the first element is a vector of jobs /// to update shards, the second element is BlockPreprocessInfo, and the third element shall be @@ -389,6 +445,7 @@ impl Chain { state_sync_adapter, orphans: OrphanBlockPool::new(), blocks_with_missing_chunks: MissingChunksPool::new(), + optimistic_block_chunks: OptimisticBlockChunksPool::new(), blocks_in_processing: BlocksInProcessing::new(), genesis, epoch_length: chain_genesis.epoch_length, @@ -398,6 +455,7 @@ impl Chain { apply_chunks_sender: sc, apply_chunks_receiver: rc, apply_chunks_spawner: Arc::new(RayonAsyncComputationSpawner), + apply_chunk_results_cache: ApplyChunksResultCache::new(APPLY_CHUNK_RESULTS_CACHE_SIZE), last_time_head_updated: clock.now(), invalid_blocks: LruCache::new(NonZeroUsize::new(INVALID_CHUNKS_POOL_SIZE).unwrap()), pending_state_patch: Default::default(), @@ -582,6 +640,7 @@ impl Chain { state_sync_adapter, orphans: OrphanBlockPool::new(), blocks_with_missing_chunks: MissingChunksPool::new(), + optimistic_block_chunks: OptimisticBlockChunksPool::new(), blocks_in_processing: BlocksInProcessing::new(), invalid_blocks: LruCache::new(NonZeroUsize::new(INVALID_CHUNKS_POOL_SIZE).unwrap()), genesis: genesis.clone(), @@ -592,6 +651,7 @@ impl Chain { apply_chunks_sender: sc, apply_chunks_receiver: rc, apply_chunks_spawner, + apply_chunk_results_cache: ApplyChunksResultCache::new(APPLY_CHUNK_RESULTS_CACHE_SIZE), last_time_head_updated: clock.now(), pending_state_patch: Default::default(), snapshot_callbacks, @@ -1467,6 +1527,107 @@ impl Chain { res } + pub fn process_optimistic_block( + &mut self, + me: &Option, + block: OptimisticBlock, + chunk_headers: Vec, + apply_chunks_done_sender: near_async::messaging::Sender, + ) -> Result<(), Error> { + let _span = debug_span!( + target: "chain", + "process_optimistic_block", + hash = ?block.hash(), + height = ?block.height() + ) + .entered(); + + let optimistic_block_hash = *block.hash(); + let block_height = block.height(); + let prev_block_hash = *block.prev_block_hash(); + let prev_block = self.get_block(&prev_block_hash)?; + let prev_chunk_headers = + Chain::get_prev_chunk_headers(self.epoch_manager.as_ref(), &prev_block)?; + + let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block_hash)?; + let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?; + let chunks = Chunks::from_chunk_headers(&chunk_headers, block_height); + let incoming_receipts = self.collect_incoming_receipts_from_chunks( + me, + &chunks, + &prev_block_hash, + &prev_block_hash, + )?; + + let mut maybe_jobs = vec![]; + for (shard_index, prev_chunk_header) in prev_chunk_headers.iter().enumerate() { + let shard_id = shard_layout.get_shard_id(shard_index)?; + let block_context = ApplyChunkBlockContext { + height: block_height, + // TODO: consider removing this field completely to avoid + // confusion with real block hash. + block_hash: CryptoHash::default(), + prev_block_hash: *block.prev_block_hash(), + block_timestamp: block.block_timestamp(), + gas_price: prev_block.header().next_gas_price(), + challenges_result: ChallengesResult::default(), + random_seed: *block.random_value(), + congestion_info: chunks.block_congestion_info(), + bandwidth_requests: chunks.block_bandwidth_requests(), + }; + let incoming_receipts = incoming_receipts.get(&shard_id); + let storage_context = StorageContext { + storage_data_source: StorageDataSource::Db, + state_patch: SandboxStatePatch::default(), + }; + + let cached_shard_update_key = + Self::get_cached_shard_update_key(&block_context, &chunks, shard_id)?; + let job = self.get_update_shard_job( + me, + cached_shard_update_key, + block_context, + &chunks, + shard_index, + &prev_block, + prev_chunk_header, + ApplyChunksMode::IsCaughtUp, + incoming_receipts, + storage_context, + ); + maybe_jobs.push(job); + } + + let mut jobs = vec![]; + for job in maybe_jobs { + match job { + Ok(Some(processor)) => jobs.push(processor), + Ok(None) => {} + Err(err) => return Err(err), + } + } + + let (apply_chunks_done_waiter, apply_chunks_still_applying) = ApplyChunksDoneWaiter::new(); + self.blocks_in_processing.add_optimistic( + block, + OptimisticBlockInfo { + apply_chunks_done_waiter, + block_start_processing_time: self.clock.now(), + }, + )?; + + // 3) schedule apply chunks, which will be executed in the rayon thread pool. + self.schedule_apply_chunks( + BlockToApply::Optimistic(optimistic_block_hash), + block_height, + jobs, + apply_chunks_still_applying, + Some(apply_chunks_done_sender), + ); + + Ok(()) + } + /// Checks if any block has finished applying chunks and postprocesses these blocks to complete /// their processing. Return a list of blocks that have finished processing. /// If there are no blocks that are ready to be postprocessed, it returns immediately @@ -1481,19 +1642,27 @@ impl Chain { let _span = debug_span!(target: "chain", "postprocess_ready_blocks_chain").entered(); let mut accepted_blocks = vec![]; let mut errors = HashMap::new(); - while let Ok((block_hash, apply_result)) = self.apply_chunks_receiver.try_recv() { - match self.postprocess_ready_block( - me, - block_hash, - apply_result, - block_processing_artifacts, - apply_chunks_done_sender.clone(), - ) { - Err(e) => { - errors.insert(block_hash, e); + while let Ok((block, apply_result)) = self.apply_chunks_receiver.try_recv() { + match block { + BlockToApply::Normal(block_hash) => { + let apply_result = apply_result.into_iter().map(|res| (res.0, res.2)).collect(); + match self.postprocess_ready_block( + me, + block_hash, + apply_result, + block_processing_artifacts, + apply_chunks_done_sender.clone(), + ) { + Err(e) => { + errors.insert(block_hash, e); + } + Ok(accepted_block) => { + accepted_blocks.push(accepted_block); + } + } } - Ok(accepted_block) => { - accepted_blocks.push(accepted_block); + BlockToApply::Optimistic(optimistic_block_hash) => { + self.postprocess_optimistic_block(optimistic_block_hash, apply_result); } } } @@ -1823,7 +1992,7 @@ impl Chain { // 3) schedule apply chunks, which will be executed in the rayon thread pool. self.schedule_apply_chunks( - block_hash, + BlockToApply::Normal(block_hash), block_height, apply_chunk_work, apply_chunks_still_applying, @@ -1838,7 +2007,7 @@ impl Chain { /// `apply_chunks_done_sender`: a sender to send a ApplyChunksDoneMessage message once applying chunks is finished fn schedule_apply_chunks( &self, - block_hash: CryptoHash, + block: BlockToApply, block_height: BlockHeight, work: Vec, apply_chunks_still_applying: ApplyChunksStillApplying, @@ -1847,10 +2016,10 @@ impl Chain { let sc = self.apply_chunks_sender.clone(); self.apply_chunks_spawner.spawn("apply_chunks", move || { // do_apply_chunks runs `work` in parallel, but still waits for all of them to finish - let res = do_apply_chunks(block_hash, block_height, work); + let res = do_apply_chunks(block.clone(), block_height, work); // If we encounter error here, that means the receiver is deallocated and the client // thread is already shut down. The node is already crashed, so we can unwrap here - sc.send((block_hash, res)).unwrap(); + sc.send((block, res)).unwrap(); drop(apply_chunks_still_applying); if let Some(sender) = apply_chunks_done_sender { sender.send(ApplyChunksDoneMessage {}); @@ -1936,6 +2105,8 @@ impl Chain { Ok(new_head) => new_head, }; + self.update_optimistic_blocks_pool(&block)?; + let epoch_id = block.header().epoch_id(); let mut shards_cares_this_or_next_epoch = vec![]; for shard_id in self.epoch_manager.shard_ids(epoch_id)? { @@ -2045,6 +2216,42 @@ impl Chain { Ok(AcceptedBlock { hash: *block.hash(), status: block_status, provenance }) } + fn postprocess_optimistic_block( + &mut self, + optimistic_block_hash: CryptoHash, + apply_result: Vec<(ShardId, CachedShardUpdateKey, Result)>, + ) { + let (optimistic_block, _) = self.blocks_in_processing.remove_optimistic(&optimistic_block_hash).unwrap_or_else(|| { + panic!( + "optimistic block {:?} finished applying chunks but not in blocks_in_processing pool", + optimistic_block_hash + ) + }); + + let prev_block_hash = optimistic_block.prev_block_hash(); + let block_height = optimistic_block.height(); + for (shard_id, cached_shard_update_key, apply_result) in apply_result.into_iter() { + match apply_result { + Ok(result) => { + info!( + target: "chain", ?prev_block_hash, block_height, + ?shard_id, ?cached_shard_update_key, + "Caching ShardUpdate result from OptimisticBlock" + ); + self.apply_chunk_results_cache.push(cached_shard_update_key, result); + } + Err(e) => { + warn!( + target: "chain", ?e, ?optimistic_block_hash, + ?prev_block_hash, block_height, ?shard_id, + ?cached_shard_update_key, + "Error applying chunk for OptimisticBlock" + ); + } + } + } + } + fn check_if_upgrade_needed(&self, block_hash: &CryptoHash) { if let Ok(next_epoch_protocol_version) = self.epoch_manager.get_next_epoch_protocol_version(block_hash) @@ -2141,6 +2348,20 @@ impl Chain { Ok(()) } + /// If `block` is committed, the `last_final_block(block)` is final. + /// Thus it is enough to keep only chunks which are built on top of block + /// with `height(last_final_block(block))` or higher. + fn update_optimistic_blocks_pool(&mut self, block: &Block) -> Result<(), Error> { + let final_block = block.header().last_final_block(); + if final_block == &CryptoHash::default() { + return Ok(()); + } + + let final_block_height = self.chain_store.get_block_header(final_block)?.height(); + self.optimistic_block_chunks.update_minimal_base_height(final_block_height); + Ok(()) + } + /// Preprocess a block before applying chunks, verify that we have the necessary information /// to process the block and the block is valid. /// Note that this function does NOT introduce any changes to chain state. @@ -2157,7 +2378,7 @@ impl Chain { let header = block.header(); // see if the block is already in processing or if there are too many blocks being processed - self.blocks_in_processing.add_dry_run(block.hash())?; + self.blocks_in_processing.add_dry_run(&BlockToApply::Normal(*block.hash()))?; debug!(target: "chain", height=header.height(), num_approvals = header.num_approvals(), "preprocess_block"); @@ -3056,8 +3277,11 @@ impl Chain { let storage_context = StorageContext { storage_data_source: StorageDataSource::Db, state_patch }; - let stateful_job = self.get_update_shard_job( + let cached_shard_update_key = + Self::get_cached_shard_update_key(&block_context, chunk_headers, shard_id)?; + let job = self.get_update_shard_job( me, + cached_shard_update_key, block_context, chunk_headers, shard_index, @@ -3067,7 +3291,7 @@ impl Chain { incoming_receipts, storage_context, ); - maybe_jobs.push((shard_id, stateful_job)); + maybe_jobs.push((shard_id, job)); } let mut jobs = vec![]; @@ -3137,10 +3361,37 @@ impl Chain { Ok(ShardContext { shard_uid, should_apply_chunk }) } + /// Get a key which can uniquely define result of applying a chunk based on + /// block execution context and other chunks. + fn get_cached_shard_update_key( + block_context: &ApplyChunkBlockContext, + chunk_headers: &Chunks, + shard_id: ShardId, + ) -> Result { + const BYTES_LEN: usize = + size_of::() + size_of::() + size_of::(); + + let mut bytes: Vec = Vec::with_capacity(BYTES_LEN); + let block = OptimisticBlockKeySource { + height: block_context.height, + prev_block_hash: block_context.prev_block_hash, + block_timestamp: block_context.block_timestamp, + random_seed: block_context.random_seed, + }; + bytes.extend_from_slice(&hash(&borsh::to_vec(&block)?).0); + + let chunks_key_source: Vec<_> = chunk_headers.iter_raw().map(|c| c.chunk_hash()).collect(); + bytes.extend_from_slice(&hash(&borsh::to_vec(&chunks_key_source)?).0); + bytes.extend_from_slice(&shard_id.to_le_bytes()); + + Ok(CachedShardUpdateKey::new(hash(&bytes))) + } + /// This method returns the closure that is responsible for updating a shard. fn get_update_shard_job( &self, me: &Option, + cached_shard_update_key: CachedShardUpdateKey, block: ApplyChunkBlockContext, chunk_headers: &Chunks, shard_index: ShardIndex, @@ -3150,8 +3401,12 @@ impl Chain { incoming_receipts: Option<&Vec>, storage_context: StorageContext, ) -> Result, Error> { - let _span = tracing::debug_span!(target: "chain", "get_update_shard_job").entered(); let prev_hash = prev_block.hash(); + let block_height = block.height; + let _span = + tracing::debug_span!(target: "chain", "get_update_shard_job", ?prev_hash, block_height) + .entered(); + let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_hash)?; let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?; let shard_id = shard_layout.get_shard_id(shard_index)?; @@ -3162,9 +3417,19 @@ impl Chain { } let chunk_header = chunk_headers.get(shard_index).ok_or(Error::InvalidShardId(shard_id))?; - let block_height = block.height; let is_new_chunk = chunk_header.is_new_chunk(block_height); + if let Some(result) = self.apply_chunk_results_cache.peek(&cached_shard_update_key) { + info!(target: "chain", ?shard_id, ?cached_shard_update_key, "Using cached ShardUpdate result"); + let result = result.clone(); + return Ok(Some(( + shard_id, + cached_shard_update_key, + Box::new(move |_| -> Result { Ok(result) }), + ))); + } + info!(target: "chain", ?shard_id, ?cached_shard_update_key, "Creating ShardUpdate job"); + let shard_update_reason = if is_new_chunk { // Validate new chunk and collect incoming receipts for it. let prev_chunk_extra = self.get_chunk_extra(prev_hash, &shard_context.shard_uid)?; @@ -3186,8 +3451,6 @@ impl Chain { warn!( target: "chain", ?err, - prev_block_hash=?prev_hash, - block_height, ?shard_id, prev_chunk_height_included, ?prev_chunk_extra, @@ -3257,6 +3520,7 @@ impl Chain { let runtime = self.runtime_adapter.clone(); Ok(Some(( shard_id, + cached_shard_update_key, Box::new(move |parent_span| -> Result { Ok(process_shard_update( parent_span, @@ -3731,7 +3995,7 @@ impl Chain { /// Check if hash is for a block that is being processed #[inline] pub fn is_in_processing(&self, hash: &CryptoHash) -> bool { - self.blocks_in_processing.contains(hash) + self.blocks_in_processing.contains(&BlockToApply::Normal(*hash)) } #[inline] @@ -3886,18 +4150,17 @@ impl Chain { } pub fn do_apply_chunks( - block_hash: CryptoHash, + block: BlockToApply, block_height: BlockHeight, work: Vec, -) -> Vec<(ShardId, Result)> { +) -> Vec<(ShardId, CachedShardUpdateKey, Result)> { let parent_span = - tracing::debug_span!(target: "chain", "do_apply_chunks", block_height, %block_hash) - .entered(); + tracing::debug_span!(target: "chain", "do_apply_chunks", block_height, ?block).entered(); work.into_par_iter() - .map(|(shard_id, task)| { + .map(|(shard_id, cached_shard_update_key, task)| { // As chunks can be processed in parallel, make sure they are all tracked as children of // a single span. - (shard_id, task(&parent_span)) + (shard_id, cached_shard_update_key, task(&parent_span)) }) .collect() } diff --git a/chain/chain/src/chain_update.rs b/chain/chain/src/chain_update.rs index b5d6b36fcf0..0a591a5babe 100644 --- a/chain/chain/src/chain_update.rs +++ b/chain/chain/src/chain_update.rs @@ -125,7 +125,7 @@ impl<'a> ChainUpdate<'a> { )?; self.chain_store_update.merge(store_update.into()); - self.chain_store_update.save_trie_changes(apply_result.trie_changes); + self.chain_store_update.save_trie_changes(*block_hash, apply_result.trie_changes); self.chain_store_update.save_outgoing_receipt( block_hash, shard_id, @@ -167,7 +167,7 @@ impl<'a> ChainUpdate<'a> { self.chain_store_update.merge(store_update.into()); self.chain_store_update.save_chunk_extra(block_hash, &shard_uid, new_extra); - self.chain_store_update.save_trie_changes(apply_result.trie_changes); + self.chain_store_update.save_trie_changes(*block_hash, apply_result.trie_changes); if should_save_state_transition_data { self.chain_store_update.save_state_transition_data( *block_hash, @@ -580,7 +580,7 @@ impl<'a> ChainUpdate<'a> { )?; self.chain_store_update.merge(store_update.into()); - self.chain_store_update.save_trie_changes(apply_result.trie_changes); + self.chain_store_update.save_trie_changes(*block_header.hash(), apply_result.trie_changes); let chunk_extra = ChunkExtra::new( protocol_version, @@ -683,7 +683,7 @@ impl<'a> ChainUpdate<'a> { apply_result.trie_changes.state_changes(), )?; self.chain_store_update.merge(store_update.into()); - self.chain_store_update.save_trie_changes(apply_result.trie_changes); + self.chain_store_update.save_trie_changes(*block_header.hash(), apply_result.trie_changes); // The chunk is missing but some fields may need to be updated // anyway. Prepare a chunk extra as a copy of the old chunk diff --git a/chain/chain/src/missing_chunks.rs b/chain/chain/src/missing_chunks.rs index d0cf1c958ec..829a1e46729 100644 --- a/chain/chain/src/missing_chunks.rs +++ b/chain/chain/src/missing_chunks.rs @@ -1,5 +1,7 @@ use near_primitives::hash::CryptoHash; -use near_primitives::sharding::ChunkHash; +use near_primitives::optimistic_block::OptimisticBlock; +use near_primitives::shard_layout::ShardLayout; +use near_primitives::sharding::{ChunkHash, ShardChunkHeader}; use near_primitives::types::BlockHeight; use std::cmp::Ordering; use std::collections::{ @@ -188,8 +190,203 @@ impl MissingChunksPool { } } +/// Stores chunks for some optimistic block received so far. +#[derive(Debug, Default)] +struct OptimisticBlockChunks { + /// Number of remaining chunks to be received. Stored to avoid naive + /// recomputation. + remaining_chunks: usize, + /// Height of the previous block. + prev_block_height: BlockHeight, + /// Stores chunks for each shard, if received. + chunks: Vec>, +} + +impl OptimisticBlockChunks { + pub fn new(prev_block_height: BlockHeight, num_shards: usize) -> Self { + Self { remaining_chunks: num_shards, prev_block_height, chunks: vec![None; num_shards] } + } +} +/// Stores optimistic blocks and chunks which are waiting to be processed. +/// +/// Once block and all chunks on top of the same previous block are received, +/// it can provide the optimistic block to be processed. +#[derive(Debug, Default)] +pub struct OptimisticBlockChunksPool { + /// All blocks and chunks built on top of blocks *smaller* than this height + /// can be discarded. Needed to garbage collect old data in case of forks + /// which never can get finalized. + minimal_base_height: BlockHeight, + /// Optimistic block with heights *not greater* than this height are discarded. + /// Needed on top of `minimal_base_height` as well to ensure that only one + /// optimistic block on each height can be processed. + block_height_threshold: BlockHeight, + /// Maps previous block hash to the received optimistic block with the + /// highest height on top of it. + blocks: HashMap, + /// Maps previous block hash to the vector of chunk headers corresponding + /// to complete chunks. + chunks: HashMap, + /// Optimistic block with the largest height which is ready to process. + /// Block and chunks must correspond to the same previous block hash. + latest_ready_block: Option<(OptimisticBlock, Vec)>, +} + +impl OptimisticBlockChunksPool { + pub fn new() -> Self { + Self { + minimal_base_height: 0, + block_height_threshold: 0, + blocks: Default::default(), + chunks: Default::default(), + latest_ready_block: None, + } + } + + pub fn num_blocks(&self) -> usize { + self.blocks.len() + } + + pub fn num_chunks(&self) -> usize { + self.chunks.len() + } + + pub fn add_block(&mut self, block: OptimisticBlock) { + if block.height() <= self.block_height_threshold { + return; + } + + let prev_block_hash = *block.prev_block_hash(); + self.blocks.insert(prev_block_hash, block); + self.update_latest_ready_block(&prev_block_hash); + } + + pub fn add_chunk(&mut self, shard_layout: &ShardLayout, chunk_header: ShardChunkHeader) { + // We assume that `chunk_header.height_created() = prev_block_height + 1`. + let prev_block_height = chunk_header.height_created().saturating_sub(1); + if prev_block_height < self.minimal_base_height { + return; + } + + let prev_block_hash = *chunk_header.prev_block_hash(); + let entry = self.chunks.entry(prev_block_hash).or_insert_with(|| { + OptimisticBlockChunks::new(prev_block_height, shard_layout.num_shards() as usize) + }); + + let shard_index = shard_layout.get_shard_index(chunk_header.shard_id()).unwrap(); + let chunk_entry = entry.chunks.get_mut(shard_index).unwrap(); + let chunk_hash = chunk_header.chunk_hash(); + if let Some(chunk) = &chunk_entry { + let existing_chunk_hash = chunk.chunk_hash(); + tracing::info!(target: "chunks", ?prev_block_hash, ?chunk_hash, ?existing_chunk_hash, "Chunk already found for OptimisticBlock"); + return; + } + + *chunk_entry = Some(chunk_header); + entry.remaining_chunks -= 1; + tracing::debug!( + target: "chunks", + ?prev_block_hash, + ?chunk_hash, + remaining_chunks = entry.remaining_chunks, + "New chunk found for OptimisticBlock" + ); + + if entry.remaining_chunks == 0 { + tracing::debug!( + target: "chunks", + ?prev_block_hash, + "All chunks received for OptimisticBlock" + ); + self.update_latest_ready_block(&prev_block_hash); + } + } + + /// Takes the latest optimistic block and chunks which are ready to + /// be processed. + pub fn take_latest_ready_block(&mut self) -> Option<(OptimisticBlock, Vec)> { + self.latest_ready_block.take() + } + + /// If the optimistic block on top of `prev_block_hash` is ready to + /// process, sets the latest ready block to it. + fn update_latest_ready_block(&mut self, prev_block_hash: &CryptoHash) { + let Some(chunks) = self.chunks.get(prev_block_hash) else { + return; + }; + if chunks.remaining_chunks != 0 { + return; + } + let Some(block) = self.blocks.remove(prev_block_hash) else { + return; + }; + if block.height() <= self.block_height_threshold { + return; + } + + tracing::info!( + target: "chunks", + ?prev_block_hash, + optimistic_block_hash = ?block.hash(), + block_height = block.height(), + "OptimisticBlock is ready" + ); + let chunks = chunks + .chunks + .iter() + .map(|c| { + let mut chunk = c.clone().unwrap(); + // Debatable but probably ok + *chunk.height_included_mut() = block.height(); + chunk + }) + .collect(); + self.update_block_height_threshold(block.height()); + self.latest_ready_block = Some((block, chunks)); + } + + /// Updates the block height threshold and cleans up old blocks. + pub fn update_block_height_threshold(&mut self, height: BlockHeight) { + self.block_height_threshold = std::cmp::max(self.block_height_threshold, height); + let hashes_to_remove: Vec<_> = self + .blocks + .iter() + .filter(|(_, h)| h.height() <= self.block_height_threshold) + .map(|(h, _)| *h) + .collect(); + for h in hashes_to_remove { + self.blocks.remove(&h); + } + + let Some((block, _)) = &self.latest_ready_block else { + return; + }; + if block.height() <= self.block_height_threshold { + self.latest_ready_block = None; + } + } + + /// Updates the minimal base height and cleans up old blocks and chunks. + pub fn update_minimal_base_height(&mut self, height: BlockHeight) { + // If *new* chunks must be built on top of *at least* this height, + // optimistic block height must be *strictly greater* than this height. + self.update_block_height_threshold(height); + + self.minimal_base_height = std::cmp::max(self.minimal_base_height, height); + let hashes_to_remove: Vec<_> = self + .chunks + .iter() + .filter(|(_, h)| h.prev_block_height < self.minimal_base_height) + .map(|(h, _)| *h) + .collect(); + for h in hashes_to_remove { + self.chunks.remove(&h); + } + } +} + #[cfg(test)] -mod test { +mod missing_chunks_test { use super::{BlockHash, BlockLike, MissingChunksPool, MAX_BLOCKS_MISSING_CHUNKS}; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::sharding::ChunkHash; @@ -294,3 +491,81 @@ mod test { assert!(pool.contains(&later_block_hash)); } } + +// TODO: tests for adding multiple blocks and reusing chunks. +#[cfg(test)] +mod optimistic_block_chunks_pool_test { + use super::{OptimisticBlock, OptimisticBlockChunksPool, ShardChunkHeader}; + use itertools::Itertools; + use near_primitives::hash::CryptoHash; + use near_primitives::shard_layout::ShardLayout; + use near_primitives::types::ShardId; + + #[test] + fn test_add_block() { + let mut pool = OptimisticBlockChunksPool::new(); + let prev_hash = CryptoHash::default(); + let block = OptimisticBlock::new_dummy(1, prev_hash); + + pool.add_block(block); + assert_eq!(pool.num_blocks(), 1); + assert!(pool.blocks.contains_key(&prev_hash)); + } + + #[test] + fn test_add_chunk() { + let mut pool = OptimisticBlockChunksPool::new(); + let prev_hash = CryptoHash::default(); + let shard_layout = ShardLayout::single_shard(); + let chunk_header = ShardChunkHeader::new_dummy(0, ShardId::new(0), prev_hash); + + pool.add_chunk(&shard_layout, chunk_header); + assert_eq!(pool.num_chunks(), 1); + assert!(pool.chunks.contains_key(&prev_hash)); + } + + #[test] + fn test_ready_block() { + let mut pool = OptimisticBlockChunksPool::new(); + let prev_hash = CryptoHash::default(); + let block = OptimisticBlock::new_dummy(1, prev_hash); + let shard_layout = ShardLayout::multi_shard(2, 3); + let shard_ids = shard_layout.shard_ids().collect_vec(); + + let chunk_header = ShardChunkHeader::new_dummy(0, shard_ids[0], prev_hash); + pool.add_block(block); + pool.add_chunk(&shard_layout, chunk_header); + + pool.update_latest_ready_block(&prev_hash); + assert!(pool.take_latest_ready_block().is_none()); + + let new_chunk_header = ShardChunkHeader::new_dummy(0, shard_ids[1], prev_hash); + pool.add_chunk(&shard_layout, new_chunk_header); + + assert!(pool.take_latest_ready_block().is_some()); + assert!(pool.take_latest_ready_block().is_none()); + } + + #[test] + fn test_thresholds() { + let mut pool = OptimisticBlockChunksPool::new(); + pool.block_height_threshold = 10; + pool.minimal_base_height = 5; + + let prev_hash = CryptoHash::default(); + let block_below_threshold = OptimisticBlock::new_dummy(10, prev_hash); + pool.add_block(block_below_threshold); + assert_eq!(pool.num_blocks(), 0, "Block below threshold should not be added"); + + let shard_layout = ShardLayout::single_shard(); + let chunk_header_below_threshold = + ShardChunkHeader::new_dummy(5, ShardId::new(0), prev_hash); + pool.add_chunk(&shard_layout, chunk_header_below_threshold); + assert_eq!(pool.num_chunks(), 0, "Chunk strictly below threshold should not be added"); + + let chunk_header_passing_threshold = + ShardChunkHeader::new_dummy(6, ShardId::new(0), prev_hash); + pool.add_chunk(&shard_layout, chunk_header_passing_threshold); + assert_eq!(pool.num_chunks(), 1); + } +} diff --git a/chain/chain/src/runtime/mod.rs b/chain/chain/src/runtime/mod.rs index 430aee3ae2e..9bf0885c13f 100644 --- a/chain/chain/src/runtime/mod.rs +++ b/chain/chain/src/runtime/mod.rs @@ -369,7 +369,6 @@ impl NightshadeRuntime { shard_uid, apply_result.trie_changes, apply_result.state_changes, - block_hash, apply_state.block_height, ), new_root: apply_result.state_root, diff --git a/chain/chain/src/runtime/tests.rs b/chain/chain/src/runtime/tests.rs index 43334b2284f..112714e2d39 100644 --- a/chain/chain/src/runtime/tests.rs +++ b/chain/chain/src/runtime/tests.rs @@ -299,7 +299,9 @@ impl TestEnv { let flat_state_changes = FlatStateChanges::from_state_changes(&apply_result.trie_changes.state_changes()); apply_result.trie_changes.insertions_into(&mut store_update.trie_store_update()); - apply_result.trie_changes.state_changes_into(&mut store_update.trie_store_update()); + apply_result + .trie_changes + .state_changes_into(&new_block_hash, &mut store_update.trie_store_update()); let prev_block_hash = self.head.last_block_hash; let epoch_id = diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index a3b74be1d0a..e4f9860e3e9 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -1104,7 +1104,7 @@ pub struct ChainStoreUpdate<'a> { header_head: Option, final_head: Option, largest_target_height: Option, - trie_changes: Vec, + trie_changes: Vec<(CryptoHash, WrappedTrieChanges)>, state_transition_data: HashMap<(CryptoHash, ShardId), StoredChunkStateTransitionData>, add_blocks_to_catchup: Vec<(CryptoHash, CryptoHash)>, // A pair (prev_hash, hash) to be removed from blocks to catchup @@ -1718,8 +1718,8 @@ impl<'a> ChainStoreUpdate<'a> { self.chain_store_cache_update.outcome_ids.insert((*block_hash, shard_id), outcome_ids); } - pub fn save_trie_changes(&mut self, trie_changes: WrappedTrieChanges) { - self.trie_changes.push(trie_changes); + pub fn save_trie_changes(&mut self, block_hash: CryptoHash, trie_changes: WrappedTrieChanges) { + self.trie_changes.push((block_hash, trie_changes)); } pub fn save_state_transition_data( @@ -2082,14 +2082,16 @@ impl<'a> ChainStoreUpdate<'a> { { let _span = tracing::trace_span!(target: "store", "write_trie_changes").entered(); let mut deletions_store_update = self.store().trie_store().store_update(); - for mut wrapped_trie_changes in self.trie_changes.drain(..) { + for (block_hash, mut wrapped_trie_changes) in self.trie_changes.drain(..) { wrapped_trie_changes.apply_mem_changes(); wrapped_trie_changes.insertions_into(&mut store_update.trie_store_update()); wrapped_trie_changes.deletions_into(&mut deletions_store_update); - wrapped_trie_changes.state_changes_into(&mut store_update.trie_store_update()); + wrapped_trie_changes + .state_changes_into(&block_hash, &mut store_update.trie_store_update()); if self.chain_store.save_trie_changes { - wrapped_trie_changes.trie_changes_into(&mut store_update.trie_store_update()); + wrapped_trie_changes + .trie_changes_into(&block_hash, &mut store_update.trie_store_update()); } } diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index 1088637deca..74c199ce8ff 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -19,6 +19,7 @@ use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::{EpochManager, EpochManagerHandle}; use near_primitives::block::Block; use near_primitives::hash::CryptoHash; +use near_primitives::optimistic_block::BlockToApply; use near_primitives::stateless_validation::ChunkProductionKey; use near_primitives::test_utils::create_test_signer; use near_primitives::types::{AccountId, NumBlocks, NumShards}; @@ -90,7 +91,7 @@ pub fn wait_for_all_blocks_in_processing(chain: &Chain) -> bool { } pub fn is_block_in_processing(chain: &Chain, block_hash: &CryptoHash) -> bool { - chain.blocks_in_processing.contains(block_hash) + chain.blocks_in_processing.contains(&BlockToApply::Normal(*block_hash)) } pub fn wait_for_block_in_processing( diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index edd6b4d0287..f220b141f58 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -1175,7 +1175,6 @@ impl RuntimeAdapter for KeyValueRuntime { ShardUId::new(0, shard_id), TrieChanges::empty(state_root), Default::default(), - block.block_hash, block.height, ), new_root: state_root, diff --git a/chain/chain/src/tests/garbage_collection.rs b/chain/chain/src/tests/garbage_collection.rs index 2da84c321ca..1001fea2f8a 100644 --- a/chain/chain/src/tests/garbage_collection.rs +++ b/chain/chain/src/tests/garbage_collection.rs @@ -123,10 +123,9 @@ fn do_fork( shard_uid, trie_changes, Default::default(), - *block.hash(), block.header().height(), ); - store_update.save_trie_changes(wrapped_trie_changes); + store_update.save_trie_changes(*block.hash(), wrapped_trie_changes); prev_state_roots[shard_id as usize] = new_root; trie_changes_shards.push(trie_changes_data); diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 3cb383653da..9ea87be87fb 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -87,7 +87,7 @@ pub struct AcceptedBlock { pub provenance: Provenance, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ApplyChunkResult { pub trie_changes: WrappedTrieChanges, pub new_root: StateRoot, diff --git a/chain/chain/src/update_shard.rs b/chain/chain/src/update_shard.rs index 05550deedcd..8f0c71abdd4 100644 --- a/chain/chain/src/update_shard.rs +++ b/chain/chain/src/update_shard.rs @@ -17,7 +17,7 @@ use node_runtime::SignedValidPeriodTransactions; /// Result of updating a shard for some block when it has a new chunk for this /// shard. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NewChunkResult { pub shard_uid: ShardUId, pub gas_limit: Gas, @@ -26,7 +26,7 @@ pub struct NewChunkResult { /// Result of updating a shard for some block when it doesn't have a new chunk /// for this shard, so previous chunk header is copied. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct OldChunkResult { pub shard_uid: ShardUId, /// Note that despite the naming, no transactions are applied in this case. @@ -35,7 +35,7 @@ pub struct OldChunkResult { } /// Result for a shard update for a single block. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ShardUpdateResult { NewChunk(NewChunkResult), OldChunk(OldChunkResult), diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index f08ff6f3f36..b6d8a33ec82 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -5,6 +5,7 @@ use crate::chunk_distribution_network::{ChunkDistributionClient, ChunkDistributionNetwork}; use crate::chunk_inclusion_tracker::ChunkInclusionTracker; use crate::chunk_producer::ChunkProducer; +use crate::client_actor::ClientSenderForClient; use crate::debug::BlockProductionTracker; use crate::metrics; use crate::stateless_validation::chunk_endorsement::ChunkEndorsementTracker; @@ -143,6 +144,8 @@ pub struct Client { state_sync_future_spawner: Arc, /// Sender for catchup state sync requests. chain_sender_for_state_sync: ChainSenderForStateSync, + // Sender to be able to send a message to myself. + pub myself_sender: ClientSenderForClient, /// List of currently accumulated challenges. pub challenges: HashMap, /// Blocks that have been re-broadcast recently. They should not be broadcast again. @@ -222,6 +225,7 @@ impl Client { resharding_sender: ReshardingSender, state_sync_future_spawner: Arc, chain_sender_for_state_sync: ChainSenderForStateSync, + myself_sender: ClientSenderForClient, upgrade_schedule: ProtocolUpgradeVotingSchedule, ) -> Result { let doomslug_threshold_mode = if enable_doomslug { @@ -348,6 +352,7 @@ impl Client { catchup_state_syncs: HashMap::new(), state_sync_future_spawner, chain_sender_for_state_sync, + myself_sender, challenges: Default::default(), rebroadcasted_blocks: lru::LruCache::new( NonZeroUsize::new(NUM_REBROADCAST_BLOCKS).unwrap(), @@ -1026,6 +1031,13 @@ impl Client { res } + #[allow(unused)] + pub fn receive_optimistic_block(&mut self, block: OptimisticBlock) { + let _span = debug_span!(target: "client", "receive_optimistic_block").entered(); + self.chain.optimistic_block_chunks.add_block(block); + self.maybe_process_optimistic_block(); + } + /// To protect ourselves from spamming, we do some pre-check on block height before we do any /// processing. This function returns true if the block height is valid. fn check_block_height( @@ -1307,8 +1319,10 @@ impl Client { .expect("Could not persist chunk"); // We're marking chunk as accepted. self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash()); + self.chain.optimistic_block_chunks.add_chunk(&shard_layout, chunk_header); // If this was the last chunk that was missing for a block, it will be processed now. self.process_blocks_with_missing_chunks(apply_chunks_done_sender, &signer); + self.maybe_process_optimistic_block(); } /// Called asynchronously when the ShardsManager finishes processing a chunk but the chunk @@ -1822,6 +1836,42 @@ impl Client { self.process_block_processing_artifact(blocks_processing_artifacts, signer); } + pub fn maybe_process_optimistic_block(&mut self) { + let Some((block, chunks)) = self.chain.optimistic_block_chunks.take_latest_ready_block() + else { + return; + }; + + let signer = self.validator_signer.get(); + let me = signer.as_ref().map(|signer| signer.validator_id()); + let prev_block_hash = *block.prev_block_hash(); + let block_hash = *block.hash(); + let block_height = block.height(); + let apply_chunks_done_sender = self.myself_sender.apply_chunks_done.clone(); + match self.chain.process_optimistic_block( + &me.map(|x| x.clone()), + block, + chunks, + apply_chunks_done_sender, + ) { + Ok(()) => { + info!( + target: "chain", prev_block_hash = ?prev_block_hash, + hash = ?block_hash, height = block_height, + "Processed optimistic block" + ); + } + Err(err) => { + warn!( + target: "chain", err = ?err, + prev_block_hash = ?prev_block_hash, + hash = ?block_hash, height = block_height, + "Failed to process optimistic block" + ); + } + } + } + pub fn is_validator(&self, epoch_id: &EpochId, signer: &Option>) -> bool { match signer { None => false, diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 5c3144853fd..dce338521ff 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -149,6 +149,7 @@ pub fn start_client( wait_until_genesis(&chain_genesis.time); let chain_sender_for_state_sync = LateBoundSender::::new(); + let client_sender_for_client = LateBoundSender::::new(); let client = Client::new( clock.clone(), client_config, @@ -167,6 +168,7 @@ pub fn start_client( resharding_sender, state_sync_future_spawner, chain_sender_for_state_sync.as_multi_sender(), + client_sender_for_client.as_multi_sender(), PROTOCOL_UPGRADE_SCHEDULE.clone(), ) .unwrap(); @@ -176,13 +178,10 @@ pub fn start_client( let sync_jobs_actor = SyncJobsActor::new(client_sender_for_sync_jobs.as_multi_sender()); let sync_jobs_actor_addr = sync_jobs_actor.spawn_actix_actor(); - let client_sender_for_client = LateBoundSender::::new(); - let client_sender_for_client_clone = client_sender_for_client.clone(); let client_addr = ClientActor::start_in_arbiter(&client_arbiter_handle, move |_| { let client_actor_inner = ClientActorInner::new( clock, client, - client_sender_for_client_clone.as_multi_sender(), node_id, network_adapter, telemetry_sender, @@ -225,8 +224,6 @@ pub struct ClientActorInner { /// Adversarial controls pub adv: crate::adversarial::Controls, - // Sender to be able to send a message to myself. - myself_sender: ClientSenderForClient, pub client: Client, network_adapter: PeerManagerAdapter, network_info: NetworkInfo, @@ -332,7 +329,6 @@ impl ClientActorInner { pub fn new( clock: Clock, client: Client, - myself_sender: ClientSenderForClient, node_id: PeerId, network_adapter: PeerManagerAdapter, telemetry_sender: Sender, @@ -351,7 +347,6 @@ impl ClientActorInner { Ok(ClientActorInner { clock, adv, - myself_sender, client, network_adapter, node_id, @@ -523,7 +518,7 @@ impl Handler for ClientActorInner { block, peer_id, was_requested, - Some(self.myself_sender.apply_chunks_done.clone()), + Some(self.client.myself_sender.apply_chunks_done.clone()), &signer, ); } else { @@ -1276,7 +1271,7 @@ impl ClientActorInner { fn try_process_unfinished_blocks(&mut self, signer: &Option>) { let _span = debug_span!(target: "client", "try_process_unfinished_blocks").entered(); let (accepted_blocks, errors) = self.client.postprocess_ready_blocks( - Some(self.myself_sender.apply_chunks_done.clone()), + Some(self.client.myself_sender.apply_chunks_done.clone()), true, signer, ); @@ -1348,7 +1343,7 @@ impl ClientActorInner { let res = self.client.start_process_block( block, Provenance::PRODUCED, - Some(self.myself_sender.apply_chunks_done.clone()), + Some(self.client.myself_sender.apply_chunks_done.clone()), signer, ); let Err(error) = res else { @@ -1392,6 +1387,8 @@ impl ClientActorInner { // We’ve produced the optimistic block, mark it as done so we don't produce it again. self.client.save_optimistic_block(&optimistic_block); + self.client.chain.optimistic_block_chunks.add_block(optimistic_block); + self.client.maybe_process_optimistic_block(); Ok(()) } @@ -1572,7 +1569,7 @@ impl ClientActorInner { if let Err(err) = self.client.run_catchup( &self.network_info.highest_height_peers, &self.sync_jobs_sender.block_catch_up, - Some(self.myself_sender.apply_chunks_done.clone()), + Some(self.client.myself_sender.apply_chunks_done.clone()), &validator_signer, ) { error!(target: "client", "{:?} Error occurred during catchup for the next epoch: {:?}", validator_signer.as_ref().map(|vs| vs.validator_id()), err); @@ -1686,7 +1683,7 @@ impl ClientActorInner { highest_height, &self.network_info.highest_height_peers, signer, - Some(self.myself_sender.apply_chunks_done.clone()), + Some(self.client.myself_sender.apply_chunks_done.clone()), ); let Some(sync_step_result) = sync_step_result else { return; @@ -1866,7 +1863,7 @@ impl ClientActorInner { let _ = self.client.start_process_block( block.into(), Provenance::PRODUCED, - Some(self.myself_sender.apply_chunks_done.clone()), + Some(self.client.myself_sender.apply_chunks_done.clone()), &signer, ); blocks_produced += 1; @@ -1903,7 +1900,7 @@ impl Handler for ClientActorInner { self.client.on_chunk_completed( partial_chunk, shard_chunk, - Some(self.myself_sender.apply_chunks_done.clone()), + Some(self.client.myself_sender.apply_chunks_done.clone()), &signer, ); } diff --git a/chain/client/src/sync_jobs_actor.rs b/chain/client/src/sync_jobs_actor.rs index 8aebe3d551c..77e95f47cb7 100644 --- a/chain/client/src/sync_jobs_actor.rs +++ b/chain/client/src/sync_jobs_actor.rs @@ -4,6 +4,7 @@ use near_async::messaging::{self, CanSend, Handler, Sender}; use near_async::{MultiSend, MultiSenderFrom}; use near_chain::chain::{do_apply_chunks, BlockCatchUpRequest, BlockCatchUpResponse}; use near_performance_metrics_macros::perf; +use near_primitives::optimistic_block::BlockToApply; // Set the mailbox capacity for the SyncJobsActor from default 16 to 100. const MAILBOX_CAPACITY: usize = 100; @@ -43,7 +44,11 @@ impl SyncJobsActor { pub fn handle_block_catch_up_request(&mut self, msg: BlockCatchUpRequest) { tracing::debug!(target: "sync", ?msg); - let results = do_apply_chunks(msg.block_hash, msg.block_height, msg.work); + let results = + do_apply_chunks(BlockToApply::Normal(msg.block_hash), msg.block_height, msg.work) + .into_iter() + .map(|res| (res.0, res.2)) + .collect(); self.client_sender.send(BlockCatchUpResponse { sync_hash: msg.sync_hash, diff --git a/chain/client/src/test_utils/client.rs b/chain/client/src/test_utils/client.rs index fcd4df145c9..e4524163a1c 100644 --- a/chain/client/src/test_utils/client.rs +++ b/chain/client/src/test_utils/client.rs @@ -19,6 +19,7 @@ use near_network::types::HighestHeightPeerInfo; use near_primitives::block::Block; use near_primitives::hash::CryptoHash; use near_primitives::merkle::{merklize, PartialMerkleTree}; +use near_primitives::optimistic_block::BlockToApply; use near_primitives::sharding::{EncodedShardChunk, ShardChunk}; use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement; use near_primitives::transaction::SignedTransaction; @@ -313,10 +314,11 @@ pub fn run_catchup( client.run_catchup(highest_height_peers, &block_catch_up, None, &signer)?; let mut catchup_done = true; for msg in block_messages.write().unwrap().drain(..) { - let results = do_apply_chunks(msg.block_hash, msg.block_height, msg.work) - .into_iter() - .map(|res| res.1) - .collect_vec(); + let results = + do_apply_chunks(BlockToApply::Normal(msg.block_hash), msg.block_height, msg.work) + .into_iter() + .map(|res| res.2) + .collect_vec(); if let Some(CatchupState { catchup, .. }) = client.catchup_state_syncs.get_mut(&msg.sync_hash) { diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index b3c51e696f5..5aa79d4f9b4 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -1086,6 +1086,7 @@ pub fn setup_client_with_runtime( resharding_sender, Arc::new(ActixFutureSpawner), noop().into_multi_sender(), // state sync ignored for these tests + noop().into_multi_sender(), // apply chunks ping not necessary for these tests PROTOCOL_UPGRADE_SCHEDULE.clone(), ) .unwrap(); diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 92dfde6904f..47636bcb5ec 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -181,7 +181,7 @@ impl TestEnv { capture.set_callback(move |msg| { if msg.starts_with("do_apply_chunks") { let cell = paused_blocks.lock().unwrap().iter().find_map(|(block_hash, cell)| { - if msg.contains(&format!("block_hash={block_hash}")) { + if msg.contains(&format!("block=Normal({block_hash})")) { Some(Arc::clone(cell)) } else { None diff --git a/core/primitives-core/src/code.rs b/core/primitives-core/src/code.rs index dc2e4691723..3f08c0424b0 100644 --- a/core/primitives-core/src/code.rs +++ b/core/primitives-core/src/code.rs @@ -2,6 +2,7 @@ use std::fmt::{Debug, Formatter}; use crate::hash::{hash as sha256, CryptoHash}; +#[derive(Clone)] pub struct ContractCode { code: Vec, hash: CryptoHash, diff --git a/core/primitives/src/block.rs b/core/primitives/src/block.rs index a0b5193a95f..0bafccf352a 100644 --- a/core/primitives/src/block.rs +++ b/core/primitives/src/block.rs @@ -348,6 +348,7 @@ impl Block { let (time, vrf_value, vrf_proof, random_value) = optimistic_block .as_ref() .map(|ob| { + tracing::debug!(target: "client", "Taking metadata from optimistic block"); ( ob.inner.block_timestamp, ob.inner.vrf_value, @@ -793,6 +794,13 @@ impl<'a> Chunks<'a> { Self { chunks, block_height: block.header().height() } } + pub fn from_chunk_headers( + chunk_headers: &'a [ShardChunkHeader], + block_height: BlockHeight, + ) -> Self { + Self { chunks: ChunksCollection::V2(chunk_headers), block_height } + } + pub fn len(&self) -> usize { match &self.chunks { ChunksCollection::V1(chunks) => chunks.len(), diff --git a/core/primitives/src/optimistic_block.rs b/core/primitives/src/optimistic_block.rs index a102ae4e361..b16deed1b19 100644 --- a/core/primitives/src/optimistic_block.rs +++ b/core/primitives/src/optimistic_block.rs @@ -1,10 +1,12 @@ -use borsh::{BorshDeserialize, BorshSerialize}; -use near_crypto::Signature; -use near_schema_checker_lib::ProtocolSchema; - use crate::block::BlockHeader; use crate::hash::{hash, CryptoHash}; use crate::types::{BlockHeight, SignatureDifferentiator}; +use borsh::{BorshDeserialize, BorshSerialize}; +use near_crypto::{InMemorySigner, Signature}; +use near_primitives_core::types::AccountId; +use near_schema_checker_lib::ProtocolSchema; +use std::fmt::Debug; +use std::str::FromStr; #[derive(BorshSerialize, BorshDeserialize, Clone, Debug, Eq, PartialEq, ProtocolSchema)] pub struct OptimisticBlockInner { @@ -66,4 +68,73 @@ impl OptimisticBlock { pub fn init(&mut self) { self.hash = hash(&borsh::to_vec(&self.inner).expect("Failed to serialize")); } + + pub fn height(&self) -> BlockHeight { + self.inner.block_height + } + + pub fn hash(&self) -> &CryptoHash { + &self.hash + } + + pub fn prev_block_hash(&self) -> &CryptoHash { + &self.inner.prev_block_hash + } + + pub fn block_timestamp(&self) -> u64 { + self.inner.block_timestamp + } + + pub fn random_value(&self) -> &CryptoHash { + &self.inner.random_value + } + + pub fn new_dummy(height: BlockHeight, prev_hash: CryptoHash) -> Self { + let signer = InMemorySigner::test_signer(&AccountId::from_str("test".into()).unwrap()); + let (vrf_value, vrf_proof) = signer.compute_vrf_with_proof(Default::default()); + Self { + inner: OptimisticBlockInner { + block_height: height, + prev_block_hash: prev_hash, + block_timestamp: 0, + random_value: Default::default(), + vrf_value, + vrf_proof, + signature_differentiator: "test".to_string(), + }, + signature: Default::default(), + hash: Default::default(), + } + } +} + +/// Optimistic block fields which are enough to define unique context for +/// applying chunks in that block. Thus hash of this struct can be used to +/// cache *valid* optimistic blocks. +/// +/// This struct is created just so that we can conveniently derive and use +/// `borsh` serialization for it. +#[derive(BorshSerialize)] +pub struct OptimisticBlockKeySource { + pub height: BlockHeight, + pub prev_block_hash: CryptoHash, + pub block_timestamp: u64, + pub random_seed: CryptoHash, +} + +#[derive(Debug, Clone)] +pub enum BlockToApply { + Normal(CryptoHash), + Optimistic(CryptoHash), +} + +#[derive(Hash, PartialEq, Eq, Clone, Copy, Debug)] +pub struct CachedShardUpdateKey(CryptoHash); + +impl CachedShardUpdateKey { + /// Explicit constructor to minimize the risk of using hashes of other + /// entities accidentally. + pub fn new(hash: CryptoHash) -> Self { + Self(hash) + } } diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 76825f85991..1471511531e 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -6,11 +6,12 @@ use crate::receipt::Receipt; use crate::transaction::SignedTransaction; use crate::types::validator_stake::{ValidatorStake, ValidatorStakeIter, ValidatorStakeV1}; use crate::types::{Balance, BlockHeight, Gas, MerkleHash, ShardId, StateRoot}; -use crate::validator_signer::ValidatorSigner; +use crate::validator_signer::{EmptyValidatorSigner, ValidatorSigner}; use crate::version::{ProtocolFeature, ProtocolVersion, SHARD_CHUNK_HEADER_UPGRADE_VERSION}; use borsh::{BorshDeserialize, BorshSerialize}; use near_crypto::Signature; use near_fmt::AbbrBytes; +use near_primitives_core::version::PROTOCOL_VERSION; use near_schema_checker_lib::ProtocolSchema; use shard_chunk_header_inner::ShardChunkHeaderInnerV4; use std::cmp::Ordering; @@ -344,6 +345,32 @@ pub enum ShardChunkHeader { } impl ShardChunkHeader { + pub fn new_dummy(height: BlockHeight, shard_id: ShardId, prev_block_hash: CryptoHash) -> Self { + let congestion_info = ProtocolFeature::CongestionControl + .enabled(PROTOCOL_VERSION) + .then_some(CongestionInfo::default()); + + ShardChunkHeader::V3(ShardChunkHeaderV3::new( + PROTOCOL_VERSION, + prev_block_hash, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + height, + shard_id, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + congestion_info, + BandwidthRequests::default_for_protocol_version(PROTOCOL_VERSION), + &EmptyValidatorSigner::default().into(), + )) + } + #[inline] pub fn take_inner(self) -> ShardChunkHeaderInner { match self { @@ -362,6 +389,10 @@ impl ShardChunkHeader { hash(&inner_bytes.expect("Failed to serialize")) } + /// Height at which the chunk was created. + /// TODO: this is always `height(prev_block_hash) + 1`. Consider using + /// `prev_block_height` instead as this is more explicit and + /// `height_created` also conflicts with `height_included`. #[inline] pub fn height_created(&self) -> BlockHeight { match self { diff --git a/core/primitives/src/stateless_validation/contract_distribution.rs b/core/primitives/src/stateless_validation/contract_distribution.rs index 6e4b5f8a170..6fa8e3672bb 100644 --- a/core/primitives/src/stateless_validation/contract_distribution.rs +++ b/core/primitives/src/stateless_validation/contract_distribution.rs @@ -370,7 +370,7 @@ impl Into for CodeBytes { } /// Contains the accesses and changes (eg. deployments) to the contracts while applying a chunk. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct ContractUpdates { /// Code-hashes of the contracts accessed (called) while applying the chunk. pub contract_accesses: HashSet, diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index 227c54c7555..eefe9bedc2a 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -1,22 +1,17 @@ -use std::collections::HashMap; use std::fmt::Debug; +use {crate::state::PartialState, std::collections::HashMap}; use super::ChunkProductionKey; -use crate::bandwidth_scheduler::BandwidthRequests; -use crate::congestion_info::CongestionInfo; #[cfg(feature = "solomon")] use crate::reed_solomon::{ReedSolomonEncoderDeserialize, ReedSolomonEncoderSerialize}; -use crate::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader, ShardChunkHeaderV3}; -use crate::state::PartialState; +use crate::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader}; use crate::transaction::SignedTransaction; use crate::types::{EpochId, SignatureDifferentiator}; use crate::utils::compression::CompressedData; -use crate::validator_signer::EmptyValidatorSigner; use borsh::{BorshDeserialize, BorshSerialize}; use bytesize::ByteSize; use near_primitives_core::hash::CryptoHash; use near_primitives_core::types::{AccountId, BlockHeight, ShardId}; -use near_primitives_core::version::{ProtocolFeature, PROTOCOL_VERSION}; use near_schema_checker_lib::ProtocolSchema; /// Represents max allowed size of the raw (not compressed) state witness, @@ -199,29 +194,7 @@ impl ChunkStateWitness { } pub fn new_dummy(height: BlockHeight, shard_id: ShardId, prev_block_hash: CryptoHash) -> Self { - let congestion_info = ProtocolFeature::CongestionControl - .enabled(PROTOCOL_VERSION) - .then_some(CongestionInfo::default()); - - let header = ShardChunkHeader::V3(ShardChunkHeaderV3::new( - PROTOCOL_VERSION, - prev_block_hash, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - height, - shard_id, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - congestion_info, - BandwidthRequests::default_for_protocol_version(PROTOCOL_VERSION), - &EmptyValidatorSigner::default().into(), - )); + let header = ShardChunkHeader::new_dummy(height, shard_id, prev_block_hash); Self::new( "alice.near".parse().unwrap(), EpochId::default(), diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 5a007a33f91..01450f5c72a 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -606,12 +606,12 @@ impl ShardTries { } } +#[derive(Clone)] pub struct WrappedTrieChanges { tries: ShardTries, shard_uid: ShardUId, trie_changes: TrieChanges, state_changes: Vec, - block_hash: CryptoHash, block_height: BlockHeight, } @@ -624,7 +624,6 @@ impl std::fmt::Debug for WrappedTrieChanges { .field("shard_uid", &self.shard_uid) .field("trie_changes", &"") .field("state_changes", &"") - .field("block_hash", &self.block_hash) .field("block_height", &self.block_height) .finish() } @@ -636,17 +635,9 @@ impl WrappedTrieChanges { shard_uid: ShardUId, trie_changes: TrieChanges, state_changes: Vec, - block_hash: CryptoHash, block_height: BlockHeight, ) -> Self { - WrappedTrieChanges { - tries, - shard_uid, - trie_changes, - state_changes, - block_hash, - block_height, - } + WrappedTrieChanges { tries, shard_uid, trie_changes, state_changes, block_height } } pub fn state_changes(&self) -> &[RawStateChangesWithTrieKey] { @@ -677,7 +668,11 @@ impl WrappedTrieChanges { fields(num_state_changes = self.state_changes.len(), shard_id = ?self.shard_uid.shard_id()), skip_all, )] - pub fn state_changes_into(&mut self, store_update: &mut TrieStoreUpdateAdapter) { + pub fn state_changes_into( + &mut self, + block_hash: &CryptoHash, + store_update: &mut TrieStoreUpdateAdapter, + ) { for mut change_with_trie_key in self.state_changes.drain(..) { assert!( !change_with_trie_key.changes.iter().any(|RawStateChange { cause, .. }| matches!( @@ -699,15 +694,12 @@ impl WrappedTrieChanges { let storage_key = match change_with_trie_key.trie_key.get_account_id() { // If a TrieKey itself doesn't identify the Shard, then we need to add shard id to the row key. None => KeyForStateChanges::delayed_receipt_key_from_trie_key( - &self.block_hash, + block_hash, &change_with_trie_key.trie_key, &self.shard_uid, ), // TrieKey has enough information to identify the shard it comes from. - _ => KeyForStateChanges::from_trie_key( - &self.block_hash, - &change_with_trie_key.trie_key, - ), + _ => KeyForStateChanges::from_trie_key(block_hash, &change_with_trie_key.trie_key), }; store_update.set_state_changes(storage_key, &change_with_trie_key); @@ -720,8 +712,12 @@ impl WrappedTrieChanges { "ShardTries::trie_changes_into", skip_all )] - pub fn trie_changes_into(&mut self, store_update: &mut TrieStoreUpdateAdapter) { - store_update.set_trie_changes(self.shard_uid, &self.block_hash, &self.trie_changes) + pub fn trie_changes_into( + &mut self, + block_hash: &CryptoHash, + store_update: &mut TrieStoreUpdateAdapter, + ) { + store_update.set_trie_changes(self.shard_uid, block_hash, &self.trie_changes) } } diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index 41f4b74953a..fb5b4dea4c4 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -650,6 +650,7 @@ impl TestLoopBuilder { resharding_sender.as_multi_sender(), Arc::new(self.test_loop.future_spawner()), client_adapter.as_multi_sender(), + client_adapter.as_multi_sender(), self.upgrade_schedule.clone(), ) .unwrap(); @@ -713,7 +714,6 @@ impl TestLoopBuilder { let client_actor = ClientActorInner::new( self.test_loop.clock(), client, - client_adapter.as_multi_sender(), peer_id.clone(), network_adapter.as_multi_sender(), noop().into_sender(), diff --git a/integration-tests/src/test_loop/tests/mod.rs b/integration-tests/src/test_loop/tests/mod.rs index 4fda42f66d7..998f4fc1750 100644 --- a/integration-tests/src/test_loop/tests/mod.rs +++ b/integration-tests/src/test_loop/tests/mod.rs @@ -14,6 +14,7 @@ mod in_memory_tries; mod max_receipt_size; mod multinode_stateless_validators; mod multinode_test_loop_example; +mod optimistic_block; mod protocol_upgrade; mod reject_outdated_blocks; mod resharding_v3; diff --git a/integration-tests/src/test_loop/tests/optimistic_block.rs b/integration-tests/src/test_loop/tests/optimistic_block.rs new file mode 100644 index 00000000000..5a77707c3ed --- /dev/null +++ b/integration-tests/src/test_loop/tests/optimistic_block.rs @@ -0,0 +1,70 @@ +use itertools::Itertools; +use near_async::time::Duration; +use near_chain_configs::test_genesis::{ + build_genesis_and_epoch_config_store, GenesisAndEpochConfigParams, ValidatorsSpec, +}; +use near_o11y::testonly::init_test_logger; +use near_primitives::shard_layout::ShardLayout; +use near_primitives::types::AccountId; +use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; + +use crate::test_loop::builder::TestLoopBuilder; + +#[test] +fn test_optimistic_block() { + if !ProtocolFeature::ProduceOptimisticBlock.enabled(PROTOCOL_VERSION) { + return; + } + + init_test_logger(); + let builder = TestLoopBuilder::new(); + + let epoch_length = 100; + let accounts = + (0..3).map(|i| format!("account{}", i).parse().unwrap()).collect::>(); + let clients = accounts.iter().cloned().collect_vec(); + let validators_spec = ValidatorsSpec::desired_roles( + &accounts.iter().map(|account_id| account_id.as_str()).collect_vec(), + &[], + ); + let shard_layout = ShardLayout::multi_shard(3, 1); + let num_shards = shard_layout.num_shards() as usize; + + let (genesis, epoch_config_store) = build_genesis_and_epoch_config_store( + GenesisAndEpochConfigParams { + epoch_length, + protocol_version: PROTOCOL_VERSION, + shard_layout, + validators_spec, + accounts: &accounts, + }, + |genesis_builder| genesis_builder, + |epoch_config_builder| epoch_config_builder, + ); + let mut env = + builder.genesis(genesis).epoch_config_store(epoch_config_store).clients(clients).build(); + + env.test_loop.run_for(Duration::seconds(10)); + + { + let chain = + &env.test_loop.data.get(&env.datas[0].client_sender.actor_handle()).client.chain; + // Under normal block processing, there can be only one optimistic + // block waiting to be processed. + assert!(chain.optimistic_block_chunks.num_blocks() <= 1); + // Under normal block processing, number of waiting chunks can't exceed + // delta between the highest block height and the final block height + // (normally 3), multiplied by the number of shards. + assert!(chain.optimistic_block_chunks.num_chunks() <= 3 * num_shards); + // There should be at least one optimistic block result in the cache. + assert!(chain.apply_chunk_results_cache.len() > 0); + // Optimistic block result should be used at least once. + assert!(chain.apply_chunk_results_cache.hits() > 0); + // Because there is no optimistic block distribution yet, there should + // be at least one miss for each shard. + // TODO: after distribution is implemented, this may change. + assert!(chain.apply_chunk_results_cache.misses() > 0); + } + + env.shutdown_and_drain_remaining_events(Duration::seconds(20)); +} diff --git a/integration-tests/src/test_loop/tests/simple_test_loop_example.rs b/integration-tests/src/test_loop/tests/simple_test_loop_example.rs index b4a2667dd84..12bec8da04f 100644 --- a/integration-tests/src/test_loop/tests/simple_test_loop_example.rs +++ b/integration-tests/src/test_loop/tests/simple_test_loop_example.rs @@ -97,6 +97,7 @@ fn test_client_with_simple_test_loop() { noop().into_multi_sender(), Arc::new(test_loop.future_spawner()), noop().into_multi_sender(), + client_adapter.as_multi_sender(), PROTOCOL_UPGRADE_SCHEDULE.clone(), ) .unwrap(); @@ -118,7 +119,6 @@ fn test_client_with_simple_test_loop() { let client_actor = ClientActorInner::new( test_loop.clock(), client, - client_adapter.as_multi_sender(), PeerId::random(), noop().into_multi_sender(), noop().into_sender(), diff --git a/tools/state-viewer/src/apply_chain_range.rs b/tools/state-viewer/src/apply_chain_range.rs index 0efc6a3a617..170adce6efc 100644 --- a/tools/state-viewer/src/apply_chain_range.rs +++ b/tools/state-viewer/src/apply_chain_range.rs @@ -343,6 +343,7 @@ fn apply_block_from_range( if let Err(err) = maybe_save_trie_changes( write_store, &genesis.config, + block_hash, apply_result, height, shard_id, diff --git a/tools/state-viewer/src/commands.rs b/tools/state-viewer/src/commands.rs index 06dcc1da538..6c0fe06593e 100644 --- a/tools/state-viewer/src/commands.rs +++ b/tools/state-viewer/src/commands.rs @@ -206,6 +206,7 @@ pub(crate) fn apply_block_at_height( let result = maybe_save_trie_changes( write_store.clone(), &near_config.genesis.config, + block_hash, apply_result, height, shard_id, @@ -1285,6 +1286,7 @@ pub(crate) fn print_state_stats(home_dir: &Path, store: Store, near_config: Near pub(crate) fn maybe_save_trie_changes( store: Option, genesis_config: &near_chain_configs::GenesisConfig, + block_hash: CryptoHash, apply_result: ApplyChunkResult, block_height: u64, shard_id: ShardId, @@ -1293,7 +1295,7 @@ pub(crate) fn maybe_save_trie_changes( let mut chain_store = ChainStore::new(store, false, genesis_config.transaction_validity_period); let mut chain_store_update = chain_store.store_update(); - chain_store_update.save_trie_changes(apply_result.trie_changes); + chain_store_update.save_trie_changes(block_hash, apply_result.trie_changes); chain_store_update.commit()?; tracing::debug!("Trie changes persisted for block {block_height}, shard {shard_id}"); } diff --git a/tools/state-viewer/src/state_changes.rs b/tools/state-viewer/src/state_changes.rs index 52245493953..a809d5497dd 100644 --- a/tools/state-viewer/src/state_changes.rs +++ b/tools/state-viewer/src/state_changes.rs @@ -200,11 +200,10 @@ fn apply_state_changes( shard_uid, trie_update, state_changes, - *block_hash, block_height, ); let mut store_update = chain_store.store_update(); - store_update.save_trie_changes(wrapped_trie_changes); + store_update.save_trie_changes(*block_hash, wrapped_trie_changes); store_update.commit().unwrap(); } }