Skip to content

Commit

Permalink
fix bug about txPoolContentWithMinTip api
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp committed Oct 21, 2024
1 parent 7a33374 commit 95fc4ea
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 127 deletions.
74 changes: 22 additions & 52 deletions crates/taiko/consensus/proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,13 @@ impl Storage {
transactions: Vec<TransactionSigned>,
ommers: Vec<Header>,
provider: &Provider,
chain_spec: Arc<ChainSpec>,
chain_spec: &ChainSpec,
executor: &Executor,
beneficiary: Address,
block_max_gas_limit: u64,
max_bytes_per_tx_list: u64,
max_transactions_lists: u64,
base_fee: u64,
) -> Result<Vec<TaskResult>, RethError>
) -> Result<TaskResult, RethError>
where
Executor: BlockExecutorProvider,
Provider: StateProviderFactory + BlockReaderIdExt,
Expand All @@ -286,7 +285,7 @@ impl Storage {
provider,
withdrawals.as_ref(),
requests.as_ref(),
&chain_spec,
chain_spec,
beneficiary,
block_max_gas_limit,
base_fee,
Expand All @@ -309,61 +308,32 @@ impl Storage {

debug!(target: "taiko::proposer", transactions=?body, "after executing transactions");

let mut tx_lists = vec![];
let mut chunk_start = 0;
let mut last_compressed_buf = None;
let mut gas_used_start = 0;
let mut index = 0;
for idx in 0..body.len() {
if let Some((txs_range, estimated_gas_used, compressed_buf)) = {
let compressed_buf = encode_and_compress_tx_list(&body[chunk_start..=idx])
.map_err(BlockExecutionError::other)?;

if compressed_buf.len() > max_bytes_per_tx_list as usize {
// the first transaction in chunk is too large, so we need to skip it
if idx == chunk_start {
gas_used_start = receipts[idx].cumulative_gas_used;
chunk_start += 1;
// the first transaction in chunk is too large, so we need to skip it
None
} else {
// current chunk reaches the max_transactions_lists or max_bytes_per_tx_list
// and use previous transaction's data
let estimated_gas_used =
receipts[idx - 1].cumulative_gas_used - gas_used_start;
gas_used_start = receipts[idx - 1].cumulative_gas_used;
let range = chunk_start..idx;
chunk_start = idx;
Some((range, estimated_gas_used, last_compressed_buf.clone()))
}
}
// reach the limitation of max_transactions_lists or max_bytes_per_tx_list
else if idx - chunk_start + 1 == max_transactions_lists as usize {
let estimated_gas_used = receipts[idx].cumulative_gas_used - gas_used_start;
gas_used_start = receipts[idx].cumulative_gas_used;
let range = chunk_start..idx + 1;
chunk_start = idx + 1;
Some((range, estimated_gas_used, Some(compressed_buf)))
} else {
last_compressed_buf = Some(compressed_buf);
None
}
} {
tx_lists.push(TaskResult {
txs: body[txs_range]
.iter()
.cloned()
.map(|tx| reth_rpc_types_compat::transaction::from_signed(tx).unwrap())
.collect(),
estimated_gas_used,
bytes_length: compressed_buf.map_or(0, |b| b.len() as u64),
});
let compressed_buf = encode_and_compress_tx_list(&body[..=idx])
.map_err(BlockExecutionError::other)?;
if compressed_buf.len() <= max_bytes_per_tx_list as usize {
last_compressed_buf = Some(compressed_buf);
index = idx;
} else {
break;
}
}

Ok(tx_lists)

Ok(TaskResult {
txs: body[0..=index]
.iter()
.cloned()
.map(|tx| reth_rpc_types_compat::transaction::from_signed(tx).unwrap())
.collect(),
estimated_gas_used: receipts[index].cumulative_gas_used,
bytes_length: last_compressed_buf.map_or(0, |b| b.len() as u64),
})
}
}


fn encode_and_compress_tx_list(txs: &[TransactionSigned]) -> io::Result<Vec<u8>> {
let encoded_buf = alloy_rlp::encode(TransactionSignedList(txs));
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
Expand Down
134 changes: 59 additions & 75 deletions crates/taiko/consensus/proposer/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Storage, TaskArgs};
use crate::{Storage, TaskArgs, TaskResult};
use futures_util::{future::BoxFuture, FutureExt};
use reth_chainspec::ChainSpec;
use reth_evm::execute::BlockExecutorProvider;
Expand All @@ -14,6 +14,8 @@ use std::{
};
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::debug;
use reth_errors::RethError;
use reth_revm::database::StateProviderDatabase;

/// A Future that listens for new ready transactions and puts new blocks into storage
pub struct ProposerTask<Provider, Pool: TransactionPool, Executor> {
Expand Down Expand Up @@ -74,55 +76,38 @@ where

// this drives block production and
loop {
if let Some(trigger_args) = match this.trigger_args_rx.poll_recv(cx) {
Poll::Ready(Some(args)) => Some(args),
Poll::Ready(None) => return Poll::Ready(()),
_ => None,
} {
let mut best_txs = this.pool.best_transactions();
best_txs.skip_blobs();
debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions");
let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs
.filter(|tx| {
tx.effective_tip_per_gas(trigger_args.base_fee)
.map_or(false, |tip| tip >= trigger_args.min_tip as u128)
})
.partition(|tx| {
trigger_args
.local_accounts
.as_ref()
.map(|local_accounts| local_accounts.contains(&tx.sender()))
.unwrap_or_default()
});
local_txs.extend(remote_txs);
debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions");
// miner returned a set of transaction that we feed to the producer
this.queued.push_back((trigger_args, local_txs));
};

if this.insert_task.is_none() {
if this.queued.is_empty() {
// nothing to insert
break;
match this.trigger_args_rx.poll_recv(cx) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(None) => {
return Poll::Ready(());
}
Poll::Ready(Some(args)) => {
let mut best_txs = this.pool.best_transactions();
best_txs.skip_blobs();
debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions");
let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs
.filter(|tx| {
tx.effective_tip_per_gas(args.base_fee)
.map_or(false, |tip| tip >= args.min_tip as u128)
})
.partition(|tx| {
args
.local_accounts
.as_ref()
.map(|local_accounts| local_accounts.contains(&tx.sender()))
.unwrap_or_default()
});
local_txs.extend(remote_txs);
debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions");

// ready to queue in new insert task;
let (trigger_args, txs) = this.queued.pop_front().expect("not empty");

let client = this.provider.clone();
let chain_spec = Arc::clone(&this.chain_spec);
let pool = this.pool.clone();
let executor = this.block_executor.clone();

// Create the mining future that creates a block, notifies the engine that drives
// the pipeline
this.insert_task = Some(Box::pin(async move {
let txs: Vec<_> = txs
let client = this.provider.clone();
let executor = this.block_executor.clone();
let txs: Vec<_> = local_txs
.into_iter()
.map(|tx| tx.to_recovered_transaction().into_signed())
.collect();
let ommers = vec![];

let TaskArgs {
tx,
beneficiary,
Expand All @@ -131,44 +116,43 @@ where
max_transactions_lists,
base_fee,
..
} = trigger_args;
let res = Storage::build_and_execute(
txs.clone(),
ommers,
&client,
chain_spec,
&executor,
beneficiary,
block_max_gas_limit,
max_bytes_per_tx_list,
max_transactions_lists,
base_fee,
);
if res.is_ok() {
// clear all transactions from pool
pool.remove_transactions(txs.iter().map(|tx| tx.hash()).collect());
}
let _ = tx.send(res);
}));
}

if let Some(mut fut) = this.insert_task.take() {
match fut.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
this.insert_task = Some(fut);
break;
} = args;
let mut target_list: Vec<TaskResult> = vec![];
let mut result: Result<Vec<TaskResult>, RethError>;
for _ in 0..max_transactions_lists {
let res = Storage::build_and_execute(
txs.clone(),
vec![],
&client,
&this.chain_spec,
&executor,
beneficiary,
block_max_gas_limit,
max_bytes_per_tx_list,
base_fee,
);
match res {
Ok(target) => if target.txs.is_empty() {
break;
} else {
target_list.push(target);
}
Err(err) => {
result = Err(err);
break;
}
}
}
result = Ok(target_list);
let _ = tx.send(result);
}
}
}

Poll::Pending
}
}

impl<Client, Pool: TransactionPool, EvmConfig: std::fmt::Debug> std::fmt::Debug
for ProposerTask<Client, Pool, EvmConfig>
for ProposerTask<Client, Pool, EvmConfig>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MiningTask").finish_non_exhaustive()
Expand Down

0 comments on commit 95fc4ea

Please sign in to comment.