From 0a487547c7a12a79ae64a8d99f757f8e6469435b Mon Sep 17 00:00:00 2001 From: Shreyan Gupta Date: Sat, 14 Dec 2024 17:43:44 +0530 Subject: [PATCH 1/8] [cleanup] Push account_id_to_shard_uid function in shard_layout (#12622) `account_id_to_shard_uid` belongs to ShardLayout and not as a stand alone function. --- chain/chain/src/resharding/resharding_v2.rs | 4 ++-- chain/chunks/src/client.rs | 8 ++++---- chain/epoch-manager/src/tests/mod.rs | 10 +++++----- core/primitives/src/shard_layout.rs | 17 ++++++++--------- core/store/src/config.rs | 4 ++-- .../src/test_loop/tests/resharding_v3.rs | 4 ++-- .../src/tests/client/resharding_v2.rs | 5 ++--- 7 files changed, 25 insertions(+), 27 deletions(-) diff --git a/chain/chain/src/resharding/resharding_v2.rs b/chain/chain/src/resharding/resharding_v2.rs index e2e56bde68e..be853b33293 100644 --- a/chain/chain/src/resharding/resharding_v2.rs +++ b/chain/chain/src/resharding/resharding_v2.rs @@ -7,7 +7,7 @@ use crate::Chain; use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle}; use near_chain_primitives::error::Error; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout}; +use near_primitives::shard_layout::ShardLayout; use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{AccountId, ShardId, StateRoot}; use near_store::flat::FlatStorageError; @@ -82,7 +82,7 @@ fn get_checked_account_id_to_shard_uid_fn( ) -> impl Fn(&AccountId) -> ShardUId { let split_shard_ids: HashSet<_> = new_shards.into_iter().collect(); move |account_id: &AccountId| { - let new_shard_uid = account_id_to_shard_uid(account_id, &next_epoch_shard_layout); + let new_shard_uid = next_epoch_shard_layout.account_id_to_shard_uid(account_id); // check that all accounts in the shard are mapped the shards that this shard will split // to according to shard layout assert!( diff --git a/chain/chunks/src/client.rs b/chain/chunks/src/client.rs index 45af5126a62..0a334e89cf2 100644 --- a/chain/chunks/src/client.rs +++ b/chain/chunks/src/client.rs @@ -5,7 +5,7 @@ use itertools::Itertools; use near_pool::types::TransactionGroupIterator; use near_pool::{InsertTransactionResult, PoolIteratorWrapper, TransactionPool}; -use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout, ShardUId}; +use near_primitives::shard_layout::{ShardLayout, ShardUId}; use near_primitives::{ epoch_info::RngSeed, sharding::{EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader}, @@ -146,7 +146,7 @@ impl ShardedTransactionPool { for tx in transactions { let signer_id = tx.transaction.signer_id(); - let new_shard_uid = account_id_to_shard_uid(&signer_id, new_shard_layout); + let new_shard_uid = new_shard_layout.account_id_to_shard_uid(&signer_id); self.insert_transaction(new_shard_uid, tx); } } @@ -161,7 +161,7 @@ mod tests { use near_primitives::{ epoch_info::RngSeed, hash::CryptoHash, - shard_layout::{account_id_to_shard_uid, ShardLayout}, + shard_layout::ShardLayout, transaction::SignedTransaction, types::{AccountId, ShardId}, }; @@ -267,7 +267,7 @@ mod tests { while let Some(tx) = group.next() { total += 1; let account_id = tx.transaction.signer_id(); - let tx_shard_uid = account_id_to_shard_uid(account_id, &new_shard_layout); + let tx_shard_uid = new_shard_layout.account_id_to_shard_uid(account_id); tracing::debug!("checking {account_id:?}:{tx_shard_uid} in {shard_uid}"); assert_eq!(shard_uid, tx_shard_uid); } diff --git a/chain/epoch-manager/src/tests/mod.rs b/chain/epoch-manager/src/tests/mod.rs index 6c11fe7e0ea..a9dfb2f79a2 100644 --- a/chain/epoch-manager/src/tests/mod.rs +++ b/chain/epoch-manager/src/tests/mod.rs @@ -20,7 +20,7 @@ use near_primitives::congestion_info::CongestionInfo; use near_primitives::epoch_block_info::BlockInfoV3; use near_primitives::epoch_manager::EpochConfig; use near_primitives::hash::hash; -use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout}; +use near_primitives::shard_layout::ShardLayout; use near_primitives::sharding::{ShardChunkHeader, ShardChunkHeaderV3}; use near_primitives::stateless_validation::chunk_endorsements_bitmap::ChunkEndorsementsBitmap; use near_primitives::stateless_validation::partial_witness::PartialEncodedStateWitness; @@ -3799,7 +3799,7 @@ fn test_get_shard_uids_pending_resharding_single() { let shard_layout_0 = ShardLayout::multi_shard_custom(vec![a.clone()], version); let shard_layout_1 = ShardLayout::derive_shard_layout(&shard_layout_0, b); - let s1 = account_id_to_shard_uid(&a, &shard_layout_0); + let s1 = shard_layout_0.account_id_to_shard_uid(&a); let shard_uids = test_get_shard_uids_pending_resharding_base(&[shard_layout_0, shard_layout_1]); assert_eq!(shard_uids, vec![s1].into_iter().collect::>()); @@ -3822,8 +3822,8 @@ fn test_get_shard_uids_pending_resharding_double_different() { let shard_layout_1 = ShardLayout::derive_shard_layout(&shard_layout_0, a.clone()); let shard_layout_2 = ShardLayout::derive_shard_layout(&shard_layout_0, c); - let s0 = account_id_to_shard_uid(&a, &shard_layout_0); - let s1 = account_id_to_shard_uid(&b, &shard_layout_0); + let s0 = shard_layout_0.account_id_to_shard_uid(&a); + let s1 = shard_layout_0.account_id_to_shard_uid(&b); let shard_uids = test_get_shard_uids_pending_resharding_base(&[ shard_layout_0, @@ -3850,7 +3850,7 @@ fn test_get_shard_uids_pending_resharding_double_same() { let shard_layout_1 = ShardLayout::derive_shard_layout(&shard_layout_0, b); let shard_layout_2 = ShardLayout::derive_shard_layout(&shard_layout_0, c); - let s1 = account_id_to_shard_uid(&a, &shard_layout_0); + let s1 = shard_layout_0.account_id_to_shard_uid(&a); let shard_uids = test_get_shard_uids_pending_resharding_base(&[ shard_layout_0, diff --git a/core/primitives/src/shard_layout.rs b/core/primitives/src/shard_layout.rs index 03fa6fb0df9..c96f4d70fb2 100644 --- a/core/primitives/src/shard_layout.rs +++ b/core/primitives/src/shard_layout.rs @@ -582,7 +582,7 @@ impl ShardLayout { ) } - /// Maps an account to the shard that it belongs to given a shard layout + /// Maps an account to the shard_id that it belongs to in this shard_layout /// For V0, maps according to hash of account id /// For V1 and V2, accounts are divided to ranges, each range of account is mapped to a shard. pub fn account_id_to_shard_id(&self, account_id: &AccountId) -> ShardId { @@ -598,8 +598,15 @@ impl ShardLayout { } } + /// Maps an account to the shard_uid that it belongs to in this shard_layout + #[inline] + pub fn account_id_to_shard_uid(&self, account_id: &AccountId) -> ShardUId { + ShardUId::from_shard_id_and_layout(self.account_id_to_shard_id(account_id), self) + } + /// Given a parent shard id, return the shard uids for the shards in the current shard layout that /// are split from this parent shard. If this shard layout has no parent shard layout, return None + #[inline] pub fn get_children_shards_uids(&self, parent_shard_id: ShardId) -> Option> { self.get_children_shards_ids(parent_shard_id).map(|shards| { shards.into_iter().map(|id| ShardUId::from_shard_id_and_layout(id, self)).collect() @@ -852,14 +859,6 @@ fn validate_and_derive_shard_parent_map_v2( shards_parent_map } -/// Maps an account to the shard that it belongs to given a shard_layout -pub fn account_id_to_shard_uid(account_id: &AccountId, shard_layout: &ShardLayout) -> ShardUId { - ShardUId::from_shard_id_and_layout( - shard_layout.account_id_to_shard_id(account_id), - shard_layout, - ) -} - /// `ShardUId` is a unique representation for shards from different shard layouts. /// /// Comparing to `ShardId`, which is just an ordinal number ranging from 0 to NUM_SHARDS-1, diff --git a/core/store/src/config.rs b/core/store/src/config.rs index ff0798bd1ae..6aa06864c03 100644 --- a/core/store/src/config.rs +++ b/core/store/src/config.rs @@ -4,7 +4,7 @@ use crate::trie::{ use crate::DBCol; use near_primitives::chains::MAINNET; use near_primitives::epoch_manager::EpochConfigStore; -use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout, ShardUId}; +use near_primitives::shard_layout::{ShardLayout, ShardUId}; use near_primitives::types::AccountId; use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; use near_time::Duration; @@ -212,7 +212,7 @@ impl StoreConfig { let account_id = AccountId::from_str(account_id) .expect("the hardcoded account id should guarantee to be valid"); for shard_layout in shard_layouts.iter() { - let shard_uid = account_id_to_shard_uid(&account_id, &shard_layout); + let shard_uid = shard_layout.account_id_to_shard_uid(&account_id); per_shard_max_bytes.insert(shard_uid, *bytes); } } diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index 0c8c6dd9744..0bda17d80bf 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -5,7 +5,7 @@ use near_chain_configs::test_genesis::{TestGenesisBuilder, ValidatorsSpec}; use near_chain_configs::DEFAULT_GC_NUM_EPOCHS_TO_KEEP; use near_o11y::testonly::init_test_logger; use near_primitives::epoch_manager::EpochConfigStore; -use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout}; +use near_primitives::shard_layout::ShardLayout; use near_primitives::types::{AccountId, BlockHeightDelta, Gas, ShardId, ShardIndex}; use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; use rand::seq::SliceRandom; @@ -534,7 +534,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { base_epoch_config.shard_layout = base_shard_layout.clone(); let new_boundary_account = "account6".parse().unwrap(); - let parent_shard_uid = account_id_to_shard_uid(&new_boundary_account, &base_shard_layout); + let parent_shard_uid = base_shard_layout.account_id_to_shard_uid(&new_boundary_account); let mut epoch_config = base_epoch_config.clone(); epoch_config.shard_layout = ShardLayout::derive_shard_layout(&base_shard_layout, new_boundary_account); diff --git a/integration-tests/src/tests/client/resharding_v2.rs b/integration-tests/src/tests/client/resharding_v2.rs index 044ab25be12..c6782846c8d 100644 --- a/integration-tests/src/tests/client/resharding_v2.rs +++ b/integration-tests/src/tests/client/resharding_v2.rs @@ -12,7 +12,6 @@ use near_primitives::block::{Block, Tip}; use near_primitives::epoch_manager::{AllEpochConfig, AllEpochConfigTestOverrides, EpochConfig}; use near_primitives::hash::CryptoHash; use near_primitives::serialize::to_base64; -use near_primitives::shard_layout::account_id_to_shard_uid; use near_primitives::stateless_validation::ChunkProductionKey; use near_primitives::transaction::{ Action, DeployContractAction, FunctionCallAction, SignedTransaction, @@ -444,7 +443,7 @@ impl TestReshardingEnv { let id = &tx.get_hash(); let signer_account_id = tx.transaction.signer_id(); - let shard_uid = account_id_to_shard_uid(signer_account_id, &shard_layout); + let shard_uid = shard_layout.account_id_to_shard_uid(signer_account_id); tracing::trace!(target: "test", tx=?id, ?signer_account_id, ?shard_uid, "checking tx"); @@ -522,7 +521,7 @@ fn check_account(env: &TestEnv, account_id: &AccountId, block: &Block) { let prev_hash = block.header().prev_hash(); let shard_layout = env.clients[0].epoch_manager.get_shard_layout_from_prev_block(prev_hash).unwrap(); - let shard_uid = account_id_to_shard_uid(account_id, &shard_layout); + let shard_uid = shard_layout.account_id_to_shard_uid(account_id); let shard_id = shard_uid.shard_id(); let shard_index = shard_layout.get_shard_index(shard_id).unwrap(); for (i, me) in env.validators.iter().enumerate() { From 8cae8ccdb95c1b26577cd7a8865acdfffe5666aa Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Sat, 14 Dec 2024 19:40:35 -0500 Subject: [PATCH 2/8] fix(resharding): wait until child flat storages are split to take snapshots (#12589) `test_resharding_v3_shard_shuffling_slower_post_processing_tasks` exposes a bug that can be triggered if child flat storages are not split after a resharding by the time we want to take a state snapshot. Then the state snapshot code will fail because the flat storage is not ready, but will not retry. To fix it, we add a `want_snapshot` field that will be set when we decide to take a state snapshot. We also add a `split_in_progress` field to the `FlatStorageManager` that will be set to `true` when a resharding is started, and back to false when it's finished and the catchup code has progressed to a height close to the desired snapshot height. The state snapshot code will wait until `split_in_progress` is false to proceed, and the flat storage catchup code will wait until `want_snapshot` is cleared if it has already advanced to the desired snapshot hash, so that we don't advance past the point that was wanted by the state snapshot. The first one is the one actually causing the test failure, but the second one is also required. We implement this waiting by rescheduling the message sends in the future. A Condvar would be a very natural choice, but it unfortunately doesn't seem to work in testloop, since actors that are normally running on different threads are put on the same thread, and a blocker on a Condvar won't be woken up. Here we are making a change to the behavior of the old `set_flat_state_updates_mode()`, which used to refuse to proceed if the update mode was already set to the same value. This seems to be an artifact of the fact that when state snapshots were implemented in https://github.com/near/nearcore/pull/9090, this extra logic was added because there was another user of this function (`inline_flat_state_values()` added in https://github.com/near/nearcore/pull/9037), but that function has since been deleted, so the state snapshot code is now the only user of `set_flat_state_updates_mode()`. --- Cargo.lock | 1 + chain/chain/Cargo.toml | 1 + chain/chain/src/chain.rs | 27 ++- chain/chain/src/flat_storage_resharder.rs | 172 +++++++++--------- .../chain/src/resharding/resharding_actor.rs | 64 ++++--- chain/chain/src/resharding/types.rs | 2 - chain/chain/src/state_snapshot_actor.rs | 132 +++++++++++--- .../client/src/test_utils/test_env_builder.rs | 2 +- core/store/src/adapter/chunk_store.rs | 4 +- core/store/src/adapter/flat_store.rs | 4 +- core/store/src/adapter/mod.rs | 6 +- core/store/src/adapter/trie_store.rs | 4 +- core/store/src/flat/chunk_view.rs | 2 + core/store/src/flat/manager.rs | 172 ++++++++++++++---- core/store/src/flat/storage.rs | 9 +- core/store/src/lib.rs | 4 +- core/store/src/trie/state_snapshot.rs | 2 +- .../src/test_loop/tests/resharding_v3.rs | 16 +- 18 files changed, 430 insertions(+), 194 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc140563ee6..ebc33b1821a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4164,6 +4164,7 @@ name = "near-chain" version = "0.0.0" dependencies = [ "actix", + "anyhow", "assert_matches", "borsh", "bytesize", diff --git a/chain/chain/Cargo.toml b/chain/chain/Cargo.toml index 5c956a6a27a..1b544c5f313 100644 --- a/chain/chain/Cargo.toml +++ b/chain/chain/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] actix.workspace = true +anyhow.workspace = true borsh.workspace = true bytesize.workspace = true chrono.workspace = true diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index d31d848cde4..b8029caa135 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -3838,6 +3838,24 @@ impl Chain { ))) } + fn min_chunk_prev_height(&self, block: &Block) -> Result { + let mut ret = None; + for chunk in block.chunks().iter_raw() { + let prev_height = if chunk.prev_block_hash() == &CryptoHash::default() { + 0 + } else { + let prev_header = self.get_block_header(chunk.prev_block_hash())?; + prev_header.height() + }; + if let Some(min_height) = ret { + ret = Some(std::cmp::min(min_height, prev_height)); + } else { + ret = Some(prev_height); + } + } + Ok(ret.unwrap_or(0)) + } + /// Function to create or delete a snapshot if necessary. /// TODO: this function calls head() inside of start_process_block_impl(), consider moving this to be called right after HEAD gets updated fn process_snapshot(&mut self) -> Result<(), Error> { @@ -3847,6 +3865,7 @@ impl Chain { SnapshotAction::MakeSnapshot(prev_hash) => { let prev_block = self.get_block(&prev_hash)?; let prev_prev_hash = prev_block.header().prev_hash(); + let min_chunk_prev_height = self.min_chunk_prev_height(&prev_block)?; let epoch_height = self.epoch_manager.get_epoch_height_from_prev_block(prev_prev_hash)?; let shard_layout = @@ -3854,7 +3873,13 @@ impl Chain { let shard_uids = shard_layout.shard_uids().enumerate().collect(); let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback; - make_snapshot_callback(*prev_prev_hash, epoch_height, shard_uids, prev_block); + make_snapshot_callback( + *prev_prev_hash, + min_chunk_prev_height, + epoch_height, + shard_uids, + prev_block, + ); } SnapshotAction::DeleteSnapshot => { let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback; diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index 272c81199d9..83bd481f3fa 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -27,9 +27,9 @@ use near_primitives::trie_key::trie_key_parsers::{ parse_account_id_from_contract_code_key, parse_account_id_from_contract_data_key, parse_account_id_from_received_data_key, parse_account_id_from_trie_key_with_separator, }; -use near_primitives::types::AccountId; #[cfg(feature = "test_features")] use near_primitives::types::BlockHeightDelta; +use near_primitives::types::{AccountId, BlockHeight}; use near_store::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; use near_store::adapter::StoreAdapter; use near_store::flat::{ @@ -62,8 +62,7 @@ use std::iter; /// [FlatStorageResharderController]. /// - In the case of event `Split` the state of flat storage will go back to what it was /// previously. -/// - Children shard catchup is a consequence of splitting a shard, not a resharding event on -/// its own. As such, it can't be manually cancelled. +/// - Children shard catchup can be cancelled and will resume from the point where it left. /// - Resilience to chain forks. /// - Resharding events will perform changes on the state only after their resharding block /// becomes final. @@ -155,16 +154,12 @@ impl FlatStorageResharder { self.clean_children_shards(&status)?; self.schedule_split_shard(parent_shard_uid, &status); } - FlatStorageReshardingStatus::CatchingUp(block_hash) => { + FlatStorageReshardingStatus::CatchingUp(_) => { info!(target: "resharding", ?shard_uid, ?status, "resuming flat storage shard catchup"); // Send a request to schedule the execution of `shard_catchup_task` for this shard. - self.sender.flat_storage_shard_catchup_sender.send( - FlatStorageShardCatchupRequest { - resharder: self.clone(), - shard_uid, - flat_head_block_hash: *block_hash, - }, - ); + self.sender + .flat_storage_shard_catchup_sender + .send(FlatStorageShardCatchupRequest { resharder: self.clone(), shard_uid }); } } Ok(()) @@ -316,10 +311,7 @@ impl FlatStorageResharder { /// /// Conceptually it simply copies each key-value pair from the parent shard to the correct /// child. This task may get cancelled or postponed. - pub fn split_shard_task( - &self, - chain_store: &ChainStore, - ) -> FlatStorageReshardingSchedulableTaskResult { + pub fn split_shard_task(&self, chain_store: &ChainStore) -> FlatStorageReshardingTaskResult { info!(target: "resharding", "flat storage shard split task execution"); // Make sure that the resharding block is final. @@ -336,11 +328,11 @@ impl FlatStorageResharder { self.cancel_scheduled_event(); error!(target: "resharding", "flat storage shard split task failed during scheduling!"); // TODO(resharding): return failed only if scheduling of all resharding blocks have failed. - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } FlatStorageReshardingTaskSchedulingStatus::Postponed => { info!(target: "resharding", "flat storage shard split task has been postponed"); - return FlatStorageReshardingSchedulableTaskResult::Postponed; + return FlatStorageReshardingTaskResult::Postponed; } }; @@ -348,7 +340,7 @@ impl FlatStorageResharder { { if self.adv_should_delay_task(&resharding_hash, chain_store) { info!(target: "resharding", "flat storage shard split task has been artificially postponed!"); - return FlatStorageReshardingSchedulableTaskResult::Postponed; + return FlatStorageReshardingTaskResult::Postponed; } } @@ -376,12 +368,12 @@ impl FlatStorageResharder { parent_shard: ShardUId, split_params: &ParentSplitParameters, metrics: &FlatStorageReshardingShardSplitMetrics, - ) -> FlatStorageReshardingSchedulableTaskResult { + ) -> FlatStorageReshardingTaskResult { self.set_resharding_event_execution_status(TaskExecutionStatus::Started); // Exit early if the task has already been cancelled. if self.controller.is_cancelled() { - return FlatStorageReshardingSchedulableTaskResult::Cancelled; + return FlatStorageReshardingTaskResult::Cancelled; } // Determines after how many bytes worth of key-values the process stops to commit changes @@ -403,7 +395,7 @@ impl FlatStorageResharder { Ok(iter) => iter, Err(err) => { error!(target: "resharding", ?parent_shard, block_hash=?split_params.resharding_hash, ?err, "failed to build flat storage iterator"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } }; @@ -434,12 +426,12 @@ impl FlatStorageResharder { &split_params, ) { error!(target: "resharding", ?err, "failed to handle flat storage key"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } } Some(FlatStorageAndDeltaIterItem::Entry(Err(err))) => { error!(target: "resharding", ?err, "failed to read flat storage value from parent shard"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } None => { iter_exhausted = true; @@ -450,7 +442,7 @@ impl FlatStorageResharder { // Make a pause to commit and check if the routine should stop. if let Err(err) = store_update.commit() { error!(target: "resharding", ?err, "failed to commit store update"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } num_batches_done += 1; @@ -459,10 +451,10 @@ impl FlatStorageResharder { // If `iter`` is exhausted we can exit after the store commit. if iter_exhausted { - return FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done }; + return FlatStorageReshardingTaskResult::Successful { num_batches_done }; } if self.controller.is_cancelled() { - return FlatStorageReshardingSchedulableTaskResult::Cancelled; + return FlatStorageReshardingTaskResult::Cancelled; } // Sleep between batches in order to throttle resharding and leave some resource for the @@ -484,7 +476,7 @@ impl FlatStorageResharder { parent_shard: ShardUId, split_params: ParentSplitParameters, metrics: &FlatStorageReshardingShardSplitMetrics, - task_status: FlatStorageReshardingSchedulableTaskResult, + task_status: FlatStorageReshardingTaskResult, ) { let ParentSplitParameters { left_child_shard, @@ -498,7 +490,7 @@ impl FlatStorageResharder { let mut store_update = flat_store.store_update(); match task_status { - FlatStorageReshardingSchedulableTaskResult::Successful { .. } => { + FlatStorageReshardingTaskResult::Successful { .. } => { // Split shard completed successfully. // Parent flat storage can be deleted from the FlatStoreManager. // If FlatStoreManager has no reference to the shard, delete it manually. @@ -524,13 +516,12 @@ impl FlatStorageResharder { FlatStorageShardCatchupRequest { resharder: self.clone(), shard_uid: child_shard, - flat_head_block_hash: resharding_hash, }, ); } } - FlatStorageReshardingSchedulableTaskResult::Failed - | FlatStorageReshardingSchedulableTaskResult::Cancelled => { + FlatStorageReshardingTaskResult::Failed + | FlatStorageReshardingTaskResult::Cancelled => { // We got an error or a cancellation request. // Reset parent. store_update.set_flat_storage_status( @@ -542,7 +533,7 @@ impl FlatStorageResharder { store_update.remove_flat_storage(child_shard); } } - FlatStorageReshardingSchedulableTaskResult::Postponed => { + FlatStorageReshardingTaskResult::Postponed => { panic!("can't finalize processing of a postponed split task!"); } } @@ -612,18 +603,23 @@ impl FlatStorageResharder { pub fn shard_catchup_task( &self, shard_uid: ShardUId, - flat_head_block_hash: CryptoHash, chain_store: &ChainStore, ) -> FlatStorageReshardingTaskResult { - info!(target: "resharding", ?shard_uid, ?flat_head_block_hash, "flat storage shard catchup task started"); + // Exit early if the task has already been cancelled. + if self.controller.is_cancelled() { + return FlatStorageReshardingTaskResult::Cancelled; + } + info!(target: "resharding", ?shard_uid, "flat storage shard catchup task started"); let metrics = FlatStorageReshardingShardCatchUpMetrics::new(&shard_uid); // Apply deltas and then create the flat storage. - let apply_result = - self.shard_catchup_apply_deltas(shard_uid, flat_head_block_hash, chain_store, &metrics); - let Ok((num_batches_done, flat_head)) = apply_result else { + let apply_result = self.shard_catchup_apply_deltas(shard_uid, chain_store, &metrics); + let Ok(res) = apply_result else { error!(target: "resharding", ?shard_uid, err = ?apply_result.unwrap_err(), "flat storage shard catchup delta application failed!"); return FlatStorageReshardingTaskResult::Failed; }; + let Some((num_batches_done, flat_head)) = res else { + return FlatStorageReshardingTaskResult::Postponed; + }; match self.shard_catchup_finalize_storage(shard_uid, &flat_head, &metrics) { Ok(_) => { let task_status = FlatStorageReshardingTaskResult::Successful { num_batches_done }; @@ -639,16 +635,26 @@ impl FlatStorageResharder { } } + /// checks whether there's a snapshot in progress. Returns true if we've already applied all deltas up + /// to the desired snapshot height, and should no longer continue to give the state snapshot + /// code a chance to finish first. + fn coordinate_snapshot(&self, height: BlockHeight) -> bool { + let manager = self.runtime.get_flat_storage_manager(); + let Some(min_chunk_prev_height) = manager.snapshot_wanted() else { + return false; + }; + height >= min_chunk_prev_height + } + /// Applies flat storage deltas in batches on a shard that is in catchup status. /// /// Returns the number of delta batches applied and the final tip of the flat storage. fn shard_catchup_apply_deltas( &self, shard_uid: ShardUId, - mut flat_head_block_hash: CryptoHash, chain_store: &ChainStore, metrics: &FlatStorageReshardingShardCatchUpMetrics, - ) -> Result<(usize, Tip), Error> { + ) -> Result, Error> { // How many block heights of deltas are applied in a single commit. let catch_up_blocks = self.resharding_config.get().catch_up_blocks; // Delay between every batch. @@ -658,7 +664,24 @@ impl FlatStorageResharder { let mut num_batches_done: usize = 0; + let status = self + .runtime + .store() + .flat_store() + .get_flat_storage_status(shard_uid) + .map_err(|e| Into::::into(e))?; + let FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + mut flat_head_block_hash, + )) = status + else { + return Err(Error::Other(format!( + "unexpected resharding catchup flat storage status for {}: {:?}", + shard_uid, &status + ))); + }; + loop { + // TODO:(resharding): check self.controller.is_cancelled() here as well. let _span = tracing::debug_span!( target: "resharding", "shard_catchup_apply_deltas/batch", @@ -670,15 +693,16 @@ impl FlatStorageResharder { // If we reached the desired new flat head, we can terminate the delta application step. if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) { - return Ok(( + return Ok(Some(( num_batches_done, Tip::from_header(&chain_store.get_block_header(&flat_head_block_hash)?), - )); + ))); } let mut merged_changes = FlatStateChanges::default(); let store = self.runtime.store().flat_store(); let mut store_update = store.store_update(); + let mut postpone = false; // Merge deltas from the next blocks until we reach chain final head. for _ in 0..catch_up_blocks { @@ -691,6 +715,10 @@ impl FlatStorageResharder { if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) { break; } + if self.coordinate_snapshot(height) { + postpone = true; + break; + } flat_head_block_hash = chain_store.get_next_block_hash(&flat_head_block_hash)?; if let Some(changes) = store .get_delta(shard_uid, flat_head_block_hash) @@ -715,6 +743,9 @@ impl FlatStorageResharder { num_batches_done += 1; metrics.set_head_height(chain_store.get_block_height(&flat_head_block_hash)?); + if postpone { + return Ok(None); + } // Sleep between batches in order to throttle resharding and leave some resource for the // regular node operation. std::thread::sleep(batch_delay); @@ -1081,19 +1112,11 @@ pub enum TaskExecutionStatus { NotStarted, } -/// Result of a simple flat storage resharding task. +/// Result of a schedulable flat storage resharding task. #[derive(Clone, Debug, Copy, Eq, PartialEq)] pub enum FlatStorageReshardingTaskResult { Successful { num_batches_done: usize }, Failed, -} - -/// Result of a schedulable flat storage resharding task. Extends [FlatStorageReshardingTaskResult] -/// with the option to cancel or postpone the task. -#[derive(Clone, Debug, Copy, Eq, PartialEq)] -pub enum FlatStorageReshardingSchedulableTaskResult { - Successful { num_batches_done: usize }, - Failed, Cancelled, Postponed, } @@ -1207,11 +1230,7 @@ mod tests { impl CanSend for SimpleSender { fn send(&self, msg: FlatStorageShardCatchupRequest) { - msg.resharder.shard_catchup_task( - msg.shard_uid, - msg.flat_head_block_hash, - &self.chain_store.lock().unwrap(), - ); + msg.resharder.shard_catchup_task(msg.shard_uid, &self.chain_store.lock().unwrap()); } } @@ -1240,7 +1259,7 @@ mod tests { } impl DelayedSender { - fn call_split_shard_task(&self) -> FlatStorageReshardingSchedulableTaskResult { + fn call_split_shard_task(&self) -> FlatStorageReshardingTaskResult { let request = self.split_shard_request.lock().unwrap(); request.as_ref().unwrap().resharder.split_shard_task(&self.chain_store.lock().unwrap()) } @@ -1251,11 +1270,9 @@ mod tests { .unwrap() .iter() .map(|request| { - request.resharder.shard_catchup_task( - request.shard_uid, - request.flat_head_block_hash, - &self.chain_store.lock().unwrap(), - ) + request + .resharder + .shard_catchup_task(request.shard_uid, &self.chain_store.lock().unwrap()) }) .collect() } @@ -1645,7 +1662,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check that more than one batch has been processed. - let FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done } = + let FlatStorageReshardingTaskResult::Successful { num_batches_done } = sender.call_split_shard_task() else { assert!(false); @@ -1868,7 +1885,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); assert_eq!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done: 3 } + FlatStorageReshardingTaskResult::Successful { num_batches_done: 3 } ); // Validate integrity of children shards. @@ -2412,10 +2429,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Move the chain final head to the resharding block height (2). @@ -2429,7 +2443,7 @@ mod tests { // Trigger resharding again and now it should split the parent shard. assert_eq!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done: 3 } + FlatStorageReshardingTaskResult::Successful { num_batches_done: 3 } ); assert_eq!(flat_store.iter(parent_shard).count(), 0); } @@ -2461,10 +2475,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Add two blocks on top of the first block (simulate a fork). @@ -2481,10 +2492,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Add two additional blocks on the fork to make the resharding block (height 1) final. @@ -2498,7 +2506,7 @@ mod tests { // Now the second resharding event should take place. assert_matches!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { .. } + FlatStorageReshardingTaskResult::Successful { .. } ); assert_eq!(flat_store.iter(parent_shard).count(), 0); @@ -2526,10 +2534,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); let (parent_shard, split_params) = resharder.get_parent_shard_and_split_params().unwrap(); let ParentSplitParameters { flat_head, .. } = split_params; - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); // Fork the chain before the resharding block and make it final, but don't update the // resharding block hash. @@ -2541,10 +2546,7 @@ mod tests { ); // Scheduling of the shard split should fail. - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Failed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Failed); assert!(resharder.resharding_event().is_none()); let flat_store = resharder.runtime.store().flat_store(); assert_eq!( diff --git a/chain/chain/src/resharding/resharding_actor.rs b/chain/chain/src/resharding/resharding_actor.rs index bc4a371dfe7..e92672b76d0 100644 --- a/chain/chain/src/resharding/resharding_actor.rs +++ b/chain/chain/src/resharding/resharding_actor.rs @@ -1,13 +1,11 @@ use super::types::{ FlatStorageShardCatchupRequest, FlatStorageSplitShardRequest, MemtrieReloadRequest, }; -use crate::flat_storage_resharder::{ - FlatStorageResharder, FlatStorageReshardingSchedulableTaskResult, - FlatStorageReshardingTaskResult, -}; +use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageReshardingTaskResult}; use crate::ChainStore; use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt}; use near_async::messaging::{self, Handler, HandlerWithContext}; +use near_primitives::shard_layout::ShardUId; use near_primitives::types::BlockHeight; use near_store::Store; use time::Duration; @@ -29,20 +27,13 @@ impl HandlerWithContext for ReshardingActor { } } -impl Handler for ReshardingActor { - fn handle(&mut self, msg: FlatStorageShardCatchupRequest) { - match msg.resharder.shard_catchup_task( - msg.shard_uid, - msg.flat_head_block_hash, - &self.chain_store, - ) { - FlatStorageReshardingTaskResult::Successful { .. } => { - // All good. - } - FlatStorageReshardingTaskResult::Failed => { - panic!("impossible to recover from a flat storage shard catchup failure!") - } - } +impl HandlerWithContext for ReshardingActor { + fn handle( + &mut self, + msg: FlatStorageShardCatchupRequest, + ctx: &mut dyn DelayedActionRunner, + ) { + self.handle_flat_storage_catchup(msg.resharder, msg.shard_uid, ctx); } } @@ -66,16 +57,16 @@ impl ReshardingActor { // becomes final. If the resharding block is not yet final, the task will exit early with // `Postponed` status and it must be rescheduled. match resharder.split_shard_task(&self.chain_store) { - FlatStorageReshardingSchedulableTaskResult::Successful { .. } => { + FlatStorageReshardingTaskResult::Successful { .. } => { // All good. } - FlatStorageReshardingSchedulableTaskResult::Failed => { + FlatStorageReshardingTaskResult::Failed => { panic!("impossible to recover from a flat storage split shard failure!") } - FlatStorageReshardingSchedulableTaskResult::Cancelled => { + FlatStorageReshardingTaskResult::Cancelled => { // The task has been cancelled. Nothing else to do. } - FlatStorageReshardingSchedulableTaskResult::Postponed => { + FlatStorageReshardingTaskResult::Postponed => { // The task must be retried later. ctx.run_later( "ReshardingActor FlatStorageSplitShard", @@ -87,4 +78,33 @@ impl ReshardingActor { } } } + + fn handle_flat_storage_catchup( + &self, + resharder: FlatStorageResharder, + shard_uid: ShardUId, + ctx: &mut dyn DelayedActionRunner, + ) { + match resharder.shard_catchup_task(shard_uid, &self.chain_store) { + FlatStorageReshardingTaskResult::Successful { .. } => { + // All good. + } + FlatStorageReshardingTaskResult::Failed => { + panic!("impossible to recover from a flat storage shard catchup failure!") + } + FlatStorageReshardingTaskResult::Cancelled => { + // The task has been cancelled. Nothing else to do. + } + FlatStorageReshardingTaskResult::Postponed => { + // The task must be retried later. + ctx.run_later( + "ReshardingActor FlatStorageCatchup", + Duration::milliseconds(1000), + move |act, ctx| { + act.handle_flat_storage_catchup(resharder, shard_uid, ctx); + }, + ); + } + } + } } diff --git a/chain/chain/src/resharding/types.rs b/chain/chain/src/resharding/types.rs index b042f3bab56..a6fae87d991 100644 --- a/chain/chain/src/resharding/types.rs +++ b/chain/chain/src/resharding/types.rs @@ -1,6 +1,5 @@ use crate::flat_storage_resharder::FlatStorageResharder; use near_async::messaging::Sender; -use near_primitives::hash::CryptoHash; use near_store::ShardUId; /// Represents a request to start the split of a parent shard flat storage into two children flat @@ -17,7 +16,6 @@ pub struct FlatStorageSplitShardRequest { pub struct FlatStorageShardCatchupRequest { pub resharder: FlatStorageResharder, pub shard_uid: ShardUId, - pub flat_head_block_hash: CryptoHash, } /// Represents a request to reload a Mem Trie for a shard after its Flat Storage resharding is diff --git a/chain/chain/src/state_snapshot_actor.rs b/chain/chain/src/state_snapshot_actor.rs index 62cb894c53c..02db1c085cd 100644 --- a/chain/chain/src/state_snapshot_actor.rs +++ b/chain/chain/src/state_snapshot_actor.rs @@ -1,11 +1,13 @@ -use near_async::messaging::{Actor, CanSend, Handler, Sender}; +use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt}; +use near_async::messaging::{Actor, CanSend, Handler, HandlerWithContext, Sender}; +use near_async::time::Duration; use near_async::{MultiSend, MultiSenderFrom}; use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; use near_performance_metrics_macros::perf; use near_primitives::block::Block; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; -use near_primitives::types::{EpochHeight, ShardIndex}; +use near_primitives::types::{BlockHeight, EpochHeight, ShardIndex}; use near_store::flat::FlatStorageManager; use near_store::ShardTries; use std::sync::Arc; @@ -44,13 +46,15 @@ pub struct DeleteAndMaybeCreateSnapshotRequest { #[derive(actix::Message)] #[rtype(result = "()")] pub struct CreateSnapshotRequest { - /// prev_hash of the last processed block. + /// equal to self.block.header().prev_hash() prev_block_hash: CryptoHash, + /// Min height of chunk.prev_block_hash() for each chunk in `block` + min_chunk_prev_height: BlockHeight, /// epoch height associated with prev_block_hash epoch_height: EpochHeight, /// Shards that need to be present in the snapshot. shard_indexes_and_uids: Vec<(ShardIndex, ShardUId)>, - /// Last block of the prev epoch. + /// prev block of the "sync_hash" block. block: Block, } @@ -59,6 +63,7 @@ impl std::fmt::Debug for CreateSnapshotRequest { f.debug_struct("CreateSnapshotRequest") .field("block_hash", self.block.hash()) .field("prev_block_hash", &self.prev_block_hash) + .field("min_chunk_prev_height", &self.min_chunk_prev_height) .field("epoch_height", &self.epoch_height) .field( "shard_uids", @@ -85,19 +90,78 @@ impl StateSnapshotActor { } } - pub fn handle_create_snapshot_request(&mut self, msg: CreateSnapshotRequest) { - tracing::debug!(target: "state_snapshot", ?msg); + /// Returns true if we shouldn't yet try to create a snapshot because a flat storage resharding + /// is in progress. + fn should_wait_for_resharding_split( + &self, + min_chunk_prev_height: BlockHeight, + shard_indexes_and_uids: &[(ShardIndex, ShardUId)], + ) -> anyhow::Result { + let shard_uids = shard_indexes_and_uids.iter().map(|(_idx, uid)| *uid); + let Some(min_height) = + self.flat_storage_manager.resharding_catchup_height_reached(shard_uids)? + else { + // No flat storage split + catchup is in progress, ok to proceed + return Ok(false); + }; + let Some(min_height) = min_height else { + // storage split + catchup is in progress and not all shards have reached the catchup phase yet. Can't proceed + return Ok(true); + }; + // Proceed if the catchup code is already reasonably close to being finished. This is not a correctness issue, + // as this line of code could just be replaced with Ok(false), and things would work. But in that case, if there are for + // some reason lots of deltas to apply (e.g. the sync hash is 1000s of blocks past the start of the epoch because of missed + // chunks), then we'll duplicate a lot of work that's being done by the resharding catchup code. So we might as well just + // come back later after most of that work has already been done. + Ok(min_height + 10 < min_chunk_prev_height) + } + + pub fn handle_create_snapshot_request( + &mut self, + msg: CreateSnapshotRequest, + ctx: &mut dyn DelayedActionRunner, + ) { + let should_wait = match self.should_wait_for_resharding_split( + msg.min_chunk_prev_height, + &msg.shard_indexes_and_uids, + ) { + Ok(s) => s, + Err(err) => { + tracing::error!(target: "state_snapshot", ?err, "State Snapshot Actor failed to check resharding status. Not making snapshot"); + return; + } + }; + // TODO: instead of resending the same message over and over, wait on a Condvar. + // This would require making testloop work with Condvars that normally are meant to be woken up by another thread + if should_wait { + tracing::debug!(target: "state_snapshot", prev_block_hash=?&msg.prev_block_hash, "Postpone CreateSnapshotRequest"); + ctx.run_later( + "ReshardingActor FlatStorageSplitShard", + Duration::seconds(1), + move |act, ctx| { + act.handle_create_snapshot_request(msg, ctx); + }, + ); + return; + } - let CreateSnapshotRequest { prev_block_hash, epoch_height, shard_indexes_and_uids, block } = - msg; + tracing::debug!(target: "state_snapshot", prev_block_hash=?&msg.prev_block_hash, "Handle CreateSnapshotRequest"); + let CreateSnapshotRequest { + prev_block_hash, + epoch_height, + shard_indexes_and_uids, + block, + .. + } = msg; let res = self.tries.create_state_snapshot(prev_block_hash, &shard_indexes_and_uids, &block); // Unlocking flat state head can be done asynchronously in state_snapshot_actor. // The next flat storage update will bring flat storage to latest head. - if !self.flat_storage_manager.set_flat_state_updates_mode(true) { - tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "Failed to unlock flat state updates"); - } + // TODO(resharding): check what happens if two calls to want_snapshot() are made before this point, + // which can happen with short epochs if a state snapshot takes longer than the rest of the epoch to complete. + // TODO(resharding): this can actually be called sooner, just after the rocksdb checkpoint is made. + self.flat_storage_manager.snapshot_taken(); match res { Ok(res_shard_uids) => { let Some(res_shard_uids) = res_shard_uids else { @@ -126,10 +190,10 @@ impl Handler for StateSnapshotActor { } } -impl Handler for StateSnapshotActor { +impl HandlerWithContext for StateSnapshotActor { #[perf] - fn handle(&mut self, msg: CreateSnapshotRequest) { - self.handle_create_snapshot_request(msg) + fn handle(&mut self, msg: CreateSnapshotRequest, ctx: &mut dyn DelayedActionRunner) { + self.handle_create_snapshot_request(msg, ctx) } } @@ -142,7 +206,7 @@ pub struct StateSnapshotSenderForStateSnapshot { pub struct StateSnapshotSenderForClient(Sender); type MakeSnapshotCallback = Arc< - dyn Fn(CryptoHash, EpochHeight, Vec<(ShardIndex, ShardUId)>, Block) -> () + dyn Fn(CryptoHash, BlockHeight, EpochHeight, Vec<(ShardIndex, ShardUId)>, Block) -> () + Send + Sync + 'static, @@ -156,28 +220,38 @@ pub struct SnapshotCallbacks { } /// Sends a request to make a state snapshot. +// TODO: remove the `prev_block_hash` argument. It's just block.header().prev_hash() pub fn get_make_snapshot_callback( sender: StateSnapshotSenderForClient, flat_storage_manager: FlatStorageManager, ) -> MakeSnapshotCallback { - Arc::new(move |prev_block_hash, epoch_height, shard_indexes_and_uids, block| { - tracing::info!( + Arc::new( + move |prev_block_hash, + min_chunk_prev_height, + epoch_height, + shard_indexes_and_uids, + block| { + tracing::info!( target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "make_snapshot_callback sends `DeleteAndMaybeCreateSnapshotRequest` to state_snapshot_addr"); - // We need to stop flat head updates synchronously in the client thread. - // Async update in state_snapshot_actor and potentially lead to flat head progressing beyond prev_block_hash - if !flat_storage_manager.set_flat_state_updates_mode(false) { - tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "Failed to lock flat state updates"); - return; - } - let create_snapshot_request = - CreateSnapshotRequest { prev_block_hash, epoch_height, shard_indexes_and_uids, block }; - sender.send(DeleteAndMaybeCreateSnapshotRequest { - create_snapshot_request: Some(create_snapshot_request), - }); - }) + // We need to stop flat head updates synchronously in the client thread. + // Async update in state_snapshot_actor can potentially lead to flat head progressing beyond prev_block_hash + // This also prevents post-resharding flat storage catchup from advancing past `prev_block_hash` + flat_storage_manager.want_snapshot(min_chunk_prev_height); + let create_snapshot_request = CreateSnapshotRequest { + prev_block_hash, + min_chunk_prev_height, + epoch_height, + shard_indexes_and_uids, + block, + }; + sender.send(DeleteAndMaybeCreateSnapshotRequest { + create_snapshot_request: Some(create_snapshot_request), + }); + }, + ) } /// Sends a request to delete a state snapshot. diff --git a/chain/client/src/test_utils/test_env_builder.rs b/chain/client/src/test_utils/test_env_builder.rs index cbf0adc50c6..351b4c4a15c 100644 --- a/chain/client/src/test_utils/test_env_builder.rs +++ b/chain/client/src/test_utils/test_env_builder.rs @@ -588,7 +588,7 @@ impl TestEnvBuilder { None => TEST_SEED, }; let tries = runtime.get_tries(); - let make_snapshot_callback = Arc::new(move |prev_block_hash, _epoch_height, shard_uids: Vec<(ShardIndex, ShardUId)>, block| { + let make_snapshot_callback = Arc::new(move |prev_block_hash, _min_chunk_prev_height, _epoch_height, shard_uids: Vec<(ShardIndex, ShardUId)>, block| { tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback"); tries.delete_state_snapshot(); tries.create_state_snapshot(prev_block_hash, &shard_uids, &block).unwrap(); diff --git a/core/store/src/adapter/chunk_store.rs b/core/store/src/adapter/chunk_store.rs index e3b8a3cf294..f1d84bd11cc 100644 --- a/core/store/src/adapter/chunk_store.rs +++ b/core/store/src/adapter/chunk_store.rs @@ -13,8 +13,8 @@ pub struct ChunkStoreAdapter { } impl StoreAdapter for ChunkStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/adapter/flat_store.rs b/core/store/src/adapter/flat_store.rs index 322f4dd09cc..d5200e038f2 100644 --- a/core/store/src/adapter/flat_store.rs +++ b/core/store/src/adapter/flat_store.rs @@ -20,8 +20,8 @@ pub struct FlatStoreAdapter { } impl StoreAdapter for FlatStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/adapter/mod.rs b/core/store/src/adapter/mod.rs index ba31d691775..fe975dc11c9 100644 --- a/core/store/src/adapter/mod.rs +++ b/core/store/src/adapter/mod.rs @@ -85,7 +85,11 @@ impl Into for StoreUpdateHolder<'static> { /// Simple adapter wrapper on top of Store to provide a more ergonomic interface for different store types. /// We provide simple inter-convertibility between different store types like FlatStoreAdapter and TrieStoreAdapter. pub trait StoreAdapter { - fn store(&self) -> Store; + fn store_ref(&self) -> &Store; + + fn store(&self) -> Store { + self.store_ref().clone() + } fn trie_store(&self) -> trie_store::TrieStoreAdapter { trie_store::TrieStoreAdapter::new(self.store()) diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index e01a4232e4b..bedec4578ea 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -18,8 +18,8 @@ pub struct TrieStoreAdapter { } impl StoreAdapter for TrieStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/flat/chunk_view.rs b/core/store/src/flat/chunk_view.rs index d704e2a013c..edff03ba670 100644 --- a/core/store/src/flat/chunk_view.rs +++ b/core/store/src/flat/chunk_view.rs @@ -47,6 +47,8 @@ impl FlatStorageChunkView { self.flat_storage.contains_key(&self.block_hash, key) } + // TODO: this should be changed to check the values that haven't yet been applied, like in get_value() and contains_key(), + // because otherwise we're iterating over old state that might have been updated by `self.block_hash` pub fn iter_range(&self, from: Option<&[u8]>, to: Option<&[u8]>) -> FlatStateIterator { self.store.iter_range(self.flat_storage.shard_uid(), from, to) } diff --git a/core/store/src/flat/manager.rs b/core/store/src/flat/manager.rs index 47168512acb..31cb9a93e39 100644 --- a/core/store/src/flat/manager.rs +++ b/core/store/src/flat/manager.rs @@ -1,5 +1,10 @@ use crate::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; -use crate::flat::{BlockInfo, FlatStorageReadyStatus, FlatStorageStatus, POISONED_LOCK_ERR}; +use crate::flat::{ + BlockInfo, FlatStorageReadyStatus, FlatStorageReshardingStatus, FlatStorageStatus, + POISONED_LOCK_ERR, +}; +use crate::{DBCol, StoreAdapter}; +use near_primitives::block_header::BlockHeader; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; @@ -30,11 +35,18 @@ pub struct FlatStorageManagerInner { /// this epoch can share the same `head` and `tail`, similar for shards for the next epoch, /// but such overhead is negligible comparing the delta sizes, so we think it's ok. flat_storages: Mutex>, + /// Set to Some() when there's a state snapshot in progress. Used to signal to the resharding flat + /// storage catchup code that it shouldn't advance past this block height + want_snapshot: Mutex>, } impl FlatStorageManager { pub fn new(store: FlatStoreAdapter) -> Self { - Self(Arc::new(FlatStorageManagerInner { store, flat_storages: Default::default() })) + Self(Arc::new(FlatStorageManagerInner { + store, + flat_storages: Default::default(), + want_snapshot: Default::default(), + })) } /// When a node starts from an empty database, this function must be called to ensure @@ -66,8 +78,14 @@ impl FlatStorageManager { /// and resharding. pub fn create_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> { tracing::debug!(target: "store", ?shard_uid, "Creating flat storage for shard"); + let want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + let disable_updates = want_snapshot.is_some(); + let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); let flat_storage = FlatStorage::new(self.0.store.clone(), shard_uid)?; + if disable_updates { + flat_storage.set_flat_head_update_mode(false); + } let original_value = flat_storages.insert(shard_uid, flat_storage); if original_value.is_some() { // Generally speaking this shouldn't happen. It may only happen when @@ -81,6 +99,67 @@ impl FlatStorageManager { Ok(()) } + fn read_block_info(&self, hash: &CryptoHash) -> Result { + let header = self + .0 + .store + .store_ref() + .get_ser::(DBCol::BlockHeader, hash.as_ref()) + .map_err(|e| { + StorageError::StorageInconsistentState(format!( + "could not read block header {}: {:?}", + hash, e + )) + })? + .ok_or_else(|| { + StorageError::StorageInconsistentState(format!("block header {} not found", hash)) + })?; + Ok(BlockInfo { + hash: *header.hash(), + prev_hash: *header.prev_hash(), + height: header.height(), + }) + } + + /// Sets the status to `Ready` if it's currently `Resharding(CatchingUp)` + fn mark_flat_storage_ready(&self, shard_uid: ShardUId) -> Result<(), StorageError> { + // Don't use Self::get_flat_storage_status() because there's no need to panic if this fails, since this is used + // during state snapshotting where an error isn't critical to node operation. + let status = self.0.store.get_flat_storage_status(shard_uid)?; + let catchup_flat_head = match status { + FlatStorageStatus::Ready(_) => return Ok(()), + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(flat_head)) => { + flat_head + } + _ => { + return Err(StorageError::StorageInconsistentState(format!( + "Unexpected flat storage status: {:?}", + &status + ))) + } + }; + let flat_head = self.read_block_info(&catchup_flat_head)?; + let mut store_update = self.0.store.store_update(); + store_update.set_flat_storage_status( + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }), + ); + // TODO: Consider adding a StorageError::IO variant? + store_update.commit().map_err(|_| StorageError::StorageInternalError)?; + Ok(()) + } + + // If the flat storage status is Resharding(CatchingUp), sets it to Ready(), and then calls create_flat_storage_for_shard() + // This is used in creating state snapshots when this might be a flat storage that is in the middle of catchup, and that + // should now be considered `Ready` in the state snapshot, even if not in the main DB. + pub fn mark_ready_and_create_flat_storage( + &self, + shard_uid: ShardUId, + ) -> Result<(), StorageError> { + self.mark_flat_storage_ready(shard_uid)?; + self.create_flat_storage_for_shard(shard_uid) + } + /// Update flat storage for given processed or caught up block, which includes: /// - merge deltas from current flat storage head to new one given in /// `new_flat_head`; @@ -220,39 +299,66 @@ impl FlatStorageManager { } } - /// Updates `move_head_enabled` for all shards and returns whether it succeeded. - /// If at least one of the shards fails to update move_head_enabled, then that operation is rolled back for all shards. - /// - /// Rollbacks should work, because we assume that this function is the only - /// entry point to locking/unlocking flat head updates in a system with - /// multiple FlatStorages running in parallel. - pub fn set_flat_state_updates_mode(&self, enabled: bool) -> bool { + /// Returns None if there's no resharding flat storage split in progress + /// If there is, returns Some(None) if there's at least one child shard that hasn't been split and had its + /// status set to `CatchingUp`. If they've all been split already and are in the catchup phase, + /// returns the lowest height among all shards that resharding catchup has advanced to. + pub fn resharding_catchup_height_reached( + &self, + shard_uids: impl Iterator, + ) -> Result>, StorageError> { + let mut ret = None; + for shard_uid in shard_uids { + match self.0.store.get_flat_storage_status(shard_uid)? { + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + catchup_flat_head, + )) => { + let flat_head = self.read_block_info(&catchup_flat_head)?; + if let Some(Some(min_height)) = ret { + ret = Some(Some(std::cmp::min(min_height, flat_head.height))); + } else { + ret = Some(Some(flat_head.height)); + } + } + FlatStorageStatus::Resharding(_) => return Ok(Some(None)), + _ => {} + }; + } + Ok(ret) + } + + /// Should be called when we want to take a state snapshot. Disallows flat head updates, and signals to any resharding + /// flat storage code that it should not advance beyond this hash + pub fn want_snapshot(&self, min_chunk_prev_height: BlockHeight) { + { + let mut want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot = Some(min_chunk_prev_height); + } let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); - let mut all_updated = true; - let mut updated_flat_storages = vec![]; - let mut updated_shard_uids = vec![]; - for (shard_uid, flat_storage) in flat_storages.iter() { - if flat_storage.set_flat_head_update_mode(enabled) { - updated_flat_storages.push(flat_storage); - updated_shard_uids.push(shard_uid); - } else { - all_updated = false; - tracing::error!(target: "store", rolling_back_shards = ?updated_shard_uids, enabled, ?shard_uid, "Locking/Unlocking of flat head updates failed for shard. Reverting."); - break; - } + for flat_storage in flat_storages.values() { + flat_storage.set_flat_head_update_mode(false); } - if all_updated { - tracing::debug!(target: "store", enabled, "Locking/Unlocking of flat head updates succeeded"); - true - } else { - // Do rollback. - // It does allow for a data race if somebody updates move_head_enabled on individual shards. - // The assumption is that all shards get locked/unlocked at the same time. - for flat_storage in updated_flat_storages { - flat_storage.set_flat_head_update_mode(!enabled); - } - tracing::error!(target: "store", enabled, "Locking/Unlocking of flat head updates failed"); - false + tracing::debug!(target: "store", "Locked flat head updates"); + } + + /// Should be called when we're done taking a state snapshot. Allows flat head updates, and signals to any resharding + /// flat storage code that it can advance now. + pub fn snapshot_taken(&self) { + { + let mut want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot = None; } + let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); + for flat_storage in flat_storages.values() { + flat_storage.set_flat_head_update_mode(true); + } + tracing::debug!(target: "store", "Unlocked flat head updates"); + } + + // Returns Some() if a state snapshot should be taken, and therefore any resharding flat storage code should not advance + // past the given hash + pub fn snapshot_wanted(&self) -> Option { + let want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot } } diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index c24917782d6..8db1b72ea6f 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -492,14 +492,9 @@ impl FlatStorage { } /// Updates `move_head_enabled` and returns whether the change was done. - pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) -> bool { + pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) { let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR); - if enabled != guard.move_head_enabled { - guard.move_head_enabled = enabled; - true - } else { - false - } + guard.move_head_enabled = enabled; } } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 7c1c288ba19..10579acda8a 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -117,8 +117,8 @@ pub struct Store { } impl StoreAdapter for Store { - fn store(&self) -> Store { - self.clone() + fn store_ref(&self) -> &Store { + self } } diff --git a/core/store/src/trie/state_snapshot.rs b/core/store/src/trie/state_snapshot.rs index edbff71b09a..77ff792aca0 100644 --- a/core/store/src/trie/state_snapshot.rs +++ b/core/store/src/trie/state_snapshot.rs @@ -90,7 +90,7 @@ impl StateSnapshot { tracing::debug!(target: "state_snapshot", ?shard_indexes_and_uids, ?prev_block_hash, "new StateSnapshot"); let mut included_shard_uids = vec![]; for &(shard_index, shard_uid) in shard_indexes_and_uids { - if let Err(err) = flat_storage_manager.create_flat_storage_for_shard(shard_uid) { + if let Err(err) = flat_storage_manager.mark_ready_and_create_flat_storage(shard_uid) { tracing::warn!(target: "state_snapshot", ?err, ?shard_uid, "Failed to create a flat storage for snapshot shard"); continue; } diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index 0bda17d80bf..c04d89ba630 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -87,7 +87,13 @@ impl TestReshardingParametersBuilder { fn build(self) -> TestReshardingParameters { let num_accounts = self.num_accounts.unwrap_or(8); let num_clients = self.num_clients.unwrap_or(3); - let epoch_length = self.epoch_length.unwrap_or(6); + // When there's a resharding task delay and single-shard tracking, the delay might be pushed out + // even further because the resharding task might have to wait for the state snapshot to be made + // before it can proceed, which might mean that flat storage won't be ready for the child shard for a whole epoch. + // So we extend the epoch length a bit in this case. + let epoch_length = self + .epoch_length + .unwrap_or_else(|| self.delay_flat_state_resharding.map_or(6, |delay| delay + 7)); // #12195 prevents number of BPs bigger than `epoch_length`. assert!(num_clients > 0 && num_clients <= epoch_length); @@ -968,9 +974,11 @@ fn test_resharding_v3_slower_post_processing_tasks() { } #[test] -// TODO(resharding): fix nearcore and change the ignore condition -// #[cfg_attr(not(feature = "test_features"), ignore)] -#[ignore] +// TODO(resharding): fix the fact that this test fails if the epoch length is set to 10, (and state sync +// is made to run before shard catchup) because set_state_finalize() sets flat storage state to +// ready before child catchup is done. Also fix the failure in +// check_state_shard_uid_mapping_after_resharding() if the epoch length is set to 11 +#[cfg_attr(not(feature = "test_features"), ignore)] fn test_resharding_v3_shard_shuffling_slower_post_processing_tasks() { let params = TestReshardingParametersBuilder::default() .shuffle_shard_assignment_for_chunk_producers(true) From 4fff0bb500b63a89f5987076593506832f9fb5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= <18039094+staffik@users.noreply.github.com> Date: Sun, 15 Dec 2024 12:46:25 +0100 Subject: [PATCH 3/8] fix(resharding): balance checker for ignored delayed receipts (#12619) Tinkering with resharding testloop in another PR, I got some errors when creating / removing an account. One of these errors is balance checker failing. For delayed receipt test, we use `call_burn_gas_contract` that does not attach deposit, so balance checker did not fail. But if a delayed receipt has deposit and it is skipped (because it belongs to the sibling shard), then balance checker fails, because it calculated total deposit independently of what delayed receipts were actually processed. --------- Co-authored-by: Andrea --- .../src/test_loop/tests/resharding_v3.rs | 4 ++-- runtime/runtime/src/balance_checker.rs | 15 ++++++++++----- runtime/runtime/src/lib.rs | 9 +++++---- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index c04d89ba630..631af6bdc98 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -331,7 +331,7 @@ fn call_burn_gas_contract( signer_id.clone(), receiver_id.clone(), &signer, - 0, + 1, method_name, args, gas_burnt_per_call + 10 * TGAS, @@ -401,7 +401,7 @@ fn call_promise_yield( signer_id.clone(), receiver_id.clone(), &signer, - 0, + 1, "call_yield_resume_read_data_id_from_storage".to_string(), yield_payload.clone(), 300 * TGAS, diff --git a/runtime/runtime/src/balance_checker.rs b/runtime/runtime/src/balance_checker.rs index 6f761702294..3a622df947b 100644 --- a/runtime/runtime/src/balance_checker.rs +++ b/runtime/runtime/src/balance_checker.rs @@ -275,6 +275,7 @@ pub(crate) fn check_balance( final_state: &TrieUpdate, validator_accounts_update: &Option, incoming_receipts: &[Receipt], + processed_delayed_receipts: &[Receipt], yield_timeout_receipts: &[Receipt], transactions: &[SignedTransaction], outgoing_receipts: &[Receipt], @@ -288,11 +289,6 @@ pub(crate) fn check_balance( let final_delayed_receipt_indices: DelayedReceiptIndices = get(final_state, &TrieKey::DelayedReceiptIndices)?.unwrap_or_default(); - // Previously delayed receipts that were processed this time. - let processed_delayed_receipts = get_delayed_receipts( - initial_state, - initial_delayed_receipt_indices.first_index..final_delayed_receipt_indices.first_index, - )?; // Receipts that were not processed this time and are delayed now. let new_delayed_receipts = get_delayed_receipts( final_state, @@ -427,6 +423,7 @@ mod tests { &[], &[], &[], + &[], &ApplyStats::default(), ) .unwrap(); @@ -445,6 +442,7 @@ mod tests { &[], &[], &[], + &[], &ApplyStats::default(), ) .unwrap_err(); @@ -509,6 +507,7 @@ mod tests { &[], &[], &[], + &[], &ApplyStats::default(), ) .unwrap(); @@ -555,6 +554,7 @@ mod tests { &None, &[], &[], + &[], &[tx], &[receipt], &ApplyStats { @@ -627,6 +627,7 @@ mod tests { &None, &[receipt], &[], + &[], &[tx], &[], &ApplyStats::default(), @@ -669,6 +670,7 @@ mod tests { &None, &[receipt], &[], + &[], &[tx], &[], &ApplyStats::default(), @@ -748,6 +750,7 @@ mod tests { &None, &[], &[], + &[], &[tx], &[], &ApplyStats { @@ -819,6 +822,7 @@ mod tests { &[], &[], &[], + &[], &outgoing_receipts, &ApplyStats::default(), ) @@ -882,6 +886,7 @@ mod tests { &[], &[], &[], + &[], &outgoing_receipts, &ApplyStats::default(), ); diff --git a/runtime/runtime/src/lib.rs b/runtime/runtime/src/lib.rs index ba72923a402..1ca1a43b3de 100644 --- a/runtime/runtime/src/lib.rs +++ b/runtime/runtime/src/lib.rs @@ -2071,7 +2071,8 @@ impl Runtime { let apply_state = processing_state.apply_state; let epoch_info_provider = processing_state.epoch_info_provider; let mut state_update = processing_state.state_update; - let delayed_receipts = processing_state.delayed_receipts; + let pending_delayed_receipts = processing_state.delayed_receipts; + let processed_delayed_receipts = process_receipts_result.processed_delayed_receipts; let promise_yield_result = process_receipts_result.promise_yield_result; if promise_yield_result.promise_yield_indices @@ -2086,10 +2087,10 @@ impl Runtime { // Congestion info needs a final touch to select an allowed shard if // this shard is fully congested. - let delayed_receipts_count = delayed_receipts.upper_bound_len(); + let delayed_receipts_count = pending_delayed_receipts.upper_bound_len(); let mut own_congestion_info = receipt_sink.own_congestion_info(); if let Some(congestion_info) = &mut own_congestion_info { - delayed_receipts.apply_congestion_changes(congestion_info)?; + pending_delayed_receipts.apply_congestion_changes(congestion_info)?; let protocol_version = apply_state.current_protocol_version; let (all_shards, shard_seed) = @@ -2123,6 +2124,7 @@ impl Runtime { &state_update, validator_accounts_update, processing_state.incoming_receipts, + &processed_delayed_receipts, &promise_yield_result.timeout_receipts, processing_state.transactions, &receipt_sink.outgoing_receipts(), @@ -2187,7 +2189,6 @@ impl Runtime { .observe(chunk_recorded_size_upper_bound / f64::max(1.0, chunk_recorded_size)); metrics::report_recorded_column_sizes(&trie, &apply_state); let proof = trie.recorded_storage(); - let processed_delayed_receipts = process_receipts_result.processed_delayed_receipts; let processed_yield_timeouts = promise_yield_result.processed_yield_timeouts; let bandwidth_scheduler_state_hash = receipt_sink .bandwidth_scheduler_output() From 6bc2df9d94aa9ebeafbf7024e1aa302b0bc292bb Mon Sep 17 00:00:00 2001 From: Jan Malinowski <149345204+jancionear@users.noreply.github.com> Date: Mon, 16 Dec 2024 13:51:35 +0000 Subject: [PATCH 4/8] fix(tests): Mark some tests as slower (#12615) When I run the tests with `--test-threads 16` these three tests often timeout. They work with `--test-threads 1`, but then it takes forever to run all of the tests. Could we move them one slowness class up? --- core/store/src/opener.rs | 2 +- integration-tests/src/tests/client/state_dump.rs | 2 +- integration-tests/src/tests/client/sync_state_nodes.rs | 2 +- nightly/expensive.txt | 2 ++ 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index 2302255dd43..0e896d2bf4f 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -648,7 +648,7 @@ mod tests { } #[test] - fn test_checkpoint_hot_storage_and_cleanup_columns() { + fn slow_test_checkpoint_hot_storage_and_cleanup_columns() { let (home_dir, opener) = NodeStorage::test_opener(); let node_storage = opener.open().unwrap(); let hot_store = Store { storage: node_storage.hot_storage.clone() }; diff --git a/integration-tests/src/tests/client/state_dump.rs b/integration-tests/src/tests/client/state_dump.rs index 00ea5dc72a3..5ca06f2d4bb 100644 --- a/integration-tests/src/tests/client/state_dump.rs +++ b/integration-tests/src/tests/client/state_dump.rs @@ -27,7 +27,7 @@ use std::sync::Arc; #[test] /// Produce several blocks, wait for the state dump thread to notice and /// write files to a temp dir. -fn test_state_dump() { +fn slow_test_state_dump() { init_test_logger(); let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); diff --git a/integration-tests/src/tests/client/sync_state_nodes.rs b/integration-tests/src/tests/client/sync_state_nodes.rs index fe1af6fb67c..38e1bcc996a 100644 --- a/integration-tests/src/tests/client/sync_state_nodes.rs +++ b/integration-tests/src/tests/client/sync_state_nodes.rs @@ -443,7 +443,7 @@ fn ultra_slow_test_sync_state_dump() { #[test] // Test that state sync behaves well when the chunks are absent at the end of the epoch. -fn slow_test_dump_epoch_missing_chunk_in_last_block() { +fn ultra_slow_test_dump_epoch_missing_chunk_in_last_block() { heavy_test(|| { init_test_logger(); let epoch_length = 10; diff --git a/nightly/expensive.txt b/nightly/expensive.txt index 9ef69e96528..24fc89f88b8 100644 --- a/nightly/expensive.txt +++ b/nightly/expensive.txt @@ -160,6 +160,8 @@ expensive integration-tests integration_tests tests::client::sync_state_nodes::u expensive integration-tests integration_tests tests::client::sync_state_nodes::ultra_slow_test_sync_state_nodes --features nightly expensive integration-tests integration_tests tests::client::sync_state_nodes::ultra_slow_test_sync_state_nodes_multishard expensive integration-tests integration_tests tests::client::sync_state_nodes::ultra_slow_test_sync_state_nodes_multishard --features nightly +expensive integration-tests integration_tests tests::client::sync_state_nodes::ultra_slow_test_dump_epoch_missing_chunk_in_last_block +expensive integration-tests integration_tests tests::client::sync_state_nodes::ultra_slow_test_dump_epoch_missing_chunk_in_last_block --features nightly # other tests expensive --timeout=300 near-chain near_chain tests::garbage_collection::ultra_slow_test_clear_old_data_too_many_heights From d3b8e0b47cb4de9517c9235a9a879f9e938e4aac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= <18039094+staffik@users.noreply.github.com> Date: Mon, 16 Dec 2024 16:11:12 +0100 Subject: [PATCH 5/8] feat(resharding): resharding archival node (#12578) Mapping from child to parent shard is stored in `DBCol::StateShardUIdMapping`, which is marked as a cold column. By default, it would be copied from hot to cold storage at each block. In this PR, we do add `is_last_block_in_epoch` parameter to `update_cold_db()` so that it is only copied once per epoch. For that, a slight refactor of `cold_store_copy()` was needed. If it is possible to have skips at epoch boundary, then `cold_store_copy()` had a bug, where previous epoch shard layout could be used to copy data for a block from the new shard layout. Added archival node and rpc node to reshardingv3 testloop. In testloop, create temporary account before resharding and remove it after resharding. After gc period, the account is available at the archival node and it is not available at rpc node. However, it works without actually testing the `cold_store_copy` path, because neither `cold_store_copy` nor garbage collection is called for archival node in testloop. So it needs fixing testloop and a follow-up testing. --- chain/chain/src/garbage_collection.rs | 2 +- chain/chain/src/resharding/manager.rs | 2 +- core/store/src/adapter/trie_store.rs | 19 +- core/store/src/archive/cold_storage.rs | 37 ++-- core/store/src/columns.rs | 1 - core/store/src/trie/mem/parallel_loader.rs | 2 +- .../src/test_loop/tests/max_receipt_size.rs | 20 +- .../src/test_loop/tests/resharding_v3.rs | 173 +++++++++++++----- .../src/test_loop/utils/transactions.rs | 23 ++- .../src/tests/client/cold_storage.rs | 27 ++- .../src/tests/client/process_blocks.rs | 12 +- nearcore/src/cold_storage.rs | 38 ++-- tools/cold-store/src/cli.rs | 18 +- 13 files changed, 259 insertions(+), 115 deletions(-) diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 4059996a814..ea574264f55 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -507,7 +507,7 @@ impl<'a> ChainStoreUpdate<'a> { Ok(()) } - // TODO(reshardingV3) Revisit this function, probably it is not needed anymore. + // TODO(resharding) Revisit this function, probably it is not needed anymore. fn get_shard_uids_to_gc( &mut self, epoch_manager: &dyn EpochManagerAdapter, diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index 82b757f211f..5d3bd24e3fd 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -147,7 +147,7 @@ impl ReshardingManager { ) -> io::Result<()> { let mut store_update = self.store.trie_store().store_update(); let parent_shard_uid = split_shard_event.parent_shard; - // TODO(reshardingV3) No need to set the mapping for children shards that we won't track just after resharding? + // TODO(resharding) No need to set the mapping for children shards that we won't track just after resharding? for child_shard_uid in split_shard_event.children_shards() { store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid); } diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index bedec4578ea..1671acdeef1 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -173,6 +173,20 @@ impl<'a> TrieStoreUpdateAdapter<'a> { } } +/// Get the `ShardUId` mapping for child_shard_uid. If the mapping does not exist, map the shard to itself. +/// Used by Resharding V3 for State mapping. +/// +/// It is kept out of `TrieStoreAdapter`, so that `TrieStoreUpdateAdapter` can use it without +/// cloning `store` each time, see https://github.com/near/nearcore/pull/12232#discussion_r1804810508. +pub fn get_shard_uid_mapping(store: &Store, child_shard_uid: ShardUId) -> ShardUId { + store + .get_ser::(DBCol::StateShardUIdMapping, &child_shard_uid.to_bytes()) + .unwrap_or_else(|_| { + panic!("get_shard_uid_mapping() failed for child_shard_uid = {}", child_shard_uid) + }) + .unwrap_or(child_shard_uid) +} + /// Constructs db key to be used to access the State column. /// First, it consults the `StateShardUIdMapping` column to map the `shard_uid` prefix /// to its ancestor in the resharding tree (according to Resharding V3) @@ -186,10 +200,7 @@ fn get_key_from_shard_uid_and_hash( shard_uid: ShardUId, hash: &CryptoHash, ) -> [u8; 40] { - let mapped_shard_uid = store - .get_ser::(DBCol::StateShardUIdMapping, &shard_uid.to_bytes()) - .expect("get_key_from_shard_uid_and_hash() failed") - .unwrap_or(shard_uid); + let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid); let mut key = [0; 40]; key[0..8].copy_from_slice(&mapped_shard_uid.to_bytes()); key[8..].copy_from_slice(hash.as_ref()); diff --git a/core/store/src/archive/cold_storage.rs b/core/store/src/archive/cold_storage.rs index 72c9ef32031..20f6a2035af 100644 --- a/core/store/src/archive/cold_storage.rs +++ b/core/store/src/archive/cold_storage.rs @@ -1,3 +1,4 @@ +use crate::adapter::trie_store::get_shard_uid_mapping; use crate::columns::DBKeyType; use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY}; use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges}; @@ -57,8 +58,7 @@ struct BatchTransaction { } /// Updates provided cold database from provided hot store with information about block at `height`. -/// Returns if the block was copied (false only if height is not present in `hot_store`). -/// Block as `height` has to be final. +/// Block at `height` has to be final and present in `hot_store`. /// /// First, we read from hot store information necessary /// to determine all the keys that need to be updated in cold db. @@ -80,22 +80,29 @@ pub fn update_cold_db( hot_store: &Store, shard_layout: &ShardLayout, height: &BlockHeight, + is_last_block_in_epoch: bool, num_threads: usize, -) -> io::Result { +) -> io::Result<()> { let _span = tracing::debug_span!(target: "cold_store", "update cold db", height = height); let _timer = metrics::COLD_COPY_DURATION.start_timer(); - if hot_store.get_for_cold(DBCol::BlockHeight, &height.to_le_bytes())?.is_none() { - return Ok(false); - } - let height_key = height.to_le_bytes(); let block_hash_vec = hot_store.get_or_err_for_cold(DBCol::BlockHeight, &height_key)?; let block_hash_key = block_hash_vec.as_slice(); let key_type_to_keys = get_keys_from_store(&hot_store, shard_layout, &height_key, block_hash_key)?; - let cold_columns = DBCol::iter().filter(|col| col.is_cold()).collect::>(); + let columns_to_update = DBCol::iter() + .filter(|col| { + if !col.is_cold() { + return false; + } + if col == &DBCol::StateShardUIdMapping && !is_last_block_in_epoch { + return false; + } + true + }) + .collect::>(); // Create new thread pool with `num_threads`. rayon::ThreadPoolBuilder::new() @@ -103,8 +110,8 @@ pub fn update_cold_db( .build() .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to create rayon pool"))? .install(|| { - cold_columns - .into_par_iter() // Process every cold column as a separate task in thread pool in parallel. + columns_to_update + .into_par_iter() // Process cold columns to update as a separate task in thread pool in parallel. // Copy column to cold db. .map(|col: DBCol| -> io::Result<()> { if col == DBCol::State { @@ -127,8 +134,7 @@ pub fn update_cold_db( }, ) })?; - - Ok(true) + Ok(()) } // Correctly set the key and value on DBTransaction, taking reference counting @@ -189,9 +195,10 @@ fn copy_state_from_store( let Some(trie_changes) = trie_changes else { continue }; total_keys += trie_changes.insertions().len(); + let mapped_shard_uid_key = get_shard_uid_mapping(hot_store, shard_uid).to_bytes(); for op in trie_changes.insertions() { - // TODO(reshardingV3) Handle shard_uid not mapped there - let key = join_two_keys(&shard_uid_key, op.hash().as_bytes()); + // TODO(resharding) Test it properly. Currently this path is not triggered in testloop. + let key = join_two_keys(&mapped_shard_uid_key, op.hash().as_bytes()); let value = op.payload().to_vec(); total_size += value.len(); @@ -343,7 +350,7 @@ pub fn copy_all_data_to_cold( tracing::debug!(target: "cold_store", "stopping copy_all_data_to_cold"); return Ok(CopyAllDataToColdStatus::Interrupted); } - // TODO(reshardingV3) Should do mapping here? + // TODO(resharding) Should do mapping here? let (key, value) = result?; transaction.set_and_write_if_full(col, key.to_vec(), value.to_vec())?; } diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 9f6f20957a0..a6a514ac735 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -462,7 +462,6 @@ impl DBCol { | DBCol::StateHeaders | DBCol::TransactionResultForBlock | DBCol::Transactions - // TODO(reshardingV3) How the mapping will work with split storage? | DBCol::StateShardUIdMapping => true, // TODO diff --git a/core/store/src/trie/mem/parallel_loader.rs b/core/store/src/trie/mem/parallel_loader.rs index ee0a4ddee41..92969ef07d4 100644 --- a/core/store/src/trie/mem/parallel_loader.rs +++ b/core/store/src/trie/mem/parallel_loader.rs @@ -191,7 +191,7 @@ impl ParallelMemTrieLoader { arena: &mut impl ArenaMut, ) -> Result { // Figure out which range corresponds to the prefix of this subtree. - // TODO(reshardingV3) This seems fragile, potentially does not work with mapping. + // TODO(resharding) This seems fragile, potentially does not work with mapping. let (start, end) = subtree_to_load.to_iter_range(self.shard_uid); // Load all the keys in this range from the FlatState column. diff --git a/integration-tests/src/test_loop/tests/max_receipt_size.rs b/integration-tests/src/test_loop/tests/max_receipt_size.rs index bf43d348924..9d386a608bd 100644 --- a/integration-tests/src/test_loop/tests/max_receipt_size.rs +++ b/integration-tests/src/test_loop/tests/max_receipt_size.rs @@ -22,6 +22,7 @@ fn slow_test_max_receipt_size() { let account0: AccountId = "account0".parse().unwrap(); let account0_signer = &create_user_test_signer(&account0).into(); + let rpc_id = "account4".parse().unwrap(); // We can't test receipt limit by submitting large transactions because we hit the transaction size limit // before hitting the receipt size limit. @@ -33,7 +34,7 @@ fn slow_test_max_receipt_size() { get_shared_block_hash(&env.datas, &env.test_loop), ); let large_tx_exec_res = - execute_tx(&mut env.test_loop, large_tx, &env.datas, Duration::seconds(5)); + execute_tx(&mut env.test_loop, &rpc_id, large_tx, &env.datas, Duration::seconds(5)); assert_matches!(large_tx_exec_res, Err(InvalidTxError::TransactionSizeExceeded { .. })); // Let's test it by running a contract that generates a large receipt. @@ -44,7 +45,7 @@ fn slow_test_max_receipt_size() { &account0_signer, get_shared_block_hash(&env.datas, &env.test_loop), ); - run_tx(&mut env.test_loop, deploy_contract_tx, &env.datas, Duration::seconds(5)); + run_tx(&mut env.test_loop, &rpc_id, deploy_contract_tx, &env.datas, Duration::seconds(5)); // Calling generate_large_receipt({"account_id": "account0", "method_name": "noop", "total_args_size": 3000000}) // will generate a receipt that has ~3_000_000 bytes. It'll be a single receipt with multiple FunctionCall actions. @@ -60,7 +61,7 @@ fn slow_test_max_receipt_size() { 300 * TGAS, get_shared_block_hash(&env.datas, &env.test_loop), ); - run_tx(&mut env.test_loop, large_receipt_tx, &env.datas, Duration::seconds(5)); + run_tx(&mut env.test_loop, &rpc_id, large_receipt_tx, &env.datas, Duration::seconds(5)); // Generating a receipt that is 5 MB should fail, it's above the receipt size limit. let too_large_receipt_tx = SignedTransaction::call( @@ -74,9 +75,14 @@ fn slow_test_max_receipt_size() { 300 * TGAS, get_shared_block_hash(&env.datas, &env.test_loop), ); - let too_large_receipt_tx_exec_res = - execute_tx(&mut env.test_loop, too_large_receipt_tx, &env.datas, Duration::seconds(5)) - .unwrap(); + let too_large_receipt_tx_exec_res = execute_tx( + &mut env.test_loop, + &rpc_id, + too_large_receipt_tx, + &env.datas, + Duration::seconds(5), + ) + .unwrap(); match too_large_receipt_tx_exec_res.status { FinalExecutionStatus::Failure(TxExecutionError::ActionError(action_error)) => { @@ -111,7 +117,7 @@ fn slow_test_max_receipt_size() { 300 * TGAS, get_shared_block_hash(&env.datas, &env.test_loop), ); - let sum_4_res = run_tx(&mut env.test_loop, sum_4_tx, &env.datas, Duration::seconds(5)); + let sum_4_res = run_tx(&mut env.test_loop, &rpc_id, sum_4_tx, &env.datas, Duration::seconds(5)); assert_eq!(sum_4_res, 10u64.to_le_bytes().to_vec()); env.shutdown_and_drain_remaining_events(Duration::seconds(20)); diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index 631af6bdc98..e8d26852880 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -3,17 +3,20 @@ use near_async::test_loop::data::{TestLoopData, TestLoopDataHandle}; use near_async::time::Duration; use near_chain_configs::test_genesis::{TestGenesisBuilder, ValidatorsSpec}; use near_chain_configs::DEFAULT_GC_NUM_EPOCHS_TO_KEEP; +use near_client::Query; use near_o11y::testonly::init_test_logger; use near_primitives::epoch_manager::EpochConfigStore; use near_primitives::shard_layout::ShardLayout; -use near_primitives::types::{AccountId, BlockHeightDelta, Gas, ShardId, ShardIndex}; +use near_primitives::types::{ + AccountId, BlockHeightDelta, BlockId, BlockReference, Gas, ShardId, ShardIndex, +}; use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; use rand::seq::SliceRandom; use rand::Rng; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; use std::cell::Cell; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use crate::test_loop::builder::TestLoopBuilder; @@ -26,8 +29,8 @@ use crate::test_loop::utils::sharding::{ next_block_has_new_shard_layout, print_and_assert_shard_accounts, }; use crate::test_loop::utils::transactions::{ - get_anchor_hash, get_next_nonce, get_shared_block_hash, get_smallest_height_head, run_tx, - store_and_submit_tx, submit_tx, + check_txs, create_account, delete_account, deploy_contract, get_anchor_hash, get_next_nonce, + get_node_data, get_smallest_height_head, store_and_submit_tx, submit_tx, }; use crate::test_loop::utils::trie_sanity::{ check_state_shard_uid_mapping_after_resharding, TrieSanityCheck, @@ -39,7 +42,7 @@ use near_crypto::Signer; use near_parameters::{vm, RuntimeConfig, RuntimeConfigStore}; use near_primitives::test_utils::create_user_test_signer; use near_primitives::transaction::SignedTransaction; -use near_primitives::views::FinalExecutionStatus; +use near_primitives::views::{FinalExecutionStatus, QueryRequest}; #[derive(derive_builder::Builder)] #[builder(pattern = "owned", build_fn(skip))] @@ -47,7 +50,7 @@ use near_primitives::views::FinalExecutionStatus; struct TestReshardingParameters { chunk_ranges_to_drop: HashMap>, num_accounts: u64, - num_clients: u64, + num_validators: u64, #[builder(setter(skip))] accounts: Vec, #[builder(setter(skip))] @@ -55,6 +58,8 @@ struct TestReshardingParameters { base_shard_layout_version: u64, #[builder(setter(skip))] block_and_chunk_producers: Vec, + rpc_clients: Vec, + archival_clients: HashSet, initial_balance: u128, epoch_length: BlockHeightDelta, shuffle_shard_assignment_for_chunk_producers: bool, @@ -85,8 +90,9 @@ struct TestReshardingParameters { impl TestReshardingParametersBuilder { fn build(self) -> TestReshardingParameters { + // TODO(resharding) Test chunk validators, and maybe more RPC / archival nodes. let num_accounts = self.num_accounts.unwrap_or(8); - let num_clients = self.num_clients.unwrap_or(3); + let num_validators = self.num_validators.unwrap_or(3); // When there's a resharding task delay and single-shard tracking, the delay might be pushed out // even further because the resharding task might have to wait for the state snapshot to be made // before it can proceed, which might mean that flat storage won't be ready for the child shard for a whole epoch. @@ -96,18 +102,18 @@ impl TestReshardingParametersBuilder { .unwrap_or_else(|| self.delay_flat_state_resharding.map_or(6, |delay| delay + 7)); // #12195 prevents number of BPs bigger than `epoch_length`. - assert!(num_clients > 0 && num_clients <= epoch_length); + assert!(num_validators > 0 && num_validators <= epoch_length); let accounts = Self::compute_initial_accounts(num_accounts); - // This piece of code creates `num_clients` from `accounts`. First client is at index 0 and - // other clients are spaced in the accounts' space as evenly as possible. - let clients_per_account = num_clients as f64 / accounts.len() as f64; - let mut client_parts = 1.0 - clients_per_account; - let clients: Vec<_> = accounts + // This piece of code creates `num_validators` from `accounts`. First validator is at index 0 and + // other validator are spaced in the accounts' space as evenly as possible. + let validators_per_account = num_validators as f64 / num_accounts as f64; + let mut client_parts = 1.0 - validators_per_account; + let block_and_chunk_producers: Vec<_> = accounts .iter() .filter(|_| { - client_parts += clients_per_account; + client_parts += validators_per_account; if client_parts >= 1.0 { client_parts -= 1.0; true @@ -118,16 +124,29 @@ impl TestReshardingParametersBuilder { .cloned() .collect(); - let block_and_chunk_producers = clients.clone(); + let non_validator_accounts: Vec<_> = accounts + .iter() + .filter(|account| !block_and_chunk_producers.contains(account)) + .collect(); + assert!(non_validator_accounts.len() >= 2); + let archival_clients = vec![non_validator_accounts[0].clone()]; + let rpc_clients = vec![non_validator_accounts[1].clone()]; + let clients = + vec![block_and_chunk_producers.clone(), archival_clients.clone(), rpc_clients.clone()] + .into_iter() + .flatten() + .collect(); TestReshardingParameters { chunk_ranges_to_drop: self.chunk_ranges_to_drop.unwrap_or_default(), num_accounts, - num_clients, + num_validators, accounts, clients, base_shard_layout_version: self.base_shard_layout_version.unwrap_or(2), block_and_chunk_producers, + archival_clients: HashSet::from_iter(archival_clients.into_iter()), + rpc_clients, initial_balance: self.initial_balance.unwrap_or(1_000_000 * ONE_NEAR), epoch_length, shuffle_shard_assignment_for_chunk_producers: self @@ -208,7 +227,7 @@ fn execute_money_transfers(account_ids: Vec) -> LoopActionFn { const NUM_TRANSFERS_PER_BLOCK: usize = 20; let latest_height = Cell::new(0); - // TODO: to be fixed when all shard tracking gets disabled. + // TODO(resharding) Make it work with the RPC from TestReshardingParameters. let rpc_id: AccountId = "account0".parse().unwrap(); let seed = rand::thread_rng().gen::(); println!("Random seed: {}", seed); @@ -277,8 +296,8 @@ fn call_burn_gas_contract( let nonce = Cell::new(102); let txs = Cell::new(vec![]); let latest_height = Cell::new(0); - // TODO: to be fixed when all shard tracking gets disabled. - let rpc_id: AccountId = "account0".parse().unwrap(); + // TODO(resharding) Make it work with the RPC from TestReshardingParameters. + let rpc_id = "account0".parse().unwrap(); Box::new( move |node_datas: &[TestData], @@ -366,7 +385,7 @@ fn call_promise_yield( let resharding_height: Cell> = Cell::new(None); let txs = Cell::new(vec![]); let latest_height = Cell::new(0); - // TODO: to be fixed when all shard tracking gets disabled. + // TODO(resharding) Make it work with the RPC from TestReshardingParameters. let rpc_id: AccountId = "account0".parse().unwrap(); let promise_txs_sent = Cell::new(false); let nonce = Cell::new(102); @@ -499,6 +518,37 @@ fn get_base_shard_layout(version: u64) -> ShardLayout { } } +// After resharding and gc-period, assert the deleted `account_id` +// is still accessible through archival node view client, +// and it is not accessible through a regular, RPC node. +fn check_deleted_account_availability( + env: &mut TestLoopEnv, + archival_id: &AccountId, + rpc_id: &AccountId, + account_id: AccountId, + height: u64, +) { + let archival_node_data = get_node_data(&env.datas, &archival_id); + let rpc_node_data = get_node_data(&env.datas, &rpc_id); + let archival_view_client_handle = archival_node_data.view_client_sender.actor_handle(); + let rpc_view_client_handle = rpc_node_data.view_client_sender.actor_handle(); + + let block_reference = BlockReference::BlockId(BlockId::Height(height)); + let request = QueryRequest::ViewAccount { account_id }; + let msg = Query::new(block_reference, request); + + let archival_node_result = { + let view_client = env.test_loop.data.get_mut(&archival_view_client_handle); + near_async::messaging::Handler::handle(view_client, msg.clone()) + }; + let rpc_node_result = { + let view_client = env.test_loop.data.get_mut(&rpc_view_client_handle); + near_async::messaging::Handler::handle(view_client, msg) + }; + assert!(archival_node_result.is_ok()); + assert!(!rpc_node_result.is_ok()); +} + /// Base setup to check sanity of Resharding V3. /// TODO(#11881): add the following scenarios: /// - Nodes must not track all shards. State sync must succeed. @@ -530,6 +580,10 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { base_epoch_config_store.get_config(base_protocol_version).as_ref().clone(); base_epoch_config.shuffle_shard_assignment_for_chunk_producers = params.shuffle_shard_assignment_for_chunk_producers; + // TODO(resharding) Test chunk validators too (would need to change the lines below). + base_epoch_config.num_block_producer_seats = params.num_validators; + base_epoch_config.num_chunk_producer_seats = params.num_validators; + base_epoch_config.num_chunk_validator_seats = params.num_validators; if !params.chunk_ranges_to_drop.is_empty() { base_epoch_config.block_producer_kickout_threshold = 0; base_epoch_config.chunk_producer_kickout_threshold = 0; @@ -543,7 +597,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { let parent_shard_uid = base_shard_layout.account_id_to_shard_uid(&new_boundary_account); let mut epoch_config = base_epoch_config.clone(); epoch_config.shard_layout = - ShardLayout::derive_shard_layout(&base_shard_layout, new_boundary_account); + ShardLayout::derive_shard_layout(&base_shard_layout, new_boundary_account.clone()); tracing::info!(target: "test", ?base_shard_layout, new_shard_layout=?epoch_config.shard_layout, "shard layout"); let expected_num_shards = epoch_config.shard_layout.num_shards(); @@ -593,10 +647,14 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { builder = builder.runtime_config_store(runtime_config_store); } - let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } = builder + let archival_id = params.archival_clients.iter().next().unwrap().clone(); + let rpc_id = params.rpc_clients[0].clone(); + + let mut env = builder .genesis(genesis) .epoch_config_store(epoch_config_store) .clients(params.clients) + .archival_clients(params.archival_clients) .load_mem_tries_for_tracked_shards(params.load_mem_tries_for_tracked_shards) .drop_protocol_upgrade_chunks( base_protocol_version + 1, @@ -604,26 +662,48 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { ) .build(); + let mut test_setup_transactions = vec![]; for contract_id in ¶ms.deploy_test_contract { - let signer = &create_user_test_signer(&contract_id).into(); - let deploy_contract_tx = SignedTransaction::deploy_contract( - 101, - &contract_id, + let deploy_contract_tx = deploy_contract( + &mut env.test_loop, + &env.datas, + &rpc_id, + contract_id, near_test_contracts::rs_contract().into(), - &signer, - get_shared_block_hash(&node_datas, &test_loop), + 1, ); - run_tx(&mut test_loop, deploy_contract_tx, &node_datas, Duration::seconds(5)); + test_setup_transactions.push(deploy_contract_tx); } + // Create an account that is: + // 1) Subaccount of a future resharding boundary account. + // 2) Temporary, because we will remove it after resharding. + // The goal is to test removing some state and see if it is kept on archival node. + // The secondary goal is to catch potential bugs due to the above two conditions making it a special case. + let temporary_account = + format!("{}.{}", new_boundary_account, new_boundary_account).parse().unwrap(); + let create_account_tx = create_account( + &mut env, + &rpc_id, + &new_boundary_account, + &temporary_account, + 10 * ONE_NEAR, + 2, + ); + test_setup_transactions.push(create_account_tx); + + // Wait for the test setup transactions to settle and ensure they all succeeded. + env.test_loop.run_for(Duration::seconds(2)); + check_txs(&env.test_loop, &env.datas, &rpc_id, &test_setup_transactions); + let client_handles = - node_datas.iter().map(|data| data.client_sender.actor_handle()).collect_vec(); + env.datas.iter().map(|data| data.client_sender.actor_handle()).collect_vec(); #[cfg(feature = "test_features")] { if params.delay_flat_state_resharding > 0 { client_handles.iter().for_each(|handle| { - let client = &mut test_loop.data.get_mut(handle).client; + let client = &mut env.test_loop.data.get_mut(handle).client; client.chain.resharding_manager.flat_storage_resharder.adv_task_delay_by_blocks = params.delay_flat_state_resharding; }); @@ -631,7 +711,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { } let clients = - client_handles.iter().map(|handle| &test_loop.data.get(handle).client).collect_vec(); + client_handles.iter().map(|handle| &env.test_loop.data.get(handle).client).collect_vec(); let mut trie_sanity_check = TrieSanityCheck::new(&clients, params.load_mem_tries_for_tracked_shards); @@ -640,7 +720,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { params .loop_actions .iter() - .for_each(|action| action(&node_datas, test_loop_data, client_handles[0].clone())); + .for_each(|action| action(&env.datas, test_loop_data, client_handles[0].clone())); let clients = client_handles.iter().map(|handle| &test_loop_data.get(handle).client).collect_vec(); @@ -679,19 +759,30 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { return true; }; - test_loop.run_until( + env.test_loop.run_until( success_condition, // Give enough time to produce ~7 epochs. Duration::seconds((7 * params.epoch_length) as i64), ); - let client = &test_loop.data.get(&client_handles[0]).client; + let client = &env.test_loop.data.get(&client_handles[0]).client; trie_sanity_check.check_epochs(client); - // Wait for garbage collection to kick in, so that it is tested as well. - test_loop + let height_after_resharding = latest_block_height.get(); + + // Delete `temporary_account`. + delete_account(&mut env, &rpc_id, &temporary_account, &rpc_id); + // Wait for garbage collection to kick in. + env.test_loop .run_for(Duration::seconds((DEFAULT_GC_NUM_EPOCHS_TO_KEEP * params.epoch_length) as i64)); + // Check that the deleted account is still accessible at archival node, but not at a regular node. + check_deleted_account_availability( + &mut env, + &archival_id, + &rpc_id, + temporary_account, + height_after_resharding, + ); - TestLoopEnv { test_loop, datas: node_datas, tempdir } - .shutdown_and_drain_remaining_events(Duration::seconds(20)); + env.shutdown_and_drain_remaining_events(Duration::seconds(20)); } #[test] @@ -746,7 +837,7 @@ fn test_resharding_v3_drop_chunks_all() { fn test_resharding_v3_resharding_block_in_fork() { test_resharding_v3_base( TestReshardingParametersBuilder::default() - .num_clients(1) + .num_validators(1) .add_loop_action(fork_before_resharding_block(false)) .build(), ); @@ -761,7 +852,7 @@ fn test_resharding_v3_resharding_block_in_fork() { fn test_resharding_v3_double_sign_resharding_block() { test_resharding_v3_base( TestReshardingParametersBuilder::default() - .num_clients(1) + .num_validators(1) .add_loop_action(fork_before_resharding_block(true)) .build(), ); diff --git a/integration-tests/src/test_loop/utils/transactions.rs b/integration-tests/src/test_loop/utils/transactions.rs index 7bcd6d32e80..0f1049ea34d 100644 --- a/integration-tests/src/test_loop/utils/transactions.rs +++ b/integration-tests/src/test_loop/utils/transactions.rs @@ -168,7 +168,8 @@ pub fn do_create_account( amount: u128, ) { tracing::info!(target: "test", "Creating account."); - let tx = create_account(env, rpc_id, originator, new_account_id, amount); + let nonce = get_next_nonce(&env.test_loop.data, &env.datas, originator); + let tx = create_account(env, rpc_id, originator, new_account_id, amount, nonce); env.test_loop.run_for(Duration::seconds(5)); check_txs(&env.test_loop, &env.datas, rpc_id, &[tx]); } @@ -228,12 +229,11 @@ pub fn create_account( originator: &AccountId, new_account_id: &AccountId, amount: u128, + nonce: u64, ) -> CryptoHash { let block_hash = get_shared_block_hash(&env.datas, &env.test_loop); - - let nonce = get_next_nonce(&env.test_loop.data, &env.datas, originator); - let signer = create_user_test_signer(&originator).into(); - let new_signer: Signer = create_user_test_signer(&new_account_id).into(); + let signer = create_user_test_signer(&originator); + let new_signer: Signer = create_user_test_signer(&new_account_id); let tx = SignedTransaction::create_account( nonce, @@ -408,18 +408,19 @@ pub fn get_node_data<'a>(node_datas: &'a [TestData], account_id: &AccountId) -> return node_data; } } - panic!("RPC client not found"); + panic!("client not found"); } /// Run a transaction until completion and assert that the result is "success". /// Returns the transaction result. pub fn run_tx( test_loop: &mut TestLoopV2, + rpc_id: &AccountId, tx: SignedTransaction, node_datas: &[TestData], maximum_duration: Duration, ) -> Vec { - let tx_res = execute_tx(test_loop, tx, node_datas, maximum_duration).unwrap(); + let tx_res = execute_tx(test_loop, rpc_id, tx, node_datas, maximum_duration).unwrap(); assert_matches!(tx_res.status, FinalExecutionStatus::SuccessValue(_)); match tx_res.status { FinalExecutionStatus::SuccessValue(res) => res, @@ -462,14 +463,12 @@ pub fn run_txs_parallel( /// For valid transactions returns the execution result (which could have an execution error inside, check it!). pub fn execute_tx( test_loop: &mut TestLoopV2, + rpc_id: &AccountId, tx: SignedTransaction, node_datas: &[TestData], maximum_duration: Duration, ) -> Result { - // Last node is usually the rpc node - let rpc_node_id = node_datas.len().checked_sub(1).unwrap(); - - let client_sender = &node_datas[rpc_node_id].client_sender; + let client_sender = &get_node_data(node_datas, rpc_id).client_sender; let future_spawner = test_loop.future_spawner(); let mut tx_runner = TransactionRunner::new(tx, true); @@ -477,7 +476,7 @@ pub fn execute_tx( let mut res = None; test_loop.run_until( |tl_data| { - let client = &tl_data.get(&node_datas[rpc_node_id].client_sender.actor_handle()).client; + let client = &tl_data.get(&client_sender.actor_handle()).client; match tx_runner.poll(client_sender, client, &future_spawner) { Poll::Pending => false, Poll::Ready(tx_res) => { diff --git a/integration-tests/src/tests/client/cold_storage.rs b/integration-tests/src/tests/client/cold_storage.rs index 64e05437bf2..0512eb602b4 100644 --- a/integration-tests/src/tests/client/cold_storage.rs +++ b/integration-tests/src/tests/client/cold_storage.rs @@ -147,9 +147,12 @@ fn test_storage_after_commit_of_cold_update() { let client = &env.clients[0]; let client_store = client.runtime_adapter.store(); - let epoch_id = client.epoch_manager.get_epoch_id_from_prev_block(&last_hash).unwrap(); + let epoch_id = client.epoch_manager.get_epoch_id(block.hash()).unwrap(); let shard_layout = client.epoch_manager.get_shard_layout(&epoch_id).unwrap(); - update_cold_db(cold_db, &client_store, &shard_layout, &height, 4).unwrap(); + let is_last_block_in_epoch = + client.epoch_manager.is_next_block_epoch_start(block.hash()).unwrap(); + update_cold_db(cold_db, &client_store, &shard_layout, &height, is_last_block_in_epoch, 4) + .unwrap(); last_hash = *block.hash(); } @@ -281,14 +284,22 @@ fn test_cold_db_copy_with_height_skips() { }; let client = &env.clients[0]; - let epoch_id = client.epoch_manager.get_epoch_id_from_prev_block(&last_hash).unwrap(); + let hot_store = client.runtime_adapter.store(); + let block_hash = + hot_store.get_ser::(DBCol::BlockHeight, &height.to_le_bytes()).unwrap(); + let Some(block) = block else { + assert!(block_hash.is_none()); + continue; + }; + let block_hash = block_hash.unwrap(); + assert!(&block_hash == block.hash()); + let epoch_id = client.epoch_manager.get_epoch_id(&block_hash).unwrap(); let shard_layout = client.epoch_manager.get_shard_layout(&epoch_id).unwrap(); - update_cold_db(&cold_db, &client.runtime_adapter.store(), &shard_layout, &height, 1) + let is_last_block_in_epoch = + client.epoch_manager.is_next_block_epoch_start(&block_hash).unwrap(); + update_cold_db(&cold_db, hot_store, &shard_layout, &height, is_last_block_in_epoch, 1) .unwrap(); - - if block.is_some() { - last_hash = *block.unwrap().hash(); - } + last_hash = block_hash; } // We still need to filter out one chunk diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index bb4f2d29379..ec486b6366c 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -1464,11 +1464,21 @@ fn test_archival_gc_common( let header = block.header(); let epoch_id = header.epoch_id(); let shard_layout = env.clients[0].epoch_manager.get_shard_layout(epoch_id).unwrap(); + let is_last_block_in_epoch = + env.clients[0].epoch_manager.is_next_block_epoch_start(header.hash()).unwrap(); blocks.push(block); if i <= max_cold_head_height { - update_cold_db(storage.cold_db().unwrap(), hot_store, &shard_layout, &i, 1).unwrap(); + update_cold_db( + storage.cold_db().unwrap(), + hot_store, + &shard_layout, + &i, + is_last_block_in_epoch, + 1, + ) + .unwrap(); update_cold_head(storage.cold_db().unwrap(), &hot_store, &i).unwrap(); } } diff --git a/nearcore/src/cold_storage.rs b/nearcore/src/cold_storage.rs index b465bb60864..a868ce8184b 100644 --- a/nearcore/src/cold_storage.rs +++ b/nearcore/src/cold_storage.rs @@ -113,20 +113,8 @@ fn cold_store_copy( return Ok(ColdStoreCopyResult::NoBlockCopied); } - // Here it should be sufficient to just read from hot storage. - // Because BlockHeight is never garbage collectable and is not even copied to cold. - let cold_head_hash = - hot_store.get_ser::(DBCol::BlockHeight, &cold_head_height.to_le_bytes())?; - let cold_head_hash = - cold_head_hash.ok_or(ColdStoreError::ColdHeadHashReadError { cold_head_height })?; - - // The previous block is the cold head so we can use it to get epoch id. - let epoch_id = epoch_manager.get_epoch_id_from_prev_block(&cold_head_hash)?; - let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?; - let mut next_height = cold_head_height + 1; - while !update_cold_db(cold_db, hot_store, &shard_layout, &next_height, num_threads)? { - next_height += 1; + let next_height_block_hash = loop { if next_height > hot_final_head_height { return Err(ColdStoreError::SkippedBlocksBetweenColdHeadAndNextHeightError { cold_head_height, @@ -134,8 +122,28 @@ fn cold_store_copy( hot_final_head_height, }); } - } - + // Here it should be sufficient to just read from hot storage. + // Because BlockHeight is never garbage collectable and is not even copied to cold. + let next_height_block_hash = + hot_store.get_ser::(DBCol::BlockHeight, &next_height.to_le_bytes())?; + if let Some(next_height_block_hash) = next_height_block_hash { + break next_height_block_hash; + } + next_height = next_height + 1; + }; + // The next block hash exists in hot store so we can use it to get epoch id. + let epoch_id = epoch_manager.get_epoch_id(&next_height_block_hash)?; + let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?; + let is_last_block_in_epoch = + epoch_manager.is_next_block_epoch_start(&next_height_block_hash)?; + update_cold_db( + cold_db, + hot_store, + &shard_layout, + &next_height, + is_last_block_in_epoch, + num_threads, + )?; update_cold_head(cold_db, hot_store, &next_height)?; let result = if next_height >= hot_final_head_height { diff --git a/tools/cold-store/src/cli.rs b/tools/cold-store/src/cli.rs index 651ab4069ab..7fc2c99ea50 100644 --- a/tools/cold-store/src/cli.rs +++ b/tools/cold-store/src/cli.rs @@ -215,24 +215,26 @@ fn copy_next_block(store: &NodeStorage, config: &NearConfig, epoch_manager: &Epo // Here it should be sufficient to just read from hot storage. // Because BlockHeight is never garbage collectable and is not even copied to cold. - let cold_head_hash = get_ser_from_store::( + let next_height_block_hash = get_ser_from_store::( &store.get_hot_store(), DBCol::BlockHeight, - &cold_head_height.to_le_bytes(), + &next_height.to_le_bytes(), ) - .unwrap_or_else(|| panic!("No block hash in hot storage for height {}", cold_head_height)); + .unwrap_or_else(|| panic!("No block hash in hot storage for height {}", next_height)); // For copying block we need to have shard_layout. // For that we need epoch_id. - // For that we might use prev_block_hash, and because next_hight = cold_head_height + 1, - // we use cold_head_hash. + // For that we might use the hash of the block. + let epoch_id = &epoch_manager.get_epoch_id(&next_height_block_hash).unwrap(); + let shard_layout = &epoch_manager.get_shard_layout(epoch_id).unwrap(); + let is_last_block_in_epoch = + epoch_manager.is_next_block_epoch_start(&next_height_block_hash).unwrap(); update_cold_db( &*store.cold_db().unwrap(), &store.get_hot_store(), - &epoch_manager - .get_shard_layout(&epoch_manager.get_epoch_id_from_prev_block(&cold_head_hash).unwrap()) - .unwrap(), + shard_layout, &next_height, + is_last_block_in_epoch, 1, ) .unwrap_or_else(|_| panic!("Failed to copy block at height {} to cold db", next_height)); From 164dc4899308a0fa9d7356bcb191cad29cc415a9 Mon Sep 17 00:00:00 2001 From: Stefan <30928612+stedfn@users.noreply.github.com> Date: Mon, 16 Dec 2024 16:35:19 +0100 Subject: [PATCH 6/8] chore: enable simd-accel feature for reed solomon crate (#12629) Using simd will speed up the reed solomon encoding and decoding --- Cargo.lock | 2 ++ Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index ebc33b1821a..c2a49977688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6794,6 +6794,8 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f" dependencies = [ + "cc", + "libc", "libm", "lru 0.7.8", "parking_lot 0.11.2", diff --git a/Cargo.toml b/Cargo.toml index de169f5038f..05e063e0363 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -321,7 +321,7 @@ rand_hc = "0.3.1" rand_xorshift = "0.3" rayon = "1.5" redis = "0.23.0" -reed-solomon-erasure = "6.0.0" +reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] } regex = "1.7.1" region = "3.0" reqwest = { version = "0.11.14", features = ["blocking"] } From a7940809752f799ed657563f28fe2d27ff2bee2b Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Tue, 17 Dec 2024 00:07:09 +0100 Subject: [PATCH 7/8] fix: use T1 for sending chunk endorsements (#12632) We missed that when refactoring chunk endorsement logic. The old `ChunkEndorsement` message was removed in #12131, but we forgot to add the new message introduced in #11091. I've noticed that by looking at mainnet metrics where `VersionedChunkEndorsement` message is only sent via T2. --- chain/network/src/peer_manager/connection/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index 0f9d16376fe..18dd886faf0 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -50,6 +50,7 @@ impl tcp::Tier { pub(crate) fn is_allowed_routed(self, body: &RoutedMessageBody) -> bool { match body { RoutedMessageBody::BlockApproval(..) + | RoutedMessageBody::VersionedChunkEndorsement(..) | RoutedMessageBody::PartialEncodedStateWitness(..) | RoutedMessageBody::PartialEncodedStateWitnessForward(..) | RoutedMessageBody::VersionedPartialEncodedChunk(..) From 7712f8bc9ff7e15b22952c8158d1de7ec2831d2f Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Tue, 17 Dec 2024 19:05:55 +0800 Subject: [PATCH 8/8] chore: check publishable crates separately (#12628) closes #12625 cc @staffik --- .github/workflows/ci.yml | 20 ++++++++++++++++---- Justfile | 10 +++++++--- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c89a30674b9..ac9fda7a174 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -425,13 +425,25 @@ jobs: fail_ci_if_error: true flags: pytests,upgradability,linux - windows_public_libraries_check: - name: "Windows check for building public libraries" - runs-on: "windows-latest" + publishable_packages_check: + name: "Cargo check publishable packages separately" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + include: + - name: Linux + id: linux + os: ubuntu-latest + - name: Windows + id: win + os: windows-latest timeout-minutes: 30 steps: - uses: actions/checkout@v4 - uses: taiki-e/install-action@5ce83af8b5520828f63d83d98df0eea6a66c7978 with: tool: just - - run: just check_build_public_libraries + - run: just check-publishable-separately + - run: just check-publishable-separately --no-default-features + - run: just check-publishable-separately --all-features diff --git a/Justfile b/Justfile index 80851906e70..91dfadfa9d2 100644 --- a/Justfile +++ b/Justfile @@ -10,7 +10,6 @@ platform_excludes := if os() == "macos" { } nightly_flags := "--features nightly,test_features" -public_libraries := "-p near-primitives -p near-crypto -p near-jsonrpc-primitives -p near-chain-configs -p near-primitives-core" export RUST_BACKTRACE := env("RUST_BACKTRACE", "short") ci_hack_nextest_profile := if env("CI_HACKS", "0") == "1" { "--profile ci" } else { "" } @@ -150,5 +149,10 @@ check-protocol-schema: env {{protocol_schema_env}} cargo test -p protocol-schema-check --profile dev-artifacts env {{protocol_schema_env}} cargo run -p protocol-schema-check --profile dev-artifacts -check_build_public_libraries: - cargo check {{public_libraries}} +publishable := "cargo metadata --no-deps --format-version 1 | jq -r '.packages[] | select(.publish == null or (.publish | length > 0)) | .name'" +check-publishable-separately *OPTIONS: + #!/usr/bin/env bash + for pkg in $({{ publishable }}); do + echo "Checking $pkg..." + cargo check -p $pkg {{ OPTIONS }} + done