-
Notifications
You must be signed in to change notification settings - Fork 648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(resharding) - congestion info computation #12581
base: master
Are you sure you want to change the base?
Changes from all commits
2ccd943
bb57b01
ffe30d9
9d04209
78b5bc8
f5ddc63
419e804
8b73a77
0457ee3
e432db8
4eea6ad
752e804
f49a309
f10e0ac
fbcca65
db568c2
d6e3617
c8c683a
bb0f0be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,19 +6,22 @@ use super::types::ReshardingSender; | |
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController}; | ||
use crate::types::RuntimeAdapter; | ||
use crate::ChainStoreUpdate; | ||
use itertools::Itertools; | ||
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle}; | ||
use near_chain_primitives::Error; | ||
use near_epoch_manager::EpochManagerAdapter; | ||
use near_primitives::block::Block; | ||
use near_primitives::challenge::PartialState; | ||
use near_primitives::congestion_info::CongestionInfo; | ||
use near_primitives::hash::CryptoHash; | ||
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout}; | ||
use near_primitives::types::chunk_extra::ChunkExtra; | ||
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; | ||
use near_store::trie::mem::mem_trie_update::TrackingMode; | ||
use near_store::trie::ops::resharding::RetainMode; | ||
use near_store::trie::outgoing_metadata::ReceiptGroupsQueue; | ||
use near_store::trie::TrieRecorder; | ||
use near_store::{DBCol, ShardTries, ShardUId, Store}; | ||
use near_store::{DBCol, ShardTries, ShardUId, Store, TrieAccess}; | ||
|
||
pub struct ReshardingManager { | ||
store: Store, | ||
|
@@ -177,7 +180,7 @@ impl ReshardingManager { | |
// blocks, the second finalization will crash. | ||
tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?; | ||
|
||
let chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?; | ||
let parent_chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?; | ||
let boundary_account = split_shard_event.boundary_account; | ||
|
||
let mut trie_store_update = self.store.store_update(); | ||
|
@@ -204,7 +207,7 @@ impl ReshardingManager { | |
let mut mem_tries = mem_tries.write().unwrap(); | ||
let mut trie_recorder = TrieRecorder::new(); | ||
let mode = TrackingMode::RefcountsAndAccesses(&mut trie_recorder); | ||
let mem_trie_update = mem_tries.update(*chunk_extra.state_root(), mode)?; | ||
let mem_trie_update = mem_tries.update(*parent_chunk_extra.state_root(), mode)?; | ||
|
||
let trie_changes = mem_trie_update.retain_split_shard(&boundary_account, retain_mode); | ||
let partial_storage = trie_recorder.recorded_storage(); | ||
|
@@ -213,11 +216,30 @@ impl ReshardingManager { | |
}; | ||
let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap(); | ||
let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes); | ||
drop(mem_tries); | ||
|
||
// Get the congestion info for the child. | ||
let parent_epoch_id = block.header().epoch_id(); | ||
let parent_shard_layout = self.epoch_manager.get_shard_layout(&parent_epoch_id)?; | ||
let child_epoch_id = self.epoch_manager.get_next_epoch_id(block.hash())?; | ||
let child_shard_layout = self.epoch_manager.get_shard_layout(&child_epoch_id)?; | ||
let child_congestion_info = Self::get_child_congestion_info( | ||
&tries, | ||
&parent_shard_layout, | ||
parent_shard_uid, | ||
&parent_chunk_extra, | ||
&child_shard_layout, | ||
new_shard_uid, | ||
retain_mode, | ||
)?; | ||
|
||
// TODO(resharding): set all fields of `ChunkExtra`. Consider stronger | ||
// typing. Clarify where it should happen when `State` and | ||
// `FlatState` update is implemented. | ||
let mut child_chunk_extra = ChunkExtra::clone(&chunk_extra); | ||
let mut child_chunk_extra = ChunkExtra::clone(&parent_chunk_extra); | ||
*child_chunk_extra.state_root_mut() = new_state_root; | ||
*child_chunk_extra.congestion_info_mut().expect("The congestion info must exist!") = | ||
child_congestion_info; | ||
|
||
chain_store_update.save_chunk_extra(block_hash, &new_shard_uid, child_chunk_extra); | ||
chain_store_update.save_state_transition_data( | ||
|
@@ -250,6 +272,99 @@ impl ReshardingManager { | |
Ok(()) | ||
} | ||
|
||
fn get_child_congestion_info( | ||
tries: &ShardTries, | ||
parent_shard_layout: &ShardLayout, | ||
parent_shard_uid: ShardUId, | ||
parent_chunk_extra: &Arc<ChunkExtra>, | ||
child_shard_layout: &ShardLayout, | ||
child_shard_uid: ShardUId, | ||
retain_mode: RetainMode, | ||
) -> Result<CongestionInfo, Error> { | ||
let parent_congestion_info = | ||
parent_chunk_extra.congestion_info().expect("The congestion info must exist!"); | ||
|
||
// Get the congestion info based on the parent shard. | ||
let parent_state_root = *parent_chunk_extra.state_root(); | ||
let parent_trie = tries.get_trie_for_shard(parent_shard_uid, parent_state_root); | ||
let mut child_congestion_info = Self::get_child_congestion_info_not_finalized( | ||
&parent_trie, | ||
&parent_shard_layout, | ||
parent_congestion_info, | ||
retain_mode, | ||
)?; | ||
|
||
// Set the allowed shard based on the child shard. | ||
Self::finalize_allowed_shard( | ||
&child_shard_layout, | ||
child_shard_uid, | ||
&mut child_congestion_info, | ||
)?; | ||
|
||
Ok(child_congestion_info) | ||
} | ||
|
||
// Get the congestion info for the child shard. The congestion info can be | ||
// inferred efficiently from the combination of the parent shard's | ||
// congestion info and the receipt group metadata, that is available in the | ||
// parent shard's trie. | ||
pub fn get_child_congestion_info_not_finalized( | ||
parent_trie: &dyn TrieAccess, | ||
parent_shard_layout: &ShardLayout, | ||
parent_congestion_info: CongestionInfo, | ||
retain_mode: RetainMode, | ||
) -> Result<CongestionInfo, Error> { | ||
// The left child contains all the delayed and buffered receipts from the | ||
// parent so it should have identical congestion info. | ||
if retain_mode == RetainMode::Left { | ||
return Ok(parent_congestion_info); | ||
} | ||
|
||
// The right child contains all the delayed receipts from the parent but it | ||
// has no buffered receipts. It's info needs to be computed by subtracting | ||
// the parent's buffered receipts from the parent's congestion info. | ||
let mut congestion_info = parent_congestion_info; | ||
for shard_id in parent_shard_layout.shard_ids() { | ||
let receipt_groups = ReceiptGroupsQueue::load(parent_trie, shard_id)?; | ||
let Some(receipt_groups) = receipt_groups else { | ||
continue; | ||
}; | ||
|
||
let bytes = receipt_groups.total_size(); | ||
let gas = receipt_groups.total_gas(); | ||
let gas = gas.try_into().expect("ReceiptGroup gas must fit in u64"); | ||
|
||
congestion_info | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we not have to set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good sanity check! The |
||
.remove_buffered_receipt_gas(gas) | ||
.expect("Buffered gas must not exceed congestion info buffered gas"); | ||
congestion_info | ||
.remove_receipt_bytes(bytes) | ||
.expect("Buffered size must not exceed congestion info buffered size"); | ||
} | ||
|
||
// The right child does not inherit any buffered receipts. The | ||
// congestion info must match this invariant. | ||
assert_eq!(congestion_info.buffered_receipts_gas(), 0); | ||
|
||
Ok(congestion_info) | ||
} | ||
|
||
pub fn finalize_allowed_shard( | ||
child_shard_layout: &ShardLayout, | ||
child_shard_uid: ShardUId, | ||
congestion_info: &mut CongestionInfo, | ||
) -> Result<(), Error> { | ||
let all_shards = child_shard_layout.shard_ids().collect_vec(); | ||
let own_shard = child_shard_uid.shard_id(); | ||
let own_shard_index = child_shard_layout | ||
.get_shard_index(own_shard)? | ||
.try_into() | ||
.expect("ShardIndex must fit in u64"); | ||
let congestion_seed = own_shard_index; | ||
congestion_info.finalize_allowed_shard(own_shard, &all_shards, congestion_seed); | ||
Ok(()) | ||
} | ||
|
||
// TODO(store): Use proper store interface | ||
fn get_chunk_extra( | ||
&self, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is annoying but needed to prevent a deadlock.