Skip to content

Commit

Permalink
Merge branch 'master' into stafik/resharding/testloop-epoch-length
Browse files Browse the repository at this point in the history
  • Loading branch information
staffik authored Dec 23, 2024
2 parents 85d8837 + 33afc1d commit 0de1ae3
Show file tree
Hide file tree
Showing 30 changed files with 352 additions and 147 deletions.
79 changes: 43 additions & 36 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl FlatStorageResharder {
parent_shard,
left_child_shard,
right_child_shard,
resharding_hash,
resharding_block,
..
} = split_params;
info!(target: "resharding", ?split_params, "initiating flat storage shard split");
Expand All @@ -192,7 +192,7 @@ impl FlatStorageResharder {
left_child_shard,
right_child_shard,
shard_layout: shard_layout.clone(),
resharding_hash,
resharding_block,
flat_head,
};
store_update.set_flat_storage_status(
Expand Down Expand Up @@ -390,11 +390,11 @@ impl FlatStorageResharder {
let mut iter = match self.flat_storage_iterator(
&flat_store,
&parent_shard,
&split_params.resharding_hash,
&split_params.resharding_block.hash,
) {
Ok(iter) => iter,
Err(err) => {
error!(target: "resharding", ?parent_shard, block_hash=?split_params.resharding_hash, ?err, "failed to build flat storage iterator");
error!(target: "resharding", ?parent_shard, block_hash=?split_params.resharding_block.hash, ?err, "failed to build flat storage iterator");
return FlatStorageReshardingTaskResult::Failed;
}
};
Expand Down Expand Up @@ -482,7 +482,7 @@ impl FlatStorageResharder {
left_child_shard,
right_child_shard,
flat_head,
resharding_hash,
resharding_block,
..
} = split_params;
let flat_store = self.runtime.store().flat_store();
Expand All @@ -507,7 +507,7 @@ impl FlatStorageResharder {
store_update.set_flat_storage_status(
child_shard,
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
resharding_hash,
resharding_block,
)),
);
// Catchup will happen in a separate task, so send a request to schedule the
Expand Down Expand Up @@ -670,9 +670,8 @@ impl FlatStorageResharder {
.flat_store()
.get_flat_storage_status(shard_uid)
.map_err(|e| Into::<StorageError>::into(e))?;
let FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
mut flat_head_block_hash,
)) = status
let FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(mut flat_head)) =
status
else {
return Err(Error::Other(format!(
"unexpected resharding catchup flat storage status for {}: {:?}",
Expand All @@ -686,16 +685,16 @@ impl FlatStorageResharder {
target: "resharding",
"shard_catchup_apply_deltas/batch",
?shard_uid,
?flat_head_block_hash,
?flat_head,
batch_id = ?num_batches_done)
.entered();
let chain_final_head = chain_store.final_head()?;

// If we reached the desired new flat head, we can terminate the delta application step.
if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) {
if is_flat_head_on_par_with_chain(&flat_head.hash, &chain_final_head) {
return Ok(Some((
num_batches_done,
Tip::from_header(&chain_store.get_block_header(&flat_head_block_hash)?),
Tip::from_header(&chain_store.get_block_header(&flat_head.hash)?),
)));
}

Expand All @@ -706,26 +705,32 @@ impl FlatStorageResharder {

// Merge deltas from the next blocks until we reach chain final head.
for _ in 0..catch_up_blocks {
let height = chain_store.get_block_height(&flat_head_block_hash)?;
debug_assert!(
height <= chain_final_head.height,
"flat head: {flat_head_block_hash}"
flat_head.height <= chain_final_head.height,
"flat head: {:?}",
&flat_head,
);
// Stop if we reached the desired new flat head.
if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) {
if is_flat_head_on_par_with_chain(&flat_head.hash, &chain_final_head) {
break;
}
if self.coordinate_snapshot(height) {
if self.coordinate_snapshot(flat_head.height) {
postpone = true;
break;
}
flat_head_block_hash = chain_store.get_next_block_hash(&flat_head_block_hash)?;
let next_hash = chain_store.get_next_block_hash(&flat_head.hash)?;
let next_header = chain_store.get_block_header(&next_hash)?;
flat_head = BlockInfo {
hash: *next_header.hash(),
height: next_header.height(),
prev_hash: *next_header.prev_hash(),
};
if let Some(changes) = store
.get_delta(shard_uid, flat_head_block_hash)
.get_delta(shard_uid, flat_head.hash)
.map_err(|err| Into::<StorageError>::into(err))?
{
merged_changes.merge(changes);
store_update.remove_delta(shard_uid, flat_head_block_hash);
store_update.remove_delta(shard_uid, flat_head.hash);
}
// TODO(resharding): if flat_head_block_hash == state sync hash -> do snapshot
}
Expand All @@ -734,14 +739,12 @@ impl FlatStorageResharder {
merged_changes.apply_to_flat_state(&mut store_update, shard_uid);
store_update.set_flat_storage_status(
shard_uid,
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
flat_head_block_hash,
)),
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(flat_head)),
);
store_update.commit()?;

num_batches_done += 1;
metrics.set_head_height(chain_store.get_block_height(&flat_head_block_hash)?);
metrics.set_head_height(flat_head.height);

if postpone {
return Ok(None);
Expand Down Expand Up @@ -1099,7 +1102,7 @@ impl FlatStorageReshardingEventStatus {
fn resharding_hash(&self) -> CryptoHash {
match self {
FlatStorageReshardingEventStatus::SplitShard(_, split_status, ..) => {
split_status.resharding_hash
split_status.resharding_block.hash
}
}
}
Expand Down Expand Up @@ -1384,12 +1387,9 @@ mod tests {
chain: &Chain,
new_shard_layout: &ShardLayout,
) -> ReshardingEventType {
ReshardingEventType::from_shard_layout(
&new_shard_layout,
chain.head().unwrap().last_block_hash,
)
.unwrap()
.unwrap()
ReshardingEventType::from_shard_layout(&new_shard_layout, chain.head().unwrap().into())
.unwrap()
.unwrap()
}

enum PreviousBlockHeight {
Expand Down Expand Up @@ -1524,7 +1524,11 @@ mod tests {
left_child_shard,
right_child_shard,
shard_layout: new_shard_layout,
resharding_hash: CryptoHash::default(),
resharding_block: BlockInfo {
hash: CryptoHash::default(),
height: 2,
prev_hash: CryptoHash::default(),
},
flat_head: BlockInfo {
hash: CryptoHash::default(),
height: 1,
Expand Down Expand Up @@ -1597,7 +1601,7 @@ mod tests {
assert_eq!(
flat_store.get_flat_storage_status(child),
Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
chain.final_head().unwrap().last_block_hash
chain.final_head().unwrap().into()
)))
);
}
Expand Down Expand Up @@ -2219,7 +2223,7 @@ mod tests {
parent_shard,
left_child_shard,
right_child_shard,
resharding_hash,
resharding_block,
..
} = match resharding_event_type.clone() {
ReshardingEventType::SplitShard(params) => params,
Expand Down Expand Up @@ -2278,12 +2282,15 @@ mod tests {
resharder.resharding_config,
);
assert!(resharder
.resume(left_child_shard, &FlatStorageReshardingStatus::CatchingUp(resharding_hash))
.resume(
left_child_shard,
&FlatStorageReshardingStatus::CatchingUp(resharding_block)
)
.is_ok());
assert!(resharder
.resume(
right_child_shard,
&FlatStorageReshardingStatus::CatchingUp(resharding_hash)
&FlatStorageReshardingStatus::CatchingUp(resharding_block)
)
.is_ok());
}
Expand Down
17 changes: 11 additions & 6 deletions chain/chain/src/resharding/event_type.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Collection of all resharding V3 event types.
use near_chain_primitives::Error;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::types::AccountId;
use near_store::flat::BlockInfo;
use near_store::ShardUId;
use tracing::error;

Expand All @@ -27,7 +27,7 @@ pub struct ReshardingSplitShardParams {
/// The account at the boundary between the two children.
pub boundary_account: AccountId,
/// Hash of the last block having the old shard layout.
pub resharding_hash: CryptoHash,
pub resharding_block: BlockInfo,
}

impl ReshardingSplitShardParams {
Expand All @@ -48,7 +48,7 @@ impl ReshardingEventType {
/// `next_shard_layout`, otherwise returns `None`.
pub fn from_shard_layout(
next_shard_layout: &ShardLayout,
resharding_hash: CryptoHash,
resharding_block: BlockInfo,
) -> Result<Option<ReshardingEventType>, Error> {
let log_and_error = |err_msg: &str| {
error!(target: "resharding", ?next_shard_layout, err_msg);
Expand Down Expand Up @@ -104,7 +104,7 @@ impl ReshardingEventType {
left_child_shard,
right_child_shard,
boundary_account,
resharding_hash,
resharding_block,
}));
}
_ => {
Expand All @@ -123,6 +123,7 @@ impl ReshardingEventType {
#[cfg(test)]
mod tests {
use super::*;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::types::{AccountId, ShardId};
use near_store::ShardUId;
Expand All @@ -138,7 +139,11 @@ mod tests {
/// Verify that the correct type of resharding is deduced from a new shard layout.
#[test]
fn parse_event_type_from_shard_layout() {
let block = CryptoHash::hash_bytes(&[1]);
let block = BlockInfo {
hash: CryptoHash::hash_bytes(&[1]),
height: 1,
prev_hash: CryptoHash::hash_bytes(&[2]),
};

let s0 = ShardId::new(0);
let s1 = ShardId::new(1);
Expand Down Expand Up @@ -177,7 +182,7 @@ mod tests {
parent_shard: ShardUId { version: 3, shard_id: 1 },
left_child_shard: ShardUId { version: 3, shard_id: 2 },
right_child_shard: ShardUId { version: 3, shard_id: 3 },
resharding_hash: block,
resharding_block: block,
boundary_account: account!("pp")
}))
);
Expand Down
8 changes: 7 additions & 1 deletion chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::flat::BlockInfo;
use near_store::trie::mem::mem_trie_update::TrackingMode;
use near_store::trie::ops::resharding::RetainMode;
use near_store::trie::TrieRecorder;
Expand Down Expand Up @@ -83,8 +84,13 @@ impl ReshardingManager {
return Ok(());
}

let block_info = BlockInfo {
hash: *block.hash(),
height: block.header().height(),
prev_hash: *block.header().prev_hash(),
};
let resharding_event_type =
ReshardingEventType::from_shard_layout(&next_shard_layout, *block_hash)?;
ReshardingEventType::from_shard_layout(&next_shard_layout, block_info)?;
match resharding_event_type {
Some(ReshardingEventType::SplitShard(split_shard_event)) => {
self.split_shard(
Expand Down
22 changes: 14 additions & 8 deletions chain/chain/src/stateless_validation/chunk_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_pool::TransactionGroupIteratorWrapper;
use near_primitives::apply::ApplyChunkReason;
use near_primitives::block::Block;
use near_primitives::block::{Block, BlockHeader};
use near_primitives::checked_feature;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::merklize;
Expand All @@ -33,6 +33,7 @@ use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{AccountId, ProtocolVersion, ShardId, ShardIndex};
use near_primitives::utils::compression::CompressedData;
use near_store::flat::BlockInfo;
use near_store::trie::ops::resharding::RetainMode;
use near_store::{PartialStorage, Trie};
use std::collections::HashMap;
Expand Down Expand Up @@ -202,7 +203,7 @@ fn get_state_witness_block_range(

if let Some(transition) = get_resharding_transition(
epoch_manager,
prev_hash,
position.prev_block.header(),
shard_uid,
position.num_new_chunks_seen,
)? {
Expand Down Expand Up @@ -272,7 +273,7 @@ fn get_state_witness_block_range(
/// so, returns the corresponding resharding transition parameters.
fn get_resharding_transition(
epoch_manager: &dyn EpochManagerAdapter,
prev_hash: &CryptoHash,
prev_header: &BlockHeader,
shard_uid: ShardUId,
num_new_chunks_seen: u32,
) -> Result<Option<ImplicitTransitionParams>, Error> {
Expand All @@ -282,17 +283,22 @@ fn get_resharding_transition(
return Ok(None);
}

let shard_layout = epoch_manager.get_shard_layout_from_prev_block(prev_hash)?;
let prev_epoch_id = epoch_manager.get_prev_epoch_id_from_prev_block(prev_hash)?;
let shard_layout = epoch_manager.get_shard_layout_from_prev_block(prev_header.hash())?;
let prev_epoch_id = epoch_manager.get_prev_epoch_id_from_prev_block(prev_header.hash())?;
let prev_shard_layout = epoch_manager.get_shard_layout(&prev_epoch_id)?;
let block_has_new_shard_layout =
epoch_manager.is_next_block_epoch_start(prev_hash)? && shard_layout != prev_shard_layout;
let block_has_new_shard_layout = epoch_manager.is_next_block_epoch_start(prev_header.hash())?
&& shard_layout != prev_shard_layout;

if !block_has_new_shard_layout {
return Ok(None);
}

let params = match ReshardingEventType::from_shard_layout(&shard_layout, *prev_hash)? {
let block_info = BlockInfo {
hash: *prev_header.hash(),
height: prev_header.height(),
prev_hash: *prev_header.prev_hash(),
};
let params = match ReshardingEventType::from_shard_layout(&shard_layout, block_info)? {
Some(ReshardingEventType::SplitShard(params)) => params,
None => return Ok(None),
};
Expand Down
6 changes: 3 additions & 3 deletions chain/epoch-manager/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2514,13 +2514,13 @@ fn test_epoch_validators_cache() {
#[test]
fn test_chunk_producers() {
let amount_staked = 1_000_000;
// Make sure that last validator has at least 160/1'000'000 / num_shards of stake.
// Make sure that last validator has at least 160/1'000'000 of stake.
// We're running with 2 shards and test1 + test2 has 2'000'000 tokens - so chunk_only should have over 160.
let validators = vec![
("test1".parse().unwrap(), amount_staked),
("test2".parse().unwrap(), amount_staked),
("chunk_only".parse().unwrap(), 200),
("not_enough_producer".parse().unwrap(), 100),
("chunk_only".parse().unwrap(), 321),
("not_enough_producer".parse().unwrap(), 320),
];

// There are 2 shards, and 2 block producers seats.
Expand Down
Loading

0 comments on commit 0de1ae3

Please sign in to comment.