Skip to content

Commit

Permalink
fallback json rpc syncing for consensus
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Oct 31, 2024
1 parent 9654097 commit 2c3a252
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 161 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 45 additions & 47 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use crate::{
storage::{self, ConnectionPool},
};

/// If less than TEMPORARY_FETCHER_THRESHOLD certificates are missing,
/// the temporary fetcher will stop fetching blocks.
pub(crate) const TEMPORARY_FETCHER_THRESHOLD: u64 = 10;
/// Whenever more than FALLBACK_FETCHER_THRESHOLD certificates are missing,
/// the fallback fetcher is active.
pub(crate) const FALLBACK_FETCHER_THRESHOLD: u64 = 10;

/// External node.
pub(super) struct EN {
Expand Down Expand Up @@ -115,11 +115,9 @@ impl EN {
let store = store.clone();
async {
let store = store;
self.temporary_block_fetcher(ctx, &store).await?;
tracing::info!(
"temporary block fetcher finished, switching to p2p fetching only"
);
Ok(())
self.fallback_block_fetcher(ctx, &store)
.await
.wrap("fallback_block_fetcher()")
}
});

Expand Down Expand Up @@ -191,7 +189,7 @@ impl EN {
.new_payload_queue(ctx, actions, self.sync_state.clone())
.await
.wrap("new_fetcher_cursor()")?;
self.fetch_blocks(ctx, &mut payload_queue, None).await
self.fetch_blocks(ctx, &mut payload_queue).await
})
.await;
match res {
Expand Down Expand Up @@ -362,9 +360,13 @@ impl EN {
}

/// Fetches (with retries) the given block from the main node.
async fn fetch_block(&self, ctx: &ctx::Ctx, n: L2BlockNumber) -> ctx::Result<FetchedBlock> {
async fn fetch_block(
&self,
ctx: &ctx::Ctx,
n: validator::BlockNumber,
) -> ctx::Result<FetchedBlock> {
const RETRY_INTERVAL: time::Duration = time::Duration::seconds(5);

let n = L2BlockNumber(n.0.try_into().context("overflow")?);
loop {
match ctx.wait(self.client.sync_l2_block(n, true)).await? {
Ok(Some(block)) => return Ok(block.try_into()?),
Expand All @@ -376,76 +378,72 @@ impl EN {
}
}

/// Fetches blocks from the main node directly, until the certificates
/// are backfilled. This allows for smooth transition from json RPC to p2p block syncing.
pub(crate) async fn temporary_block_fetcher(
/// Fetches blocks from the main node directly whenever the EN is lagging behind too much.
pub(crate) async fn fallback_block_fetcher(
&self,
ctx: &ctx::Ctx,
store: &Store,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
let Some(mut next) = store.next_block(ctx).await? else {
return Ok(());
};
while store.persisted().borrow().next().0 + TEMPORARY_FETCHER_THRESHOLD < next.0 {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
// TODO: metrics.
s.spawn::<()>(async {
let send = send;
let is_lagging =
|main| main >= store.persisted().borrow().next() + FALLBACK_FETCHER_THRESHOLD;
let mut next = store.next_block(ctx).await.wrap("next_block()")?;
loop {
// Wait until p2p syncing is lagging.
self.sync_state
.wait_for_main_node_block(ctx, is_lagging)
.await?;
// Determine the next block to fetch and wait for it to be available.
next = next.max(store.next_block(ctx).await.wrap("next_block()")?);
self.sync_state
.wait_for_main_node_block(ctx, |main| main >= next)
.await?;
// Fetch the block asynchronously.
send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?;
next = next.next();
}
drop(send);
Ok(())
});
while let Ok(block) = recv.recv_or_disconnected(ctx).await? {
loop {
let block = recv.recv(ctx).await?;
store
.queue_next_fetched_block(ctx, block.join(ctx).await?)
.await
.wrap("queue_next_fetched_block()")?;
}
Ok(())
})
.await
}

/// Fetches blocks from the main node in range `[cursor.next()..end)`.
/// Fetches blocks starting with `queue.next()`.
async fn fetch_blocks(
&self,
ctx: &ctx::Ctx,
queue: &mut storage::PayloadQueue,
end: Option<validator::BlockNumber>,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
let first = queue.next();
let mut next = first;
let mut next = queue.next();
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
s.spawn::<()>(async {
let send = send;
while end.map_or(true, |end| next < end) {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
loop {
self.sync_state
.wait_for_main_node_block(ctx, |main| main >= next)
.await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?;
next = next.next();
}
Ok(())
});
while end.map_or(true, |end| queue.next() < end) {
loop {
let block = recv.recv(ctx).await?.join(ctx).await?;
queue.send(block).await.context("queue.send()")?;
}
Ok(())
})
.await?;
// If fetched anything, wait for the last block to be stored persistently.
if first < queue.next() {
self.pool
.wait_for_payload(ctx, queue.next().prev().unwrap())
.await
.wrap("wait_for_payload()")?;
}
Ok(())
.await
}
}
8 changes: 3 additions & 5 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,12 @@ impl Store {
}

/// Number of the next block to queue.
pub(crate) async fn next_block(
&self,
ctx: &ctx::Ctx,
) -> ctx::OrCanceled<Option<validator::BlockNumber>> {
pub(crate) async fn next_block(&self, ctx: &ctx::Ctx) -> ctx::Result<validator::BlockNumber> {
Ok(sync::lock(ctx, &self.block_payloads)
.await?
.as_ref()
.map(|p| p.next()))
.context("payload_queue not set")?
.next())
}

/// Queues the next block.
Expand Down
39 changes: 1 addition & 38 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ use zksync_types::{
};
use zksync_web3_decl::client::{Client, DynClient, L2};

use crate::{
en,
storage::{ConnectionPool, Store},
};
use crate::{en, storage::ConnectionPool};

/// Fake StateKeeper for tests.
#[derive(Debug)]
Expand Down Expand Up @@ -413,40 +410,6 @@ impl StateKeeper {
.await
}

pub async fn run_temporary_fetcher(
self,
ctx: &ctx::Ctx,
client: Box<DynClient<L2>>,
) -> ctx::Result<()> {
scope::run!(ctx, |ctx, s| async {
let payload_queue = self
.pool
.connection(ctx)
.await
.wrap("connection()")?
.new_payload_queue(ctx, self.actions_sender, self.sync_state.clone())
.await
.wrap("new_payload_queue()")?;
let (store, runner) = Store::new(
ctx,
self.pool.clone(),
Some(payload_queue),
Some(client.clone()),
)
.await
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
en::EN {
pool: self.pool.clone(),
client,
sync_state: self.sync_state.clone(),
}
.temporary_block_fetcher(ctx, &store)
.await
})
.await
}

/// Runs consensus node for the external node.
pub async fn run_consensus(
self,
Expand Down
65 changes: 4 additions & 61 deletions core/node/consensus/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zksync_types::ProtocolVersionId;
use zksync_web3_decl::namespaces::EnNamespaceClient as _;

use crate::{
en::TEMPORARY_FETCHER_THRESHOLD,
en::FALLBACK_FETCHER_THRESHOLD,
mn::run_main_node,
storage::{ConnectionPool, Store},
testonly,
Expand Down Expand Up @@ -665,7 +665,7 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV
// Test temporary fetcher fetching blocks if a lot of certs are missing.
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId) {
async fn test_fallback_fetcher(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
Expand Down Expand Up @@ -705,7 +705,7 @@ async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId)
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_fetcher(ctx, client.clone()));
validator
.push_random_blocks(rng, account, TEMPORARY_FETCHER_THRESHOLD as usize + 1)
.push_random_blocks(rng, account, FALLBACK_FETCHER_THRESHOLD as usize + 1)
.await;
node_pool
.wait_for_payload(ctx, validator.last_block())
Expand All @@ -715,58 +715,7 @@ async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId)
.await
.unwrap();

tracing::info!(
"Run p2p fetcher. Blocks should be fetched by the temporary fetcher anyway."
);
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone()));
validator.push_random_blocks(rng, account, 5).await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();
Ok(())
})
.await
.unwrap();
}

// Test that temporary fetcher terminates once enough blocks have certs.
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher_termination(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
tracing::info!("Spawn validator.");
let validator_pool = ConnectionPool::test(from_snapshot, version).await;
let (mut validator, runner) =
testonly::StateKeeper::new(ctx, validator_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(run_main_node(
ctx,
validator_cfg.config.clone(),
validator_cfg.secrets.clone(),
validator_pool.clone(),
));
// API server needs at least 1 L1 batch to start.
validator.seal_batch().await;
let client = validator.connect(ctx).await?;

let node_pool = ConnectionPool::test(from_snapshot, version).await;

// Run the EN so the consensus is initialized on EN and wait for it to sync.
tracing::info!("Run p2p fetcher. Blocks should be fetched by the fallback fetcher anyway.");
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
Expand All @@ -779,12 +728,6 @@ async fn test_temporary_fetcher_termination(from_snapshot: bool, version: Protoc
})
.await
.unwrap();

// Run the temporary fetcher. It should terminate immediately, since EN is synced.
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
node.run_temporary_fetcher(ctx, client).await?;

Ok(())
})
.await
Expand Down
1 change: 1 addition & 0 deletions core/node/node_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ zksync_health_check.workspace = true
zksync_utils.workspace = true
zksync_eth_client.workspace = true
zksync_concurrency.workspace = true
zksync_consensus_roles.workspace = true
vise.workspace = true
zksync_vm_executor.workspace = true

Expand Down
21 changes: 12 additions & 9 deletions core/node/node_sync/src/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use serde::Serialize;
use tokio::sync::watch;
use zksync_concurrency::{ctx, sync};
use zksync_consensus_roles::validator;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::{CheckHealth, Health, HealthStatus};
use zksync_shared_metrics::EN_METRICS;
Expand Down Expand Up @@ -50,18 +51,20 @@ impl SyncState {
.unwrap();
}

/// Waits until the main node block is greater or equal to the given block number.
/// Returns the current main node block number.
pub async fn wait_for_main_node_block(
&self,
ctx: &ctx::Ctx,
want: L2BlockNumber,
) -> ctx::OrCanceled<()> {
sync::wait_for(
ctx,
&mut self.0.subscribe(),
|inner| matches!(inner.main_node_block, Some(got) if got >= want),
)
.await?;
Ok(())
pred: impl Fn(validator::BlockNumber) -> bool,
) -> ctx::OrCanceled<validator::BlockNumber> {
sync::wait_for_some(ctx, &mut self.0.subscribe(), |inner| {
inner
.main_node_block
.map(|n| validator::BlockNumber(n.0.into()))
.filter(|n| pred(*n))
})
.await
}

pub fn set_main_node_block(&self, block: L2BlockNumber) {
Expand Down
Loading

0 comments on commit 2c3a252

Please sign in to comment.