Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(resharding): fix Chain::get_shards_to_state_sync() #12617

Merged
merged 12 commits into from
Jan 6, 2025
164 changes: 93 additions & 71 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,28 +795,27 @@ impl Chain {
fn get_state_sync_info(
&self,
me: &Option<AccountId>,
epoch_first_block: &Block,
epoch_id: &EpochId,
block_hash: &CryptoHash,
prev_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
wacban marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Option<StateSyncInfo>, Error> {
let prev_hash = *epoch_first_block.header().prev_hash();
let shards_to_state_sync = Chain::get_shards_to_state_sync(
self.epoch_manager.as_ref(),
&self.shard_tracker,
me,
&prev_hash,
prev_hash,
prev_prev_hash,
)?;
if shards_to_state_sync.is_empty() {
Ok(None)
} else {
debug!(target: "chain", "Downloading state for {:?}, I'm {:?}", shards_to_state_sync, me);
let epoch_id = epoch_first_block.header().epoch_id();
let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?;
// Note that this block is the first block in an epoch because this function is only called
// in get_catchup_and_state_sync_infos() when that is the case.
let state_sync_info = StateSyncInfo::new(
protocol_version,
*epoch_first_block.header().hash(),
shards_to_state_sync,
);
let state_sync_info =
StateSyncInfo::new(protocol_version, *block_hash, shards_to_state_sync);
Ok(Some(state_sync_info))
}
}
Expand Down Expand Up @@ -2271,7 +2270,7 @@ impl Chain {
// First real I/O expense.
let prev = self.get_previous_header(header)?;
let prev_hash = *prev.hash();
let prev_prev_hash = *prev.prev_hash();
let prev_prev_hash = prev.prev_hash();
let gas_price = prev.next_gas_price();
let prev_random_value = *prev.random_value();
let prev_height = prev.height();
Expand All @@ -2281,8 +2280,13 @@ impl Chain {
return Err(Error::InvalidBlockHeight(prev_height));
}

let (is_caught_up, state_sync_info) =
self.get_catchup_and_state_sync_infos(header, prev_hash, prev_prev_hash, me, block)?;
let (is_caught_up, state_sync_info) = self.get_catchup_and_state_sync_infos(
header.epoch_id(),
header.hash(),
&prev_hash,
prev_prev_hash,
me,
)?;

self.check_if_challenged_block_on_chain(header)?;

Expand Down Expand Up @@ -2375,29 +2379,32 @@ impl Chain {

fn get_catchup_and_state_sync_infos(
&self,
header: &BlockHeader,
prev_hash: CryptoHash,
prev_prev_hash: CryptoHash,
epoch_id: &EpochId,
block_hash: &CryptoHash,
prev_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
me: &Option<AccountId>,
block: &MaybeValidated<Block>,
) -> Result<(bool, Option<StateSyncInfo>), Error> {
if self.epoch_manager.is_next_block_epoch_start(&prev_hash)? {
debug!(target: "chain", block_hash=?header.hash(), "block is the first block of an epoch");
if !self.prev_block_is_caught_up(&prev_prev_hash, &prev_hash)? {
// The previous block is not caught up for the next epoch relative to the previous
// block, which is the current epoch for this block, so this block cannot be applied
// at all yet, needs to be orphaned
return Err(Error::Orphan);
}

// For the first block of the epoch we check if we need to start download states for
// shards that we will care about in the next epoch. If there is no state to be downloaded,
// we consider that we are caught up, otherwise not
let state_sync_info = self.get_state_sync_info(me, block)?;
Ok((state_sync_info.is_none(), state_sync_info))
} else {
Ok((self.prev_block_is_caught_up(&prev_prev_hash, &prev_hash)?, None))
if !self.epoch_manager.is_next_block_epoch_start(prev_hash)? {
return Ok((self.prev_block_is_caught_up(prev_prev_hash, prev_hash)?, None));
}
if !self.prev_block_is_caught_up(prev_prev_hash, prev_hash)? {
// The previous block is not caught up for the next epoch relative to the previous
// block, which is the current epoch for this block, so this block cannot be applied
// at all yet, needs to be orphaned
return Err(Error::Orphan);
}

// For the first block of the epoch we check if we need to start download states for
// shards that we will care about in the next epoch. If there is no state to be downloaded,
// we consider that we are caught up, otherwise not
let state_sync_info =
self.get_state_sync_info(me, epoch_id, block_hash, prev_hash, prev_prev_hash)?;
debug!(
target: "chain", %block_hash, shards_to_sync=?state_sync_info.as_ref().map(|s| s.shards()),
"Checked for shards to sync for epoch T+1 upon processing first block of epoch T"
);
Ok((state_sync_info.is_none(), state_sync_info))
}

pub fn prev_block_is_caught_up(
Expand Down Expand Up @@ -2425,56 +2432,71 @@ impl Chain {
shard_tracker: &ShardTracker,
me: &Option<AccountId>,
parent_hash: &CryptoHash,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<unrelated rant> Why do we have two names - prev and parent - for the same thing? 😢 </unrelated rant>

prev_prev_hash: &CryptoHash,
) -> Result<Vec<ShardId>, Error> {
let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash)?;
Ok((epoch_manager.shard_ids(&epoch_id)?)
.into_iter()
.filter(|shard_id| {
Self::should_catch_up_shard(
epoch_manager,
shard_tracker,
me,
parent_hash,
*shard_id,
)
})
.collect())
let mut shards_to_sync = Vec::new();
for shard_id in epoch_manager.shard_ids(&epoch_id)? {
Comment on lines +2438 to +2439
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to for loop

if Self::should_catch_up_shard(
epoch_manager,
shard_tracker,
me,
&epoch_id,
parent_hash,
prev_prev_hash,
shard_id,
)? {
shards_to_sync.push(shard_id)
}
}
Ok(shards_to_sync)
}

/// Returns whether we need to initiate state sync for the given `shard_id` for the epoch
/// beginning after the block `epoch_last_block`. If that epoch is epoch T, the logic is:
/// - will track the shard in epoch T+1
/// - AND not tracking it in T
/// - AND didn't track it in T-1
/// We check that we didn't track it in T-1 because if so, and we're in the relatively rare case
/// where we'll go from tracking it to not tracking it and back to tracking it in consecutive epochs,
/// then we can just continue to apply chunks as if we were tracking it in epoch T, and there's no need to state sync.
fn should_catch_up_shard(
epoch_manager: &dyn EpochManagerAdapter,
shard_tracker: &ShardTracker,
me: &Option<AccountId>,
parent_hash: &CryptoHash,
epoch_id: &EpochId,
epoch_last_block: &CryptoHash,
epoch_last_block_prev: &CryptoHash,
shard_id: ShardId,
) -> bool {
let result = epoch_manager.will_shard_layout_change(parent_hash);
let will_shard_layout_change = match result {
Ok(_will_shard_layout_change) => {
// TODO(#11881): before state sync is fixed, we don't catch up
// split shards. Assume that all needed shards are tracked
// already.
// will_shard_layout_change,
false
}
Err(err) => {
// TODO(resharding) This is a problem, if this happens the node
// will not perform resharding and fall behind the network.
tracing::error!(target: "chain", ?err, "failed to check if shard layout will change");
false
}
};
// if shard layout will change the next epoch, we should catch up the shard regardless
// whether we already have the shard's state this epoch, because we need to generate
// new states for shards split from the current shard for the next epoch
let will_care_about_shard =
shard_tracker.will_care_about_shard(me.as_ref(), parent_hash, shard_id, true);
let does_care_about_shard =
shard_tracker.care_about_shard(me.as_ref(), parent_hash, shard_id, true);
) -> Result<bool, Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the old condition is no longer relevant but the logic here seems too complicated.

In V2 we needed to get a second copy of the parent, even the node tracks it, to perform the preprocessing.
will_care_about_shard && (will_shard_layout_change || !does_care_about_shard)

In V3 this should be needed so it feels like the following should be sufficient:
will_care_about_shard && !does_care_about_shard

Please keep in mind that will_care_about_shard is resharding aware. We should aim to keep the resharding specific part of this logic in one place if possible.

Copy link
Contributor Author

@marcelo-gonzalez marcelo-gonzalez Dec 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In V3 this should be needed so it feels like the following should be sufficient: will_care_about_shard && !does_care_about_shard

So, relative to the current PR, something like:

diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs
index 931f711f8..ac6b3d650 100644
--- a/chain/chain/src/chain.rs
+++ b/chain/chain/src/chain.rs
@@ -2465,32 +2465,7 @@ impl Chain {
         if shard_tracker.care_about_shard(me.as_ref(), parent_hash, shard_id, true) {
             return Ok(false);
         }
-        // Now we need to state sync it unless this is a post-resharding child shard whose parent we were tracking in the
-        // previous epoch, in which case we don't need to state sync because we'll generate the child when we do the resharding
-
-        let shard_layout = epoch_manager.get_shard_layout(epoch_id)?;
-        let prev_epoch_id = epoch_manager.get_prev_epoch_id_from_prev_block(parent_hash)?;
-        let prev_shard_layout = epoch_manager.get_shard_layout(&prev_epoch_id)?;
-
-        let resharded = shard_layout != prev_shard_layout;
-        if !resharded {
-            return Ok(true);
-        }
-        let Some(parent_shard_id) = shard_layout.try_get_parent_shard_id(shard_id)? else {
-            return Ok(true);
-        };
-        let was_split = parent_shard_id != shard_id;
-        if !was_split {
-            return Ok(true);
-        }
-
-        // Note that here passing `prev_prev_hash` to care_about_shard() will have us check whether we were tracking it in
-        // the previous epoch, because the current block is the first block of an epoch, so prev_prev_hash is the "parent_hash"
-        // of the last block of the previous epoch. TODO: consider refactoring these ShardTracker functions to accept an epoch_id
-        // to make this less tricky.
-        let splitting_child =
-            shard_tracker.care_about_shard(me.as_ref(), prev_prev_hash, parent_shard_id, true);
-        Ok(!splitting_child)
+        Ok(true)
     }
 
     /// Check if any block with missing chunk is ready to be processed and start processing these blocks

Now that the other PR is merged, we can try the modification to test_resharding_v3_shard_shuffling_slower_post_processing_tasks that I mentioned caused a crash:

diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs
index c04d89ba6..457db7723 100644
--- a/integration-tests/src/test_loop/tests/resharding_v3.rs
+++ b/integration-tests/src/test_loop/tests/resharding_v3.rs
@@ -985,6 +985,7 @@ fn test_resharding_v3_shard_shuffling_slower_post_processing_tasks() {
         .track_all_shards(false)
         .all_chunks_expected(false)
         .delay_flat_state_resharding(2)
+        .epoch_length(10)
         .build();
     test_resharding_v3_base(params);
 }

Now unfortunately this passes because something about the order of the test loop task processing for state sync and resharding catchup has changed between when I wrote that and the current head of master. Without trying to figure it out, we can just hack it to run in the same order as before:

diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs
index 83bd481f3..a401c51ce 100644
--- a/chain/chain/src/flat_storage_resharder.rs
+++ b/chain/chain/src/flat_storage_resharder.rs
@@ -609,6 +609,16 @@ impl FlatStorageResharder {
         if self.controller.is_cancelled() {
             return FlatStorageReshardingTaskResult::Cancelled;
         }
+        let head = chain_store.head().unwrap();
+        let Some(sync_hash) = chain_store.get_current_epoch_sync_hash(&head.epoch_id).unwrap() else {
+            return FlatStorageReshardingTaskResult::Postponed;
+        };
+        let key = borsh::to_vec(&near_primitives::state_sync::StatePartKey(sync_hash, shard_uid.shard_id(), 0)).unwrap();
+        let part = chain_store.store().get(near_store::DBCol::StateParts, &key).unwrap();
+        if !part.is_some() {
+            return FlatStorageReshardingTaskResult::Postponed;
+        }
+
         info!(target: "resharding", ?shard_uid, "flat storage shard catchup task started");
         let metrics = FlatStorageReshardingShardCatchUpMetrics::new(&shard_uid);
         // Apply deltas and then create the flat storage.

Then (with the above epoch length change), run it and you get:

thread 'test_loop::tests::resharding_v3::test_resharding_v3_shard_shuffling_slower_post_processing_tasks' panicked at chain/chain/src/resharding/resharding_actor.rs:93:17:
impossible to recover from a flat storage shard catchup failure!

with this error message at the end of the logs:

 5.661s ERROR resharding: flat storage shard catchup delta application failed! shard_uid=s7.v3 err=Other("unexpected resharding catchup flat storage status for s7.v3: Ready(FlatStorageReadyStatus { flat_head: BlockInfo { hash: Gposh9D271WMUy3wt4Re1pXtFViNEdiMEgWyjm8eoYHH, height: 24, prev_hash: 7d7uMfUA8dXHjGcixM88gVVnt7jaak8N5y7gfv7C58af } })")

What's happening here is the problem with account6 I mentioned in the other comment. It's not tracking the child in the current epoch, will track it in the next, but is performing the split because it tracked the parent in the previous epoch.

Please keep in mind that will_care_about_shard is resharding aware. We should aim to keep the resharding specific part of this logic in one place if possible.

Yeah it's a bit messy... maybe we should split somethiing out of this?

// Won't care about it next epoch, no need to state sync it.
if !shard_tracker.will_care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) {
return Ok(false);
}
// Currently tracking the shard, so no need to state sync it.
if shard_tracker.care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) {
return Ok(false);
}

tracing::debug!(target: "chain", does_care_about_shard, will_care_about_shard, will_shard_layout_change, "should catch up shard");
// Now we need to state sync it unless we were tracking the parent in the previous epoch,
// in which case we don't need to because we already have the state, and can just continue applying chunks
if epoch_id == &EpochId::default() {
return Ok(true);
}

will_care_about_shard && (will_shard_layout_change || !does_care_about_shard)
let (_layout, parent_shard_id, _index) =
epoch_manager.get_prev_shard_id_from_prev_hash(epoch_last_block, shard_id)?;
// Note that here passing `epoch_last_block_prev` to care_about_shard() will have us check whether we were tracking it in
// the previous epoch, because it is the "parent_hash" of the last block of the previous epoch.
// TODO: consider refactoring these ShardTracker functions to accept an epoch_id
// to make this less tricky.
let tracked_before = shard_tracker.care_about_shard(
me.as_ref(),
epoch_last_block_prev,
parent_shard_id,
true,
);
Ok(!tracked_before)
}

/// Check if any block with missing chunk is ready to be processed and start processing these blocks
Expand Down
32 changes: 32 additions & 0 deletions integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,38 @@ fn test_resharding_v3_shard_shuffling() {
test_resharding_v3_base(params);
}

/// This tests an edge case where we track the parent in the pre-resharding epoch, then we
/// track an unrelated shard in the first epoch after resharding, then we track a child of the resharding
/// in the next epoch after that. In that case we don't want to state sync because we can just perform
/// the resharding and continue applying chunks for the child in the first epoch post-resharding.
#[test]
fn test_resharding_v3_shard_shuffling_untrack_then_track() {
let account_in_stable_shard: AccountId = "account0".parse().unwrap();
let split_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap();
let base_shard_layout = get_base_shard_layout(DEFAULT_SHARD_LAYOUT_VERSION);
let new_shard_layout =
ShardLayout::derive_shard_layout(&base_shard_layout, split_boundary_account.clone());
let parent_shard_id = base_shard_layout.account_id_to_shard_id(&split_boundary_account);
let child_shard_id = new_shard_layout.account_id_to_shard_id(&split_boundary_account);
let unrelated_shard_id = new_shard_layout.account_id_to_shard_id(&account_in_stable_shard);

let tracked_shard_sequence =
vec![parent_shard_id, parent_shard_id, unrelated_shard_id, child_shard_id];
let num_clients = 8;
let tracked_shard_schedule = TrackedShardSchedule {
client_index: (num_clients - 1) as usize,
schedule: shard_sequence_to_schedule(tracked_shard_sequence),
};
let params = TestReshardingParametersBuilder::default()
.shuffle_shard_assignment_for_chunk_producers(true)
.num_clients(num_clients)
.tracked_shard_schedule(Some(tracked_shard_schedule))
// TODO(resharding): uncomment after fixing test_resharding_v3_state_cleanup()
//.add_loop_action(check_state_cleanup_after_resharding(tracked_shard_schedule))
.build();
test_resharding_v3_base(params);
}

#[test]
fn test_resharding_v3_shard_shuffling_intense() {
let chunk_ranges_to_drop = HashMap::from([(0, -1..2), (1, -3..0), (2, -3..3), (3, 0..1)]);
Expand Down
Loading