Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(resharding) - congestion info computation #12581

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# them at earliest convenience :)
# Also in addition to this, the `nextest-integration` test is currently disabled on macos
platform_excludes := if os() == "macos" {
"--exclude node-runtime --exclude runtime-params-estimator --exclude near-network --exclude estimator-warehouse --exclude integration-tests"
""
} else if os() == "windows" {
"--exclude node-runtime --exclude runtime-params-estimator --exclude near-network --exclude estimator-warehouse --exclude integration-tests"
} else {
Expand Down
5 changes: 4 additions & 1 deletion chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,10 @@ fn shard_split_handle_key_value(
| col::BANDWIDTH_SCHEDULER_STATE => {
copy_kv_to_all_children(&split_params, key, value, store_update)
}
col::BUFFERED_RECEIPT_INDICES | col::BUFFERED_RECEIPT => {
col::BUFFERED_RECEIPT_INDICES
| col::BUFFERED_RECEIPT
| col::BUFFERED_RECEIPT_GROUPS_QUEUE_DATA
| col::BUFFERED_RECEIPT_GROUPS_QUEUE_ITEM => {
copy_kv_to_left_child(&split_params, key, value, store_update)
}
_ => unreachable!("key: {:?} should not appear in flat store!", key),
Expand Down
123 changes: 119 additions & 4 deletions chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ use super::types::ReshardingSender;
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController};
use crate::types::RuntimeAdapter;
use crate::ChainStoreUpdate;
use itertools::Itertools;
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle};
use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::block::Block;
use near_primitives::challenge::PartialState;
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::trie::mem::mem_trie_update::TrackingMode;
use near_store::trie::ops::resharding::RetainMode;
use near_store::trie::outgoing_metadata::ReceiptGroupsQueue;
use near_store::trie::TrieRecorder;
use near_store::{DBCol, ShardTries, ShardUId, Store};
use near_store::{DBCol, ShardTries, ShardUId, Store, TrieAccess};

pub struct ReshardingManager {
store: Store,
Expand Down Expand Up @@ -177,7 +180,7 @@ impl ReshardingManager {
// blocks, the second finalization will crash.
tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?;

let chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?;
let parent_chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?;
let boundary_account = split_shard_event.boundary_account;

let mut trie_store_update = self.store.store_update();
Expand All @@ -204,7 +207,7 @@ impl ReshardingManager {
let mut mem_tries = mem_tries.write().unwrap();
let mut trie_recorder = TrieRecorder::new();
let mode = TrackingMode::RefcountsAndAccesses(&mut trie_recorder);
let mem_trie_update = mem_tries.update(*chunk_extra.state_root(), mode)?;
let mem_trie_update = mem_tries.update(*parent_chunk_extra.state_root(), mode)?;

let trie_changes = mem_trie_update.retain_split_shard(&boundary_account, retain_mode);
let partial_storage = trie_recorder.recorded_storage();
Expand All @@ -213,11 +216,30 @@ impl ReshardingManager {
};
let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap();
let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes);
drop(mem_tries);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is annoying but needed to prevent a deadlock.


// Get the congestion info for the child.
let parent_epoch_id = block.header().epoch_id();
let parent_shard_layout = self.epoch_manager.get_shard_layout(&parent_epoch_id)?;
let child_epoch_id = self.epoch_manager.get_next_epoch_id(block.hash())?;
let child_shard_layout = self.epoch_manager.get_shard_layout(&child_epoch_id)?;
let child_congestion_info = Self::get_child_congestion_info(
&tries,
&parent_shard_layout,
parent_shard_uid,
&parent_chunk_extra,
&child_shard_layout,
new_shard_uid,
retain_mode,
)?;

// TODO(resharding): set all fields of `ChunkExtra`. Consider stronger
// typing. Clarify where it should happen when `State` and
// `FlatState` update is implemented.
let mut child_chunk_extra = ChunkExtra::clone(&chunk_extra);
let mut child_chunk_extra = ChunkExtra::clone(&parent_chunk_extra);
*child_chunk_extra.state_root_mut() = new_state_root;
*child_chunk_extra.congestion_info_mut().expect("The congestion info must exist!") =
child_congestion_info;

chain_store_update.save_chunk_extra(block_hash, &new_shard_uid, child_chunk_extra);
chain_store_update.save_state_transition_data(
Expand Down Expand Up @@ -250,6 +272,99 @@ impl ReshardingManager {
Ok(())
}

fn get_child_congestion_info(
tries: &ShardTries,
parent_shard_layout: &ShardLayout,
parent_shard_uid: ShardUId,
parent_chunk_extra: &Arc<ChunkExtra>,
child_shard_layout: &ShardLayout,
child_shard_uid: ShardUId,
retain_mode: RetainMode,
) -> Result<CongestionInfo, Error> {
let parent_congestion_info =
parent_chunk_extra.congestion_info().expect("The congestion info must exist!");

// Get the congestion info based on the parent shard.
let parent_state_root = *parent_chunk_extra.state_root();
let parent_trie = tries.get_trie_for_shard(parent_shard_uid, parent_state_root);
let mut child_congestion_info = Self::get_child_congestion_info_not_finalized(
&parent_trie,
&parent_shard_layout,
parent_congestion_info,
retain_mode,
)?;

// Set the allowed shard based on the child shard.
Self::finalize_allowed_shard(
&child_shard_layout,
child_shard_uid,
&mut child_congestion_info,
)?;

Ok(child_congestion_info)
}

// Get the congestion info for the child shard. The congestion info can be
// inferred efficiently from the combination of the parent shard's
// congestion info and the receipt group metadata, that is available in the
// parent shard's trie.
pub fn get_child_congestion_info_not_finalized(
parent_trie: &dyn TrieAccess,
parent_shard_layout: &ShardLayout,
parent_congestion_info: CongestionInfo,
retain_mode: RetainMode,
) -> Result<CongestionInfo, Error> {
// The left child contains all the delayed and buffered receipts from the
// parent so it should have identical congestion info.
if retain_mode == RetainMode::Left {
return Ok(parent_congestion_info);
}

// The right child contains all the delayed receipts from the parent but it
// has no buffered receipts. It's info needs to be computed by subtracting
// the parent's buffered receipts from the parent's congestion info.
let mut congestion_info = parent_congestion_info;
for shard_id in parent_shard_layout.shard_ids() {
let receipt_groups = ReceiptGroupsQueue::load(parent_trie, shard_id)?;
let Some(receipt_groups) = receipt_groups else {
continue;
};

let bytes = receipt_groups.total_size();
let gas = receipt_groups.total_gas();
let gas = gas.try_into().expect("ReceiptGroup gas must fit in u64");

congestion_info
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not have to set total_receipts_num as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good sanity check! The total_receipts_num field is part of ReceiptGroupsQueueData which is resharded by copying to left child only. It is handled by the memtrie and flat storage split operations. MemTrie is already implemented here and this PR adds the flat storage equivalent.

.remove_buffered_receipt_gas(gas)
.expect("Buffered gas must not exceed congestion info buffered gas");
congestion_info
.remove_receipt_bytes(bytes)
.expect("Buffered size must not exceed congestion info buffered size");
}

// The right child does not inherit any buffered receipts. The
// congestion info must match this invariant.
assert_eq!(congestion_info.buffered_receipts_gas(), 0);

Ok(congestion_info)
}

pub fn finalize_allowed_shard(
child_shard_layout: &ShardLayout,
child_shard_uid: ShardUId,
congestion_info: &mut CongestionInfo,
) -> Result<(), Error> {
let all_shards = child_shard_layout.shard_ids().collect_vec();
let own_shard = child_shard_uid.shard_id();
let own_shard_index = child_shard_layout
.get_shard_index(own_shard)?
.try_into()
.expect("ShardIndex must fit in u64");
let congestion_seed = own_shard_index;
congestion_info.finalize_allowed_shard(own_shard, &all_shards, congestion_seed);
Ok(())
}

// TODO(store): Use proper store interface
fn get_chunk_extra(
&self,
Expand Down
37 changes: 31 additions & 6 deletions chain/chain/src/stateless_validation/chunk_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::chain::{
};
use crate::rayon_spawner::RayonAsyncComputationSpawner;
use crate::resharding::event_type::ReshardingEventType;
use crate::resharding::manager::ReshardingManager;
use crate::sharding::shuffle_receipt_proofs;
use crate::stateless_validation::processing_tracker::ProcessingDoneTracker;
use crate::store::filter_incoming_receipts_for_shard;
Expand Down Expand Up @@ -712,12 +713,36 @@ pub fn validate_chunk_state_witness(
child_shard_uid,
) => {
let old_root = *chunk_extra.state_root();
let trie = Trie::from_recorded_storage(
PartialStorage { nodes: transition.base_state },
old_root,
true,
);
let new_root = trie.retain_split_shard(&boundary_account, retain_mode)?;
let partial_storage = PartialStorage { nodes: transition.base_state };
let parent_trie = Trie::from_recorded_storage(partial_storage, old_root, true);

// Update the congestion info based on the parent shard. It's
// important to do this step before the `retain_split_shard`
// because only the parent has the needed information.
if let Some(congestion_info) = chunk_extra.congestion_info_mut() {
// Get the congestion info based on the parent shard.
let epoch_id = epoch_manager.get_epoch_id(&block_hash)?;
let parent_shard_layout = epoch_manager.get_shard_layout(&epoch_id)?;
let parent_congestion_info = *congestion_info;
*congestion_info = ReshardingManager::get_child_congestion_info_not_finalized(
&parent_trie,
&parent_shard_layout,
parent_congestion_info,
retain_mode,
)?;

// Set the allowed shard based on the child shard.
let next_epoch_id = epoch_manager.get_next_epoch_id(&block_hash)?;
let next_shard_layout = epoch_manager.get_shard_layout(&next_epoch_id)?;
ReshardingManager::finalize_allowed_shard(
&next_shard_layout,
child_shard_uid,
congestion_info,
)?;
}

let new_root = parent_trie.retain_split_shard(&boundary_account, retain_mode)?;

(child_shard_uid, new_root)
}
};
Expand Down
29 changes: 25 additions & 4 deletions core/primitives-core/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,18 @@ impl ProtocolFeature {
// TODO(#11201): When stabilizing this feature in mainnet, also remove the temporary code
// that always enables this for mocknet (see config_mocknet function).
ProtocolFeature::ShuffleShardAssignments => 143,
// CurrentEpochStateSync must be enabled before ReshardingV3! When
// releasing this feature please make sure to schedule separate
// protocol upgrades for those features!
ProtocolFeature::CurrentEpochStateSync => 144,
ProtocolFeature::SimpleNightshadeV4 => 145,
// BandwidthScheduler must be enabled before ReshardingV3! When
// releasing this feature please make sure to schedule separate
// protocol upgrades for those features!
ProtocolFeature::BandwidthScheduler => 145,
ProtocolFeature::SimpleNightshadeV4 => 146,
#[cfg(feature = "protocol_feature_relaxed_chunk_validation")]
ProtocolFeature::RelaxedChunkValidation => 146,
ProtocolFeature::ExcludeExistingCodeFromWitnessForCodeLen => 147,
ProtocolFeature::BandwidthScheduler => 148,
ProtocolFeature::RelaxedChunkValidation => 147,
ProtocolFeature::ExcludeExistingCodeFromWitnessForCodeLen => 148,
ProtocolFeature::BlockHeightForReceiptId => 149,
// Place features that are not yet in Nightly below this line.
}
Expand Down Expand Up @@ -341,3 +347,18 @@ macro_rules! checked_feature {
}
}};
}

#[cfg(test)]
mod tests {
use super::ProtocolFeature;

#[test]
fn test_resharding_dependencies() {
let state_sync = ProtocolFeature::CurrentEpochStateSync.protocol_version();
let bandwidth_scheduler = ProtocolFeature::BandwidthScheduler.protocol_version();
let resharding_v3 = ProtocolFeature::SimpleNightshadeV4.protocol_version();

assert!(state_sync < resharding_v3);
assert!(bandwidth_scheduler < resharding_v3);
}
}
10 changes: 10 additions & 0 deletions core/primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,16 @@ pub mod chunk_extra {
}
}

#[inline]
pub fn congestion_info_mut(&mut self) -> Option<&mut CongestionInfo> {
match self {
Self::V1(_) => None,
Self::V2(_) => None,
Self::V3(v3) => Some(&mut v3.congestion_info),
Self::V4(v4) => Some(&mut v4.congestion_info),
}
}

#[inline]
pub fn bandwidth_requests(&self) -> Option<&BandwidthRequests> {
match self {
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/trie/ops/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::interface::{
};
use super::squash::GenericTrieUpdateSquash;

#[derive(Debug)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
/// Whether to retain left or right part of trie after shard split.
pub enum RetainMode {
Left,
Expand Down
4 changes: 3 additions & 1 deletion integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl TestReshardingParametersBuilder {
limit_outgoing_gas: self.limit_outgoing_gas.unwrap_or(false),
delay_flat_state_resharding: self.delay_flat_state_resharding.unwrap_or(0),
short_yield_timeout: self.short_yield_timeout.unwrap_or(false),
allow_negative_refcount: self.allow_negative_refcount.unwrap_or(false),
allow_negative_refcount: self.allow_negative_refcount.unwrap_or(true),
}
}

Expand Down Expand Up @@ -757,6 +757,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
if params.all_chunks_expected && params.chunk_ranges_to_drop.is_empty() {
assert!(block_header.chunk_mask().iter().all(|chunk_bit| *chunk_bit));
}

tracing::info!(target: "test", epoch_id=?tip.epoch_id, height=?tip.height, "new block");
}

// Return true if we passed an epoch with increased number of shards.
Expand Down
10 changes: 10 additions & 0 deletions runtime/runtime/src/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,27 @@ impl ReceiptSinkV2 {
parent_shard_ids.intersection(&shard_ids.clone().into_iter().collect()).count() == 0
);

let mut all_buffers_empty = true;

// First forward any receipts that may still be in the outgoing buffers
// of the parent shards.
for &shard_id in &parent_shard_ids {
self.forward_from_buffer_to_shard(shard_id, state_update, apply_state, &shard_layout)?;
let is_buffer_empty = self.outgoing_buffers.to_shard(shard_id).len() == 0;
all_buffers_empty &= is_buffer_empty;
}

// Then forward receipts from the outgoing buffers of the shard in the
// current shard layout.
for &shard_id in &shard_ids {
self.forward_from_buffer_to_shard(shard_id, state_update, apply_state, &shard_layout)?;
let is_buffer_empty = self.outgoing_buffers.to_shard(shard_id).len() == 0;
all_buffers_empty &= is_buffer_empty;
}

// Assert that empty buffers match zero buffered gas.
assert_eq!(all_buffers_empty, self.own_congestion_info.buffered_receipts_gas() == 0);

Ok(())
}

Expand Down
Loading