-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POS efficient l1 polling #2506
base: main
Are you sure you want to change the base?
POS efficient l1 polling #2506
Changes from all commits
6cfed7f
d4c87be
34b6a41
69cf1ea
d699431
f5d1ea2
f4bc1d1
60e95c8
e956d6e
ae558a7
810a0f0
f6007a5
0526e94
22b6861
d49e600
6f5731e
c53eb8e
6f8664e
32a490b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
use std::{ | ||
cmp::{min, Ordering}, | ||
fmt::Debug, | ||
iter::FromFn, | ||
num::NonZeroUsize, | ||
sync::Arc, | ||
time::Instant, | ||
|
@@ -11,7 +12,8 @@ use async_trait::async_trait; | |
use clap::Parser; | ||
use committable::{Commitment, Committable, RawCommitmentBuilder}; | ||
use contract_bindings::{ | ||
fee_contract::FeeContract, permissioned_stake_table::PermissionedStakeTable, | ||
fee_contract::FeeContract, | ||
permissioned_stake_table::{PermissionedStakeTable, StakersUpdatedFilter}, | ||
}; | ||
use ethers::{ | ||
prelude::{Address, BlockNumber, Middleware, Provider, H256, U256}, | ||
|
@@ -27,8 +29,8 @@ use reqwest::StatusCode; | |
use serde::{de::DeserializeOwned, Serialize}; | ||
use std::time::Duration; | ||
use tokio::{ | ||
spawn, | ||
sync::{Mutex, MutexGuard}, | ||
task::JoinSet, | ||
time::sleep, | ||
}; | ||
use tracing::Instrument; | ||
|
@@ -88,8 +90,8 @@ impl L1BlockInfo { | |
|
||
impl Drop for L1UpdateTask { | ||
fn drop(&mut self) { | ||
if let Some(task) = self.0.get_mut().take() { | ||
task.abort(); | ||
if let Some(mut tasks) = self.0.get_mut().take() { | ||
tasks.abort_all() | ||
} | ||
} | ||
} | ||
|
@@ -307,7 +309,10 @@ impl L1Client { | |
pub async fn spawn_tasks(&self) { | ||
let mut update_task = self.update_task.0.lock().await; | ||
if update_task.is_none() { | ||
*update_task = Some(spawn(self.update_loop())); | ||
let mut tasks = JoinSet::new(); | ||
tasks.spawn(self.update_loop()); | ||
tasks.spawn(self.stake_update_loop()); | ||
*update_task = Some(tasks); | ||
} | ||
} | ||
|
||
|
@@ -316,14 +321,69 @@ impl L1Client { | |
/// The L1 client will still be usable, but will stop updating until [`start`](Self::start) is | ||
/// called again. | ||
pub async fn shut_down_tasks(&self) { | ||
if let Some(update_task) = self.update_task.0.lock().await.take() { | ||
update_task.abort(); | ||
if let Some(mut update_task) = self.update_task.0.lock().await.take() { | ||
// TODO review order | ||
update_task.abort_all(); | ||
} | ||
} | ||
|
||
pub fn provider(&self) -> &impl Middleware<Error: 'static> { | ||
&self.provider | ||
} | ||
/// Update stake-table cache on `L1Event::NewHead`. | ||
fn stake_update_loop(&self) -> impl Future<Output = ()> { | ||
let opt = self.options(); | ||
let retry_delay = opt.l1_retry_delay; | ||
let chunk_size = opt.l1_events_max_block_range as usize; | ||
let span = tracing::warn_span!("L1 client update"); | ||
let state = self.state.clone(); | ||
let stake_table_contract = | ||
// TODO attach address to L1Client | ||
PermissionedStakeTable::new(Address::default(), self.provider.clone()); | ||
|
||
let mut events = self.receiver.activate_cloned(); | ||
|
||
async move { | ||
loop { | ||
let last_head = { | ||
let state = state.lock().await; | ||
state.snapshot.head | ||
}; | ||
while let Some(event) = events.next().await { | ||
let L1Event::NewHead { head } = event else { | ||
continue; | ||
}; | ||
|
||
let chunks = L1Client::chunky2(last_head, head, chunk_size); | ||
let mut events: Vec<StakersUpdatedFilter> = Vec::new(); | ||
for (from, to) in chunks { | ||
tracing::debug!(from, to, "fetch stake table events in range"); | ||
match stake_table_contract | ||
.stakers_updated_filter() | ||
.from_block(from) | ||
.to_block(to) | ||
.query() | ||
.await | ||
{ | ||
Ok(e) => { | ||
for event in e { | ||
events.push(event) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we put the events into a local Vec called |
||
} | ||
break; | ||
} | ||
Err(err) => { | ||
tracing::warn!(from, to, %err, "Stake Table L1Event Error"); | ||
sleep(retry_delay).await; | ||
} | ||
} | ||
} | ||
sleep(retry_delay).await; | ||
} | ||
sleep(retry_delay).await; | ||
} | ||
} | ||
.instrument(span) | ||
} | ||
|
||
fn update_loop(&self) -> impl Future<Output = ()> { | ||
let opt = self.options(); | ||
|
@@ -435,7 +495,8 @@ impl L1Client { | |
|
||
// Update the state snapshot; | ||
let mut state = state.lock().await; | ||
if head > state.snapshot.head { | ||
let snapshot_head = state.snapshot.head; | ||
if head > snapshot_head { | ||
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated"); | ||
metrics.head.set(head as usize); | ||
state.snapshot.head = head; | ||
|
@@ -679,6 +740,40 @@ impl L1Client { | |
(state, block) | ||
} | ||
|
||
/// Divide the range `start..=end` into chunks of size | ||
/// `events_max_block_range`. | ||
fn chunky(&self, start: u64, end: u64) -> FromFn<impl FnMut() -> Option<(u64, u64)>> { | ||
let mut start = start; | ||
let chunk_size = self.options().l1_events_max_block_range; | ||
std::iter::from_fn(move || { | ||
let chunk_end = min(start + chunk_size - 1, end); | ||
if chunk_end < start { | ||
return None; | ||
} | ||
|
||
let chunk = (start, chunk_end); | ||
start = chunk_end + 1; | ||
Some(chunk) | ||
}) | ||
} | ||
|
||
/// Divide the range `start..=end` into chunks of size | ||
/// `events_max_block_range`. | ||
fn chunky2(start: u64, end: u64, chunk_size: usize) -> Vec<(u64, u64)> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about returning a |
||
// let opt = self.options(); | ||
// let chunk_size = opt.l1_events_max_block_range as usize; | ||
let chunks: Vec<u64> = (start..=end).collect(); | ||
let tups: Vec<(u64, u64)> = chunks | ||
.chunks(chunk_size) | ||
.map(|s| { | ||
// should never be empty | ||
s.first().cloned().zip(s.last().cloned()).unwrap() | ||
}) | ||
.collect(); | ||
|
||
tups | ||
} | ||
|
||
/// Get fee info for each `Deposit` occurring between `prev` | ||
/// and `new`. Returns `Vec<FeeInfo>` | ||
pub async fn get_finalized_deposits( | ||
|
@@ -701,19 +796,7 @@ impl L1Client { | |
|
||
// Divide the range `prev_finalized..=new_finalized` into chunks of size | ||
// `events_max_block_range`. | ||
let mut start = prev; | ||
let end = new_finalized; | ||
let chunk_size = opt.l1_events_max_block_range; | ||
let chunks = std::iter::from_fn(move || { | ||
let chunk_end = min(start + chunk_size - 1, end); | ||
if chunk_end < start { | ||
return None; | ||
} | ||
|
||
let chunk = (start, chunk_end); | ||
start = chunk_end + 1; | ||
Some(chunk) | ||
}); | ||
let chunks = self.chunky(prev, new_finalized); | ||
|
||
// Fetch events for each chunk. | ||
let events = stream::iter(chunks).then(|(from, to)| { | ||
|
@@ -744,25 +827,55 @@ impl L1Client { | |
events.flatten().map(FeeInfo::from).collect().await | ||
} | ||
|
||
/// Get `StakeTable` at block height. | ||
// /// Get `StakeTable` at block height. If unavailable in local cache, poll the l1. | ||
pub async fn get_stake_table( | ||
&self, | ||
contract: Address, | ||
contract_address: Address, | ||
block: u64, | ||
) -> anyhow::Result<StakeTables> { | ||
// TODO stake_table_address needs to be passed in to L1Client | ||
// before update loop starts. | ||
let stake_table_contract = PermissionedStakeTable::new(contract, self.provider.clone()); | ||
|
||
let events = stake_table_contract | ||
.stakers_updated_filter() | ||
.from_block(0) | ||
.to_block(block) | ||
.query() | ||
.await?; | ||
) -> Option<StakeTables> { | ||
let opt = self.options(); | ||
let retry_delay = opt.l1_retry_delay; | ||
let chunk_size = opt.l1_events_max_block_range as usize; | ||
|
||
let last_head = { | ||
let mut state = self.state.lock().await; | ||
if let Some(st) = state.stake.get(&block) { | ||
return Some(st.clone()); | ||
} else { | ||
state.snapshot.head | ||
} | ||
}; | ||
|
||
Ok(StakeTables::from_l1_events(events.clone())) | ||
let chunks = L1Client::chunky2(last_head, block, chunk_size); | ||
let contract = PermissionedStakeTable::new(contract_address, self.provider.clone()); | ||
|
||
let mut events: Vec<StakersUpdatedFilter> = Vec::new(); | ||
for (from, to) in chunks { | ||
tracing::debug!(from, to, "fetch stake table events in range"); | ||
loop { | ||
match contract | ||
.stakers_updated_filter() | ||
.from_block(from) | ||
.to_block(to) | ||
.query() | ||
.await | ||
{ | ||
Ok(e) => { | ||
for event in e { | ||
events.push(event) | ||
} | ||
break; | ||
} | ||
Err(err) => { | ||
tracing::warn!(from, to, %err, "Stake Table L1Event Error"); | ||
sleep(retry_delay).await; | ||
} | ||
} | ||
} | ||
} | ||
Some(StakeTables::from_l1_events(events)) | ||
} | ||
|
||
fn options(&self) -> &L1ClientOptions { | ||
(*self.provider).as_ref().options() | ||
} | ||
|
@@ -781,6 +894,7 @@ impl L1State { | |
Self { | ||
snapshot: Default::default(), | ||
finalized: LruCache::new(cache_size), | ||
stake: LruCache::new(cache_size), | ||
} | ||
} | ||
|
||
|
@@ -1182,12 +1296,30 @@ mod test { | |
tracing::info!(?final_state, "state updated"); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_chunky() { | ||
let anvil = Anvil::new().spawn(); | ||
let opt = L1ClientOptions { | ||
l1_events_max_block_range: 3, | ||
..Default::default() | ||
}; | ||
let l1_client = opt.connect(vec![anvil.endpoint().parse().unwrap()]); | ||
|
||
let chunks = l1_client.chunky(3, 10); | ||
let tups = stream::iter(chunks).collect::<Vec<_>>().await; | ||
|
||
assert_eq![vec![(3, 5), (6, 8), (9, 10)], tups]; | ||
|
||
let tups = L1Client::chunky2(3, 10, 3); | ||
assert_eq![vec![(3, 5), (6, 8), (9, 10)], tups]; | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_fetch_stake_table() -> anyhow::Result<()> { | ||
setup_test(); | ||
|
||
let anvil = Anvil::new().spawn(); | ||
let l1_client = L1Client::new(anvil.endpoint().parse().unwrap()); | ||
let l1_client = new_l1_client(&anvil, false).await; | ||
let wallet: LocalWallet = anvil.keys()[0].clone().into(); | ||
|
||
// In order to deposit we need a provider that can sign. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the latest L1 block (that may not be finalized yet). We should only consider finalized blocks for the stake table because otherwise the stake table may change on L1 after we fetched it.