diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index 29c903b0..0c996467 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -5,11 +5,14 @@ //! This is NOT intended to be run in production so it has no nice configuration, poor error checking and some hardcoded values. use std::{path::PathBuf, sync::Arc, thread::sleep, time::Duration}; +use alloy_primitives::U256; use jsonrpsee::RpcModule; use rbuilder::{ beacon_api_client::Client, building::{ builders::{ + best_block_store::BestBlockTracker, + best_block_store::GlobalBestBlockStore, block_building_helper::{BlockBuildingHelper, BlockBuildingHelperFromProvider}, BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, OrderConsumer, UnfinishedBlockBuildingSink, UnfinishedBlockBuildingSinkFactory, @@ -47,7 +50,7 @@ use tokio::{ sync::{broadcast, mpsc}, }; use tokio_util::sync::CancellationToken; -use tracing::{info, level_filters::LevelFilter}; +use tracing::{error, info, level_filters::LevelFilter}; const RETH_DB_PATH: &str = DEFAULT_RETH_DB_PATH; @@ -110,7 +113,9 @@ async fn main() -> eyre::Result<()> { global_cancellation: cancel.clone(), extra_rpc: RpcModule::new(()), sink_factory: Box::new(TraceBlockSinkFactory {}), - builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))], + builders: vec![Arc::new( + DummyBuildingAlgorithm::new(10, GlobalBestBlockStore::new()).await, + )], run_sparse_trie_prefetcher: false, orderpool_sender, orderpool_receiver, @@ -157,7 +162,6 @@ impl UnfinishedBlockBuildingSink for TracingBlockSink { false } } - //////////////////////////// /// BUILDING ALGORITHM //////////////////////////// @@ -168,13 +172,18 @@ impl UnfinishedBlockBuildingSink for TracingBlockSink { struct DummyBuildingAlgorithm { /// Amnount of used orders to build a block orders_to_use: usize, + best_block_tracker: BestBlockTracker, } const ORDER_POLLING_PERIOD: Duration = Duration::from_millis(10); const BUILDER_NAME: &str = "DUMMY"; impl DummyBuildingAlgorithm { - pub fn new(orders_to_use: usize) -> Self { - Self { orders_to_use } + pub async fn new(orders_to_use: usize, best_block_store: GlobalBestBlockStore) -> Self { + let best_block_tracker = BestBlockTracker::new(best_block_store); + Self { + orders_to_use, + best_block_tracker, + } } fn wait_for_orders( @@ -247,7 +256,19 @@ where let block = self .build_block(orders, input.provider, &input.ctx) .unwrap(); - input.sink.new_block(block); + + match block.finalize_block(Some(U256::from(0))) { + Ok(res) => { + // block on the async operation + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(self.best_block_tracker.try_and_update(res.block)) + }); + } + Err(e) => { + error!("Error on finalize_block on DummyBuildingAlgorithm: {:?}", e); + } + } } } } diff --git a/crates/rbuilder/src/building/builders/best_block_store.rs b/crates/rbuilder/src/building/builders/best_block_store.rs new file mode 100644 index 00000000..60bca560 --- /dev/null +++ b/crates/rbuilder/src/building/builders/best_block_store.rs @@ -0,0 +1,433 @@ +use futures::stream::{Stream, StreamExt}; +use std::sync::Arc; +use tokio::sync::{broadcast, Mutex}; +use tokio_stream::wrappers::BroadcastStream; +use tracing::debug; + +use super::Block; + +const DEFAULT_BUFFER_SIZE: usize = 64; + +/// Error returned when an update is rejected. +#[derive(Debug)] +pub struct UpdateRejected; + +/// GlobalBestBlockStore maintains a single global best block. +/// It is updated by the best block builder and broadcast to all listeners. +#[derive(Clone, Debug)] +pub struct GlobalBestBlockStore { + best_block: Arc>>, + tx: broadcast::Sender, +} + +impl Default for GlobalBestBlockStore { + fn default() -> Self { + Self::new() + } +} + +impl GlobalBestBlockStore { + /// Create a new global best block store with default buffer size. + pub fn new() -> Self { + Self::with_buffer_size(DEFAULT_BUFFER_SIZE) + } + + /// Create a new global best block store with specified buffer size. + pub fn with_buffer_size(buffer_size: usize) -> Self { + let (tx, _rx) = broadcast::channel(buffer_size); + Self { + best_block: Arc::new(Mutex::new(None)), + tx, + } + } + + /// Compare the new block with the current best block and update if the new block is better. + pub async fn compare_and_update(&self, new_block: Block) -> Result<(), UpdateRejected> { + // Create a new scope to hold the lock for the entire function + // Lock is released here when guard goes out of scope + + { + let mut guard = self.best_block.lock().await; + + if guard.is_none() || new_block.has_higher_bid_value_than(guard.as_ref()) { + debug!( + "Updating best block bid value to {:?} for builder {:?} and block number {:?}, from {:?} for builder {:?}", + new_block.trace.bid_value, + new_block.builder_name, + new_block.sealed_block.number, + guard.as_ref().map(|block| block.trace.bid_value).unwrap_or_default(), + guard.as_ref().map(|block| block.builder_name.clone()).unwrap_or_default() + ); + *guard = Some(new_block.clone()); + let _ = self.tx.send(new_block); + Ok(()) + } else { + Err(UpdateRejected) + } + } + } + + /// Subscribe to updates on the best block. + pub fn subscribe(&self) -> impl Stream { + let rx = self.tx.subscribe(); + BroadcastStream::new(rx).filter_map(|res| async move { res.ok() }) + } + + /// Get the current best block. + pub async fn get_best_block(&self) -> Option { + let guard = self.best_block.lock().await; + guard.clone() + } + + /// Get a broadcast receiver for best block updates + pub fn subscribe_to_updates(&self) -> broadcast::Receiver { + self.tx.subscribe() + } +} + +/// A tracker that keeps a local view of the best block. +/// It provides a method to try to update the best block and a method to get the best block. +#[derive(Debug, Clone)] +pub struct BestBlockTracker { + store: GlobalBestBlockStore, + best_block: Arc>>, +} + +impl BestBlockTracker { + /// Create a new uninitialized tracker + pub fn new(store: GlobalBestBlockStore) -> Self { + Self { + store, + best_block: Arc::new(Mutex::new(None)), + } + } + + /// Initialize the tracker with the current best block and start background tracking + pub async fn init(&self) { + let initial_block = self.store.get_best_block().await; + *self.best_block.lock().await = initial_block; + + let best_block = self.best_block.clone(); + let mut rx = self.store.subscribe_to_updates(); + + // start background task to track best blocks + tokio::spawn(async move { + while let Ok(block) = rx.recv().await { + *best_block.lock().await = Some(block); + } + }); + } + + /// Attempt to update the best block with `new_block`. + /// If `new_block` is better than the current best block, it tries to update the store. + /// Returns `true` if the global store was successfully updated. + pub async fn try_and_update(&self, new_block: Block) -> bool { + let current_best = self.best_block.lock().await; + if current_best.is_none() || new_block.has_higher_bid_value_than(current_best.as_ref()) { + self.store.compare_and_update(new_block).await.is_ok() + } else { + false + } + } + + /// Get current best block + pub async fn get_best_block(&self) -> Option { + self.best_block.lock().await.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::building::{BuiltBlockTrace, SealedBlock}; + use alloy_primitives::U256; + use futures::pin_mut; + use std::time::Duration; + use time::OffsetDateTime; + + #[allow(dead_code)] + fn make_block(bid: u64) -> Block { + Block { + trace: BuiltBlockTrace { + included_orders: vec![], + bid_value: U256::from(bid), + true_bid_value: U256::from(bid), + got_no_signer_error: false, + orders_closed_at: OffsetDateTime::now_utc(), + orders_sealed_at: OffsetDateTime::now_utc(), + fill_time: Duration::from_secs(0), + finalize_time: Duration::from_secs(0), + root_hash_time: Duration::from_secs(0), + }, + sealed_block: SealedBlock::default(), + txs_blobs_sidecars: vec![], + execution_requests: vec![], + builder_name: "test_builder".to_string(), + } + } + + #[tokio::test] + async fn test_compare_and_update() { + let store = GlobalBestBlockStore::new(); + let initial_block = make_block(100); + let better_block = make_block(101); + let worse_block = make_block(99); + + // Initially store is empty so any block is better + assert!(store + .compare_and_update(initial_block.clone()) + .await + .is_ok()); + assert_eq!( + store.get_best_block().await.unwrap().trace.bid_value, + U256::from(100) + ); + + // Update with a better block + assert!(store.compare_and_update(better_block.clone()).await.is_ok()); + assert_eq!( + store.get_best_block().await.unwrap().trace.bid_value, + U256::from(101) + ); + + // Update with a worse block + assert!(store.compare_and_update(worse_block.clone()).await.is_err()); + assert_eq!( + store.get_best_block().await.unwrap().trace.bid_value, + U256::from(101) + ); + } + + #[tokio::test] + async fn test_subscribe() { + let store = GlobalBestBlockStore::new(); + let stream = store.subscribe(); + pin_mut!(stream); + + let block = make_block(100); + + // No updates yet, stream should yield None. + tokio::select! { + biased; + _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {}, + val = stream.next() => { + panic!("Received a block when none was broadcast: {:?}", val); + } + } + + // Update the store; subscribers should see this. + assert!(store.compare_and_update(block.clone()).await.is_ok()); + + let received = stream.next().await.unwrap(); + assert_eq!(received.trace.bid_value, U256::from(100)); + } + + #[tokio::test] + async fn test_concurrent_updates() { + let store = GlobalBestBlockStore::new(); + let mut handles = vec![]; + use rand::Rng; + let mut rng = rand::thread_rng(); + + // Generate random bid values and store the max for later assertion. + let mut values = vec![]; + for _ in 0..10 { + values.push(rng.gen_range(50..200u64)); + } + let max_value = *values.iter().max().unwrap(); + + // Spawn tasks to update the store concurrently. + for v in values.into_iter() { + let s = store.clone(); + handles.push(tokio::spawn(async move { + let block = make_block(v); + let _ = s.compare_and_update(block).await; + })); + } + + for h in handles { + h.await.unwrap(); + } + + // After all updates, ensure the best block matches the highest submitted value. + let best_block = store.get_best_block().await.unwrap(); + assert_eq!(best_block.trace.bid_value, U256::from(max_value)); + } + + #[tokio::test] + async fn test_multiple_subscribers() { + let store = GlobalBestBlockStore::new(); + + // Create three subscribers before any updates + let stream1 = store.subscribe(); + let stream2 = store.subscribe(); + let stream3 = store.subscribe(); + + // Pin them for iteration + tokio::pin!(stream1, stream2, stream3); + + let block = make_block(150); + + // Update the store + assert!(store.compare_and_update(block.clone()).await.is_ok()); + + // All subscribers should receive the update + let received1 = stream1.next().await.unwrap(); + let received2 = stream2.next().await.unwrap(); + let received3 = stream3.next().await.unwrap(); + + assert_eq!(received1.trace.bid_value, U256::from(150)); + assert_eq!(received2.trace.bid_value, U256::from(150)); + assert_eq!(received3.trace.bid_value, U256::from(150)); + + // Now create a new subscriber after the first update + let late_subscriber = store.subscribe(); + pin_mut!(late_subscriber); + + let better_block = make_block(200); + + // Update again + assert!(store.compare_and_update(better_block.clone()).await.is_ok()); + + let late_received = late_subscriber.next().await.unwrap(); + assert_eq!(late_received.trace.bid_value, U256::from(200)); + } + + #[tokio::test] + async fn test_tracker_initially_empty_store() { + let store = GlobalBestBlockStore::new(); + let tracker = BestBlockTracker::new(store.clone()); + + // Store is empty initially, tracker should also reflect no best block + assert!(tracker.get_best_block().await.is_none()); + } + + #[tokio::test] + async fn test_tracker_initially_with_store_data() { + let store = GlobalBestBlockStore::new(); + let initial_block = make_block(100); + store + .compare_and_update(initial_block.clone()) + .await + .unwrap(); + + let tracker = BestBlockTracker::new(store.clone()); + tracker.init().await; + + // Tracker should immediately see the block that was in the store + let best_block = tracker.get_best_block().await; + assert!(best_block.is_some()); + assert_eq!(best_block.unwrap().trace.bid_value, U256::from(100)); + } + + #[tokio::test] + async fn test_tracker_updates_on_new_broadcast() { + let store = GlobalBestBlockStore::new(); + let tracker = BestBlockTracker::new(store.clone()); + tracker.init().await; + + // Initially empty + assert!(tracker.get_best_block().await.is_none()); + + // Update the store + let better_block = make_block(200); + store + .compare_and_update(better_block.clone()) + .await + .unwrap(); + + // Allow some time for the background task to pick up changes + tokio::time::sleep(Duration::from_millis(100)).await; + + let best_block = tracker.get_best_block().await; + assert!(best_block.is_some()); + assert_eq!(best_block.unwrap().trace.bid_value, U256::from(200)); + } + + #[tokio::test] + async fn test_try_and_update_improves_block() { + let store = GlobalBestBlockStore::new(); + let tracker = BestBlockTracker::new(store.clone()); + tracker.init().await; + let block_100 = make_block(100); + assert!(tracker.try_and_update(block_100.clone()).await); + + let current_best = store.get_best_block().await.unwrap(); + assert_eq!(current_best.trace.bid_value, U256::from(100)); + + // Try a worse block, should fail to update + let block_50 = make_block(50); + assert!(!tracker.try_and_update(block_50.clone()).await); + + let current_best = store.get_best_block().await.unwrap(); + assert_eq!(current_best.trace.bid_value, U256::from(100)); + + // Try a better block, should succeed + let block_150 = make_block(150); + assert!(tracker.try_and_update(block_150.clone()).await); + + let current_best = store.get_best_block().await.unwrap(); + assert_eq!(current_best.trace.bid_value, U256::from(150)); + } + + #[tokio::test] + async fn test_multiple_trackers_see_same_updates() { + let store = GlobalBestBlockStore::new(); + let tracker1 = BestBlockTracker::new(store.clone()); + let tracker2 = BestBlockTracker::new(store.clone()); + + // Initialize both trackers + tracker1.init().await; + tracker2.init().await; + + // No best block initially + assert!(tracker1.get_best_block().await.is_none()); + assert!(tracker2.get_best_block().await.is_none()); + + // Update from tracker1 + let block_100 = make_block(100); + assert!(tracker1.try_and_update(block_100.clone()).await); + + // Both should now see the updated block + tokio::time::sleep(Duration::from_millis(100)).await; + let t1_best = tracker1.get_best_block().await; + let t2_best = tracker2.get_best_block().await; + assert_eq!(t1_best.unwrap().trace.bid_value, U256::from(100)); + assert_eq!(t2_best.unwrap().trace.bid_value, U256::from(100)); + + // Update directly via store to simulate external update + let block_200 = make_block(200); + store.compare_and_update(block_200.clone()).await.unwrap(); + + // Both trackers should see the improved block after some time + tokio::time::sleep(Duration::from_millis(100)).await; + let t1_best = tracker1.get_best_block().await.unwrap(); + let t2_best = tracker2.get_best_block().await.unwrap(); + assert_eq!(t1_best.trace.bid_value, U256::from(200)); + assert_eq!(t2_best.trace.bid_value, U256::from(200)); + } + + #[tokio::test] + async fn test_concurrent_try_and_update() { + let store = GlobalBestBlockStore::new(); + let tracker = BestBlockTracker::new(store.clone()); + + // We'll try multiple updates concurrently + let candidates = vec![50, 150, 120, 200, 100]; + let mut handles = vec![]; + + for bid in candidates { + let t = tracker.clone(); + let block = make_block(bid); + let handle = tokio::spawn(async move { t.try_and_update(block).await }); + handles.push(handle); + } + + let _results = futures::future::join_all(handles).await; + // Some updates might fail (lower bids), some might succeed. + // We only care that the final store best_block matches the highest bid submitted (200). + + let final_best = store.get_best_block().await.unwrap(); + assert_eq!(final_best.trace.bid_value, U256::from(200)); + } +} diff --git a/crates/rbuilder/src/building/builders/mod.rs b/crates/rbuilder/src/building/builders/mod.rs index c71f53e5..badd77a6 100644 --- a/crates/rbuilder/src/building/builders/mod.rs +++ b/crates/rbuilder/src/building/builders/mod.rs @@ -1,4 +1,5 @@ //! builders is a subprocess that builds a block +pub mod best_block_store; pub mod block_building_helper; pub mod mock_block_building_helper; pub mod ordering_builder; @@ -37,6 +38,14 @@ pub struct Block { pub builder_name: String, } +impl Block { + /// Returns true if this block has a higher bid value than the other block. + pub fn has_higher_bid_value_than(&self, other: Option<&Block>) -> bool { + let current_best_value = other.map(|b| b.trace.bid_value).unwrap_or_default(); + self.trace.bid_value > current_best_value + } +} + #[derive(Debug)] pub struct LiveBuilderInput { pub provider: P, diff --git a/crates/rbuilder/src/building/builders/ordering_builder.rs b/crates/rbuilder/src/building/builders/ordering_builder.rs index 6c233595..435358ce 100644 --- a/crates/rbuilder/src/building/builders/ordering_builder.rs +++ b/crates/rbuilder/src/building/builders/ordering_builder.rs @@ -85,7 +85,6 @@ where config.clone(), input.root_hash_config, ); - // this is a hack to mark used orders until built block trace is implemented as a sane thing let mut removed_orders = Vec::new(); let mut use_suggested_fee_recipient_as_coinbase = config.coinbase_payment; diff --git a/crates/rbuilder/src/live_builder/block_output/bidding/parallel_sealer_bid_maker.rs b/crates/rbuilder/src/live_builder/block_output/bidding/parallel_sealer_bid_maker.rs index 4f3d4eeb..d836872b 100644 --- a/crates/rbuilder/src/live_builder/block_output/bidding/parallel_sealer_bid_maker.rs +++ b/crates/rbuilder/src/live_builder/block_output/bidding/parallel_sealer_bid_maker.rs @@ -4,9 +4,10 @@ use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use tracing::error; -use crate::live_builder::block_output::relay_submit::BlockBuildingSink; +use crate::building::builders::best_block_store::BestBlockTracker; use super::interfaces::{Bid, BidMaker}; +use tracing::info; /// BidMaker with a background task sealing multiple parallel bids concurrently. /// If several bids arrive while we hit the max of concurrent sealings we keep only the last one since we assume new is better. @@ -53,13 +54,12 @@ impl PendingBid { impl ParallelSealerBidMaker { pub fn new( max_concurrent_seals: usize, - sink: Arc, + best_block_tracker: BestBlockTracker, cancel: CancellationToken, ) -> Self { let notify = Arc::new(Notify::new()); let pending_bid = Arc::new(PendingBid::new(notify.clone())); let mut sealing_process = ParallelSealerBidMakerProcess { - sink, cancel, pending_bid: pending_bid.clone(), notify: notify.clone(), @@ -68,6 +68,7 @@ impl ParallelSealerBidMaker { seals_in_progress: Default::default(), }), max_concurrent_seals, + best_block_tracker: best_block_tracker.clone(), }; tokio::task::spawn(async move { @@ -86,8 +87,6 @@ struct SealsInProgress { /// Background task waiting for new bids to seal. struct ParallelSealerBidMakerProcess { - /// Destination of the finished blocks. - sink: Arc, cancel: CancellationToken, pending_bid: Arc, /// Signaled when we set a new bid or a sealing finishes. @@ -96,10 +95,14 @@ struct ParallelSealerBidMakerProcess { seal_control: Arc, /// Maximum number of concurrent sealings. max_concurrent_seals: usize, + /// Best block tracker. + best_block_tracker: BestBlockTracker, } impl ParallelSealerBidMakerProcess { async fn run(&mut self) { + // Initialize best block tracker + self.best_block_tracker.init().await; loop { tokio::select! { _ = self.wait_for_change() => self.check_for_new_bid().await, @@ -112,8 +115,9 @@ impl ParallelSealerBidMakerProcess { self.notify.notified().await } - /// block.finalize_block + self.sink.new_block inside spawn_blocking. + /// block.finalize_block inside spawn_blocking. async fn check_for_new_bid(&mut self) { + info!("Checking for new bid"); if *self.seal_control.seals_in_progress.lock() >= self.max_concurrent_seals { return; } @@ -124,10 +128,12 @@ impl ParallelSealerBidMakerProcess { // Take sealing "slot" *self.seal_control.seals_in_progress.lock() += 1; let seal_control = self.seal_control.clone(); - let sink = self.sink.clone(); - tokio::task::spawn_blocking(move || { + let best_block_tracker = self.best_block_tracker.clone(); + tokio::spawn(async move { match block.finalize_block(payout_tx_val) { - Ok(res) => sink.new_block(res.block), + Ok(res) => { + best_block_tracker.try_and_update(res.block).await; + } Err(error) => error!( block_number, ?error, diff --git a/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs b/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs index 08f38cb6..6f400a0f 100644 --- a/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs +++ b/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs @@ -1,4 +1,4 @@ -use crate::live_builder::block_output::relay_submit::BlockBuildingSink; +use crate::building::builders::best_block_store::BestBlockTracker; use parking_lot::Mutex; use std::sync::Arc; use tokio::sync::Notify; @@ -51,12 +51,12 @@ impl PendingBid { } impl SequentialSealerBidMaker { - pub fn new(sink: Arc, cancel: CancellationToken) -> Self { + pub fn new(best_block_tracker: BestBlockTracker, cancel: CancellationToken) -> Self { let pending_bid = Arc::new(PendingBid::new()); let mut sealing_process = SequentialSealerBidMakerProcess { - sink, cancel, pending_bid: pending_bid.clone(), + best_block_tracker: best_block_tracker.clone(), }; tokio::task::spawn(async move { @@ -69,13 +69,15 @@ impl SequentialSealerBidMaker { /// Background task waiting for new bids to seal. struct SequentialSealerBidMakerProcess { /// Destination of the finished blocks. - sink: Arc, cancel: CancellationToken, pending_bid: Arc, + best_block_tracker: BestBlockTracker, } impl SequentialSealerBidMakerProcess { async fn run(&mut self) { + // Initialize best block tracker + self.best_block_tracker.init().await; loop { tokio::select! { _ = self.pending_bid.wait_for_change() => self.check_for_new_bid().await, @@ -84,7 +86,7 @@ impl SequentialSealerBidMakerProcess { } } - /// block.finalize_block + self.sink.new_block inside spawn_blocking. + /// block.finalize_block inside spawn_blocking. async fn check_for_new_bid(&mut self) { if let Some(bid) = self.pending_bid.consume_bid() { let payout_tx_val = bid.payout_tx_value(); @@ -92,7 +94,9 @@ impl SequentialSealerBidMakerProcess { let block_number = block.building_context().block(); match tokio::task::spawn_blocking(move || block.finalize_block(payout_tx_val)).await { Ok(finalize_res) => match finalize_res { - Ok(res) => self.sink.new_block(res.block), + Ok(res) => { + self.best_block_tracker.try_and_update(res.block).await; + } Err(error) => { if error.is_critical() { error!( diff --git a/crates/rbuilder/src/live_builder/block_output/block_sealing_bidder_factory.rs b/crates/rbuilder/src/live_builder/block_output/block_sealing_bidder_factory.rs index 80f855ce..37274a36 100644 --- a/crates/rbuilder/src/live_builder/block_output/block_sealing_bidder_factory.rs +++ b/crates/rbuilder/src/live_builder/block_output/block_sealing_bidder_factory.rs @@ -1,5 +1,8 @@ use crate::{ - building::builders::{UnfinishedBlockBuildingSink, UnfinishedBlockBuildingSinkFactory}, + building::builders::{ + best_block_store::{BestBlockTracker, GlobalBestBlockStore}, + UnfinishedBlockBuildingSink, UnfinishedBlockBuildingSinkFactory, + }, live_builder::payload_events::MevBoostSlotData, }; use alloy_primitives::U256; @@ -15,7 +18,7 @@ use super::{ sequential_sealer_bid_maker::SequentialSealerBidMaker, wallet_balance_watcher::WalletBalanceWatcher, }, - relay_submit::BuilderSinkFactory, + relay_submit::RelayCoordinator, }; /// UnfinishedBlockBuildingSinkFactory to bid blocks against the competition. @@ -27,7 +30,7 @@ pub struct BlockSealingBidderFactory

{ /// Factory for the SlotBidder for blocks. bidding_service: Box, /// Factory for the final destination for blocks. - block_sink_factory: Box, + relay_coordinator: RelayCoordinator, /// SlotBidder are subscribed to the proper block in the bid_value_source. competition_bid_value_source: Arc, wallet_balance_watcher: WalletBalanceWatcher

, @@ -52,14 +55,14 @@ impl

Debug for BlockSealingBidderFactory

{ impl

BlockSealingBidderFactory

{ pub fn new( bidding_service: Box, - block_sink_factory: Box, + relay_coordinator: RelayCoordinator, competition_bid_value_source: Arc, wallet_balance_watcher: WalletBalanceWatcher

, max_concurrent_seals: usize, ) -> Self { Self { bidding_service, - block_sink_factory, + relay_coordinator, competition_bid_value_source, wallet_balance_watcher, max_concurrent_seals, @@ -88,6 +91,12 @@ where slot_data: MevBoostSlotData, cancel: tokio_util::sync::CancellationToken, ) -> std::sync::Arc { + // Create a new store for this block + let best_block_store = GlobalBestBlockStore::new(); + + // Create a new best block tracker for this block + let best_block_tracker = BestBlockTracker::new(best_block_store.clone()); + match self .wallet_balance_watcher .update_to_block(slot_data.block() - 1) @@ -102,20 +111,23 @@ where } } - let finished_block_sink = self.block_sink_factory.create_builder_sink( + // start submission job + self.relay_coordinator.start_submission_job( slot_data.clone(), self.competition_bid_value_source.clone(), cancel.clone(), + best_block_store.clone(), ); + let sealer: Box = if self.max_concurrent_seals == 1 { Box::new(SequentialSealerBidMaker::new( - Arc::from(finished_block_sink), + best_block_tracker.clone(), cancel.clone(), )) } else { Box::new(ParallelSealerBidMaker::new( self.max_concurrent_seals, - Arc::from(finished_block_sink), + best_block_tracker.clone(), cancel.clone(), )) }; diff --git a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs index 52053b1d..741a355b 100644 --- a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs +++ b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs @@ -1,5 +1,5 @@ use crate::{ - building::builders::Block, + building::builders::best_block_store::GlobalBestBlockStore, live_builder::payload_events::MevBoostSlotData, mev_boost::{ sign_block_for_relay, BLSBlockSigner, RelayError, SubmitBlockErr, SubmitBlockRequest, @@ -16,14 +16,13 @@ use crate::{ }; use ahash::HashMap; use alloy_primitives::{utils::format_ether, U256}; -use mockall::automock; -use parking_lot::Mutex; +use futures::StreamExt; use reth_chainspec::ChainSpec; use reth_primitives::SealedBlock; use std::sync::Arc; -use tokio::{sync::Notify, time::Instant}; +use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level}; +use tracing::{debug, error, event, info, info_span, trace, warn, Instrument, Level}; use super::{ bid_observer::BidObserver, @@ -32,68 +31,6 @@ use super::{ const SIM_ERROR_CATEGORY: &str = "submit_block_simulation"; const VALIDATION_ERROR_CATEGORY: &str = "validate_block_simulation"; - -/// Contains the best block so far. -/// Building updates via compare_and_update while relay submitter polls via take_best_block. -/// A new block can be waited without polling via wait_for_change. -#[derive(Debug, Default)] -pub struct BestBlockCell { - block: Mutex>, - block_notify: Notify, -} - -impl BestBlockCell { - pub fn compare_and_update(&self, block: Block) { - let mut best_block = self.block.lock(); - let old_value = best_block - .as_ref() - .map(|b| b.trace.bid_value) - .unwrap_or_default(); - if block.trace.bid_value > old_value { - *best_block = Some(block); - self.block_notify.notify_one(); - } - } - - pub fn take_best_block(&self) -> Option { - self.block.lock().take() - } - - pub async fn wait_for_change(&self) { - self.block_notify.notified().await - } -} - -/// Adapts BestBlockCell to BlockBuildingSink by calling compare_and_update on new_block. -#[derive(Debug)] -struct BestBlockCellToBlockBuildingSink { - best_block_cell: Arc, -} - -impl BlockBuildingSink for BestBlockCellToBlockBuildingSink { - fn new_block(&self, block: Block) { - self.best_block_cell.compare_and_update(block); - } -} - -/// Final destination of blocks (eg: submit to the relays). -#[automock] -pub trait BlockBuildingSink: std::fmt::Debug + Send + Sync { - fn new_block(&self, block: Block); -} - -/// Factory used to create BlockBuildingSink.. -pub trait BuilderSinkFactory: std::fmt::Debug + Send + Sync { - /// # Arguments - /// slot_bidder: Not always needed but simplifies the design. - fn create_builder_sink( - &self, - slot_data: MevBoostSlotData, - competition_bid_value_source: Arc, - cancel: CancellationToken, - ) -> Box; -} - #[derive(Debug)] pub struct SubmissionConfig { pub chain_spec: Arc, @@ -128,12 +65,12 @@ struct BuiltBlockInfo { /// returns the best bid made #[allow(clippy::too_many_arguments)] async fn run_submit_to_relays_job( - best_bid: Arc, slot_data: MevBoostSlotData, relays: Vec, config: Arc, cancel: CancellationToken, competition_bid_value_source: Arc, + best_block_store: GlobalBestBlockStore, ) -> Option { let best_bid_sync_source = BestBidSyncSource::new( competition_bid_value_source, @@ -155,24 +92,16 @@ async fn run_submit_to_relays_job( (normal_relays, optimistic_relays) }; - let mut last_bid_value = U256::from(0); - 'submit: loop { - if cancel.is_cancelled() { - break 'submit res; - } + // Create a stream that automatically ends when cancelled + let block_stream = best_block_store.subscribe().take_until(cancel.cancelled()); - best_bid.wait_for_change().await; - let block = if let Some(new_block) = best_bid.take_best_block() { - if new_block.trace.bid_value > last_bid_value { - last_bid_value = new_block.trace.bid_value; - new_block - } else { - continue 'submit; - } - } else { - continue 'submit; - }; + tokio::pin!(block_stream); + while let Some(block) = block_stream.next().await { + info!( + "got a block from the store! builder: {:?}, block number: {:?}", + block.builder_name, block.sealed_block.number + ); res = Some(BuiltBlockInfo { bid_value: block.trace.bid_value, true_bid_value: block.trace.true_bid_value, @@ -223,7 +152,7 @@ async fn run_submit_to_relays_job( Ok(res) => res, Err(err) => { error!(parent: &submission_span, err = ?err, "Error signing block for relay"); - continue 'submit; + return res; } }; let optimistic_signed_submission = match sign_block_for_relay( @@ -239,7 +168,7 @@ async fn run_submit_to_relays_job( Ok(res) => res, Err(err) => { error!(parent: &submission_span, err = ?err, "Error signing block for relay"); - continue 'submit; + return res; } }; (normal_signed_submission, optimistic_signed_submission) @@ -256,7 +185,7 @@ async fn run_submit_to_relays_job( ) .instrument(submission_span) .await; - continue 'submit; + return res; } measure_block_e2e_latency(&block.trace.included_orders); @@ -331,23 +260,24 @@ async fn run_submit_to_relays_job( ); }) } + res } pub async fn run_submit_to_relays_job_and_metrics( - best_bid: Arc, slot_data: MevBoostSlotData, relays: Vec, config: Arc, cancel: CancellationToken, competition_bid_value_source: Arc, + best_block_store: GlobalBestBlockStore, ) { let last_build_block_info = run_submit_to_relays_job( - best_bid, slot_data, relays, config, cancel, competition_bid_value_source, + best_block_store, ) .await; if let Some(last_build_block_info) = last_build_block_info { @@ -504,12 +434,12 @@ async fn submit_bid_to_the_relay( /// Real life BuilderSinkFactory that send the blocks to the Relay #[derive(Debug)] -pub struct RelaySubmitSinkFactory { +pub struct RelayCoordinator { submission_config: Arc, relays: HashMap, } -impl RelaySubmitSinkFactory { +impl RelayCoordinator { pub fn new(submission_config: SubmissionConfig, relays: Vec) -> Self { let relays = relays .into_iter() @@ -520,17 +450,14 @@ impl RelaySubmitSinkFactory { relays, } } -} -impl BuilderSinkFactory for RelaySubmitSinkFactory { - fn create_builder_sink( + pub fn start_submission_job( &self, slot_data: MevBoostSlotData, competition_bid_value_source: Arc, cancel: CancellationToken, - ) -> Box { - let best_block_cell = Arc::new(BestBlockCell::default()); - + best_block_store: GlobalBestBlockStore, + ) { let relays = slot_data .relays .iter() @@ -542,13 +469,12 @@ impl BuilderSinkFactory for RelaySubmitSinkFactory { }) .collect(); tokio::spawn(run_submit_to_relays_job_and_metrics( - best_block_cell.clone(), slot_data, relays, self.submission_config.clone(), cancel, competition_bid_value_source, + best_block_store, )); - Box::new(BestBlockCellToBlockBuildingSink { best_block_cell }) } } diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 508787d7..c2599907 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -11,7 +11,7 @@ use super::{ wallet_balance_watcher::WalletBalanceWatcher, }, block_sealing_bidder_factory::BlockSealingBidderFactory, - relay_submit::{RelaySubmitSinkFactory, SubmissionConfig}, + relay_submit::{RelayCoordinator, SubmissionConfig}, }, }; use crate::{ @@ -27,8 +27,7 @@ use crate::{ Sorting, }, live_builder::{ - base_config::EnvOrValue, block_output::relay_submit::BuilderSinkFactory, - cli::LiveBuilderConfig, payload_events::MevBoostSlotDataGenerator, + base_config::EnvOrValue, cli::LiveBuilderConfig, payload_events::MevBoostSlotDataGenerator, }, mev_boost::BLSBlockSigner, primitives::mev_boost::{MevBoostRelay, RelayConfig}, @@ -255,12 +254,12 @@ impl L1Config { }) } - /// Creates the RelaySubmitSinkFactory and also returns the associated relays. - pub fn create_relays_sealed_sink_factory( + /// Creates the relay coordinator and also returns the associated relays. + pub fn create_relays_and_relay_coordinator( &self, chain_spec: Arc, bid_observer: Box, - ) -> eyre::Result<(Box, Vec)> { + ) -> eyre::Result<(RelayCoordinator, Vec)> { let submission_config = self.submission_config(chain_spec, bid_observer)?; info!( "Builder mev boost normal relay pubkey: {:?}", @@ -278,11 +277,9 @@ impl L1Config { ); let relays = self.create_relays()?; - let sink_factory: Box = Box::new(RelaySubmitSinkFactory::new( - submission_config, - relays.clone(), - )); - Ok((sink_factory, relays)) + let relay_coordinator: RelayCoordinator = + RelayCoordinator::new(submission_config, relays.clone()); + Ok((relay_coordinator, relays)) } } @@ -303,7 +300,7 @@ impl LiveBuilderConfig for Config { + Clone + 'static, { - let (sink_sealed_factory, relays) = self.l1_config.create_relays_sealed_sink_factory( + let (relay_coordinator, relays) = self.l1_config.create_relays_and_relay_coordinator( self.base_config.chain_spec()?, Box::new(NullBidObserver {}), )?; @@ -318,7 +315,7 @@ impl LiveBuilderConfig for Config { let sink_factory = Box::new(BlockSealingBidderFactory::new( bidding_service, - sink_sealed_factory, + relay_coordinator, Arc::new(NullBidValueSource {}), wallet_balance_watcher, self.l1_config.max_concurrent_seals as usize, @@ -340,6 +337,7 @@ impl LiveBuilderConfig for Config { ) .await?; let root_hash_config = self.base_config.live_root_hash_config()?; + let builders = create_builders( self.live_builders()?, root_hash_config, diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index abdbd973..6d314c46 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -252,7 +252,6 @@ where self.global_cancellation.clone(), time_until_slot_end.try_into().unwrap_or_default(), ); - if let Some(watchdog_sender) = watchdog_sender.as_ref() { watchdog_sender.try_send(()).unwrap_or_default(); };