Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update sync_headers_in_reverse logic to match lotus #4291

Merged
merged 18 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/blocks/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ impl Tipset {
}
anyhow::bail!("Genesis block not found")
}

/// Check if `self` is the child of `other`
pub fn is_child_of(&self, other: &Self) -> bool {
// Note: the extra `&& self.epoch() > other.epoch()` check in lotus is dropped
// See <https://github.com/filecoin-project/lotus/blob/01ec22974942fb7328a1e665704c6cfd75d93372/chain/types/tipset.go#L258>
Comment on lines +408 to +409
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent comment

self.parents() == other.key()
}
}

/// `FullTipset` is an expanded version of a tipset that contains all the blocks
Expand Down
161 changes: 84 additions & 77 deletions src/chain_sync/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,10 @@ async fn sync_tipset_range<DB: Blockstore + Sync + Send + 'static>(
bad_block_cache: Arc<BadBlockCache>,
genesis: Arc<Tipset>,
) -> Result<(), TipsetRangeSyncerError> {
if proposed_head == current_head {
return Ok(());
}

tracker
.write()
.init(current_head.clone(), proposed_head.clone());
Expand Down Expand Up @@ -807,111 +811,114 @@ async fn sync_headers_in_reverse<DB: Blockstore + Sync + Send + 'static>(
chain_store: &ChainStore<DB>,
network: SyncNetworkContext<DB>,
) -> Result<NonEmpty<Arc<Tipset>>, TipsetRangeSyncerError> {
let mut parent_blocks: Vec<Cid> = vec![];
let mut parent_tipsets = nonempty![proposed_head.clone()];
let until_epoch = current_head.epoch() + 1;
let total_size = proposed_head.epoch() - until_epoch + 1;

let mut accepted_blocks: Vec<Cid> = vec![];
let mut pending_tipsets = nonempty![proposed_head];
tracker.write().set_epoch(current_head.epoch());

let total_size = proposed_head.epoch() - current_head.epoch();
#[allow(deprecated)] // Tracking issue: https://github.com/ChainSafe/forest/issues/3157
let wp = WithProgressRaw::new("Downloading headers", total_size as u64);

'sync: loop {
let oldest_parent = parent_tipsets.last();
let work_to_be_done = oldest_parent.epoch() - current_head.epoch();
while pending_tipsets.last().epoch() > until_epoch {
let oldest_pending_tipset = pending_tipsets.last();
let work_to_be_done = oldest_pending_tipset.epoch() - until_epoch + 1;
wp.set((work_to_be_done - total_size).unsigned_abs());
validate_tipset_against_cache(bad_block_cache, oldest_parent.parents(), &parent_blocks)?;
validate_tipset_against_cache(
bad_block_cache,
oldest_pending_tipset.parents(),
&accepted_blocks,
)?;

// Check if we are at the end of the range
if oldest_parent.epoch() <= current_head.epoch() {
// Current tipset epoch is less than or equal to the epoch of
// Tipset we a synchronizing toward, stop.
break;
}
// Attempt to load the parent tipset from local store
if let Some(tipset) = chain_store
.chain_index
.load_tipset(oldest_parent.parents())?
.load_tipset(oldest_pending_tipset.parents())?
{
parent_blocks.extend(tipset.cids());
parent_tipsets.push(tipset);
accepted_blocks.extend(tipset.cids());
pending_tipsets.push(tipset);
continue;
}

let epoch_diff = oldest_parent.epoch() - current_head.epoch();
let epoch_diff = oldest_pending_tipset.epoch() - current_head.epoch();
let window = min(epoch_diff, MAX_TIPSETS_TO_REQUEST as i64);
let network_tipsets = network
.chain_exchange_headers(None, oldest_parent.parents(), window as u64)
.chain_exchange_headers(None, oldest_pending_tipset.parents(), window as u64)
.await
.map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?;

for tipset in network_tipsets {
// Break if have already traversed the entire tipset range
if tipset.epoch() < current_head.epoch() {
break 'sync;
}
validate_tipset_against_cache(bad_block_cache, tipset.key(), &parent_blocks)?;
parent_blocks.extend(tipset.cids());
for tipset in network_tipsets
.into_iter()
.take_while(|ts| ts.epoch() >= until_epoch)
{
validate_tipset_against_cache(bad_block_cache, tipset.key(), &accepted_blocks)?;
accepted_blocks.extend(tipset.cids());
tracker.write().set_epoch(tipset.epoch());
parent_tipsets.push(tipset);
pending_tipsets.push(tipset);
}
}
drop(wp);

let oldest_tipset = parent_tipsets.last().clone();
// Determine if the local chain was a fork.
// If it was, then sync the fork tipset range by iteratively walking back
let oldest_pending_tipset = pending_tipsets.last();
// common case: receiving a block that's potentially part of the same tipset as our best block
if oldest_pending_tipset.as_ref() == current_head
|| oldest_pending_tipset.is_child_of(current_head)
{
return Ok(pending_tipsets);
}

// Fork detected, sync the fork tipset range by iteratively walking back
// from the oldest tipset synced until we find a common ancestor
if oldest_tipset.parents() != current_head.parents() {
info!("Fork detected, searching for a common ancestor between the local chain and the network chain");
const FORK_LENGTH_THRESHOLD: u64 = 500;
let fork_tipsets = network
.chain_exchange_headers(None, oldest_tipset.parents(), FORK_LENGTH_THRESHOLD)
.await
.map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?;
let mut potential_common_ancestor = chain_store
.chain_index
.load_required_tipset(current_head.parents())?;
let mut i = 0;
let mut fork_length = 1;
while let Some(fork_tipset) = fork_tipsets.get(i) {
if fork_tipset.epoch() == 0 {
return Err(TipsetRangeSyncerError::ForkAtGenesisBlock(format!(
"{:?}",
oldest_tipset.cids()
)));
}
if &potential_common_ancestor == fork_tipset {
// Remove elements from the vector since the Drain
// iterator is immediately dropped
let mut fork_tipsets = fork_tipsets;
fork_tipsets.drain((i + 1)..);
parent_tipsets.extend(fork_tipsets);
break;
}
info!("Fork detected, searching for a common ancestor between the local chain and the network chain");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this log message is basically the same as the comment above, you can probably delete the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

const FORK_LENGTH_THRESHOLD: u64 = 500;
let fork_tipsets = network
.chain_exchange_headers(None, oldest_pending_tipset.parents(), FORK_LENGTH_THRESHOLD)
.await
.map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?;
let mut potential_common_ancestor = chain_store
.chain_index
.load_required_tipset(current_head.parents())?;
let mut i = 0;
let mut fork_length = 1;
while let Some(fork_tipset) = fork_tipsets.get(i) {
if fork_tipset.epoch() == 0 {
return Err(TipsetRangeSyncerError::ForkAtGenesisBlock(format!(
"{:?}",
oldest_pending_tipset.cids()
)));
}
if &potential_common_ancestor == fork_tipset {
// Remove elements from the vector since the Drain
// iterator is immediately dropped
let mut fork_tipsets = fork_tipsets;
fork_tipsets.drain((i + 1)..);
pending_tipsets.extend(fork_tipsets);
break;
}

// If the potential common ancestor has an epoch which
// is lower than the current fork tipset under evaluation
// move to the next iteration without updated the potential common ancestor
if potential_common_ancestor.epoch() < fork_tipset.epoch() {
i += 1;
} else {
fork_length += 1;
// Increment the fork length and enforce the fork length check
if fork_length > FORK_LENGTH_THRESHOLD {
return Err(TipsetRangeSyncerError::ChainForkLengthExceedsMaximum);
}
// If we have not found a common ancestor by the last iteration, then return an
// error
if i == (fork_tipsets.len() - 1) {
return Err(TipsetRangeSyncerError::ChainForkLengthExceedsFinalityThreshold);
}
potential_common_ancestor = chain_store
.chain_index
.load_required_tipset(potential_common_ancestor.parents())?;
// If the potential common ancestor has an epoch which
// is lower than the current fork tipset under evaluation
// move to the next iteration without updated the potential common ancestor
if potential_common_ancestor.epoch() < fork_tipset.epoch() {
i += 1;
} else {
fork_length += 1;
// Increment the fork length and enforce the fork length check
if fork_length > FORK_LENGTH_THRESHOLD {
return Err(TipsetRangeSyncerError::ChainForkLengthExceedsMaximum);
}
// If we have not found a common ancestor by the last iteration, then return an
// error
if i == (fork_tipsets.len() - 1) {
return Err(TipsetRangeSyncerError::ChainForkLengthExceedsFinalityThreshold);
}
potential_common_ancestor = chain_store
.chain_index
.load_required_tipset(potential_common_ancestor.parents())?;
}
}
Ok(parent_tipsets)

Ok(pending_tipsets)
}

#[allow(clippy::too_many_arguments)]
Expand Down
Loading