-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POS efficient l1 polling #2506
Draft
tbro
wants to merge
19
commits into
main
Choose a base branch
from
tb/pos/efficient-l1-polling
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+172
−40
Draft
POS efficient l1 polling #2506
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
6cfed7f
Begin reworking L1Client logic
tbro d4c87be
Pos State
tbro 34b6a41
clippy
tbro 69cf1ea
add some preliminary cache functionality
tbro d699431
call state update method from `update_loop`
tbro f5d1ea2
move update to it's own task
tbro f4bc1d1
update stake tables in their own thread
tbro 60e95c8
Isolate l1 fetching to pure fn
tbro e956d6e
pass contract to fetcher
tbro ae558a7
Reinstate `get_stake_tables`
tbro 810a0f0
add back chunky to get_stake_tables
tbro f6007a5
chuky2 part 1
tbro 0526e94
chunky2 part 2
tbro 22b6861
chunky2 part 3
tbro d49e600
cleanup
tbro 6f5731e
make chunker a pure fn
tbro c53eb8e
cleanup
tbro 6f8664e
Merge remote-tracking branch 'origin/main' into tb/pos/efficient-l1-p…
tbro 32a490b
fix conflict res
tbro File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
use std::{ | ||
cmp::{min, Ordering}, | ||
fmt::Debug, | ||
iter::FromFn, | ||
num::NonZeroUsize, | ||
sync::Arc, | ||
time::Instant, | ||
|
@@ -11,7 +12,8 @@ use async_trait::async_trait; | |
use clap::Parser; | ||
use committable::{Commitment, Committable, RawCommitmentBuilder}; | ||
use contract_bindings::{ | ||
fee_contract::FeeContract, permissioned_stake_table::PermissionedStakeTable, | ||
fee_contract::FeeContract, | ||
permissioned_stake_table::{PermissionedStakeTable, StakersUpdatedFilter}, | ||
}; | ||
use ethers::{ | ||
prelude::{Address, BlockNumber, Middleware, Provider, H256, U256}, | ||
|
@@ -27,8 +29,8 @@ use reqwest::StatusCode; | |
use serde::{de::DeserializeOwned, Serialize}; | ||
use std::time::Duration; | ||
use tokio::{ | ||
spawn, | ||
sync::{Mutex, MutexGuard}, | ||
task::JoinSet, | ||
time::sleep, | ||
}; | ||
use tracing::Instrument; | ||
|
@@ -88,8 +90,8 @@ impl L1BlockInfo { | |
|
||
impl Drop for L1UpdateTask { | ||
fn drop(&mut self) { | ||
if let Some(task) = self.0.get_mut().take() { | ||
task.abort(); | ||
if let Some(mut tasks) = self.0.get_mut().take() { | ||
tasks.abort_all() | ||
} | ||
} | ||
} | ||
|
@@ -307,7 +309,10 @@ impl L1Client { | |
pub async fn spawn_tasks(&self) { | ||
let mut update_task = self.update_task.0.lock().await; | ||
if update_task.is_none() { | ||
*update_task = Some(spawn(self.update_loop())); | ||
let mut tasks = JoinSet::new(); | ||
tasks.spawn(self.update_loop()); | ||
tasks.spawn(self.stake_update_loop()); | ||
*update_task = Some(tasks); | ||
} | ||
} | ||
|
||
|
@@ -316,14 +321,69 @@ impl L1Client { | |
/// The L1 client will still be usable, but will stop updating until [`start`](Self::start) is | ||
/// called again. | ||
pub async fn shut_down_tasks(&self) { | ||
if let Some(update_task) = self.update_task.0.lock().await.take() { | ||
update_task.abort(); | ||
if let Some(mut update_task) = self.update_task.0.lock().await.take() { | ||
// TODO review order | ||
update_task.abort_all(); | ||
} | ||
} | ||
|
||
pub fn provider(&self) -> &impl Middleware<Error: 'static> { | ||
&self.provider | ||
} | ||
/// Update stake-table cache on `L1Event::NewHead`. | ||
fn stake_update_loop(&self) -> impl Future<Output = ()> { | ||
let opt = self.options(); | ||
let retry_delay = opt.l1_retry_delay; | ||
let chunk_size = opt.l1_events_max_block_range as usize; | ||
let span = tracing::warn_span!("L1 client update"); | ||
let state = self.state.clone(); | ||
let stake_table_contract = | ||
// TODO attach address to L1Client | ||
PermissionedStakeTable::new(Address::default(), self.provider.clone()); | ||
|
||
let mut events = self.receiver.activate_cloned(); | ||
|
||
async move { | ||
loop { | ||
let last_head = { | ||
let state = state.lock().await; | ||
state.snapshot.head | ||
}; | ||
while let Some(event) = events.next().await { | ||
let L1Event::NewHead { head } = event else { | ||
continue; | ||
}; | ||
|
||
let chunks = L1Client::chunky2(last_head, head, chunk_size); | ||
let mut events: Vec<StakersUpdatedFilter> = Vec::new(); | ||
for (from, to) in chunks { | ||
tracing::debug!(from, to, "fetch stake table events in range"); | ||
match stake_table_contract | ||
.stakers_updated_filter() | ||
.from_block(from) | ||
.to_block(to) | ||
.query() | ||
.await | ||
{ | ||
Ok(e) => { | ||
for event in e { | ||
events.push(event) | ||
} | ||
break; | ||
} | ||
Err(err) => { | ||
tracing::warn!(from, to, %err, "Stake Table L1Event Error"); | ||
sleep(retry_delay).await; | ||
} | ||
} | ||
} | ||
sleep(retry_delay).await; | ||
} | ||
sleep(retry_delay).await; | ||
} | ||
} | ||
.instrument(span) | ||
} | ||
|
||
fn update_loop(&self) -> impl Future<Output = ()> { | ||
let opt = self.options(); | ||
|
@@ -435,7 +495,8 @@ impl L1Client { | |
|
||
// Update the state snapshot; | ||
let mut state = state.lock().await; | ||
if head > state.snapshot.head { | ||
let snapshot_head = state.snapshot.head; | ||
if head > snapshot_head { | ||
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated"); | ||
metrics.head.set(head as usize); | ||
state.snapshot.head = head; | ||
|
@@ -679,6 +740,40 @@ impl L1Client { | |
(state, block) | ||
} | ||
|
||
/// Divide the range `start..=end` into chunks of size | ||
/// `events_max_block_range`. | ||
fn chunky(&self, start: u64, end: u64) -> FromFn<impl FnMut() -> Option<(u64, u64)>> { | ||
let mut start = start; | ||
let chunk_size = self.options().l1_events_max_block_range; | ||
std::iter::from_fn(move || { | ||
let chunk_end = min(start + chunk_size - 1, end); | ||
if chunk_end < start { | ||
return None; | ||
} | ||
|
||
let chunk = (start, chunk_end); | ||
start = chunk_end + 1; | ||
Some(chunk) | ||
}) | ||
} | ||
|
||
/// Divide the range `start..=end` into chunks of size | ||
/// `events_max_block_range`. | ||
fn chunky2(start: u64, end: u64, chunk_size: usize) -> Vec<(u64, u64)> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about returning a |
||
// let opt = self.options(); | ||
// let chunk_size = opt.l1_events_max_block_range as usize; | ||
let chunks: Vec<u64> = (start..=end).collect(); | ||
let tups: Vec<(u64, u64)> = chunks | ||
.chunks(chunk_size) | ||
.map(|s| { | ||
// should never be empty | ||
s.first().cloned().zip(s.last().cloned()).unwrap() | ||
}) | ||
.collect(); | ||
|
||
tups | ||
} | ||
|
||
/// Get fee info for each `Deposit` occurring between `prev` | ||
/// and `new`. Returns `Vec<FeeInfo>` | ||
pub async fn get_finalized_deposits( | ||
|
@@ -701,19 +796,7 @@ impl L1Client { | |
|
||
// Divide the range `prev_finalized..=new_finalized` into chunks of size | ||
// `events_max_block_range`. | ||
let mut start = prev; | ||
let end = new_finalized; | ||
let chunk_size = opt.l1_events_max_block_range; | ||
let chunks = std::iter::from_fn(move || { | ||
let chunk_end = min(start + chunk_size - 1, end); | ||
if chunk_end < start { | ||
return None; | ||
} | ||
|
||
let chunk = (start, chunk_end); | ||
start = chunk_end + 1; | ||
Some(chunk) | ||
}); | ||
let chunks = self.chunky(prev, new_finalized); | ||
|
||
// Fetch events for each chunk. | ||
let events = stream::iter(chunks).then(|(from, to)| { | ||
|
@@ -744,25 +827,55 @@ impl L1Client { | |
events.flatten().map(FeeInfo::from).collect().await | ||
} | ||
|
||
/// Get `StakeTable` at block height. | ||
// /// Get `StakeTable` at block height. If unavailable in local cache, poll the l1. | ||
pub async fn get_stake_table( | ||
&self, | ||
contract: Address, | ||
contract_address: Address, | ||
block: u64, | ||
) -> anyhow::Result<StakeTables> { | ||
// TODO stake_table_address needs to be passed in to L1Client | ||
// before update loop starts. | ||
let stake_table_contract = PermissionedStakeTable::new(contract, self.provider.clone()); | ||
|
||
let events = stake_table_contract | ||
.stakers_updated_filter() | ||
.from_block(0) | ||
.to_block(block) | ||
.query() | ||
.await?; | ||
) -> Option<StakeTables> { | ||
let opt = self.options(); | ||
let retry_delay = opt.l1_retry_delay; | ||
let chunk_size = opt.l1_events_max_block_range as usize; | ||
|
||
let last_head = { | ||
let mut state = self.state.lock().await; | ||
if let Some(st) = state.stake.get(&block) { | ||
return Some(st.clone()); | ||
} else { | ||
state.snapshot.head | ||
} | ||
}; | ||
|
||
Ok(StakeTables::from_l1_events(events.clone())) | ||
let chunks = L1Client::chunky2(last_head, block, chunk_size); | ||
let contract = PermissionedStakeTable::new(contract_address, self.provider.clone()); | ||
|
||
let mut events: Vec<StakersUpdatedFilter> = Vec::new(); | ||
for (from, to) in chunks { | ||
tracing::debug!(from, to, "fetch stake table events in range"); | ||
loop { | ||
match contract | ||
.stakers_updated_filter() | ||
.from_block(from) | ||
.to_block(to) | ||
.query() | ||
.await | ||
{ | ||
Ok(e) => { | ||
for event in e { | ||
events.push(event) | ||
} | ||
break; | ||
} | ||
Err(err) => { | ||
tracing::warn!(from, to, %err, "Stake Table L1Event Error"); | ||
sleep(retry_delay).await; | ||
} | ||
} | ||
} | ||
} | ||
Some(StakeTables::from_l1_events(events)) | ||
} | ||
|
||
fn options(&self) -> &L1ClientOptions { | ||
(*self.provider).as_ref().options() | ||
} | ||
|
@@ -781,6 +894,7 @@ impl L1State { | |
Self { | ||
snapshot: Default::default(), | ||
finalized: LruCache::new(cache_size), | ||
stake: LruCache::new(cache_size), | ||
} | ||
} | ||
|
||
|
@@ -1182,12 +1296,30 @@ mod test { | |
tracing::info!(?final_state, "state updated"); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_chunky() { | ||
let anvil = Anvil::new().spawn(); | ||
let opt = L1ClientOptions { | ||
l1_events_max_block_range: 3, | ||
..Default::default() | ||
}; | ||
let l1_client = opt.connect(vec![anvil.endpoint().parse().unwrap()]); | ||
|
||
let chunks = l1_client.chunky(3, 10); | ||
let tups = stream::iter(chunks).collect::<Vec<_>>().await; | ||
|
||
assert_eq![vec![(3, 5), (6, 8), (9, 10)], tups]; | ||
|
||
let tups = L1Client::chunky2(3, 10, 3); | ||
assert_eq![vec![(3, 5), (6, 8), (9, 10)], tups]; | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_fetch_stake_table() -> anyhow::Result<()> { | ||
setup_test(); | ||
|
||
let anvil = Anvil::new().spawn(); | ||
let l1_client = L1Client::new(anvil.endpoint().parse().unwrap()); | ||
let l1_client = new_l1_client(&anvil, false).await; | ||
let wallet: LocalWallet = anvil.keys()[0].clone().into(); | ||
|
||
// In order to deposit we need a provider that can sign. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the latest L1 block (that may not be finalized yet). We should only consider finalized blocks for the stake table because otherwise the stake table may change on L1 after we fetched it.