Skip to content

Commit

Permalink
Remove dead code and test
Browse files Browse the repository at this point in the history
  • Loading branch information
ksolana committed Feb 25, 2025
1 parent 2d5005b commit 826b995
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 554 deletions.
50 changes: 1 addition & 49 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use {
leader_slot_metrics::LeaderSlotMetricsTracker,
qos_service::QosService,
unprocessed_packet_batches::*,
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
unprocessed_transaction_storage::{ThreadType, VoteStorage},
BankingStage, BankingStageStats,
},
banking_trace::BankingTracer,
Expand Down Expand Up @@ -82,54 +82,6 @@ fn check_txs(receiver: &Arc<Receiver<WorkingBankEntry>>, ref_tx_count: usize) {
assert_eq!(total, ref_tx_count);
}

#[bench]
fn bench_consume_buffered(bencher: &mut Bencher) {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000);
let bank = Bank::new_for_benches(&genesis_config)
.wrap_with_bank_forks_for_tests()
.0;
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(bank, blockstore, None, None);

let recorder = poh_recorder.read().unwrap().new_recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let tx = test_tx();
let transactions = vec![tx; 4194304];
let batches = transactions
.iter()
.filter_map(|transaction| {
let packet = Packet::from_data(None, transaction).ok().unwrap();
DeserializedPacket::new(packet).ok()
})
.collect::<Vec<_>>();
let batches_len = batches.len();
let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(batches, 2 * batches_len),
ThreadType::Transactions,
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
consumer.consume_buffered_packets(
&bank_start,
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
});

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}

fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec<Transaction> {
let to_pubkey = pubkey::new_rand();
let dummy = system_transaction::transfer(mint_keypair, &to_pubkey, 1, hash);
Expand Down
5 changes: 1 addition & 4 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,7 @@ impl BankingStage {
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
VoteStorage::new(
latest_unprocessed_votes.clone(),
vote_source,
),
VoteStorage::new(latest_unprocessed_votes.clone(), vote_source),
));
}

Expand Down
306 changes: 1 addition & 305 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ impl Consumer {
);

if reached_end_of_slot {
slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len(
vote_storage.len() as u64,
);
slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len(vote_storage.len() as u64);
}

proc_start.stop();
Expand Down Expand Up @@ -2147,308 +2145,6 @@ mod tests {
let _ = poh_simulator.join();
}

#[test]
fn test_consume_buffered_packets() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let (transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder: TransactionRecorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions),
ThreadType::Transactions,
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank_for_test(bank);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);

// Check that all packets were processed without retrying
assert!(buffered_packet_batches.is_empty());
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
0
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
num_conflicting_transactions as u64
);

poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}

#[test]
fn test_consume_buffered_packets_sanitization_error() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let (mut transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let duplicate_account_key = transactions[0].message.account_keys[0];
transactions[0]
.message
.account_keys
.push(duplicate_account_key); // corrupt transaction
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions),
ThreadType::Transactions,
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);

// When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank_for_test(bank);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
assert!(buffered_packet_batches.is_empty());
poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}

#[test]
fn test_consume_buffered_packets_retryable() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let (transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let retryable_packet = deserialized_packets[0].clone();
let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions),
ThreadType::Transactions,
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed
// except except for retryable errors. Manually take the lock of a transaction to
// simulate another thread processing a transaction with that lock.
poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let lock_account = transactions[0].message.account_keys[1];
let manual_lock_tx = RuntimeTransaction::from_transaction_for_tests(
system_transaction::transfer(&Keypair::new(), &lock_account, 1, bank.last_blockhash()),
);
let _ = bank_start.working_bank.accounts().lock_accounts(
std::iter::once(&manual_lock_tx),
bank_start.working_bank.get_transaction_account_lock_limit(),
);

let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);

// Check that all but 1 transaction was processed. And that it was rebuffered.
assert_eq!(buffered_packet_batches.len(), 1);
assert_eq!(
buffered_packet_batches.iter().next().unwrap(),
&retryable_packet
);
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions - 1,
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
1
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
num_conflicting_transactions as u64 - 1
);

poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}

#[test]
fn test_consume_buffered_packets_batch_priority_guard() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let (
_,
bank,
_bank_forks,
poh_recorder,
_entry_receiver,
genesis_config_info,
poh_simulator,
) = setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();

// Setup transactions:
// [(AB), (BC), (CD)]
// (AB) and (BC) are conflicting, and cannot go into the same batch.
// (AB) and (CD) are not conflict. However, (CD) should not be able to take locks needed by (BC).
let keypair_a = Keypair::new();
let keypair_b = Keypair::new();
let keypair_c = Keypair::new();
let keypair_d = Keypair::new();
for keypair in &[&keypair_a, &keypair_b, &keypair_c, &keypair_d] {
bank.transfer(5_000, &genesis_config_info.mint_keypair, &keypair.pubkey())
.unwrap();
}

let make_prioritized_transfer = |from: &Keypair, to, lamports, priority| -> Transaction {
let ixs = vec![
system_instruction::transfer(&from.pubkey(), to, lamports),
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(priority),
];
let message = Message::new(&ixs, Some(&from.pubkey()));
Transaction::new(&[from], message, bank.last_blockhash())
};

let transactions = vec![
make_prioritized_transfer(&keypair_a, &keypair_b.pubkey(), 1, 3),
make_prioritized_transfer(&keypair_b, &keypair_c.pubkey(), 1, 2),
make_prioritized_transfer(&keypair_c, &keypair_d.pubkey(), 1, 1),
];

let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions),
ThreadType::Transactions,
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank_for_test(bank);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);

// Check that all packets were processed without retrying
assert!(buffered_packet_batches.is_empty());
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
0
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
4 + num_conflicting_transactions as u64 // 4 for funding transfers
);

poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}

#[test]
fn test_accumulate_execute_units_and_time() {
let mut execute_timings = ExecuteTimings::default();
Expand Down
Loading

0 comments on commit 826b995

Please sign in to comment.