diff --git a/host/src/preflight.rs b/host/src/preflight.rs index 90ceb1eb..35f20089 100644 --- a/host/src/preflight.rs +++ b/host/src/preflight.rs @@ -13,7 +13,9 @@ use anyhow::{anyhow, bail, Result}; use c_kzg::{Blob, KzgCommitment}; use hashbrown::HashSet; use raiko_lib::{ - builder::{prepare::TaikoHeaderPrepStrategy, BlockBuilder, TkoTxExecStrategy}, + builder::{ + prepare::TaikoHeaderPrepStrategy, BlockBuilder, OptimisticDatabase, TkoTxExecStrategy, + }, consts::{get_network_spec, Network}, input::{ decode_anchor, proposeBlockCall, taiko_a6::BlockProposed as TestnetBlockProposed, @@ -28,7 +30,7 @@ use raiko_primitives::{ }; use serde::{Deserialize, Serialize}; -use crate::provider_db::{MeasuredProviderDb, ProviderDb}; +use crate::provider_db::ProviderDb; pub fn preflight( rpc_url: Option, @@ -41,6 +43,7 @@ pub fn preflight( let provider = ProviderBuilder::new().provider(RootProvider::new_http( reqwest::Url::parse(&rpc_url.clone().unwrap()).expect("invalid rpc url"), )); + let is_local = provider.client().is_local(); let measurement = Measurement::start("Fetching block data...", true); @@ -189,13 +192,30 @@ pub fn preflight( network, parent_block.header.number.unwrap().try_into().unwrap(), )?; + let mut builder = BlockBuilder::new(&input) - .with_db(MeasuredProviderDb::new(provider_db)) - .prepare_header::()? - .execute_transactions::()?; + .with_db(provider_db) + .prepare_header::()?; + + // Optimize data gathering by executing the transactions multiple times so data can be requested in batches + let max_iterations = if is_local { 1 } else { 50 }; + let mut done = false; + let mut num_iterations = 0; + while !done { + println!("Execution iteration {num_iterations}..."); + builder.mut_db().unwrap().optimistic = if num_iterations + 1 < max_iterations { + true + } else { + false + }; + builder = builder.execute_transactions::()?; + if builder.mut_db().unwrap().fetch_data() { + done = true; + } + num_iterations += 1; + } + builder = builder.prepare_header::()?; let provider_db = builder.mut_db().unwrap(); - provider_db.print_report(); - let provider_db = provider_db.db(); // Gather inclusion proofs for the initial and final state let measurement = Measurement::start("Fetching storage proofs...", true); diff --git a/host/src/provider_db.rs b/host/src/provider_db.rs index 1d6c9eb6..59b57977 100644 --- a/host/src/provider_db.rs +++ b/host/src/provider_db.rs @@ -11,19 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - ops::AddAssign, - time::{Duration, Instant}, -}; +use std::{collections::HashSet, mem::take}; use alloy_consensus::Header as AlloyConsensusHeader; -use alloy_primitives::{Bytes, Uint}; +use alloy_primitives::{Bytes, StorageKey, Uint}; use alloy_provider::{Provider, ReqwestProvider}; use alloy_rpc_client::{ClientBuilder, RpcClient}; use alloy_rpc_types::{Block, BlockId, BlockNumberOrTag, EIP1186AccountProofResponse}; use alloy_transport_http::Http; use raiko_lib::{ - clear_line, consts::Network, inplace_print, mem_db::MemDb, print_duration, + builder::OptimisticDatabase, clear_line, consts::Network, inplace_print, mem_db::MemDb, taiko_utils::to_header, }; use raiko_primitives::{Address, B256, U256}; @@ -44,6 +41,12 @@ pub struct ProviderDb { pub initial_headers: HashMap, pub current_db: MemDb, async_executor: Handle, + + pub optimistic: bool, + pub staging_db: MemDb, + pub pending_accounts: HashSet
, + pub pending_slots: HashSet<(Address, U256)>, + pub pending_block_hashes: HashSet, } impl ProviderDb { @@ -63,11 +66,18 @@ impl ProviderDb { initial_headers: Default::default(), current_db: Default::default(), async_executor: tokio::runtime::Handle::current(), + optimistic: false, + staging_db: Default::default(), + pending_accounts: HashSet::new(), + pending_slots: HashSet::new(), + pending_block_hashes: HashSet::new(), }; if network.is_taiko() { // Get the 256 history block hashes from the provider at first time for anchor // transaction. - let initial_history_blocks = provider_db.batch_get_history_headers(block_number)?; + let start = block_number.saturating_sub(255); + let block_numbers = (start..=block_number).collect(); + let initial_history_blocks = provider_db.fetch_blocks(&block_numbers)?; for block in initial_history_blocks { let block_number: u64 = block.header.number.unwrap().try_into().unwrap(); let block_hash = block.header.hash.unwrap(); @@ -82,32 +92,144 @@ impl ProviderDb { Ok(provider_db) } - fn batch_get_history_headers( - &mut self, - block_number: u64, - ) -> Result, anyhow::Error> { - let mut batch = self.client.new_batch(); - let start = block_number.saturating_sub(255); - let mut requests = vec![]; - - for block_number in start..=block_number { - requests.push(Box::pin(batch.add_call( - "eth_getBlockByNumber", - &(BlockNumberOrTag::from(block_number), false), - )?)); + fn fetch_blocks(&mut self, block_numbers: &Vec) -> Result, anyhow::Error> { + let mut all_blocks = Vec::new(); + + let max_batch_size = 32; + for block_numbers in block_numbers.chunks(max_batch_size) { + let mut batch = self.client.new_batch(); + let mut requests = vec![]; + + for block_number in block_numbers.iter() { + requests.push(Box::pin(batch.add_call( + "eth_getBlockByNumber", + &(BlockNumberOrTag::from(*block_number), false), + )?)); + } + + let mut blocks = self.async_executor.block_on(async { + batch.send().await?; + let mut blocks = vec![]; + // Collect the data from the batch + for request in requests.into_iter() { + blocks.push(request.await?); + } + Ok::<_, anyhow::Error>(blocks) + })?; + + all_blocks.append(&mut blocks); } - let blocks = self.async_executor.block_on(async { - batch.send().await?; - let mut blocks = vec![]; - // Collect the data from the batch - for request in requests.into_iter() { - blocks.push(request.await?); + Ok(all_blocks) + } + + fn fetch_accounts(&self, accounts: &Vec
) -> Result, anyhow::Error> { + let mut all_accounts = Vec::new(); + + let max_batch_size = 250; + for accounts in accounts.chunks(max_batch_size) { + let mut batch = self.client.new_batch(); + + let mut nonce_requests = Vec::new(); + let mut balance_requests = Vec::new(); + let mut code_requests = Vec::new(); + + for address in accounts { + nonce_requests.push(Box::pin( + batch + .add_call::<_, Uint<64, 1>>( + "eth_getTransactionCount", + &(address, Some(BlockId::from(self.block_number))), + ) + .unwrap(), + )); + balance_requests.push(Box::pin( + batch + .add_call::<_, Uint<256, 4>>( + "eth_getBalance", + &(address, Some(BlockId::from(self.block_number))), + ) + .unwrap(), + )); + code_requests.push(Box::pin( + batch + .add_call::<_, Bytes>( + "eth_getCode", + &(address, Some(BlockId::from(self.block_number))), + ) + .unwrap(), + )); + } + + let mut accounts = self.async_executor.block_on(async { + batch.send().await?; + let mut accounts = vec![]; + // Collect the data from the batch + for (nonce_request, (balance_request, code_request)) in nonce_requests + .into_iter() + .zip(balance_requests.into_iter().zip(code_requests.into_iter())) + { + let (nonce, balance, code) = ( + nonce_request.await?, + balance_request.await?, + code_request.await?, + ); + + let account_info = AccountInfo::new( + balance, + nonce.try_into().unwrap(), + Bytecode::new_raw(code.clone()).hash_slow(), + Bytecode::new_raw(code), + ); + + accounts.push(account_info); + } + Ok::<_, anyhow::Error>(accounts) + })?; + + all_accounts.append(&mut accounts); + } + + Ok(all_accounts) + } + + fn fetch_storage_slots( + &self, + accounts: &Vec<(Address, U256)>, + ) -> Result, anyhow::Error> { + let mut all_values = Vec::new(); + + let max_batch_size = 1000; + for accounts in accounts.chunks(max_batch_size) { + let mut batch = self.client.new_batch(); + + let mut requests = Vec::new(); + + for (address, key) in accounts { + requests.push(Box::pin( + batch + .add_call::<_, U256>( + "eth_getStorageAt", + &(address, key, Some(BlockId::from(self.block_number))), + ) + .unwrap(), + )); } - Ok::<_, anyhow::Error>(blocks) - })?; - Ok(blocks) + let mut values = self.async_executor.block_on(async { + batch.send().await?; + let mut values = vec![]; + // Collect the data from the batch + for request in requests.into_iter() { + values.push(request.await?); + } + Ok::<_, anyhow::Error>(values) + })?; + + all_values.append(&mut values); + } + + Ok(all_values) } fn get_storage_proofs( @@ -151,7 +273,10 @@ impl ProviderDb { } // Extract the keys to process - let keys_to_process = keys.drain(0..num_keys_to_process).collect::>(); + let keys_to_process = keys + .drain(0..num_keys_to_process) + .map(|v| StorageKey::from(v)) + .collect::>(); // Add the request requests.push(Box::pin( @@ -269,6 +394,12 @@ impl ProviderDb { .collect(); Ok(headers) } + + pub fn is_valid_run(&self) -> bool { + self.pending_accounts.is_empty() + && self.pending_slots.is_empty() + && self.pending_block_hashes.is_empty() + } } impl Database for ProviderDb { @@ -282,52 +413,35 @@ impl Database for ProviderDb { if let Ok(db_result) = self.initial_db.basic(address) { return Ok(db_result); } + if let Ok(db_result) = self.staging_db.basic(address) { + if self.is_valid_run() { + self.initial_db + .insert_account_info(address, db_result.clone().unwrap()); + } + return Ok(db_result); + } - // Create a batch request for all account values - let mut batch = self.client.new_batch(); - - let nonce_request = batch - .add_call::<_, Uint<64, 1>>( - "eth_getTransactionCount", - &(address, Some(BlockId::from(self.block_number))), - ) - .unwrap(); - let balance_request = batch - .add_call::<_, Uint<256, 4>>( - "eth_getBalance", - &(address, Some(BlockId::from(self.block_number))), - ) - .unwrap(); - let code_request = batch - .add_call::<_, Bytes>( - "eth_getCode", - &(address, Some(BlockId::from(self.block_number))), - ) - .unwrap(); + // In optimistic mode, don't wait on the data and just return some default values + if self.optimistic { + self.pending_accounts.insert(address); + + let code = Bytes::from(vec![]); + let account_info = AccountInfo::new( + U256::ZERO, + u64::MAX, + Bytecode::new_raw(code.clone()).hash_slow(), + Bytecode::new_raw(code), + ); + return Ok(Some(account_info)); + } - // Send the batch - self.async_executor.block_on(async { batch.send().await })?; - - // Collect the data from the batch - let (nonce, balance, code) = self.async_executor.block_on(async { - Ok::<_, Self::Error>(( - nonce_request.await?, - balance_request.await?, - code_request.await?, - )) - })?; - - let account_info = AccountInfo::new( - balance, - nonce.try_into().unwrap(), - Bytecode::new_raw(code.clone()).hash_slow(), - Bytecode::new_raw(code), - ); + // Fetch the account + let account = self.fetch_accounts(&vec![address])?[0].clone(); // Insert the account into the initial database. self.initial_db - .insert_account_info(address, account_info.clone()); - Ok(Some(account_info)) + .insert_account_info(address, account.clone()); + Ok(Some(account)) } fn storage(&mut self, address: Address, index: U256) -> Result { @@ -338,43 +452,61 @@ impl Database for ProviderDb { if let Ok(db_result) = self.initial_db.storage(address, index) { return Ok(db_result); } + if let Ok(db_result) = self.staging_db.storage(address, index) { + if self.is_valid_run() { + self.initial_db + .insert_account_storage(&address, index, db_result.clone()); + } + return Ok(db_result); + } + + // In optimistic mode, don't wait on the data and just return a default value + if self.optimistic { + self.basic(address)?; + self.pending_slots.insert((address, index)); + return Ok(U256::default()); + } - // Get the storage slot from the provider. + // Makes sure the account is also always loaded self.initial_db.basic(address)?; - let storage = self.async_executor.block_on(async { - self.provider - .get_storage_at( - address.into_array().into(), - index, - Some(BlockId::from(self.block_number)), - ) - .await - })?; + + // Fetch the storage value + let value = self.fetch_storage_slots(&vec![(address, index)])?[0].clone(); + self.initial_db - .insert_account_storage(&address, index, storage); - Ok(storage) + .insert_account_storage(&address, index, value); + Ok(value) } fn block_hash(&mut self, number: U256) -> Result { + let block_number = u64::try_from(number).unwrap(); + // Check if the block hash is in the current database. if let Ok(block_hash) = self.initial_db.block_hash(number) { return Ok(block_hash); } + if let Ok(db_result) = self.staging_db.block_hash(number) { + if self.is_valid_run() { + self.initial_db + .insert_block_hash(block_number, db_result.clone()); + } + return Ok(db_result); + } + + // In optimistic mode, don't wait on the data and just return some default values + if self.optimistic { + self.pending_block_hashes.insert(block_number); + return Ok(B256::default()); + } + + // Fetch the block hash + let block_hash = self.fetch_blocks(&vec![block_number])?[0] + .header + .hash + .unwrap() + .0 + .into(); - let block_number = u64::try_from(number).unwrap(); - // Get the block hash from the provider. - let block_hash = self.async_executor.block_on(async { - self.provider - .get_block_by_number(block_number.into(), false) - .await - .unwrap() - .unwrap() - .header - .hash - .unwrap() - .0 - .into() - }); self.initial_db.insert_block_hash(block_number, block_hash); Ok(block_hash) } @@ -390,96 +522,57 @@ impl DatabaseCommit for ProviderDb { } } -pub struct MeasuredProviderDb { - pub provider: ProviderDb, - pub num_basic: u64, - pub time_basic: Duration, - pub num_storage: u64, - pub time_storage: Duration, - pub num_block_hash: u64, - pub time_block_hash: Duration, - pub num_code_by_hash: u64, - pub time_code_by_hash: Duration, -} - -impl MeasuredProviderDb { - pub fn new(provider: ProviderDb) -> Self { - MeasuredProviderDb { - provider, - num_basic: 0, - time_basic: Duration::default(), - num_storage: 0, - time_storage: Duration::default(), - num_block_hash: 0, - time_block_hash: Duration::default(), - num_code_by_hash: 0, - time_code_by_hash: Duration::default(), - } - } - - pub fn db(&mut self) -> &mut ProviderDb { - &mut self.provider - } +impl OptimisticDatabase for ProviderDb { + fn fetch_data(&mut self) -> bool { + //println!("all accounts touched: {:?}", self.pending_accounts); + //println!("all slots touched: {:?}", self.pending_slots); + //println!("all block hashes touched: {:?}", self.pending_block_hashes); - pub fn print_report(&self) { - println!("db accesses: "); - print_duration( - &format!("- account [{} ops]: ", self.num_basic), - self.time_basic, - ); - print_duration( - &format!("- storage [{} ops]: ", self.num_storage), - self.time_storage, - ); - print_duration( - &format!("- block_hash [{} ops]: ", self.num_block_hash), - self.time_block_hash, - ); - print_duration( - &format!("- code_by_hash [{} ops]: ", self.num_code_by_hash), - self.time_code_by_hash, - ); - } -} + // This run was valid when no pending work was scheduled + let valid_run = self.is_valid_run(); -impl Database for MeasuredProviderDb { - type Error = anyhow::Error; + let accounts = self + .fetch_accounts(&self.pending_accounts.iter().cloned().collect()) + .unwrap(); + for (address, account) in take(&mut self.pending_accounts) + .into_iter() + .zip(accounts.iter()) + { + self.staging_db + .insert_account_info(address.clone(), account.clone()); + } - fn basic(&mut self, address: Address) -> Result, Self::Error> { - self.num_basic += 1; - let start = Instant::now(); - let res = self.provider.basic(address); - self.time_basic.add_assign(start.elapsed()); - res - } + let slots = self + .fetch_storage_slots(&self.pending_slots.iter().cloned().collect()) + .unwrap(); + for ((address, index), value) in take(&mut self.pending_slots).into_iter().zip(slots.iter()) + { + self.staging_db + .insert_account_storage(&address, index.clone(), value.clone()); + } - fn storage(&mut self, address: Address, index: U256) -> Result { - self.num_storage += 1; - let start = Instant::now(); - let res = self.provider.storage(address, index); - self.time_storage.add_assign(start.elapsed()); - res - } + let blocks = self + .fetch_blocks(&self.pending_block_hashes.iter().cloned().collect()) + .unwrap(); + for (block_number, block) in take(&mut self.pending_block_hashes) + .into_iter() + .zip(blocks.iter()) + { + self.staging_db + .insert_block_hash(block_number, block.header.hash.unwrap().0.into()); + self.initial_headers + .insert(block_number, to_header(&block.header)); + } - fn block_hash(&mut self, number: U256) -> Result { - self.num_block_hash += 1; - let start = Instant::now(); - let res = self.provider.block_hash(number); - self.time_block_hash.add_assign(start.elapsed()); - res - } + // If this wasn't a valid run, clear the post execution database + if !valid_run { + self.current_db = Default::default(); + } - fn code_by_hash(&mut self, _code_hash: B256) -> Result { - self.num_code_by_hash += 1; - let start = Instant::now(); - let res = self.provider.code_by_hash(_code_hash); - self.time_code_by_hash.add_assign(start.elapsed()); - res + valid_run } -} -impl DatabaseCommit for MeasuredProviderDb { - fn commit(&mut self, changes: HashMap) { - self.provider.commit(changes) + fn is_optimistic(&self) -> bool { + self.optimistic } } diff --git a/lib/src/builder/execute.rs b/lib/src/builder/execute.rs index cd270d18..eb0d0757 100644 --- a/lib/src/builder/execute.rs +++ b/lib/src/builder/execute.rs @@ -32,7 +32,7 @@ use revm::{ taiko, Database, DatabaseCommit, Evm, }; -use super::TxExecStrategy; +use super::{OptimisticDatabase, TxExecStrategy}; use crate::{ builder::BlockBuilder, clear_line, @@ -51,12 +51,14 @@ pub struct TkoTxExecStrategy {} impl TxExecStrategy for TkoTxExecStrategy { fn execute_transactions(mut block_builder: BlockBuilder) -> Result> where - D: Database + DatabaseCommit, + D: Database + DatabaseCommit + OptimisticDatabase, ::Error: Debug, { let mut tx_transact_duration = Duration::default(); let mut tx_misc_duration = Duration::default(); + let is_optimistic = block_builder.db().unwrap().is_optimistic(); + let header = block_builder .header .as_mut() @@ -208,6 +210,9 @@ impl TxExecStrategy for TkoTxExecStrategy { // verify transaction gas let block_available_gas = block_builder.input.gas_limit - cumulative_gas_used; if block_available_gas < tx_env.gas_limit { + if is_optimistic { + continue; + } if is_anchor { bail!("Error at transaction {}: gas exceeds block limit", tx_no); } @@ -235,6 +240,9 @@ impl TxExecStrategy for TkoTxExecStrategy { let ResultAndState { result, state } = match evm.transact() { Ok(result) => result, Err(err) => { + if is_optimistic { + continue; + } if !is_taiko { bail!("tx failed to execute successfully: {:?}", err); } @@ -265,7 +273,7 @@ impl TxExecStrategy for TkoTxExecStrategy { let start = Instant::now(); // anchor tx needs to succeed - if is_anchor && !result.is_success() { + if is_anchor && !result.is_success() && !is_optimistic { bail!( "Error at transaction {tx_no}: execute anchor failed {result:?}, output {:?}", result.output().map(|o| from_utf8(o).unwrap_or_default()) @@ -311,10 +319,7 @@ impl TxExecStrategy for TkoTxExecStrategy { // process withdrawals unconditionally after any transactions let measurement = Measurement::start("Processing withdrawals...", true); let mut withdrawals_trie = MptNode::default(); - for (i, withdrawal) in take(&mut block_builder.input.withdrawals) - .into_iter() - .enumerate() - { + for (i, withdrawal) in block_builder.input.withdrawals.iter().enumerate() { // the withdrawal amount is given in Gwei let amount_wei = GWEI_TO_WEI .checked_mul(withdrawal.amount.try_into().unwrap()) diff --git a/lib/src/builder/mod.rs b/lib/src/builder/mod.rs index 747555ed..aeaca08e 100644 --- a/lib/src/builder/mod.rs +++ b/lib/src/builder/mod.rs @@ -34,6 +34,15 @@ mod finalize; mod initialize; pub mod prepare; +/// Optimistic database +pub trait OptimisticDatabase { + /// Handle post execution work + fn fetch_data(&mut self) -> bool; + + /// If the current database is optimistic + fn is_optimistic(&self) -> bool; +} + /// A generic builder for building a block. #[derive(Clone, Debug)] pub struct BlockBuilder { @@ -45,7 +54,7 @@ pub struct BlockBuilder { impl BlockBuilder where - D: Database + DatabaseCommit, + D: Database + DatabaseCommit + OptimisticDatabase, ::Error: core::fmt::Debug, { /// Creates a new block builder. @@ -123,6 +132,6 @@ impl BlockBuilderStrategy for TaikoStrategy { pub trait TxExecStrategy { fn execute_transactions(block_builder: BlockBuilder) -> Result> where - D: Database + DatabaseCommit, + D: Database + DatabaseCommit + OptimisticDatabase, ::Error: core::fmt::Debug; } diff --git a/lib/src/mem_db.rs b/lib/src/mem_db.rs index b3a7a15e..4c2bd98a 100644 --- a/lib/src/mem_db.rs +++ b/lib/src/mem_db.rs @@ -21,6 +21,7 @@ use revm::{ use serde::{Deserialize, Serialize}; use thiserror_no_std::Error as ThisError; +use crate::builder::OptimisticDatabase; #[cfg(not(feature = "std"))] use crate::no_std::*; @@ -280,3 +281,13 @@ impl DatabaseCommit for MemDb { } } } + +impl OptimisticDatabase for MemDb { + fn fetch_data(&mut self) -> bool { + true + } + + fn is_optimistic(&self) -> bool { + false + } +}