Skip to content

Commit

Permalink
fix(resharding): Increase epoch length for shard shuffling (#12674)
Browse files Browse the repository at this point in the history
Some resharding tests were failing with single shard tracking enabled
because of epoch length was sligthly too short.
Under the hood, if we are close to epoch length (4 blocks or less), we
forward transactions to chunk producer for the next epoch too. But if
the epoch length is 6, it means the chunk producer has only 2 blocks to
catchup before it is supposed to start accepting transactions for the
new shard. In such case, `TX_CHECK_BLOCKS_AFTER_RESHARDING` needs also
be increased.

**Changes:**
This PR sets `INCREASED_EPOCH_LENGTH: 8` for these tests.
For remaining tests, I keep the old `DEFAULT_EPOCH_LENGTH: 6`, to test
potentially more corner cases, and some tests seem to be designed with
this specific epoch length in mind.
`TX_CHECK_BLOCKS_AFTER_RESHARDING ` is increased to almost max
(`epoch_length - 2`).
Loop actions are slightly refactored, introducing `LoopAction` to make
sure these actions actually succeed before the test is over. It happened
to me several times they did not, if some parameter was changed.
  • Loading branch information
staffik authored Jan 2, 2025
1 parent edaccb6 commit 2338f72
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 40 deletions.
90 changes: 60 additions & 30 deletions integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use crate::test_loop::builder::TestLoopBuilder;
use crate::test_loop::env::{TestData, TestLoopEnv};
use crate::test_loop::utils::loop_action::{LoopAction, LoopActionStatus};
use crate::test_loop::utils::receipts::{
check_receipts_presence_after_resharding_block, check_receipts_presence_at_resharding_block,
ReceiptKind,
Expand All @@ -35,14 +36,25 @@ use crate::test_loop::utils::transactions::{
use crate::test_loop::utils::trie_sanity::{
check_state_shard_uid_mapping_after_resharding, TrieSanityCheck,
};
use crate::test_loop::utils::{get_node_data, retrieve_client_actor, LoopActionFn, ONE_NEAR, TGAS};
use crate::test_loop::utils::{get_node_data, retrieve_client_actor, ONE_NEAR, TGAS};
use assert_matches::assert_matches;
use near_crypto::Signer;
use near_parameters::{vm, RuntimeConfig, RuntimeConfigStore};
use near_primitives::test_utils::create_user_test_signer;
use near_primitives::transaction::SignedTransaction;
use near_primitives::views::{FinalExecutionStatus, QueryRequest};

/// Default and minimal epoch length used in resharding tests.
const DEFAULT_EPOCH_LENGTH: u64 = 6;

/// Increased epoch length that has to be used in some tests due to the delay caused by catch up.
///
/// With shorter epoch length, a chunk producer might not finish catch up on time,
/// before it is supposed to accept transactions for the next epoch.
/// That would result in chunk producer rejecting a transaction
/// and later we would hit the `DBNotFoundErr("Transaction ...)` error in tests.
const INCREASED_EPOCH_LENGTH: u64 = 8;

#[derive(derive_builder::Builder)]
#[builder(pattern = "owned", build_fn(skip))]
#[allow(unused)]
Expand Down Expand Up @@ -82,7 +94,7 @@ struct TestReshardingParameters {
load_mem_tries_for_tracked_shards: bool,
/// Custom behavior executed at every iteration of test loop.
#[builder(setter(custom))]
loop_actions: Vec<LoopActionFn>,
loop_actions: Vec<LoopAction>,
// When enabling shard shuffling with a short epoch length, sometimes a node might not finish
// catching up by the end of the epoch, and then misses a chunk. This can be fixed by using a longer
// epoch length, but it's good to also check what happens with shorter ones.
Expand All @@ -105,7 +117,7 @@ struct TestReshardingParameters {

impl TestReshardingParametersBuilder {
fn build(self) -> TestReshardingParameters {
let epoch_length = self.epoch_length.unwrap_or(6);
let epoch_length = self.epoch_length.unwrap_or(DEFAULT_EPOCH_LENGTH);

let num_accounts = self.num_accounts.unwrap_or(8);
let num_clients = self.num_clients.unwrap_or(7);
Expand Down Expand Up @@ -193,7 +205,7 @@ impl TestReshardingParametersBuilder {
}
}

fn add_loop_action(mut self, loop_action: LoopActionFn) -> Self {
fn add_loop_action(mut self, loop_action: LoopAction) -> Self {
self.loop_actions.get_or_insert_default().push(loop_action);
self
}
Expand All @@ -212,12 +224,12 @@ impl TestReshardingParametersBuilder {

// Returns a callable function that, when invoked inside a test loop iteration, can force the creation of a chain fork.
#[cfg(feature = "test_features")]
fn fork_before_resharding_block(double_signing: bool) -> LoopActionFn {
fn fork_before_resharding_block(double_signing: bool) -> LoopAction {
use crate::test_loop::utils::retrieve_client_actor;
use near_client::client_actor::AdvProduceBlockHeightSelection;

let done = Cell::new(false);
Box::new(
let (done, succeeded) = LoopAction::shared_success_flag();
let action_fn = Box::new(
move |node_datas: &[TestData],
test_loop_data: &mut TestLoopData,
client_account_id: AccountId| {
Expand Down Expand Up @@ -248,17 +260,19 @@ fn fork_before_resharding_block(double_signing: bool) -> LoopActionFn {
done.set(true);
}
},
)
);
LoopAction::new(action_fn, succeeded)
}

fn execute_money_transfers(account_ids: Vec<AccountId>) -> LoopActionFn {
fn execute_money_transfers(account_ids: Vec<AccountId>) -> LoopAction {
const NUM_TRANSFERS_PER_BLOCK: usize = 20;

let latest_height = Cell::new(0);
let seed = rand::thread_rng().gen::<u64>();
println!("Random seed: {}", seed);

Box::new(
let (ran_transfers, succeeded) = LoopAction::shared_success_flag();
let action_fn = Box::new(
move |node_datas: &[TestData],
test_loop_data: &mut TestLoopData,
client_account_id: AccountId| {
Expand Down Expand Up @@ -301,8 +315,10 @@ fn execute_money_transfers(account_ids: Vec<AccountId>) -> LoopActionFn {
);
submit_tx(&node_datas, &client_account_id, tx);
}
ran_transfers.set(true);
},
)
);
LoopAction::new(action_fn, succeeded)
}

/// Returns a loop action that invokes a costly method from a contract
Expand All @@ -315,17 +331,20 @@ fn call_burn_gas_contract(
signer_ids: Vec<AccountId>,
receiver_ids: Vec<AccountId>,
gas_burnt_per_call: Gas,
) -> LoopActionFn {
// Must be less than epoch length, otherwise transactions won't be checked.
const TX_CHECK_BLOCKS_AFTER_RESHARDING: u64 = 5;
epoch_length: u64,
) -> LoopAction {
const CALLS_PER_BLOCK_HEIGHT: usize = 5;
// Set to a value large enough, so that transactions from the past epoch are settled.
// Must be less than epoch length, otherwise won't be triggered before the test is finished.
let tx_check_blocks_after_resharding = epoch_length - 2;

let resharding_height = Cell::new(None);
let nonce = Cell::new(102);
let txs = Cell::new(vec![]);
let latest_height = Cell::new(0);
let (checked_transactions, succeeded) = LoopAction::shared_success_flag();

Box::new(
let action_fn = Box::new(
move |node_datas: &[TestData],
test_loop_data: &mut TestLoopData,
client_account_id: AccountId| {
Expand All @@ -341,7 +360,7 @@ fn call_burn_gas_contract(

// After resharding: wait some blocks and check that all txs have been executed correctly.
if let Some(height) = resharding_height.get() {
if tip.height > height + TX_CHECK_BLOCKS_AFTER_RESHARDING {
if tip.height > height + tx_check_blocks_after_resharding {
for (tx, tx_height) in txs.take() {
let tx_outcome =
client_actor.client.chain.get_partial_transaction_result(&tx);
Expand All @@ -350,6 +369,7 @@ fn call_burn_gas_contract(
tracing::debug!(target: "test", ?tx_height, ?tx, ?status, "transaction status");
assert_matches!(status, FinalExecutionStatus::SuccessValue(_));
}
checked_transactions.set(true);
}
} else {
if next_block_has_new_shard_layout(client_actor.client.epoch_manager.as_ref(), &tip)
Expand Down Expand Up @@ -395,7 +415,8 @@ fn call_burn_gas_contract(
}
}
},
)
);
LoopAction::new(action_fn, succeeded)
}

/// Sends a promise-yield transaction before resharding. Then, if `call_resume` is `true` also sends
Expand All @@ -408,15 +429,16 @@ fn call_promise_yield(
call_resume: bool,
signer_ids: Vec<AccountId>,
receiver_ids: Vec<AccountId>,
) -> LoopActionFn {
) -> LoopAction {
let resharding_height: Cell<Option<u64>> = Cell::new(None);
let txs = Cell::new(vec![]);
let latest_height = Cell::new(0);
let promise_txs_sent = Cell::new(false);
let nonce = Cell::new(102);
let yield_payload = vec![];
let (checked_transactions, succeeded) = LoopAction::shared_success_flag();

Box::new(
let action_fn = Box::new(
move |node_datas: &[TestData],
test_loop_data: &mut TestLoopData,
client_account_id: AccountId| {
Expand Down Expand Up @@ -476,6 +498,7 @@ fn call_promise_yield(
tracing::debug!(target: "test", ?tx_height, ?tx, ?status, "transaction status");
assert_matches!(status, FinalExecutionStatus::SuccessValue(_));
}
checked_transactions.set(true);
}
(Some(_resharding), _latest) => {}
// Resharding didn't happen in the past.
Expand Down Expand Up @@ -523,7 +546,8 @@ fn call_promise_yield(
}
}
},
)
);
LoopAction::new(action_fn, succeeded)
}

fn get_base_shard_layout(version: u64) -> ShardLayout {
Expand Down Expand Up @@ -738,7 +762,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
params
.loop_actions
.iter()
.for_each(|action| action(&env.datas, test_loop_data, client_account_id.clone()));
.for_each(|action| action.call(&env.datas, test_loop_data, client_account_id.clone()));

let clients =
client_handles.iter().map(|handle| &test_loop_data.get(handle).client).collect_vec();
Expand Down Expand Up @@ -774,6 +798,9 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
println!("State after resharding:");
print_and_assert_shard_accounts(&clients, &tip);
check_state_shard_uid_mapping_after_resharding(&client, parent_shard_uid);
for loop_action in &params.loop_actions {
assert_matches!(loop_action.get_status(), LoopActionStatus::Succeeded);
}
return true;
};

Expand Down Expand Up @@ -907,7 +934,7 @@ fn test_resharding_v3_shard_shuffling_intense() {
let chunk_ranges_to_drop = HashMap::from([(0, -1..2), (1, -3..0), (2, -3..3), (3, 0..1)]);
let params = TestReshardingParametersBuilder::default()
.num_accounts(8)
.epoch_length(8)
.epoch_length(INCREASED_EPOCH_LENGTH)
.shuffle_shard_assignment_for_chunk_producers(true)
.chunk_ranges_to_drop(chunk_ranges_to_drop)
.add_loop_action(execute_money_transfers(
Expand All @@ -927,6 +954,7 @@ fn test_resharding_v3_delayed_receipts_left_child() {
vec![account.clone()],
vec![account.clone()],
275 * TGAS,
DEFAULT_EPOCH_LENGTH,
))
.add_loop_action(check_receipts_presence_at_resharding_block(
vec![account],
Expand All @@ -947,14 +975,14 @@ fn test_resharding_v3_delayed_receipts_right_child() {
vec![account.clone()],
vec![account.clone()],
275 * TGAS,
INCREASED_EPOCH_LENGTH,
))
.add_loop_action(check_receipts_presence_at_resharding_block(
vec![account],
ReceiptKind::Delayed,
))
.allow_negative_refcount(true)
// TODO(resharding): test should work without changes to track_all_shards
.track_all_shards(true)
.epoch_length(INCREASED_EPOCH_LENGTH)
.build();
test_resharding_v3_base(params);
}
Expand All @@ -972,6 +1000,7 @@ fn test_resharding_v3_split_parent_buffered_receipts_base(base_shard_layout_vers
vec![account_in_left_child.clone(), account_in_right_child],
vec![receiver_account],
10 * TGAS,
INCREASED_EPOCH_LENGTH,
))
.add_loop_action(check_receipts_presence_at_resharding_block(
vec![account_in_parent],
Expand All @@ -981,8 +1010,7 @@ fn test_resharding_v3_split_parent_buffered_receipts_base(base_shard_layout_vers
vec![account_in_left_child],
ReceiptKind::Buffered,
))
// TODO(resharding): test should work without changes to track_all_shards
.track_all_shards(true)
.epoch_length(INCREASED_EPOCH_LENGTH)
.build();
test_resharding_v3_base(params);
}
Expand Down Expand Up @@ -1015,6 +1043,7 @@ fn test_resharding_v3_buffered_receipts_towards_splitted_shard_base(
vec![account_in_stable_shard.clone()],
vec![account_in_left_child, account_in_right_child],
10 * TGAS,
DEFAULT_EPOCH_LENGTH,
))
.add_loop_action(check_receipts_presence_at_resharding_block(
vec![account_in_stable_shard.clone()],
Expand Down Expand Up @@ -1052,6 +1081,7 @@ fn test_resharding_v3_outgoing_receipts_towards_splitted_shard() {
vec![account_1_in_stable_shard, account_2_in_stable_shard],
vec![receiver_account],
5 * TGAS,
DEFAULT_EPOCH_LENGTH,
))
.build();
test_resharding_v3_base(params);
Expand All @@ -1069,9 +1099,9 @@ fn test_resharding_v3_outgoing_receipts_from_splitted_shard() {
vec![account_in_left_child, account_in_right_child],
vec![receiver_account],
5 * TGAS,
INCREASED_EPOCH_LENGTH,
))
// TODO(resharding): test should work without changes to track_all_shards
.track_all_shards(true)
.epoch_length(INCREASED_EPOCH_LENGTH)
.build();
test_resharding_v3_base(params);
}
Expand Down Expand Up @@ -1108,7 +1138,7 @@ fn test_resharding_v3_slower_post_processing_tasks() {
test_resharding_v3_base(
TestReshardingParametersBuilder::default()
.delay_flat_state_resharding(2)
.epoch_length(13)
.epoch_length(INCREASED_EPOCH_LENGTH)
.build(),
);
}
Expand All @@ -1119,7 +1149,7 @@ fn test_resharding_v3_shard_shuffling_slower_post_processing_tasks() {
let params = TestReshardingParametersBuilder::default()
.shuffle_shard_assignment_for_chunk_producers(true)
.delay_flat_state_resharding(2)
.epoch_length(13)
.epoch_length(INCREASED_EPOCH_LENGTH)
.build();
test_resharding_v3_base(params);
}
Expand Down
72 changes: 72 additions & 0 deletions integration-tests/src/test_loop/utils/loop_action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::cell::Cell;
use std::rc::Rc;

use near_async::test_loop::data::TestLoopData;
use near_primitives::types::AccountId;

use crate::test_loop::env::TestData;

/// Signature of functions callable from inside the inner loop of a testloop test.
pub(crate) type LoopActionFn = Box<dyn Fn(&[TestData], &mut TestLoopData, AccountId)>;

/// A wrapper for a callable (action) to be used inside a testloop test.
///
/// The action has two failure modes:
/// 1. It can fail during an execution of the callable (e.g. assert failure).
/// 2. The `succeeded` has never been set by the action and the testloop test is over.
///
/// The expectation is that `succeeded` would eventually be set to true by some iteration of `action_fn`.
pub(crate) struct LoopAction {
action_fn: LoopActionFn,
started: Cell<bool>,
succeeded: Rc<Cell<bool>>,
}

impl LoopAction {
/// Returns a pair of pointers to the same flag, initially set to false.
/// To be used for a succees flag that is shared between `LoopAction` and its `LoopActionFn`.
pub fn shared_success_flag() -> (Rc<Cell<bool>>, Rc<Cell<bool>>) {
let flag = Rc::new(Cell::new(false));
(flag.clone(), flag)
}
}

/// Current status for a `LoopAction`.
#[derive(Debug)]
pub enum LoopActionStatus {
/// The action has never been called.
NotStarted,
/// The action has been called, but it has not set the `succeeded` flag yet.
Started,
/// The `succeeded` flag has been set.
Succeeded,
}

impl LoopAction {
/// The `succeeded` flag should be shared with `action_fn` that will update the flag at some point.
pub fn new(action_fn: LoopActionFn, succeeded: Rc<Cell<bool>>) -> LoopAction {
LoopAction { action_fn, started: Cell::new(false), succeeded }
}

/// Call the action callable with provided arguments.
pub fn call(
&self,
test_data: &[TestData],
test_loop_data: &mut TestLoopData,
account_id: AccountId,
) {
self.started.set(true);
(self.action_fn)(test_data, test_loop_data, account_id)
}

/// Return the current status of the loop action.
pub fn get_status(&self) -> LoopActionStatus {
if self.succeeded.get() {
return LoopActionStatus::Succeeded;
}
if self.started.get() {
return LoopActionStatus::Started;
}
LoopActionStatus::NotStarted
}
}
Loading

0 comments on commit 2338f72

Please sign in to comment.