From 826b995e21598d2c00b21d090bfc93033a16a59f Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Tue, 25 Feb 2025 08:36:41 -0800 Subject: [PATCH] Remove dead code and test --- core/benches/banking_stage.rs | 50 +-- core/src/banking_stage.rs | 5 +- core/src/banking_stage/consumer.rs | 306 +----------------- core/src/banking_stage/leader_slot_metrics.rs | 4 +- core/src/banking_stage/packet_receiver.rs | 4 +- .../unprocessed_transaction_storage.rs | 214 ++---------- 6 files changed, 29 insertions(+), 554 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 8b83407334d54c..665dfdd42c2bbe 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -25,7 +25,7 @@ use { leader_slot_metrics::LeaderSlotMetricsTracker, qos_service::QosService, unprocessed_packet_batches::*, - unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, + unprocessed_transaction_storage::{ThreadType, VoteStorage}, BankingStage, BankingStageStats, }, banking_trace::BankingTracer, @@ -82,54 +82,6 @@ fn check_txs(receiver: &Arc>, ref_tx_count: usize) { assert_eq!(total, ref_tx_count); } -#[bench] -fn bench_consume_buffered(bencher: &mut Bencher) { - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000); - let bank = Bank::new_for_benches(&genesis_config) - .wrap_with_bank_forks_for_tests() - .0; - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Arc::new( - Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"), - ); - let (exit, poh_recorder, poh_service, _signal_receiver) = - create_test_recorder(bank, blockstore, None, None); - - let recorder = poh_recorder.read().unwrap().new_recorder(); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - - let tx = test_tx(); - let transactions = vec![tx; 4194304]; - let batches = transactions - .iter() - .filter_map(|transaction| { - let packet = Packet::from_data(None, transaction).ok().unwrap(); - DeserializedPacket::new(packet).ok() - }) - .collect::>(); - let batches_len = batches.len(); - let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(batches, 2 * batches_len), - ThreadType::Transactions, - ); - let (s, _r) = unbounded(); - let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64))); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - // This tests the performance of buffering packets. - // If the packet buffers are copied, performance will be poor. - bencher.iter(move || { - consumer.consume_buffered_packets( - &bank_start, - &mut transaction_buffer, - &BankingStageStats::default(), - &mut LeaderSlotMetricsTracker::new(0), - ); - }); - - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); -} - fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec { let to_pubkey = pubkey::new_rand(); let dummy = system_transaction::transfer(mint_keypair, &to_pubkey, 1, hash); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 48b0501083b19f..e628601000651b 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -441,10 +441,7 @@ impl BankingStage { committer.clone(), transaction_recorder.clone(), log_messages_bytes_limit, - VoteStorage::new( - latest_unprocessed_votes.clone(), - vote_source, - ), + VoteStorage::new(latest_unprocessed_votes.clone(), vote_source), )); } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 31070dc5a8dee4..6c3cba30504134 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -137,9 +137,7 @@ impl Consumer { ); if reached_end_of_slot { - slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len( - vote_storage.len() as u64, - ); + slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len(vote_storage.len() as u64); } proc_start.stop(); @@ -2147,308 +2145,6 @@ mod tests { let _ = poh_simulator.join(); } - #[test] - fn test_consume_buffered_packets() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let (transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) = - setup_conflicting_transactions(ledger_path.path()); - let recorder: TransactionRecorder = poh_recorder.read().unwrap().new_recorder(); - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions), - ThreadType::Transactions, - ); - - let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - - // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) - assert!(!poh_recorder.read().unwrap().has_bank()); - assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); - // When the working bank in poh_recorder is Some, all packets should be processed. - // Multi-Iterator will process them 1-by-1 if all txs are conflicting. - poh_recorder.write().unwrap().set_bank_for_test(bank); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let banking_stage_stats = BankingStageStats::default(); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - - // Check that all packets were processed without retrying - assert!(buffered_packet_batches.is_empty()); - assert_eq!( - banking_stage_stats - .consumed_buffered_packets_count - .load(Ordering::Relaxed), - num_conflicting_transactions - ); - assert_eq!( - banking_stage_stats - .rebuffered_packets_count - .load(Ordering::Relaxed), - 0 - ); - // Use bank to check the number of entries (batches) - assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); - assert_eq!( - bank_start.working_bank.transaction_entries_count(), - num_conflicting_transactions as u64 - ); - - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - let _ = poh_simulator.join(); - } - - #[test] - fn test_consume_buffered_packets_sanitization_error() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let (mut transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) = - setup_conflicting_transactions(ledger_path.path()); - let duplicate_account_key = transactions[0].message.account_keys[0]; - transactions[0] - .message - .account_keys - .push(duplicate_account_key); // corrupt transaction - let recorder = poh_recorder.read().unwrap().new_recorder(); - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions), - ThreadType::Transactions, - ); - - let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - - // When the working bank in poh_recorder is None, no packets should be processed - assert!(!poh_recorder.read().unwrap().has_bank()); - assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); - // When the working bank in poh_recorder is Some, all packets should be processed. - // Multi-Iterator will process them 1-by-1 if all txs are conflicting. - poh_recorder.write().unwrap().set_bank_for_test(bank); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &BankingStageStats::default(), - &mut LeaderSlotMetricsTracker::new(0), - ); - assert!(buffered_packet_batches.is_empty()); - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - let _ = poh_simulator.join(); - } - - #[test] - fn test_consume_buffered_packets_retryable() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let (transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) = - setup_conflicting_transactions(ledger_path.path()); - let recorder = poh_recorder.read().unwrap().new_recorder(); - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let retryable_packet = deserialized_packets[0].clone(); - let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions), - ThreadType::Transactions, - ); - - let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - - // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) - assert!(!poh_recorder.read().unwrap().has_bank()); - assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); - // When the working bank in poh_recorder is Some, all packets should be processed - // except except for retryable errors. Manually take the lock of a transaction to - // simulate another thread processing a transaction with that lock. - poh_recorder - .write() - .unwrap() - .set_bank_for_test(bank.clone()); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - - let lock_account = transactions[0].message.account_keys[1]; - let manual_lock_tx = RuntimeTransaction::from_transaction_for_tests( - system_transaction::transfer(&Keypair::new(), &lock_account, 1, bank.last_blockhash()), - ); - let _ = bank_start.working_bank.accounts().lock_accounts( - std::iter::once(&manual_lock_tx), - bank_start.working_bank.get_transaction_account_lock_limit(), - ); - - let banking_stage_stats = BankingStageStats::default(); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - - // Check that all but 1 transaction was processed. And that it was rebuffered. - assert_eq!(buffered_packet_batches.len(), 1); - assert_eq!( - buffered_packet_batches.iter().next().unwrap(), - &retryable_packet - ); - assert_eq!( - banking_stage_stats - .consumed_buffered_packets_count - .load(Ordering::Relaxed), - num_conflicting_transactions - 1, - ); - assert_eq!( - banking_stage_stats - .rebuffered_packets_count - .load(Ordering::Relaxed), - 1 - ); - // Use bank to check the number of entries (batches) - assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); - assert_eq!( - bank_start.working_bank.transaction_entries_count(), - num_conflicting_transactions as u64 - 1 - ); - - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - let _ = poh_simulator.join(); - } - - #[test] - fn test_consume_buffered_packets_batch_priority_guard() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let ( - _, - bank, - _bank_forks, - poh_recorder, - _entry_receiver, - genesis_config_info, - poh_simulator, - ) = setup_conflicting_transactions(ledger_path.path()); - let recorder = poh_recorder.read().unwrap().new_recorder(); - - // Setup transactions: - // [(AB), (BC), (CD)] - // (AB) and (BC) are conflicting, and cannot go into the same batch. - // (AB) and (CD) are not conflict. However, (CD) should not be able to take locks needed by (BC). - let keypair_a = Keypair::new(); - let keypair_b = Keypair::new(); - let keypair_c = Keypair::new(); - let keypair_d = Keypair::new(); - for keypair in &[&keypair_a, &keypair_b, &keypair_c, &keypair_d] { - bank.transfer(5_000, &genesis_config_info.mint_keypair, &keypair.pubkey()) - .unwrap(); - } - - let make_prioritized_transfer = |from: &Keypair, to, lamports, priority| -> Transaction { - let ixs = vec![ - system_instruction::transfer(&from.pubkey(), to, lamports), - compute_budget::ComputeBudgetInstruction::set_compute_unit_price(priority), - ]; - let message = Message::new(&ixs, Some(&from.pubkey())); - Transaction::new(&[from], message, bank.last_blockhash()) - }; - - let transactions = vec![ - make_prioritized_transfer(&keypair_a, &keypair_b.pubkey(), 1, 3), - make_prioritized_transfer(&keypair_b, &keypair_c.pubkey(), 1, 2), - make_prioritized_transfer(&keypair_c, &keypair_d.pubkey(), 1, 1), - ]; - - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions), - ThreadType::Transactions, - ); - - let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - - // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) - assert!(!poh_recorder.read().unwrap().has_bank()); - assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); - // When the working bank in poh_recorder is Some, all packets should be processed. - // Multi-Iterator will process them 1-by-1 if all txs are conflicting. - poh_recorder.write().unwrap().set_bank_for_test(bank); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let banking_stage_stats = BankingStageStats::default(); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - - // Check that all packets were processed without retrying - assert!(buffered_packet_batches.is_empty()); - assert_eq!( - banking_stage_stats - .consumed_buffered_packets_count - .load(Ordering::Relaxed), - num_conflicting_transactions - ); - assert_eq!( - banking_stage_stats - .rebuffered_packets_count - .load(Ordering::Relaxed), - 0 - ); - // Use bank to check the number of entries (batches) - assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); - assert_eq!( - bank_start.working_bank.transaction_entries_count(), - 4 + num_conflicting_transactions as u64 // 4 for funding transfers - ); - - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - let _ = poh_simulator.join(); - } - #[test] fn test_accumulate_execute_units_and_time() { let mut execute_timings = ExecuteTimings::default(); diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index d1c2a30a946d22..02b6c60a404998 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -3,9 +3,7 @@ use { consumer::LeaderProcessedTransactionCounts, leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics}, packet_deserializer::PacketReceiverStats, - unprocessed_transaction_storage::{ - InsertPacketBatchSummary, VoteStorage, - }, + unprocessed_transaction_storage::{InsertPacketBatchSummary, VoteStorage}, }, solana_poh::poh_recorder::BankStart, solana_sdk::{clock::Slot, saturating_add_assign}, diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index a6ac1530be850d..99fa2435287e98 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -68,9 +68,7 @@ impl PacketReceiver { result } - fn get_receive_timeout( - unprocessed_transaction_storage: &VoteStorage, - ) -> Duration { + fn get_receive_timeout(unprocessed_transaction_storage: &VoteStorage) -> Duration { // Gossip thread (does not process) should not continuously receive with 0 duration. // This can cause the thread to run at 100% CPU because it is continuously polling. if !unprocessed_transaction_storage.should_not_process() diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 6573f064f6a0f5..5a9354912ff035 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -9,9 +9,7 @@ use { leader_slot_metrics::LeaderSlotMetricsTracker, multi_iterator_scanner::{MultiIteratorScanner, ProcessingDecision}, read_write_account_set::ReadWriteAccountSet, - unprocessed_packet_batches::{ - DeserializedPacket, PacketBatchInsertionMetrics, - }, + unprocessed_packet_batches::{DeserializedPacket, PacketBatchInsertionMetrics}, BankingStageStats, }, itertools::Itertools, @@ -252,20 +250,22 @@ impl VoteStorage { &mut self, deserialized_packets: Vec, ) -> InsertPacketBatchSummary { - InsertPacketBatchSummary::from(self.latest_unprocessed_votes.insert_batch( - deserialized_packets - .into_iter() - .filter_map(|deserialized_packet| { - LatestValidatorVotePacket::new_from_immutable( - Arc::new(deserialized_packet), - self.vote_source, - self.latest_unprocessed_votes - .should_deprecate_legacy_vote_ixs(), - ) - .ok() - }), - false, // should_replenish_taken_votes - )) + InsertPacketBatchSummary::from( + self.latest_unprocessed_votes.insert_batch( + deserialized_packets + .into_iter() + .filter_map(|deserialized_packet| { + LatestValidatorVotePacket::new_from_immutable( + Arc::new(deserialized_packet), + self.vote_source, + self.latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(), + ) + .ok() + }), + false, // should_replenish_taken_votes + ), + ) } // returns `true` if the end of slot is reached @@ -344,11 +344,11 @@ impl VoteStorage { scanner.finalize().payload.reached_end_of_slot } - fn clear(&mut self) { + pub fn clear(&mut self) { self.latest_unprocessed_votes.clear(); } - fn cache_epoch_boundary_info(&mut self, bank: &Bank) { + pub fn cache_epoch_boundary_info(&mut self, bank: &Bank) { if matches!(self.vote_source, VoteSource::Gossip) { panic!("Gossip vote thread should not be checking epoch boundary"); } @@ -363,181 +363,17 @@ impl VoteStorage { } } -impl ThreadLocalUnprocessedPackets { - fn is_empty(&self) -> bool { - self.unprocessed_packet_batches.is_empty() - } - - pub fn thread_type(&self) -> ThreadType { - self.thread_type - } - - fn len(&self) -> usize { - self.unprocessed_packet_batches.len() - } - - pub fn get_min_compute_unit_price(&self) -> Option { - self.unprocessed_packet_batches.get_min_compute_unit_price() - } - - pub fn get_max_compute_unit_price(&self) -> Option { - self.unprocessed_packet_batches.get_max_compute_unit_price() - } - - fn max_receive_size(&self) -> usize { - self.unprocessed_packet_batches.capacity() - self.unprocessed_packet_batches.len() - } - - #[cfg(test)] - fn iter(&mut self) -> impl Iterator { - self.unprocessed_packet_batches.iter() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.unprocessed_packet_batches.iter_mut() - } - - fn insert_batch( - &mut self, - deserialized_packets: Vec, - ) -> PacketBatchInsertionMetrics { - self.unprocessed_packet_batches.insert_batch( - deserialized_packets - .into_iter() - .map(DeserializedPacket::from_immutable_section), - ) - } - - /// Take self.unprocessed_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. - fn take_priority_queue(&mut self) -> MinMaxHeap> { - std::mem::replace( - &mut self.unprocessed_packet_batches.packet_priority_queue, - MinMaxHeap::new(), // <-- no need to reserve capacity as we will replace this - ) - } - - /// Verify that the priority queue and map are consistent and that original capacity is maintained. - fn verify_priority_queue(&self, original_capacity: usize) { - // Assert unprocessed queue is still consistent and maintains original capacity - assert_eq!( - self.unprocessed_packet_batches - .packet_priority_queue - .capacity(), - original_capacity - ); - assert_eq!( - self.unprocessed_packet_batches.packet_priority_queue.len(), - self.unprocessed_packet_batches - .message_hash_to_transaction - .len() - ); - } - - fn collect_retained_packets( - message_hash_to_transaction: &mut HashMap, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) -> Vec> { - Self::remove_non_retained_packets( - message_hash_to_transaction, - packets_to_process, - retained_packet_indexes, - ); - retained_packet_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec() - } - - /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have - /// been removed from UnprocessedPacketBatches.packet_priority_queue - fn remove_non_retained_packets( - message_hash_to_transaction: &mut HashMap, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) { - filter_processed_packets( - retained_packet_indexes - .iter() - .chain(std::iter::once(&packets_to_process.len())), - |start, end| { - for processed_packet in &packets_to_process[start..end] { - message_hash_to_transaction.remove(processed_packet.message_hash()); - } - }, - ) - } - - // returns `true` if reached end of slot - fn process_packets( - &mut self, - bank: &Bank, - banking_stage_stats: &BankingStageStats, - slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - mut processing_function: F, - ) -> bool - where - F: FnMut( - &Vec>, - &mut ConsumeScannerPayload, - ) -> Option>, - { - let mut retryable_packets = self.take_priority_queue(); - let original_capacity = retryable_packets.capacity(); - let mut new_retryable_packets = MinMaxHeap::with_capacity(original_capacity); - let all_packets_to_process = retryable_packets.drain_desc().collect_vec(); - - let should_process_packet = - |packet: &Arc, payload: &mut ConsumeScannerPayload| { - consume_scan_should_process_packet(bank, banking_stage_stats, packet, payload) - }; - let mut scanner = create_consume_multi_iterator( - &all_packets_to_process, - slot_metrics_tracker, - &mut self.unprocessed_packet_batches.message_hash_to_transaction, - should_process_packet, - ); - - while let Some((packets_to_process, payload)) = scanner.iterate() { - let packets_to_process = packets_to_process - .iter() - .map(|p| (*p).clone()) - .collect_vec(); - let retryable_packets = if let Some(retryable_transaction_indexes) = - processing_function(&packets_to_process, payload) - { - Self::collect_retained_packets( - payload.message_hash_to_transaction, - &packets_to_process, - &retryable_transaction_indexes, - ) - } else { - packets_to_process - }; - - new_retryable_packets.extend(retryable_packets); - } - - let reached_end_of_slot = scanner.finalize().payload.reached_end_of_slot; - - self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets; - self.verify_priority_queue(original_capacity); - - reached_end_of_slot - } -} - #[cfg(test)] mod tests { use { super::*, - itertools::iproduct, solana_perf::packet::{Packet, PacketFlags}, solana_runtime::genesis_utils, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, system_transaction, + transaction::Transaction, }, solana_vote::vote_transaction::new_tower_sync_transaction, solana_vote_program::vote_state::TowerSync, @@ -617,11 +453,11 @@ mod tests { #[test] fn test_filter_and_forward_with_account_limits() { solana_logger::setup(); - let GenesisConfigInfo { + let genesis_utils::GenesisConfigInfo { genesis_config, mint_keypair, .. - } = create_genesis_config(10); + } = genesis_utils::create_genesis_config(10); let (current_bank, _bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); let simple_transactions: Vec = (0..256) @@ -672,10 +508,8 @@ mod tests { let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[vote_keypair.pubkey()]); - let mut transaction_storage = VoteStorage::new( - Arc::new(latest_unprocessed_votes), - VoteSource::Tpu, - ); + let mut transaction_storage = + VoteStorage::new(Arc::new(latest_unprocessed_votes), VoteSource::Tpu); transaction_storage.insert_batch(vec![ImmutableDeserializedPacket::new(vote.clone())?]); assert_eq!(1, transaction_storage.len());