Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POS efficient l1 polling #2506

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
204 changes: 168 additions & 36 deletions types/src/v0/impls/l1.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
cmp::{min, Ordering},
fmt::Debug,
iter::FromFn,
num::NonZeroUsize,
sync::Arc,
time::Instant,
Expand All @@ -11,7 +12,8 @@ use async_trait::async_trait;
use clap::Parser;
use committable::{Commitment, Committable, RawCommitmentBuilder};
use contract_bindings::{
fee_contract::FeeContract, permissioned_stake_table::PermissionedStakeTable,
fee_contract::FeeContract,
permissioned_stake_table::{PermissionedStakeTable, StakersUpdatedFilter},
};
use ethers::{
prelude::{Address, BlockNumber, Middleware, Provider, H256, U256},
Expand All @@ -27,8 +29,8 @@ use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;
use tokio::{
spawn,
sync::{Mutex, MutexGuard},
task::JoinSet,
time::sleep,
};
use tracing::Instrument;
Expand Down Expand Up @@ -88,8 +90,8 @@ impl L1BlockInfo {

impl Drop for L1UpdateTask {
fn drop(&mut self) {
if let Some(task) = self.0.get_mut().take() {
task.abort();
if let Some(mut tasks) = self.0.get_mut().take() {
tasks.abort_all()
}
}
}
Expand Down Expand Up @@ -307,7 +309,10 @@ impl L1Client {
pub async fn spawn_tasks(&self) {
let mut update_task = self.update_task.0.lock().await;
if update_task.is_none() {
*update_task = Some(spawn(self.update_loop()));
let mut tasks = JoinSet::new();
tasks.spawn(self.update_loop());
tasks.spawn(self.stake_update_loop());
*update_task = Some(tasks);
}
}

Expand All @@ -316,14 +321,69 @@ impl L1Client {
/// The L1 client will still be usable, but will stop updating until [`start`](Self::start) is
/// called again.
pub async fn shut_down_tasks(&self) {
if let Some(update_task) = self.update_task.0.lock().await.take() {
update_task.abort();
if let Some(mut update_task) = self.update_task.0.lock().await.take() {
// TODO review order
update_task.abort_all();
}
}

pub fn provider(&self) -> &impl Middleware<Error: 'static> {
&self.provider
}
/// Update stake-table cache on `L1Event::NewHead`.
fn stake_update_loop(&self) -> impl Future<Output = ()> {
let opt = self.options();
let retry_delay = opt.l1_retry_delay;
let chunk_size = opt.l1_events_max_block_range as usize;
let span = tracing::warn_span!("L1 client update");
let state = self.state.clone();
let stake_table_contract =
// TODO attach address to L1Client
PermissionedStakeTable::new(Address::default(), self.provider.clone());

let mut events = self.receiver.activate_cloned();

async move {
loop {
let last_head = {
let state = state.lock().await;
state.snapshot.head
};
while let Some(event) = events.next().await {
let L1Event::NewHead { head } = event else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the latest L1 block (that may not be finalized yet). We should only consider finalized blocks for the stake table because otherwise the stake table may change on L1 after we fetched it.

continue;
};

let chunks = L1Client::chunky2(last_head, head, chunk_size);
let mut events: Vec<StakersUpdatedFilter> = Vec::new();
for (from, to) in chunks {
tracing::debug!(from, to, "fetch stake table events in range");
match stake_table_contract
.stakers_updated_filter()
.from_block(from)
.to_block(to)
.query()
.await
{
Ok(e) => {
for event in e {
events.push(event)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we put the events into a local Vec called events. Maybe I'm missing something but it looks to me like we are not doing anything with this events variable.

}
break;
}
Err(err) => {
tracing::warn!(from, to, %err, "Stake Table L1Event Error");
sleep(retry_delay).await;
}
}
}
sleep(retry_delay).await;
}
sleep(retry_delay).await;
}
}
.instrument(span)
}

fn update_loop(&self) -> impl Future<Output = ()> {
let opt = self.options();
Expand Down Expand Up @@ -435,7 +495,8 @@ impl L1Client {

// Update the state snapshot;
let mut state = state.lock().await;
if head > state.snapshot.head {
let snapshot_head = state.snapshot.head;
if head > snapshot_head {
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
metrics.head.set(head as usize);
state.snapshot.head = head;
Expand Down Expand Up @@ -679,6 +740,40 @@ impl L1Client {
(state, block)
}

/// Divide the range `start..=end` into chunks of size
/// `events_max_block_range`.
fn chunky(&self, start: u64, end: u64) -> FromFn<impl FnMut() -> Option<(u64, u64)>> {
let mut start = start;
let chunk_size = self.options().l1_events_max_block_range;
std::iter::from_fn(move || {
let chunk_end = min(start + chunk_size - 1, end);
if chunk_end < start {
return None;
}

let chunk = (start, chunk_end);
start = chunk_end + 1;
Some(chunk)
})
}

/// Divide the range `start..=end` into chunks of size
/// `events_max_block_range`.
fn chunky2(start: u64, end: u64, chunk_size: usize) -> Vec<(u64, u64)> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about returning a Range instead of (u64, u64)?

// let opt = self.options();
// let chunk_size = opt.l1_events_max_block_range as usize;
let chunks: Vec<u64> = (start..=end).collect();
let tups: Vec<(u64, u64)> = chunks
.chunks(chunk_size)
.map(|s| {
// should never be empty
s.first().cloned().zip(s.last().cloned()).unwrap()
})
.collect();

tups
}

/// Get fee info for each `Deposit` occurring between `prev`
/// and `new`. Returns `Vec<FeeInfo>`
pub async fn get_finalized_deposits(
Expand All @@ -701,19 +796,7 @@ impl L1Client {

// Divide the range `prev_finalized..=new_finalized` into chunks of size
// `events_max_block_range`.
let mut start = prev;
let end = new_finalized;
let chunk_size = opt.l1_events_max_block_range;
let chunks = std::iter::from_fn(move || {
let chunk_end = min(start + chunk_size - 1, end);
if chunk_end < start {
return None;
}

let chunk = (start, chunk_end);
start = chunk_end + 1;
Some(chunk)
});
let chunks = self.chunky(prev, new_finalized);

// Fetch events for each chunk.
let events = stream::iter(chunks).then(|(from, to)| {
Expand Down Expand Up @@ -744,25 +827,55 @@ impl L1Client {
events.flatten().map(FeeInfo::from).collect().await
}

/// Get `StakeTable` at block height.
// /// Get `StakeTable` at block height. If unavailable in local cache, poll the l1.
pub async fn get_stake_table(
&self,
contract: Address,
contract_address: Address,
block: u64,
) -> anyhow::Result<StakeTables> {
// TODO stake_table_address needs to be passed in to L1Client
// before update loop starts.
let stake_table_contract = PermissionedStakeTable::new(contract, self.provider.clone());

let events = stake_table_contract
.stakers_updated_filter()
.from_block(0)
.to_block(block)
.query()
.await?;
) -> Option<StakeTables> {
let opt = self.options();
let retry_delay = opt.l1_retry_delay;
let chunk_size = opt.l1_events_max_block_range as usize;

let last_head = {
let mut state = self.state.lock().await;
if let Some(st) = state.stake.get(&block) {
return Some(st.clone());
} else {
state.snapshot.head
}
};

Ok(StakeTables::from_l1_events(events.clone()))
let chunks = L1Client::chunky2(last_head, block, chunk_size);
let contract = PermissionedStakeTable::new(contract_address, self.provider.clone());

let mut events: Vec<StakersUpdatedFilter> = Vec::new();
for (from, to) in chunks {
tracing::debug!(from, to, "fetch stake table events in range");
loop {
match contract
.stakers_updated_filter()
.from_block(from)
.to_block(to)
.query()
.await
{
Ok(e) => {
for event in e {
events.push(event)
}
break;
}
Err(err) => {
tracing::warn!(from, to, %err, "Stake Table L1Event Error");
sleep(retry_delay).await;
}
}
}
}
Some(StakeTables::from_l1_events(events))
}

fn options(&self) -> &L1ClientOptions {
(*self.provider).as_ref().options()
}
Expand All @@ -781,6 +894,7 @@ impl L1State {
Self {
snapshot: Default::default(),
finalized: LruCache::new(cache_size),
stake: LruCache::new(cache_size),
}
}

Expand Down Expand Up @@ -1182,12 +1296,30 @@ mod test {
tracing::info!(?final_state, "state updated");
}

#[tokio::test]
async fn test_chunky() {
let anvil = Anvil::new().spawn();
let opt = L1ClientOptions {
l1_events_max_block_range: 3,
..Default::default()
};
let l1_client = opt.connect(vec![anvil.endpoint().parse().unwrap()]);

let chunks = l1_client.chunky(3, 10);
let tups = stream::iter(chunks).collect::<Vec<_>>().await;

assert_eq![vec![(3, 5), (6, 8), (9, 10)], tups];

let tups = L1Client::chunky2(3, 10, 3);
assert_eq![vec![(3, 5), (6, 8), (9, 10)], tups];
}

#[tokio::test]
async fn test_fetch_stake_table() -> anyhow::Result<()> {
setup_test();

let anvil = Anvil::new().spawn();
let l1_client = L1Client::new(anvil.endpoint().parse().unwrap());
let l1_client = new_l1_client(&anvil, false).await;
let wallet: LocalWallet = anvil.keys()[0].clone().into();

// In order to deposit we need a provider that can sign.
Expand Down
1 change: 0 additions & 1 deletion types/src/v0/impls/stake_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ impl Membership<SeqTypes> for EpochCommittees {
self.l1_client
.get_stake_table(address, block_header.height())
.await
.ok()
.map(|stake_table| -> Box<dyn FnOnce(&mut Self) + Send> {
Box::new(move |committee: &mut Self| {
let _ = committee.update_stake_table(epoch, stake_table);
Expand Down
7 changes: 4 additions & 3 deletions types/src/v0/v0_1/l1.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::parse_duration;
use crate::{parse_duration, v0_3::StakeTables};
use async_broadcast::{InactiveReceiver, Sender};
use clap::Parser;
use derive_more::Deref;
Expand All @@ -16,7 +16,7 @@ use std::{
};
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
task::JoinSet,
};
use url::Url;

Expand Down Expand Up @@ -166,6 +166,7 @@ pub struct L1Client {
pub(crate) struct L1State {
pub(crate) snapshot: L1Snapshot,
pub(crate) finalized: LruCache<u64, L1BlockInfo>,
pub(crate) stake: LruCache<u64, StakeTables>,
}

#[derive(Clone, Debug)]
Expand All @@ -175,7 +176,7 @@ pub(crate) enum L1Event {
}

#[derive(Debug, Default)]
pub(crate) struct L1UpdateTask(pub(crate) Mutex<Option<JoinHandle<()>>>);
pub(crate) struct L1UpdateTask(pub(crate) Mutex<Option<JoinSet<()>>>);

#[derive(Clone, Debug)]
pub(crate) struct L1ClientMetrics {
Expand Down