-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(en): Support Merkle tree recovery with pruning enabled #3172
base: main
Are you sure you want to change the base?
Changes from all commits
93256b8
138b164
fc05dcb
1c14e45
76883cd
3067f03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,16 +32,14 @@ use std::{ | |
}; | ||
|
||
use anyhow::Context as _; | ||
use async_trait::async_trait; | ||
use futures::future; | ||
use tokio::sync::{watch, Mutex, Semaphore}; | ||
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; | ||
use zksync_health_check::HealthUpdater; | ||
use zksync_merkle_tree::TreeEntry; | ||
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; | ||
use zksync_types::{ | ||
snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus}, | ||
L2BlockNumber, H256, | ||
}; | ||
use zksync_types::{snapshots::uniform_hashed_keys_chunk, L1BatchNumber, L2BlockNumber, H256}; | ||
|
||
use super::{ | ||
helpers::{AsyncTree, AsyncTreeRecovery, GenericAsyncTree, MerkleTreeHealth}, | ||
|
@@ -54,12 +52,13 @@ mod tests; | |
|
||
/// Handler of recovery life cycle events. This functionality is encapsulated in a trait to be able | ||
/// to control recovery behavior in tests. | ||
#[async_trait] | ||
trait HandleRecoveryEvent: fmt::Debug + Send + Sync { | ||
fn recovery_started(&mut self, _chunk_count: u64, _recovered_chunk_count: u64) { | ||
// Default implementation does nothing | ||
} | ||
|
||
fn chunk_recovered(&self) { | ||
async fn chunk_recovered(&self) { | ||
// Default implementation does nothing | ||
} | ||
} | ||
|
@@ -82,6 +81,7 @@ impl<'a> RecoveryHealthUpdater<'a> { | |
} | ||
} | ||
|
||
#[async_trait] | ||
impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> { | ||
fn recovery_started(&mut self, chunk_count: u64, recovered_chunk_count: u64) { | ||
self.chunk_count = chunk_count; | ||
|
@@ -91,7 +91,7 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> { | |
.set(recovered_chunk_count); | ||
} | ||
|
||
fn chunk_recovered(&self) { | ||
async fn chunk_recovered(&self) { | ||
let recovered_chunk_count = self.recovered_chunk_count.fetch_add(1, Ordering::SeqCst) + 1; | ||
let chunks_left = self.chunk_count.saturating_sub(recovered_chunk_count); | ||
tracing::info!( | ||
|
@@ -110,34 +110,68 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> { | |
} | ||
|
||
#[derive(Debug, Clone, Copy)] | ||
struct SnapshotParameters { | ||
struct InitParameters { | ||
l1_batch: L1BatchNumber, | ||
l2_block: L2BlockNumber, | ||
expected_root_hash: H256, | ||
expected_root_hash: Option<H256>, | ||
log_count: u64, | ||
desired_chunk_size: u64, | ||
} | ||
|
||
impl SnapshotParameters { | ||
impl InitParameters { | ||
async fn new( | ||
pool: &ConnectionPool<Core>, | ||
recovery: &SnapshotRecoveryStatus, | ||
config: &MetadataCalculatorRecoveryConfig, | ||
) -> anyhow::Result<Self> { | ||
let l2_block = recovery.l2_block_number; | ||
let expected_root_hash = recovery.l1_batch_root_hash; | ||
|
||
) -> anyhow::Result<Option<Self>> { | ||
let mut storage = pool.connection_tagged("metadata_calculator").await?; | ||
let recovery_status = storage | ||
.snapshot_recovery_dal() | ||
.get_applied_snapshot_status() | ||
.await?; | ||
let pruning_info = storage.pruning_dal().get_pruning_info().await?; | ||
|
||
let (l1_batch, l2_block); | ||
let mut expected_root_hash = None; | ||
match (recovery_status, pruning_info.last_hard_pruned_l2_block) { | ||
(Some(recovery), None) => { | ||
tracing::warn!( | ||
"Snapshot recovery {recovery:?} is present on the node, but pruning info is empty; assuming no pruning happened" | ||
); | ||
l1_batch = recovery.l1_batch_number; | ||
l2_block = recovery.l2_block_number; | ||
expected_root_hash = Some(recovery.l1_batch_root_hash); | ||
} | ||
(Some(recovery), Some(pruned_l2_block)) => { | ||
// We have both recovery and some pruning on top of it. | ||
l2_block = pruned_l2_block.max(recovery.l2_block_number); | ||
l1_batch = pruning_info | ||
.last_hard_pruned_l1_batch | ||
.with_context(|| format!("malformed pruning info: {pruning_info:?}"))?; | ||
if l1_batch == recovery.l1_batch_number { | ||
expected_root_hash = Some(recovery.l1_batch_root_hash); | ||
} | ||
} | ||
(None, Some(pruned_l2_block)) => { | ||
l2_block = pruned_l2_block; | ||
l1_batch = pruning_info | ||
.last_hard_pruned_l1_batch | ||
.with_context(|| format!("malformed pruning info: {pruning_info:?}"))?; | ||
} | ||
(None, None) => return Ok(None), | ||
}; | ||
|
||
let log_count = storage | ||
.storage_logs_dal() | ||
.get_storage_logs_row_count(l2_block) | ||
.await?; | ||
|
||
Ok(Self { | ||
Ok(Some(Self { | ||
l1_batch, | ||
l2_block, | ||
expected_root_hash, | ||
log_count, | ||
desired_chunk_size: config.desired_chunk_size, | ||
}) | ||
})) | ||
} | ||
|
||
fn chunk_count(&self) -> u64 { | ||
|
@@ -168,47 +202,44 @@ impl GenericAsyncTree { | |
stop_receiver: &watch::Receiver<bool>, | ||
) -> anyhow::Result<Option<AsyncTree>> { | ||
let started_at = Instant::now(); | ||
let (tree, snapshot_recovery) = match self { | ||
let (tree, init_params) = match self { | ||
Self::Ready(tree) => return Ok(Some(tree)), | ||
Self::Recovering(tree) => { | ||
let snapshot_recovery = get_snapshot_recovery(main_pool).await?.context( | ||
let params = InitParameters::new(main_pool, config).await?.context( | ||
"Merkle tree is recovering, but Postgres doesn't contain snapshot recovery information", | ||
)?; | ||
let recovered_version = tree.recovered_version(); | ||
anyhow::ensure!( | ||
u64::from(snapshot_recovery.l1_batch_number.0) == recovered_version, | ||
"Snapshot L1 batch in Postgres ({snapshot_recovery:?}) differs from the recovered Merkle tree version \ | ||
u64::from(params.l1_batch.0) == recovered_version, | ||
"Snapshot L1 batch in Postgres ({params:?}) differs from the recovered Merkle tree version \ | ||
({recovered_version})" | ||
); | ||
tracing::info!("Resuming tree recovery with status: {snapshot_recovery:?}"); | ||
(tree, snapshot_recovery) | ||
tracing::info!("Resuming tree recovery with status: {params:?}"); | ||
(tree, params) | ||
} | ||
Self::Empty { db, mode } => { | ||
if let Some(snapshot_recovery) = get_snapshot_recovery(main_pool).await? { | ||
tracing::info!( | ||
"Starting Merkle tree recovery with status {snapshot_recovery:?}" | ||
); | ||
let l1_batch = snapshot_recovery.l1_batch_number; | ||
if let Some(params) = InitParameters::new(main_pool, config).await? { | ||
tracing::info!("Starting Merkle tree recovery with status {params:?}"); | ||
let l1_batch = params.l1_batch; | ||
let tree = AsyncTreeRecovery::new(db, l1_batch.0.into(), mode, config)?; | ||
(tree, snapshot_recovery) | ||
(tree, params) | ||
} else { | ||
// Start the tree from scratch. The genesis block will be filled in `TreeUpdater::loop_updating_tree()`. | ||
return Ok(Some(AsyncTree::new(db, mode)?)); | ||
} | ||
} | ||
}; | ||
|
||
let snapshot = SnapshotParameters::new(main_pool, &snapshot_recovery, config).await?; | ||
tracing::debug!( | ||
"Obtained snapshot parameters: {snapshot:?} based on recovery configuration {config:?}" | ||
"Obtained recovery init parameters: {init_params:?} based on recovery configuration {config:?}" | ||
); | ||
let recovery_options = RecoveryOptions { | ||
chunk_count: snapshot.chunk_count(), | ||
chunk_count: init_params.chunk_count(), | ||
concurrency_limit: recovery_pool.max_size() as usize, | ||
events: Box::new(RecoveryHealthUpdater::new(health_updater)), | ||
}; | ||
let tree = tree | ||
.recover(snapshot, recovery_options, &recovery_pool, stop_receiver) | ||
.recover(init_params, recovery_options, &recovery_pool, stop_receiver) | ||
.await?; | ||
if tree.is_some() { | ||
// Only report latency if recovery wasn't canceled | ||
|
@@ -223,12 +254,12 @@ impl GenericAsyncTree { | |
impl AsyncTreeRecovery { | ||
async fn recover( | ||
mut self, | ||
snapshot: SnapshotParameters, | ||
init_params: InitParameters, | ||
mut options: RecoveryOptions<'_>, | ||
pool: &ConnectionPool<Core>, | ||
stop_receiver: &watch::Receiver<bool>, | ||
) -> anyhow::Result<Option<AsyncTree>> { | ||
self.ensure_desired_chunk_size(snapshot.desired_chunk_size) | ||
self.ensure_desired_chunk_size(init_params.desired_chunk_size) | ||
.await?; | ||
|
||
let start_time = Instant::now(); | ||
|
@@ -237,13 +268,15 @@ impl AsyncTreeRecovery { | |
.map(|chunk_id| uniform_hashed_keys_chunk(chunk_id, chunk_count)) | ||
.collect(); | ||
tracing::info!( | ||
"Recovering Merkle tree from Postgres snapshot in {chunk_count} chunks with max concurrency {}", | ||
"Recovering Merkle tree from Postgres snapshot in {chunk_count} chunks with max concurrency {}. \ | ||
Be aware that enabling node pruning during recovery will probably result in a recovery error; always disable pruning \ | ||
until recovery is complete", | ||
options.concurrency_limit | ||
); | ||
|
||
let mut storage = pool.connection_tagged("metadata_calculator").await?; | ||
let remaining_chunks = self | ||
.filter_chunks(&mut storage, snapshot.l2_block, &chunks) | ||
.filter_chunks(&mut storage, init_params.l2_block, &chunks) | ||
.await?; | ||
drop(storage); | ||
options | ||
|
@@ -261,9 +294,10 @@ impl AsyncTreeRecovery { | |
.acquire() | ||
.await | ||
.context("semaphore is never closed")?; | ||
if Self::recover_key_chunk(&tree, snapshot.l2_block, chunk, pool, stop_receiver).await? | ||
if Self::recover_key_chunk(&tree, init_params.l2_block, chunk, pool, stop_receiver) | ||
.await? | ||
{ | ||
options.events.chunk_recovered(); | ||
options.events.chunk_recovered().await; | ||
} | ||
anyhow::Ok(()) | ||
}); | ||
|
@@ -279,13 +313,18 @@ impl AsyncTreeRecovery { | |
|
||
let finalize_latency = RECOVERY_METRICS.latency[&RecoveryStage::Finalize].start(); | ||
let actual_root_hash = tree.root_hash().await; | ||
anyhow::ensure!( | ||
actual_root_hash == snapshot.expected_root_hash, | ||
"Root hash of recovered tree {actual_root_hash:?} differs from expected root hash {:?}. \ | ||
If pruning is enabled and the tree is initialized some time after node recovery, \ | ||
this is caused by snapshot storage logs getting pruned; this setup is currently not supported", | ||
snapshot.expected_root_hash | ||
); | ||
if let Some(expected_root_hash) = init_params.expected_root_hash { | ||
anyhow::ensure!( | ||
actual_root_hash == expected_root_hash, | ||
"Root hash of recovered tree {actual_root_hash:?} differs from expected root hash {expected_root_hash:?}" | ||
); | ||
} | ||
|
||
// Check pruning info one last time before finalizing the tree. | ||
let mut storage = pool.connection_tagged("metadata_calculator").await?; | ||
Self::check_pruning_info(&mut storage, init_params.l2_block).await?; | ||
drop(storage); | ||
|
||
let tree = tree.finalize().await?; | ||
finalize_latency.observe(); | ||
tracing::info!( | ||
|
@@ -340,6 +379,21 @@ impl AsyncTreeRecovery { | |
Ok(output) | ||
} | ||
|
||
async fn check_pruning_info( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WDYM by " if pruning is configured to never happen in practice"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, |
||
storage: &mut Connection<'_, Core>, | ||
snapshot_l2_block: L2BlockNumber, | ||
) -> anyhow::Result<()> { | ||
let pruning_info = storage.pruning_dal().get_pruning_info().await?; | ||
if let Some(last_hard_pruned_l2_block) = pruning_info.last_hard_pruned_l2_block { | ||
anyhow::ensure!( | ||
last_hard_pruned_l2_block == snapshot_l2_block, | ||
"Additional data was pruned compared to tree recovery L2 block #{snapshot_l2_block}: {pruning_info:?}. \ | ||
Continuing recovery is impossible; to recover the tree, drop its RocksDB directory, stop pruning and restart recovery" | ||
); | ||
} | ||
Ok(()) | ||
} | ||
|
||
/// Returns `Ok(true)` if the chunk was recovered, `Ok(false)` if the recovery process was interrupted. | ||
async fn recover_key_chunk( | ||
tree: &Mutex<AsyncTreeRecovery>, | ||
|
@@ -363,7 +417,9 @@ impl AsyncTreeRecovery { | |
.storage_logs_dal() | ||
.get_tree_entries_for_l2_block(snapshot_l2_block, key_chunk.clone()) | ||
.await?; | ||
Self::check_pruning_info(&mut storage, snapshot_l2_block).await?; | ||
drop(storage); | ||
|
||
let entries_latency = entries_latency.observe(); | ||
tracing::debug!( | ||
"Loaded {} entries for chunk {key_chunk:?} in {entries_latency:?}", | ||
|
@@ -414,13 +470,3 @@ impl AsyncTreeRecovery { | |
Ok(true) | ||
} | ||
} | ||
|
||
async fn get_snapshot_recovery( | ||
pool: &ConnectionPool<Core>, | ||
) -> anyhow::Result<Option<SnapshotRecoveryStatus>> { | ||
let mut storage = pool.connection_tagged("metadata_calculator").await?; | ||
Ok(storage | ||
.snapshot_recovery_dal() | ||
.get_applied_snapshot_status() | ||
.await?) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be beneficial to record the state root hash of the last hard-pruned L1 batch somewhere (in pruning logs?) so that it can be checked here. If this sounds OK, I'd prefer to implement it in a follow-up PR.