Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove UnprocessedTransactionStorage #4604

Merged
merged 17 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 2 additions & 64 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,7 @@ use {
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_core::{
banking_stage::{
committer::Committer,
consumer::Consumer,
leader_slot_metrics::LeaderSlotMetricsTracker,
qos_service::QosService,
unprocessed_packet_batches::*,
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStage, BankingStageStats,
},
banking_trace::BankingTracer,
},
solana_core::{banking_stage::BankingStage, banking_trace::BankingTracer},
solana_entry::entry::{next_hash, Entry},
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
Expand All @@ -38,10 +27,7 @@ use {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path_auto_delete,
},
solana_perf::{
packet::{to_packet_batches, Packet},
test_tx::test_tx,
},
solana_perf::packet::to_packet_batches,
solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry},
solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
Expand Down Expand Up @@ -82,54 +68,6 @@ fn check_txs(receiver: &Arc<Receiver<WorkingBankEntry>>, ref_tx_count: usize) {
assert_eq!(total, ref_tx_count);
}

#[bench]
fn bench_consume_buffered(bencher: &mut Bencher) {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000);
let bank = Bank::new_for_benches(&genesis_config)
.wrap_with_bank_forks_for_tests()
.0;
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(bank, blockstore, None, None);

let recorder = poh_recorder.read().unwrap().new_recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let tx = test_tx();
let transactions = vec![tx; 4194304];
let batches = transactions
.iter()
.filter_map(|transaction| {
let packet = Packet::from_data(None, transaction).ok().unwrap();
DeserializedPacket::new(packet).ok()
})
.collect::<Vec<_>>();
let batches_len = batches.len();
let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(batches, 2 * batches_len),
ThreadType::Transactions,
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
consumer.consume_buffered_packets(
&bank_start,
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
});

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}

fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec<Transaction> {
let to_pubkey = pubkey::new_rand();
let dummy = system_transaction::transfer(mint_keypair, &to_pubkey, 1, hash);
Expand Down
40 changes: 17 additions & 23 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
leader_slot_metrics::LeaderSlotMetricsTracker,
packet_receiver::PacketReceiver,
qos_service::QosService,
unprocessed_transaction_storage::UnprocessedTransactionStorage,
vote_storage::VoteStorage,
},
crate::{
banking_stage::{
Expand Down Expand Up @@ -65,7 +65,7 @@ pub mod consumer;
pub mod leader_slot_metrics;
pub mod qos_service;
pub mod unprocessed_packet_batches;
pub mod unprocessed_transaction_storage;
pub mod vote_storage;

mod consume_worker;
mod decision_maker;
Expand Down Expand Up @@ -441,10 +441,7 @@ impl BankingStage {
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
vote_source,
),
VoteStorage::new(latest_unprocessed_votes.clone(), vote_source),
));
}

Expand Down Expand Up @@ -590,7 +587,7 @@ impl BankingStage {
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
unprocessed_transaction_storage: UnprocessedTransactionStorage,
vote_storage: VoteStorage,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let consumer = Consumer::new(
Expand All @@ -609,7 +606,7 @@ impl BankingStage {
&bank_forks,
&consumer,
id,
unprocessed_transaction_storage,
vote_storage,
)
})
.unwrap()
Expand All @@ -620,19 +617,16 @@ impl BankingStage {
decision_maker: &mut DecisionMaker,
bank_forks: &RwLock<BankForks>,
consumer: &Consumer,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
vote_storage: &mut VoteStorage,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) {
if unprocessed_transaction_storage.should_not_process() {
if vote_storage.should_not_process() {
return;
}
let (decision, make_decision_us) =
measure_us!(decision_maker.make_consume_or_forward_decision());
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(
decision.bank_start(),
Some(unprocessed_transaction_storage),
);
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start());
slot_metrics_tracker.increment_make_decision_us(make_decision_us);

match decision {
Expand All @@ -645,7 +639,7 @@ impl BankingStage {
let (_, consume_buffered_packets_us) = measure_us!(consumer
.consume_buffered_packets(
&bank_start,
unprocessed_transaction_storage,
vote_storage,
banking_stage_stats,
slot_metrics_tracker,
));
Expand All @@ -656,14 +650,14 @@ impl BankingStage {
// get current working bank from bank_forks, use it to sanitize transaction and
// load all accounts from address loader;
let current_bank = bank_forks.read().unwrap().working_bank();
unprocessed_transaction_storage.cache_epoch_boundary_info(&current_bank);
unprocessed_transaction_storage.clear();
vote_storage.cache_epoch_boundary_info(&current_bank);
vote_storage.clear();
}
BufferedPacketsDecision::ForwardAndHold => {
// get current working bank from bank_forks, use it to sanitize transaction and
// load all accounts from address loader;
let current_bank = bank_forks.read().unwrap().working_bank();
unprocessed_transaction_storage.cache_epoch_boundary_info(&current_bank);
vote_storage.cache_epoch_boundary_info(&current_bank);
}
BufferedPacketsDecision::Hold => {}
}
Expand All @@ -675,22 +669,22 @@ impl BankingStage {
bank_forks: &RwLock<BankForks>,
consumer: &Consumer,
id: u32,
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
mut vote_storage: VoteStorage,
) {
let mut banking_stage_stats = BankingStageStats::new(id);

let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
let mut last_metrics_update = Instant::now();

loop {
if !unprocessed_transaction_storage.is_empty()
if !vote_storage.is_empty()
|| last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD
{
let (_, process_buffered_packets_us) = measure_us!(Self::process_buffered_packets(
decision_maker,
bank_forks,
consumer,
&mut unprocessed_transaction_storage,
&mut vote_storage,
&banking_stage_stats,
&mut slot_metrics_tracker,
));
Expand All @@ -700,7 +694,7 @@ impl BankingStage {
}

match packet_receiver.receive_and_buffer_packets(
&mut unprocessed_transaction_storage,
&mut vote_storage,
&mut banking_stage_stats,
&mut slot_metrics_tracker,
) {
Expand Down Expand Up @@ -1291,7 +1285,7 @@ mod tests {

#[test_case(TransactionStructure::Sdk)]
#[test_case(TransactionStructure::View)]
fn test_unprocessed_transaction_storage_full_send(transaction_struct: TransactionStructure) {
fn test_vote_storage_full_send(transaction_struct: TransactionStructure) {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
Expand Down
Loading
Loading