From 40a731d81187921bffca71b5d12464e425e40e8d Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 23 Jan 2025 12:17:31 -0800 Subject: [PATCH 01/17] ThreadLocalUnprocessedPackets Towards: #3357 --- .../unprocessed_transaction_storage.rs | 140 ++++-------------- 1 file changed, 26 insertions(+), 114 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 5947f774d902dc..a34d9e7c4bc497 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -38,13 +38,6 @@ const MAX_NUM_VOTES_RECEIVE: usize = 10_000; #[derive(Debug)] pub enum UnprocessedTransactionStorage { VoteStorage(VoteStorage), - LocalTransactionStorage(ThreadLocalUnprocessedPackets), -} - -#[derive(Debug)] -pub struct ThreadLocalUnprocessedPackets { - unprocessed_packet_batches: UnprocessedPacketBatches, - thread_type: ThreadType, } #[derive(Debug)] @@ -248,15 +241,6 @@ where } impl UnprocessedTransactionStorage { - pub fn new_transaction_storage( - unprocessed_packet_batches: UnprocessedPacketBatches, - thread_type: ThreadType, - ) -> Self { - Self::LocalTransactionStorage(ThreadLocalUnprocessedPackets { - unprocessed_packet_batches, - thread_type, - }) - } pub fn new_vote_storage( latest_unprocessed_votes: Arc, @@ -271,32 +255,24 @@ impl UnprocessedTransactionStorage { pub fn is_empty(&self) -> bool { match self { Self::VoteStorage(vote_storage) => vote_storage.is_empty(), - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.is_empty(), } } pub fn len(&self) -> usize { match self { Self::VoteStorage(vote_storage) => vote_storage.len(), - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.len(), } } pub fn get_min_priority(&self) -> Option { match self { Self::VoteStorage(_) => None, - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.get_min_compute_unit_price() - } } } pub fn get_max_priority(&self) -> Option { match self { Self::VoteStorage(_) => None, - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.get_max_compute_unit_price() - } } } @@ -304,9 +280,6 @@ impl UnprocessedTransactionStorage { pub fn max_receive_size(&self) -> usize { match self { Self::VoteStorage(vote_storage) => vote_storage.max_receive_size(), - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.max_receive_size() - } } } @@ -322,11 +295,22 @@ impl UnprocessedTransactionStorage { #[cfg(test)] pub fn iter(&mut self) -> impl Iterator { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.iter(), _ => panic!(), } } + pub fn forward_option(&self) -> ForwardOption { + match self { + Self::VoteStorage(vote_storage) => vote_storage.forward_option(), + } + } + + pub fn clear_forwarded_packets(&mut self) { + match self { + Self::VoteStorage(vote_storage) => vote_storage.clear_forwarded_packets(), + } + } + pub(crate) fn insert_batch( &mut self, deserialized_packets: Vec, @@ -335,9 +319,20 @@ impl UnprocessedTransactionStorage { Self::VoteStorage(vote_storage) => { InsertPacketBatchSummary::from(vote_storage.insert_batch(deserialized_packets)) } - Self::LocalTransactionStorage(transaction_storage) => InsertPacketBatchSummary::from( - transaction_storage.insert_batch(deserialized_packets), - ), + } + } + + pub fn filter_forwardable_packets_and_add_batches( + &mut self, + bank: Arc, + forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, + ) -> FilterForwardingResults { + match self { + Self::VoteStorage(vote_storage) => vote_storage + .filter_forwardable_packets_and_add_batches( + bank, + forward_packet_batches_by_accounts, + ), } } @@ -359,13 +354,6 @@ impl UnprocessedTransactionStorage { ) -> Option>, { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage - .process_packets( - &bank, - banking_stage_stats, - slot_metrics_tracker, - processing_function, - ), Self::VoteStorage(vote_storage) => vote_storage.process_packets( bank, banking_stage_stats, @@ -384,7 +372,6 @@ impl UnprocessedTransactionStorage { pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) { match self { - Self::LocalTransactionStorage(_) => (), Self::VoteStorage(vote_storage) => vote_storage.cache_epoch_boundary_info(bank), } } @@ -744,81 +731,6 @@ mod tests { assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); } - #[test] - fn test_unprocessed_transaction_storage_insert() -> Result<(), Box> { - let keypair = Keypair::new(); - let vote_keypair = Keypair::new(); - let pubkey = solana_pubkey::new_rand(); - - let small_transfer = Packet::from_data( - None, - system_transaction::transfer(&keypair, &pubkey, 1, Hash::new_unique()), - )?; - let mut vote = Packet::from_data( - None, - new_tower_sync_transaction( - TowerSync::default(), - Hash::new_unique(), - &keypair, - &vote_keypair, - &vote_keypair, - None, - ), - )?; - vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); - let big_transfer = Packet::from_data( - None, - system_transaction::transfer(&keypair, &pubkey, 1000000, Hash::new_unique()), - )?; - - for thread_type in [ - ThreadType::Transactions, - ThreadType::Voting(VoteSource::Gossip), - ThreadType::Voting(VoteSource::Tpu), - ] { - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::with_capacity(100), - thread_type, - ); - transaction_storage.insert_batch(vec![ - ImmutableDeserializedPacket::new(small_transfer.clone())?, - ImmutableDeserializedPacket::new(vote.clone())?, - ImmutableDeserializedPacket::new(big_transfer.clone())?, - ]); - let deserialized_packets = transaction_storage - .iter() - .map(|packet| packet.immutable_section().original_packet().clone()) - .collect_vec(); - assert_eq!(3, deserialized_packets.len()); - assert!(deserialized_packets.contains(&small_transfer)); - assert!(deserialized_packets.contains(&vote)); - assert!(deserialized_packets.contains(&big_transfer)); - } - - for (vote_source, staked) in iproduct!( - [VoteSource::Gossip, VoteSource::Tpu].into_iter(), - [true, false].into_iter() - ) { - let staked_keys = if staked { - vec![vote_keypair.pubkey()] - } else { - vec![] - }; - let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&staked_keys); - let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( - Arc::new(latest_unprocessed_votes), - vote_source, - ); - transaction_storage.insert_batch(vec![ - ImmutableDeserializedPacket::new(small_transfer.clone())?, - ImmutableDeserializedPacket::new(vote.clone())?, - ImmutableDeserializedPacket::new(big_transfer.clone())?, - ]); - assert_eq!(if staked { 1 } else { 0 }, transaction_storage.len()); - } - Ok(()) - } - #[test] fn test_process_packets_retryable_indexes_reinserted() -> Result<(), Box> { let node_keypair = Keypair::new(); From 78cf33f366c5510928c1d8628aded0733e5c5a4d Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 23 Jan 2025 12:31:23 -0800 Subject: [PATCH 02/17] Remove match with let --- .../unprocessed_transaction_storage.rs | 84 +++++++------------ 1 file changed, 28 insertions(+), 56 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index a34d9e7c4bc497..ad7ac3b6d8d91f 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -241,7 +241,6 @@ where } impl UnprocessedTransactionStorage { - pub fn new_vote_storage( latest_unprocessed_votes: Arc, vote_source: VoteSource, @@ -253,73 +252,52 @@ impl UnprocessedTransactionStorage { } pub fn is_empty(&self) -> bool { - match self { - Self::VoteStorage(vote_storage) => vote_storage.is_empty(), - } + let Self::VoteStorage(vote_storage) = self; + vote_storage.is_empty() } pub fn len(&self) -> usize { - match self { - Self::VoteStorage(vote_storage) => vote_storage.len(), - } + let Self::VoteStorage(vote_storage) = self; + vote_storage.len() } pub fn get_min_priority(&self) -> Option { - match self { - Self::VoteStorage(_) => None, - } + None } pub fn get_max_priority(&self) -> Option { - match self { - Self::VoteStorage(_) => None, - } + None } /// Returns the maximum number of packets a receive should accept pub fn max_receive_size(&self) -> usize { - match self { - Self::VoteStorage(vote_storage) => vote_storage.max_receive_size(), - } + let Self::VoteStorage(vote_storage) = self; + vote_storage.max_receive_size() } pub fn should_not_process(&self) -> bool { // The gossip vote thread does not need to process any votes, that is // handled by the tpu vote thread - if let Self::VoteStorage(vote_storage) = self { - return matches!(vote_storage.vote_source, VoteSource::Gossip); - } - false - } - - #[cfg(test)] - pub fn iter(&mut self) -> impl Iterator { - match self { - _ => panic!(), - } + let Self::VoteStorage(vote_storage) = self; + matches!(vote_storage.vote_source, VoteSource::Gossip) } pub fn forward_option(&self) -> ForwardOption { - match self { - Self::VoteStorage(vote_storage) => vote_storage.forward_option(), - } + let Self::VoteStorage(vote_storage) = self; + vote_storage.forward_option() } pub fn clear_forwarded_packets(&mut self) { - match self { - Self::VoteStorage(vote_storage) => vote_storage.clear_forwarded_packets(), - } + let Self::VoteStorage(vote_storage) = self; + vote_storage.clear_forwarded_packets() } pub(crate) fn insert_batch( &mut self, deserialized_packets: Vec, ) -> InsertPacketBatchSummary { - match self { - Self::VoteStorage(vote_storage) => { - InsertPacketBatchSummary::from(vote_storage.insert_batch(deserialized_packets)) - } - } + let Self::VoteStorage(vote_storage) = self; + InsertPacketBatchSummary::from(vote_storage.insert_batch(deserialized_packets)) } pub fn filter_forwardable_packets_and_add_batches( @@ -327,13 +305,9 @@ impl UnprocessedTransactionStorage { bank: Arc, forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> FilterForwardingResults { - match self { - Self::VoteStorage(vote_storage) => vote_storage - .filter_forwardable_packets_and_add_batches( - bank, - forward_packet_batches_by_accounts, - ), - } + let Self::VoteStorage(vote_storage) = self; + vote_storage + .filter_forwardable_packets_and_add_batches(bank, forward_packet_batches_by_accounts) } /// The processing function takes a stream of packets ready to process, and returns the indices @@ -353,14 +327,13 @@ impl UnprocessedTransactionStorage { &mut ConsumeScannerPayload, ) -> Option>, { - match self { - Self::VoteStorage(vote_storage) => vote_storage.process_packets( - bank, - banking_stage_stats, - slot_metrics_tracker, - processing_function, - ), - } + let Self::VoteStorage(vote_storage) = self; + vote_storage.process_packets( + bank, + banking_stage_stats, + slot_metrics_tracker, + processing_function, + ) } pub(crate) fn clear(&mut self) { @@ -371,9 +344,8 @@ impl UnprocessedTransactionStorage { } pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) { - match self { - Self::VoteStorage(vote_storage) => vote_storage.cache_epoch_boundary_info(bank), - } + let Self::VoteStorage(vote_storage) = self; + vote_storage.cache_epoch_boundary_info(bank); } } From c0feafb2b016ed27031876133bbfb1b37aa0f209 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 23 Jan 2025 12:55:34 -0800 Subject: [PATCH 03/17] Move filter_processed_packets to tests --- .../unprocessed_transaction_storage.rs | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index ad7ac3b6d8d91f..122421fb8b150e 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -95,25 +95,6 @@ impl From for InsertPacketBatchSummary { } } -fn filter_processed_packets<'a, F>( - retryable_transaction_indexes: impl Iterator, - mut f: F, -) where - F: FnMut(usize, usize), -{ - let mut prev_retryable_index = 0; - for (i, retryable_index) in retryable_transaction_indexes.enumerate() { - let start = if i == 0 { 0 } else { prev_retryable_index + 1 }; - - let end = *retryable_index; - prev_retryable_index = *retryable_index; - - if start < end { - f(start, end) - } - } -} - /// Convenient wrapper for shared-state between banking stage processing and the /// multi-iterator checking function. pub struct ConsumeScannerPayload<'a> { @@ -652,6 +633,25 @@ mod tests { std::error::Error, }; + fn filter_processed_packets<'a, F>( + retryable_transaction_indexes: impl Iterator, + mut f: F, + ) where + F: FnMut(usize, usize), + { + let mut prev_retryable_index = 0; + for (i, retryable_index) in retryable_transaction_indexes.enumerate() { + let start = if i == 0 { 0 } else { prev_retryable_index + 1 }; + + let end = *retryable_index; + prev_retryable_index = *retryable_index; + + if start < end { + f(start, end) + } + } + } + #[test] fn test_filter_processed_packets() { let retryable_indexes = [0, 1, 2, 3]; From 92c5ae1ac6dc0493202901c1129c23c6cc7a2130 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 23 Jan 2025 14:14:31 -0800 Subject: [PATCH 04/17] s/UnprocessedTransactionStorage/VoteStorage --- core/src/banking_stage.rs | 10 ++--- core/src/banking_stage/consumer.rs | 18 ++++---- core/src/banking_stage/leader_slot_metrics.rs | 8 ++-- core/src/banking_stage/packet_receiver.rs | 10 ++--- .../unprocessed_transaction_storage.rs | 44 +++++++++++++++---- 5 files changed, 57 insertions(+), 33 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 14176c17cb65a5..48b0501083b19f 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -13,7 +13,7 @@ use { leader_slot_metrics::LeaderSlotMetricsTracker, packet_receiver::PacketReceiver, qos_service::QosService, - unprocessed_transaction_storage::UnprocessedTransactionStorage, + unprocessed_transaction_storage::VoteStorage, }, crate::{ banking_stage::{ @@ -441,7 +441,7 @@ impl BankingStage { committer.clone(), transaction_recorder.clone(), log_messages_bytes_limit, - UnprocessedTransactionStorage::new_vote_storage( + VoteStorage::new( latest_unprocessed_votes.clone(), vote_source, ), @@ -590,7 +590,7 @@ impl BankingStage { committer: Committer, transaction_recorder: TransactionRecorder, log_messages_bytes_limit: Option, - unprocessed_transaction_storage: UnprocessedTransactionStorage, + unprocessed_transaction_storage: VoteStorage, ) -> JoinHandle<()> { let mut packet_receiver = PacketReceiver::new(id, packet_receiver); let consumer = Consumer::new( @@ -620,7 +620,7 @@ impl BankingStage { decision_maker: &mut DecisionMaker, bank_forks: &RwLock, consumer: &Consumer, - unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + unprocessed_transaction_storage: &mut VoteStorage, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { @@ -675,7 +675,7 @@ impl BankingStage { bank_forks: &RwLock, consumer: &Consumer, id: u32, - mut unprocessed_transaction_storage: UnprocessedTransactionStorage, + mut unprocessed_transaction_storage: VoteStorage, ) { let mut banking_stage_stats = BankingStageStats::new(id); diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 117e4b61d08bfc..31070dc5a8dee4 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -8,7 +8,7 @@ use { leader_slot_timing_metrics::LeaderExecuteAndCommitTimings, qos_service::QosService, scheduler_messages::MaxAge, - unprocessed_transaction_storage::{ConsumeScannerPayload, UnprocessedTransactionStorage}, + unprocessed_transaction_storage::{ConsumeScannerPayload, VoteStorage}, BankingStageStats, }, itertools::Itertools, @@ -111,16 +111,16 @@ impl Consumer { pub fn consume_buffered_packets( &self, bank_start: &BankStart, - unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + vote_storage: &mut VoteStorage, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { let mut rebuffered_packet_count = 0; let mut consumed_buffered_packets_count = 0; let mut proc_start = Measure::start("consume_buffered_process"); - let num_packets_to_process = unprocessed_transaction_storage.len(); + let num_packets_to_process = vote_storage.len(); - let reached_end_of_slot = unprocessed_transaction_storage.process_packets( + let reached_end_of_slot = vote_storage.process_packets( bank_start.working_bank.clone(), banking_stage_stats, slot_metrics_tracker, @@ -138,7 +138,7 @@ impl Consumer { if reached_end_of_slot { slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len( - unprocessed_transaction_storage.len() as u64, + vote_storage.len() as u64, ); } @@ -852,8 +852,7 @@ mod tests { crate::banking_stage::{ immutable_deserialized_packet::DeserializedPacketError, tests::{create_slow_genesis_config, sanitize_transactions, simulate_poh}, - unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, - unprocessed_transaction_storage::ThreadType, + unprocessed_packet_batches::DeserializedPacket, }, crossbeam_channel::{unbounded, Receiver}, solana_cost_model::{cost_model::CostModel, transaction_cost::TransactionCost}, @@ -877,13 +876,12 @@ mod tests { self, state::{AddressLookupTable, LookupTableMeta}, }, - compute_budget, fee_calculator::FeeCalculator, hash::Hash, instruction::InstructionError, message::{ v0::{self, MessageAddressTableLookup}, - Message, MessageHeader, VersionedMessage, + MessageHeader, VersionedMessage, }, nonce::{self, state::DurableNonce}, nonce_account::verify_nonce_account, @@ -892,7 +890,7 @@ mod tests { reserved_account_keys::ReservedAccountKeys, signature::Keypair, signer::Signer, - system_instruction, system_program, system_transaction, + system_program, system_transaction, transaction::{Transaction, VersionedTransaction}, }, solana_svm::account_loader::CheckedTransactionDetails, diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index beee3fb31d53ac..d1c2a30a946d22 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -4,7 +4,7 @@ use { leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics}, packet_deserializer::PacketReceiverStats, unprocessed_transaction_storage::{ - InsertPacketBatchSummary, UnprocessedTransactionStorage, + InsertPacketBatchSummary, VoteStorage, }, }, solana_poh::poh_recorder::BankStart, @@ -104,7 +104,7 @@ struct LeaderPrioritizationFeesMetrics { } impl LeaderPrioritizationFeesMetrics { - fn new(unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>) -> Self { + fn new(unprocessed_transaction_storage: Option<&VoteStorage>) -> Self { if let Some(unprocessed_transaction_storage) = unprocessed_transaction_storage { Self { min_prioritization_fees_per_cu: unprocessed_transaction_storage @@ -476,7 +476,7 @@ impl LeaderSlotMetrics { id: u32, slot: Slot, bank_creation_time: &Instant, - unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>, + unprocessed_transaction_storage: Option<&VoteStorage>, ) -> Self { Self { id: id.to_string(), @@ -571,7 +571,7 @@ impl LeaderSlotMetricsTracker { pub(crate) fn check_leader_slot_boundary( &mut self, bank_start: Option<&BankStart>, - unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>, + unprocessed_transaction_storage: Option<&VoteStorage>, ) -> MetricsTrackerAction { match (self.leader_slot_metrics.as_mut(), bank_start) { (None, None) => MetricsTrackerAction::Noop, diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index e9e760ede455fe..a6ac1530be850d 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -3,7 +3,7 @@ use { immutable_deserialized_packet::ImmutableDeserializedPacket, leader_slot_metrics::LeaderSlotMetricsTracker, packet_deserializer::{PacketDeserializer, ReceivePacketResults}, - unprocessed_transaction_storage::UnprocessedTransactionStorage, + unprocessed_transaction_storage::VoteStorage, BankingStageStats, }, agave_banking_stage_ingress_types::BankingPacketReceiver, @@ -29,7 +29,7 @@ impl PacketReceiver { /// Receive incoming packets, push into unprocessed buffer with packet indexes pub fn receive_and_buffer_packets( &mut self, - unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + unprocessed_transaction_storage: &mut VoteStorage, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { @@ -69,7 +69,7 @@ impl PacketReceiver { } fn get_receive_timeout( - unprocessed_transaction_storage: &UnprocessedTransactionStorage, + unprocessed_transaction_storage: &VoteStorage, ) -> Duration { // Gossip thread (does not process) should not continuously receive with 0 duration. // This can cause the thread to run at 100% CPU because it is continuously polling. @@ -93,7 +93,7 @@ impl PacketReceiver { deserialized_packets, packet_stats, }: ReceivePacketResults, - unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + unprocessed_transaction_storage: &mut VoteStorage, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { @@ -130,7 +130,7 @@ impl PacketReceiver { } fn push_unprocessed( - unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + unprocessed_transaction_storage: &mut VoteStorage, deserialized_packets: Vec, dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 122421fb8b150e..c9e27995d94f4a 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -331,23 +331,43 @@ impl UnprocessedTransactionStorage { } impl VoteStorage { - fn is_empty(&self) -> bool { + pub fn new( + latest_unprocessed_votes: Arc, + vote_source: VoteSource, + ) -> Self { + Self { + latest_unprocessed_votes, + vote_source, + } + } + + // TODO: Remove this. + pub fn get_min_priority(&self) -> Option { + None + } + + // TODO: Remove this. + pub fn get_max_priority(&self) -> Option { + None + } + + pub fn is_empty(&self) -> bool { self.latest_unprocessed_votes.is_empty() } - fn len(&self) -> usize { + pub fn len(&self) -> usize { self.latest_unprocessed_votes.len() } - fn max_receive_size(&self) -> usize { + pub fn max_receive_size(&self) -> usize { MAX_NUM_VOTES_RECEIVE } - fn insert_batch( + pub(crate) fn insert_batch( &mut self, deserialized_packets: Vec, - ) -> VoteBatchInsertionMetrics { - self.latest_unprocessed_votes.insert_batch( + ) -> InsertPacketBatchSummary { + InsertPacketBatchSummary::from(self.latest_unprocessed_votes.insert_batch( deserialized_packets .into_iter() .filter_map(|deserialized_packet| { @@ -360,11 +380,11 @@ impl VoteStorage { .ok() }), false, // should_replenish_taken_votes - ) + )) } // returns `true` if the end of slot is reached - fn process_packets( + pub fn process_packets( &mut self, bank: Arc, banking_stage_stats: &BankingStageStats, @@ -450,6 +470,12 @@ impl VoteStorage { self.latest_unprocessed_votes .cache_epoch_boundary_info(bank); } + + pub fn should_not_process(&self) -> bool { + // The gossip vote thread does not need to process or forward any votes, that is + // handled by the tpu vote thread + matches!(self.vote_source, VoteSource::Gossip) + } } impl ThreadLocalUnprocessedPackets { @@ -726,7 +752,7 @@ mod tests { let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[vote_keypair.pubkey()]); - let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + let mut transaction_storage = VoteStorage::new( Arc::new(latest_unprocessed_votes), VoteSource::Tpu, ); From 247be45399694bd0abda6d696ab674ce75964b34 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 23 Jan 2025 14:19:13 -0800 Subject: [PATCH 05/17] Remove dead imports and code --- .../unprocessed_transaction_storage.rs | 117 +----------------- 1 file changed, 1 insertion(+), 116 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index c9e27995d94f4a..2ed3b1b54cb449 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -10,12 +10,11 @@ use { multi_iterator_scanner::{MultiIteratorScanner, ProcessingDecision}, read_write_account_set::ReadWriteAccountSet, unprocessed_packet_batches::{ - DeserializedPacket, PacketBatchInsertionMetrics, UnprocessedPacketBatches, + DeserializedPacket, PacketBatchInsertionMetrics, }, BankingStageStats, }, itertools::Itertools, - min_max_heap::MinMaxHeap, solana_accounts_db::account_locks::validate_account_locks, solana_measure::measure_us, solana_runtime::bank::Bank, @@ -35,11 +34,6 @@ pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 64; /// Maximum number of votes a single receive call will accept const MAX_NUM_VOTES_RECEIVE: usize = 10_000; -#[derive(Debug)] -pub enum UnprocessedTransactionStorage { - VoteStorage(VoteStorage), -} - #[derive(Debug)] pub struct VoteStorage { latest_unprocessed_votes: Arc, @@ -221,115 +215,6 @@ where ) } -impl UnprocessedTransactionStorage { - pub fn new_vote_storage( - latest_unprocessed_votes: Arc, - vote_source: VoteSource, - ) -> Self { - Self::VoteStorage(VoteStorage { - latest_unprocessed_votes, - vote_source, - }) - } - - pub fn is_empty(&self) -> bool { - let Self::VoteStorage(vote_storage) = self; - vote_storage.is_empty() - } - - pub fn len(&self) -> usize { - let Self::VoteStorage(vote_storage) = self; - vote_storage.len() - } - - pub fn get_min_priority(&self) -> Option { - None - } - - pub fn get_max_priority(&self) -> Option { - None - } - - /// Returns the maximum number of packets a receive should accept - pub fn max_receive_size(&self) -> usize { - let Self::VoteStorage(vote_storage) = self; - vote_storage.max_receive_size() - } - - pub fn should_not_process(&self) -> bool { - // The gossip vote thread does not need to process any votes, that is - // handled by the tpu vote thread - let Self::VoteStorage(vote_storage) = self; - matches!(vote_storage.vote_source, VoteSource::Gossip) - } - - pub fn forward_option(&self) -> ForwardOption { - let Self::VoteStorage(vote_storage) = self; - vote_storage.forward_option() - } - - pub fn clear_forwarded_packets(&mut self) { - let Self::VoteStorage(vote_storage) = self; - vote_storage.clear_forwarded_packets() - } - - pub(crate) fn insert_batch( - &mut self, - deserialized_packets: Vec, - ) -> InsertPacketBatchSummary { - let Self::VoteStorage(vote_storage) = self; - InsertPacketBatchSummary::from(vote_storage.insert_batch(deserialized_packets)) - } - - pub fn filter_forwardable_packets_and_add_batches( - &mut self, - bank: Arc, - forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, - ) -> FilterForwardingResults { - let Self::VoteStorage(vote_storage) = self; - vote_storage - .filter_forwardable_packets_and_add_batches(bank, forward_packet_batches_by_accounts) - } - - /// The processing function takes a stream of packets ready to process, and returns the indices - /// of the unprocessed packets that are eligible for retry. A return value of None means that - /// all packets are unprocessed and eligible for retry. - #[must_use] - pub fn process_packets( - &mut self, - bank: Arc, - banking_stage_stats: &BankingStageStats, - slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - processing_function: F, - ) -> bool - where - F: FnMut( - &Vec>, - &mut ConsumeScannerPayload, - ) -> Option>, - { - let Self::VoteStorage(vote_storage) = self; - vote_storage.process_packets( - bank, - banking_stage_stats, - slot_metrics_tracker, - processing_function, - ) - } - - pub(crate) fn clear(&mut self) { - match self { - Self::LocalTransactionStorage(_) => {} - Self::VoteStorage(vote_storage) => vote_storage.clear(), - } - } - - pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) { - let Self::VoteStorage(vote_storage) = self; - vote_storage.cache_epoch_boundary_info(bank); - } -} - impl VoteStorage { pub fn new( latest_unprocessed_votes: Arc, From b00b8641d094b5d1d7dac20faa7030bc5d11aee9 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Tue, 25 Feb 2025 08:36:41 -0800 Subject: [PATCH 06/17] Remove dead code and test --- core/benches/banking_stage.rs | 50 +-- core/src/banking_stage.rs | 5 +- core/src/banking_stage/consumer.rs | 306 +----------------- core/src/banking_stage/leader_slot_metrics.rs | 4 +- core/src/banking_stage/packet_receiver.rs | 4 +- .../unprocessed_transaction_storage.rs | 210 ++---------- 6 files changed, 27 insertions(+), 552 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 8b83407334d54c..665dfdd42c2bbe 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -25,7 +25,7 @@ use { leader_slot_metrics::LeaderSlotMetricsTracker, qos_service::QosService, unprocessed_packet_batches::*, - unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, + unprocessed_transaction_storage::{ThreadType, VoteStorage}, BankingStage, BankingStageStats, }, banking_trace::BankingTracer, @@ -82,54 +82,6 @@ fn check_txs(receiver: &Arc>, ref_tx_count: usize) { assert_eq!(total, ref_tx_count); } -#[bench] -fn bench_consume_buffered(bencher: &mut Bencher) { - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000); - let bank = Bank::new_for_benches(&genesis_config) - .wrap_with_bank_forks_for_tests() - .0; - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Arc::new( - Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"), - ); - let (exit, poh_recorder, poh_service, _signal_receiver) = - create_test_recorder(bank, blockstore, None, None); - - let recorder = poh_recorder.read().unwrap().new_recorder(); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - - let tx = test_tx(); - let transactions = vec![tx; 4194304]; - let batches = transactions - .iter() - .filter_map(|transaction| { - let packet = Packet::from_data(None, transaction).ok().unwrap(); - DeserializedPacket::new(packet).ok() - }) - .collect::>(); - let batches_len = batches.len(); - let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(batches, 2 * batches_len), - ThreadType::Transactions, - ); - let (s, _r) = unbounded(); - let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64))); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - // This tests the performance of buffering packets. - // If the packet buffers are copied, performance will be poor. - bencher.iter(move || { - consumer.consume_buffered_packets( - &bank_start, - &mut transaction_buffer, - &BankingStageStats::default(), - &mut LeaderSlotMetricsTracker::new(0), - ); - }); - - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); -} - fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec { let to_pubkey = pubkey::new_rand(); let dummy = system_transaction::transfer(mint_keypair, &to_pubkey, 1, hash); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 48b0501083b19f..e628601000651b 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -441,10 +441,7 @@ impl BankingStage { committer.clone(), transaction_recorder.clone(), log_messages_bytes_limit, - VoteStorage::new( - latest_unprocessed_votes.clone(), - vote_source, - ), + VoteStorage::new(latest_unprocessed_votes.clone(), vote_source), )); } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 31070dc5a8dee4..6c3cba30504134 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -137,9 +137,7 @@ impl Consumer { ); if reached_end_of_slot { - slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len( - vote_storage.len() as u64, - ); + slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len(vote_storage.len() as u64); } proc_start.stop(); @@ -2147,308 +2145,6 @@ mod tests { let _ = poh_simulator.join(); } - #[test] - fn test_consume_buffered_packets() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let (transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) = - setup_conflicting_transactions(ledger_path.path()); - let recorder: TransactionRecorder = poh_recorder.read().unwrap().new_recorder(); - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions), - ThreadType::Transactions, - ); - - let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - - // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) - assert!(!poh_recorder.read().unwrap().has_bank()); - assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); - // When the working bank in poh_recorder is Some, all packets should be processed. - // Multi-Iterator will process them 1-by-1 if all txs are conflicting. - poh_recorder.write().unwrap().set_bank_for_test(bank); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let banking_stage_stats = BankingStageStats::default(); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - - // Check that all packets were processed without retrying - assert!(buffered_packet_batches.is_empty()); - assert_eq!( - banking_stage_stats - .consumed_buffered_packets_count - .load(Ordering::Relaxed), - num_conflicting_transactions - ); - assert_eq!( - banking_stage_stats - .rebuffered_packets_count - .load(Ordering::Relaxed), - 0 - ); - // Use bank to check the number of entries (batches) - assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); - assert_eq!( - bank_start.working_bank.transaction_entries_count(), - num_conflicting_transactions as u64 - ); - - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - let _ = poh_simulator.join(); - } - - #[test] - fn test_consume_buffered_packets_sanitization_error() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let (mut transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) = - setup_conflicting_transactions(ledger_path.path()); - let duplicate_account_key = transactions[0].message.account_keys[0]; - transactions[0] - .message - .account_keys - .push(duplicate_account_key); // corrupt transaction - let recorder = poh_recorder.read().unwrap().new_recorder(); - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions), - ThreadType::Transactions, - ); - - let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - - // When the working bank in poh_recorder is None, no packets should be processed - assert!(!poh_recorder.read().unwrap().has_bank()); - assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); - // When the working bank in poh_recorder is Some, all packets should be processed. - // Multi-Iterator will process them 1-by-1 if all txs are conflicting. - poh_recorder.write().unwrap().set_bank_for_test(bank); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &BankingStageStats::default(), - &mut LeaderSlotMetricsTracker::new(0), - ); - assert!(buffered_packet_batches.is_empty()); - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - let _ = poh_simulator.join(); - } - - #[test] - fn test_consume_buffered_packets_retryable() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let (transactions, bank, _bank_forks, poh_recorder, _entry_receiver, _, poh_simulator) = - setup_conflicting_transactions(ledger_path.path()); - let recorder = poh_recorder.read().unwrap().new_recorder(); - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let retryable_packet = deserialized_packets[0].clone(); - let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions), - ThreadType::Transactions, - ); - - let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - - // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) - assert!(!poh_recorder.read().unwrap().has_bank()); - assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); - // When the working bank in poh_recorder is Some, all packets should be processed - // except except for retryable errors. Manually take the lock of a transaction to - // simulate another thread processing a transaction with that lock. - poh_recorder - .write() - .unwrap() - .set_bank_for_test(bank.clone()); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - - let lock_account = transactions[0].message.account_keys[1]; - let manual_lock_tx = RuntimeTransaction::from_transaction_for_tests( - system_transaction::transfer(&Keypair::new(), &lock_account, 1, bank.last_blockhash()), - ); - let _ = bank_start.working_bank.accounts().lock_accounts( - std::iter::once(&manual_lock_tx), - bank_start.working_bank.get_transaction_account_lock_limit(), - ); - - let banking_stage_stats = BankingStageStats::default(); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - - // Check that all but 1 transaction was processed. And that it was rebuffered. - assert_eq!(buffered_packet_batches.len(), 1); - assert_eq!( - buffered_packet_batches.iter().next().unwrap(), - &retryable_packet - ); - assert_eq!( - banking_stage_stats - .consumed_buffered_packets_count - .load(Ordering::Relaxed), - num_conflicting_transactions - 1, - ); - assert_eq!( - banking_stage_stats - .rebuffered_packets_count - .load(Ordering::Relaxed), - 1 - ); - // Use bank to check the number of entries (batches) - assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); - assert_eq!( - bank_start.working_bank.transaction_entries_count(), - num_conflicting_transactions as u64 - 1 - ); - - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - let _ = poh_simulator.join(); - } - - #[test] - fn test_consume_buffered_packets_batch_priority_guard() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let ( - _, - bank, - _bank_forks, - poh_recorder, - _entry_receiver, - genesis_config_info, - poh_simulator, - ) = setup_conflicting_transactions(ledger_path.path()); - let recorder = poh_recorder.read().unwrap().new_recorder(); - - // Setup transactions: - // [(AB), (BC), (CD)] - // (AB) and (BC) are conflicting, and cannot go into the same batch. - // (AB) and (CD) are not conflict. However, (CD) should not be able to take locks needed by (BC). - let keypair_a = Keypair::new(); - let keypair_b = Keypair::new(); - let keypair_c = Keypair::new(); - let keypair_d = Keypair::new(); - for keypair in &[&keypair_a, &keypair_b, &keypair_c, &keypair_d] { - bank.transfer(5_000, &genesis_config_info.mint_keypair, &keypair.pubkey()) - .unwrap(); - } - - let make_prioritized_transfer = |from: &Keypair, to, lamports, priority| -> Transaction { - let ixs = vec![ - system_instruction::transfer(&from.pubkey(), to, lamports), - compute_budget::ComputeBudgetInstruction::set_compute_unit_price(priority), - ]; - let message = Message::new(&ixs, Some(&from.pubkey())); - Transaction::new(&[from], message, bank.last_blockhash()) - }; - - let transactions = vec![ - make_prioritized_transfer(&keypair_a, &keypair_b.pubkey(), 1, 3), - make_prioritized_transfer(&keypair_b, &keypair_c.pubkey(), 1, 2), - make_prioritized_transfer(&keypair_c, &keypair_d.pubkey(), 1, 1), - ]; - - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter(deserialized_packets, num_conflicting_transactions), - ThreadType::Transactions, - ); - - let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None); - - // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) - assert!(!poh_recorder.read().unwrap().has_bank()); - assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); - // When the working bank in poh_recorder is Some, all packets should be processed. - // Multi-Iterator will process them 1-by-1 if all txs are conflicting. - poh_recorder.write().unwrap().set_bank_for_test(bank); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let banking_stage_stats = BankingStageStats::default(); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - - // Check that all packets were processed without retrying - assert!(buffered_packet_batches.is_empty()); - assert_eq!( - banking_stage_stats - .consumed_buffered_packets_count - .load(Ordering::Relaxed), - num_conflicting_transactions - ); - assert_eq!( - banking_stage_stats - .rebuffered_packets_count - .load(Ordering::Relaxed), - 0 - ); - // Use bank to check the number of entries (batches) - assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); - assert_eq!( - bank_start.working_bank.transaction_entries_count(), - 4 + num_conflicting_transactions as u64 // 4 for funding transfers - ); - - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - let _ = poh_simulator.join(); - } - #[test] fn test_accumulate_execute_units_and_time() { let mut execute_timings = ExecuteTimings::default(); diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index d1c2a30a946d22..02b6c60a404998 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -3,9 +3,7 @@ use { consumer::LeaderProcessedTransactionCounts, leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics}, packet_deserializer::PacketReceiverStats, - unprocessed_transaction_storage::{ - InsertPacketBatchSummary, VoteStorage, - }, + unprocessed_transaction_storage::{InsertPacketBatchSummary, VoteStorage}, }, solana_poh::poh_recorder::BankStart, solana_sdk::{clock::Slot, saturating_add_assign}, diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index a6ac1530be850d..99fa2435287e98 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -68,9 +68,7 @@ impl PacketReceiver { result } - fn get_receive_timeout( - unprocessed_transaction_storage: &VoteStorage, - ) -> Duration { + fn get_receive_timeout(unprocessed_transaction_storage: &VoteStorage) -> Duration { // Gossip thread (does not process) should not continuously receive with 0 duration. // This can cause the thread to run at 100% CPU because it is continuously polling. if !unprocessed_transaction_storage.should_not_process() diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 2ed3b1b54cb449..dc55674ddacd0e 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -9,9 +9,7 @@ use { leader_slot_metrics::LeaderSlotMetricsTracker, multi_iterator_scanner::{MultiIteratorScanner, ProcessingDecision}, read_write_account_set::ReadWriteAccountSet, - unprocessed_packet_batches::{ - DeserializedPacket, PacketBatchInsertionMetrics, - }, + unprocessed_packet_batches::{DeserializedPacket, PacketBatchInsertionMetrics}, BankingStageStats, }, itertools::Itertools, @@ -252,20 +250,22 @@ impl VoteStorage { &mut self, deserialized_packets: Vec, ) -> InsertPacketBatchSummary { - InsertPacketBatchSummary::from(self.latest_unprocessed_votes.insert_batch( - deserialized_packets - .into_iter() - .filter_map(|deserialized_packet| { - LatestValidatorVotePacket::new_from_immutable( - Arc::new(deserialized_packet), - self.vote_source, - self.latest_unprocessed_votes - .should_deprecate_legacy_vote_ixs(), - ) - .ok() - }), - false, // should_replenish_taken_votes - )) + InsertPacketBatchSummary::from( + self.latest_unprocessed_votes.insert_batch( + deserialized_packets + .into_iter() + .filter_map(|deserialized_packet| { + LatestValidatorVotePacket::new_from_immutable( + Arc::new(deserialized_packet), + self.vote_source, + self.latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(), + ) + .ok() + }), + false, // should_replenish_taken_votes + ), + ) } // returns `true` if the end of slot is reached @@ -344,11 +344,11 @@ impl VoteStorage { scanner.finalize().payload.reached_end_of_slot } - fn clear(&mut self) { + pub fn clear(&mut self) { self.latest_unprocessed_votes.clear(); } - fn cache_epoch_boundary_info(&mut self, bank: &Bank) { + pub fn cache_epoch_boundary_info(&mut self, bank: &Bank) { if matches!(self.vote_source, VoteSource::Gossip) { panic!("Gossip vote thread should not be checking epoch boundary"); } @@ -363,181 +363,17 @@ impl VoteStorage { } } -impl ThreadLocalUnprocessedPackets { - fn is_empty(&self) -> bool { - self.unprocessed_packet_batches.is_empty() - } - - pub fn thread_type(&self) -> ThreadType { - self.thread_type - } - - fn len(&self) -> usize { - self.unprocessed_packet_batches.len() - } - - pub fn get_min_compute_unit_price(&self) -> Option { - self.unprocessed_packet_batches.get_min_compute_unit_price() - } - - pub fn get_max_compute_unit_price(&self) -> Option { - self.unprocessed_packet_batches.get_max_compute_unit_price() - } - - fn max_receive_size(&self) -> usize { - self.unprocessed_packet_batches.capacity() - self.unprocessed_packet_batches.len() - } - - #[cfg(test)] - fn iter(&mut self) -> impl Iterator { - self.unprocessed_packet_batches.iter() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.unprocessed_packet_batches.iter_mut() - } - - fn insert_batch( - &mut self, - deserialized_packets: Vec, - ) -> PacketBatchInsertionMetrics { - self.unprocessed_packet_batches.insert_batch( - deserialized_packets - .into_iter() - .map(DeserializedPacket::from_immutable_section), - ) - } - - /// Take self.unprocessed_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. - fn take_priority_queue(&mut self) -> MinMaxHeap> { - std::mem::replace( - &mut self.unprocessed_packet_batches.packet_priority_queue, - MinMaxHeap::new(), // <-- no need to reserve capacity as we will replace this - ) - } - - /// Verify that the priority queue and map are consistent and that original capacity is maintained. - fn verify_priority_queue(&self, original_capacity: usize) { - // Assert unprocessed queue is still consistent and maintains original capacity - assert_eq!( - self.unprocessed_packet_batches - .packet_priority_queue - .capacity(), - original_capacity - ); - assert_eq!( - self.unprocessed_packet_batches.packet_priority_queue.len(), - self.unprocessed_packet_batches - .message_hash_to_transaction - .len() - ); - } - - fn collect_retained_packets( - message_hash_to_transaction: &mut HashMap, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) -> Vec> { - Self::remove_non_retained_packets( - message_hash_to_transaction, - packets_to_process, - retained_packet_indexes, - ); - retained_packet_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec() - } - - /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have - /// been removed from UnprocessedPacketBatches.packet_priority_queue - fn remove_non_retained_packets( - message_hash_to_transaction: &mut HashMap, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) { - filter_processed_packets( - retained_packet_indexes - .iter() - .chain(std::iter::once(&packets_to_process.len())), - |start, end| { - for processed_packet in &packets_to_process[start..end] { - message_hash_to_transaction.remove(processed_packet.message_hash()); - } - }, - ) - } - - // returns `true` if reached end of slot - fn process_packets( - &mut self, - bank: &Bank, - banking_stage_stats: &BankingStageStats, - slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - mut processing_function: F, - ) -> bool - where - F: FnMut( - &Vec>, - &mut ConsumeScannerPayload, - ) -> Option>, - { - let mut retryable_packets = self.take_priority_queue(); - let original_capacity = retryable_packets.capacity(); - let mut new_retryable_packets = MinMaxHeap::with_capacity(original_capacity); - let all_packets_to_process = retryable_packets.drain_desc().collect_vec(); - - let should_process_packet = - |packet: &Arc, payload: &mut ConsumeScannerPayload| { - consume_scan_should_process_packet(bank, banking_stage_stats, packet, payload) - }; - let mut scanner = create_consume_multi_iterator( - &all_packets_to_process, - slot_metrics_tracker, - &mut self.unprocessed_packet_batches.message_hash_to_transaction, - should_process_packet, - ); - - while let Some((packets_to_process, payload)) = scanner.iterate() { - let packets_to_process = packets_to_process - .iter() - .map(|p| (*p).clone()) - .collect_vec(); - let retryable_packets = if let Some(retryable_transaction_indexes) = - processing_function(&packets_to_process, payload) - { - Self::collect_retained_packets( - payload.message_hash_to_transaction, - &packets_to_process, - &retryable_transaction_indexes, - ) - } else { - packets_to_process - }; - - new_retryable_packets.extend(retryable_packets); - } - - let reached_end_of_slot = scanner.finalize().payload.reached_end_of_slot; - - self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets; - self.verify_priority_queue(original_capacity); - - reached_end_of_slot - } -} - #[cfg(test)] mod tests { use { super::*, - itertools::iproduct, solana_perf::packet::{Packet, PacketFlags}, solana_runtime::genesis_utils, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, system_transaction, + transaction::Transaction, }, solana_vote::vote_transaction::new_tower_sync_transaction, solana_vote_program::vote_state::TowerSync, @@ -637,10 +473,8 @@ mod tests { let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[vote_keypair.pubkey()]); - let mut transaction_storage = VoteStorage::new( - Arc::new(latest_unprocessed_votes), - VoteSource::Tpu, - ); + let mut transaction_storage = + VoteStorage::new(Arc::new(latest_unprocessed_votes), VoteSource::Tpu); transaction_storage.insert_batch(vec![ImmutableDeserializedPacket::new(vote.clone())?]); assert_eq!(1, transaction_storage.len()); From d157146e1307144b03a460b3605d8a116fd821d7 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Tue, 25 Feb 2025 21:55:26 -0800 Subject: [PATCH 07/17] Remove unused imports --- core/benches/banking_stage.rs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 665dfdd42c2bbe..332c8e5264258d 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -19,15 +19,7 @@ use { rand::{thread_rng, Rng}, rayon::prelude::*, solana_core::{ - banking_stage::{ - committer::Committer, - consumer::Consumer, - leader_slot_metrics::LeaderSlotMetricsTracker, - qos_service::QosService, - unprocessed_packet_batches::*, - unprocessed_transaction_storage::{ThreadType, VoteStorage}, - BankingStage, BankingStageStats, - }, + banking_stage::BankingStage, banking_trace::BankingTracer, }, solana_entry::entry::{next_hash, Entry}, @@ -38,10 +30,7 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path_auto_delete, }, - solana_perf::{ - packet::{to_packet_batches, Packet}, - test_tx::test_tx, - }, + solana_perf::packet::to_packet_batches, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, solana_runtime::{ bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, From 8126701445635cfb3283e86a1cd9bd7f78141012 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Tue, 25 Feb 2025 21:57:50 -0800 Subject: [PATCH 08/17] Remove LeaderPrioritizationFeesMetrics --- core/src/banking_stage.rs | 3 +- core/src/banking_stage/leader_slot_metrics.rs | 84 ++++--------------- 2 files changed, 16 insertions(+), 71 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e628601000651b..ddb9b29d30bc0d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -627,8 +627,7 @@ impl BankingStage { let (decision, make_decision_us) = measure_us!(decision_maker.make_consume_or_forward_decision()); let metrics_action = slot_metrics_tracker.check_leader_slot_boundary( - decision.bank_start(), - Some(unprocessed_transaction_storage), + decision.bank_start() ); slot_metrics_tracker.increment_make_decision_us(make_decision_us); diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index 02b6c60a404998..7d99a2ceef07bd 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -3,7 +3,7 @@ use { consumer::LeaderProcessedTransactionCounts, leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics}, packet_deserializer::PacketReceiverStats, - unprocessed_transaction_storage::{InsertPacketBatchSummary, VoteStorage}, + unprocessed_transaction_storage::InsertPacketBatchSummary, }, solana_poh::poh_recorder::BankStart, solana_sdk::{clock::Slot, saturating_add_assign}, @@ -92,50 +92,6 @@ impl CommittedTransactionsCounts { } } -// Metrics describing prioritization fee information for each transaction storage before processing transactions -#[derive(Debug, Default)] -struct LeaderPrioritizationFeesMetrics { - // minimum prioritization fees in the MinMaxHeap - min_prioritization_fees_per_cu: u64, - // maximum prioritization fees in the MinMaxHeap - max_prioritization_fees_per_cu: u64, -} - -impl LeaderPrioritizationFeesMetrics { - fn new(unprocessed_transaction_storage: Option<&VoteStorage>) -> Self { - if let Some(unprocessed_transaction_storage) = unprocessed_transaction_storage { - Self { - min_prioritization_fees_per_cu: unprocessed_transaction_storage - .get_min_priority() - .unwrap_or_default(), - max_prioritization_fees_per_cu: unprocessed_transaction_storage - .get_max_priority() - .unwrap_or_default(), - } - } else { - Self::default() - } - } - - fn report(&self, id: &str, slot: Slot) { - datapoint_info!( - "banking_stage-leader_prioritization_fees_info", - "id" => id, - ("slot", slot, i64), - ( - "min_prioritization_fees_per_cu", - self.min_prioritization_fees_per_cu, - i64 - ), - ( - "max_prioritization_fees_per_cu", - self.max_prioritization_fees_per_cu, - i64 - ) - ); - } -} - // Metrics describing packets ingested/processed in various parts of BankingStage during this // validator's leader slot #[derive(Debug, Default)] @@ -463,8 +419,6 @@ pub(crate) struct LeaderSlotMetrics { timing_metrics: LeaderSlotTimingMetrics, - prioritization_fees_metric: LeaderPrioritizationFeesMetrics, - // Used by tests to check if the `self.report()` method was called is_reported: bool, } @@ -474,7 +428,6 @@ impl LeaderSlotMetrics { id: u32, slot: Slot, bank_creation_time: &Instant, - unprocessed_transaction_storage: Option<&VoteStorage>, ) -> Self { Self { id: id.to_string(), @@ -483,9 +436,6 @@ impl LeaderSlotMetrics { transaction_error_metrics: TransactionErrorMetrics::new(), vote_packet_count_metrics: VotePacketCountMetrics::new(), timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time), - prioritization_fees_metric: LeaderPrioritizationFeesMetrics::new( - unprocessed_transaction_storage, - ), is_reported: false, } } @@ -497,7 +447,6 @@ impl LeaderSlotMetrics { report_transaction_error_metrics(&self.transaction_error_metrics, &self.id, self.slot); self.packet_count_metrics.report(&self.id, self.slot); self.vote_packet_count_metrics.report(&self.id, self.slot); - self.prioritization_fees_metric.report(&self.id, self.slot); } /// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None @@ -569,7 +518,6 @@ impl LeaderSlotMetricsTracker { pub(crate) fn check_leader_slot_boundary( &mut self, bank_start: Option<&BankStart>, - unprocessed_transaction_storage: Option<&VoteStorage>, ) -> MetricsTrackerAction { match (self.leader_slot_metrics.as_mut(), bank_start) { (None, None) => MetricsTrackerAction::Noop, @@ -585,7 +533,6 @@ impl LeaderSlotMetricsTracker { self.id, bank_start.working_bank.slot(), &bank_start.bank_creation_time, - unprocessed_transaction_storage, ))) } @@ -597,7 +544,6 @@ impl LeaderSlotMetricsTracker { self.id, bank_start.working_bank.slot(), &bank_start.bank_creation_time, - unprocessed_transaction_storage, ))) } else { MetricsTrackerAction::Noop @@ -1023,7 +969,7 @@ mod tests { .. } = setup_test_slot_boundary_banks(); // Test that with no bank being tracked, and no new bank being tracked, nothing is reported - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); assert_eq!( mem::discriminant(&MetricsTrackerAction::Noop), mem::discriminant(&action) @@ -1044,7 +990,7 @@ mod tests { // Metrics should not be reported because leader slot has not ended assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::NewTracker(None)), mem::discriminant(&action) @@ -1068,12 +1014,12 @@ mod tests { { // Setup first_bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert reporting if slot has ended - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), mem::discriminant(&action) @@ -1086,7 +1032,7 @@ mod tests { } { // Assert no-op if still no new bank - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); assert_eq!( mem::discriminant(&MetricsTrackerAction::Noop), mem::discriminant(&action) @@ -1108,13 +1054,13 @@ mod tests { { // Setup with first_bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert nop-op if same bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::Noop), mem::discriminant(&action) @@ -1123,7 +1069,7 @@ mod tests { } { // Assert reporting if slot has ended - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), mem::discriminant(&action) @@ -1152,13 +1098,13 @@ mod tests { { // Setup with first_bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert reporting if new bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&next_poh_recorder_bank), None); + .check_leader_slot_boundary(Some(&next_poh_recorder_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)), mem::discriminant(&action) @@ -1171,7 +1117,7 @@ mod tests { } { // Assert reporting if slot has ended - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), mem::discriminant(&action) @@ -1199,13 +1145,13 @@ mod tests { { // Setup with next_bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&next_poh_recorder_bank), None); + .check_leader_slot_boundary(Some(&next_poh_recorder_bank)); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert reporting if new bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)), mem::discriminant(&action) @@ -1218,7 +1164,7 @@ mod tests { } { // Assert reporting if slot has ended - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), mem::discriminant(&action) From 6aaafeda76083b59e4563b205ea3ca65ec52eff5 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Tue, 25 Feb 2025 22:19:23 -0800 Subject: [PATCH 09/17] cargo clippy --- core/benches/banking_stage.rs | 5 +- core/src/banking_stage.rs | 4 +- core/src/banking_stage/consumer.rs | 82 +------------------ core/src/banking_stage/leader_slot_metrics.rs | 10 +-- .../unprocessed_transaction_storage.rs | 2 - 5 files changed, 9 insertions(+), 94 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 332c8e5264258d..f46bdf6d800a57 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -18,10 +18,7 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, - solana_core::{ - banking_stage::BankingStage, - banking_trace::BankingTracer, - }, + solana_core::{banking_stage::BankingStage, banking_trace::BankingTracer}, solana_entry::entry::{next_hash, Entry}, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ddb9b29d30bc0d..7c12f1f9738480 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -626,9 +626,7 @@ impl BankingStage { } let (decision, make_decision_us) = measure_us!(decision_maker.make_consume_or_forward_decision()); - let metrics_action = slot_metrics_tracker.check_leader_slot_boundary( - decision.bank_start() - ); + let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start()); slot_metrics_tracker.increment_make_decision_us(make_decision_us); match decision { diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 6c3cba30504134..f8ffee9fe4d798 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -847,10 +847,8 @@ impl Consumer { mod tests { use { super::*, - crate::banking_stage::{ - immutable_deserialized_packet::DeserializedPacketError, - tests::{create_slow_genesis_config, sanitize_transactions, simulate_poh}, - unprocessed_packet_batches::DeserializedPacket, + crate::banking_stage::tests::{ + create_slow_genesis_config, sanitize_transactions, simulate_poh, }, crossbeam_channel::{unbounded, Receiver}, solana_cost_model::{cost_model::CostModel, transaction_cost::TransactionCost}, @@ -862,10 +860,9 @@ mod tests { get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, - solana_perf::packet::Packet, - solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry}, + solana_poh::poh_recorder::{PohRecorder, Record}, solana_rpc::transaction_status_service::TransactionStatusService, - solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache}, + solana_runtime::prioritization_fee_cache::PrioritizationFeeCache, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ account::AccountSharedData, @@ -896,7 +893,6 @@ mod tests { solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, std::{ borrow::Cow, - path::Path, sync::{ atomic::{AtomicBool, AtomicU64}, RwLock, @@ -999,76 +995,6 @@ mod tests { account } - #[allow(clippy::type_complexity)] - fn setup_conflicting_transactions( - ledger_path: &Path, - ) -> ( - Vec, - Arc, - Arc>, - Arc>, - Receiver, - GenesisConfigInfo, - JoinHandle<()>, - ) { - Blockstore::destroy(ledger_path).unwrap(); - let genesis_config_info = create_slow_genesis_config(100_000_000); - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = &genesis_config_info; - let blockstore = - Blockstore::open(ledger_path).expect("Expected to be able to open database ledger"); - let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(genesis_config); - let exit = Arc::new(AtomicBool::default()); - let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - bank.clone(), - Some((4, 4)), - bank.ticks_per_slot(), - Arc::new(blockstore), - &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), - &PohConfig::default(), - exit, - ); - let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - - // Set up unparallelizable conflicting transactions - let pubkey0 = solana_pubkey::new_rand(); - let pubkey1 = solana_pubkey::new_rand(); - let pubkey2 = solana_pubkey::new_rand(); - let transactions = vec![ - system_transaction::transfer(mint_keypair, &pubkey0, 1, genesis_config.hash()), - system_transaction::transfer(mint_keypair, &pubkey1, 1, genesis_config.hash()), - system_transaction::transfer(mint_keypair, &pubkey2, 1, genesis_config.hash()), - ]; - let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - - ( - transactions, - bank, - bank_forks, - poh_recorder, - entry_receiver, - genesis_config_info, - poh_simulator, - ) - } - - fn transactions_to_deserialized_packets( - transactions: &[Transaction], - ) -> Result, DeserializedPacketError> { - transactions - .iter() - .map(|transaction| { - let packet = Packet::from_data(None, transaction)?; - DeserializedPacket::new(packet) - }) - .collect() - } - #[test] fn test_bank_process_and_record_transactions() { solana_logger::setup(); diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index 7d99a2ceef07bd..8b2a3ebe491868 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -424,11 +424,7 @@ pub(crate) struct LeaderSlotMetrics { } impl LeaderSlotMetrics { - pub(crate) fn new( - id: u32, - slot: Slot, - bank_creation_time: &Instant, - ) -> Self { + pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self { Self { id: id.to_string(), slot, @@ -989,8 +985,8 @@ mod tests { // Test case where the thread has not detected a leader bank, and now sees a leader bank. // Metrics should not be reported because leader slot has not ended assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); - let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + let action = + leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_poh_recorder_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::NewTracker(None)), mem::discriminant(&action) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index dc55674ddacd0e..f5fa68017712b0 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -372,8 +372,6 @@ mod tests { solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, - system_transaction, - transaction::Transaction, }, solana_vote::vote_transaction::new_tower_sync_transaction, solana_vote_program::vote_state::TowerSync, From 9bc2c43202b0c22ceb1e08d106b098a8ca1248dd Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 27 Feb 2025 18:11:44 -0800 Subject: [PATCH 10/17] Rename unprocessed_transaction_storage to vote_storage --- core/src/banking_stage.rs | 30 +++++++++---------- core/src/banking_stage/consumer.rs | 2 +- core/src/banking_stage/leader_slot_metrics.rs | 2 +- core/src/banking_stage/packet_receiver.rs | 26 ++++++++-------- ...transaction_storage.rs => vote_storage.rs} | 0 5 files changed, 30 insertions(+), 30 deletions(-) rename core/src/banking_stage/{unprocessed_transaction_storage.rs => vote_storage.rs} (100%) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 7c12f1f9738480..4aab0260ee635b 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -13,7 +13,7 @@ use { leader_slot_metrics::LeaderSlotMetricsTracker, packet_receiver::PacketReceiver, qos_service::QosService, - unprocessed_transaction_storage::VoteStorage, + vote_storage::VoteStorage, }, crate::{ banking_stage::{ @@ -65,7 +65,7 @@ pub mod consumer; pub mod leader_slot_metrics; pub mod qos_service; pub mod unprocessed_packet_batches; -pub mod unprocessed_transaction_storage; +pub mod vote_storage; mod consume_worker; mod decision_maker; @@ -587,7 +587,7 @@ impl BankingStage { committer: Committer, transaction_recorder: TransactionRecorder, log_messages_bytes_limit: Option, - unprocessed_transaction_storage: VoteStorage, + vote_storage: VoteStorage, ) -> JoinHandle<()> { let mut packet_receiver = PacketReceiver::new(id, packet_receiver); let consumer = Consumer::new( @@ -606,7 +606,7 @@ impl BankingStage { &bank_forks, &consumer, id, - unprocessed_transaction_storage, + vote_storage, ) }) .unwrap() @@ -617,11 +617,11 @@ impl BankingStage { decision_maker: &mut DecisionMaker, bank_forks: &RwLock, consumer: &Consumer, - unprocessed_transaction_storage: &mut VoteStorage, + vote_storage: &mut VoteStorage, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { - if unprocessed_transaction_storage.should_not_process() { + if vote_storage.should_not_process() { return; } let (decision, make_decision_us) = @@ -639,7 +639,7 @@ impl BankingStage { let (_, consume_buffered_packets_us) = measure_us!(consumer .consume_buffered_packets( &bank_start, - unprocessed_transaction_storage, + vote_storage, banking_stage_stats, slot_metrics_tracker, )); @@ -650,14 +650,14 @@ impl BankingStage { // get current working bank from bank_forks, use it to sanitize transaction and // load all accounts from address loader; let current_bank = bank_forks.read().unwrap().working_bank(); - unprocessed_transaction_storage.cache_epoch_boundary_info(¤t_bank); - unprocessed_transaction_storage.clear(); + vote_storage.cache_epoch_boundary_info(¤t_bank); + vote_storage.clear(); } BufferedPacketsDecision::ForwardAndHold => { // get current working bank from bank_forks, use it to sanitize transaction and // load all accounts from address loader; let current_bank = bank_forks.read().unwrap().working_bank(); - unprocessed_transaction_storage.cache_epoch_boundary_info(¤t_bank); + vote_storage.cache_epoch_boundary_info(¤t_bank); } BufferedPacketsDecision::Hold => {} } @@ -669,7 +669,7 @@ impl BankingStage { bank_forks: &RwLock, consumer: &Consumer, id: u32, - mut unprocessed_transaction_storage: VoteStorage, + mut vote_storage: VoteStorage, ) { let mut banking_stage_stats = BankingStageStats::new(id); @@ -677,14 +677,14 @@ impl BankingStage { let mut last_metrics_update = Instant::now(); loop { - if !unprocessed_transaction_storage.is_empty() + if !vote_storage.is_empty() || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD { let (_, process_buffered_packets_us) = measure_us!(Self::process_buffered_packets( decision_maker, bank_forks, consumer, - &mut unprocessed_transaction_storage, + &mut vote_storage, &banking_stage_stats, &mut slot_metrics_tracker, )); @@ -694,7 +694,7 @@ impl BankingStage { } match packet_receiver.receive_and_buffer_packets( - &mut unprocessed_transaction_storage, + &mut vote_storage, &mut banking_stage_stats, &mut slot_metrics_tracker, ) { @@ -1285,7 +1285,7 @@ mod tests { #[test_case(TransactionStructure::Sdk)] #[test_case(TransactionStructure::View)] - fn test_unprocessed_transaction_storage_full_send(transaction_struct: TransactionStructure) { + fn test_vote_storage_full_send(transaction_struct: TransactionStructure) { solana_logger::setup(); let GenesisConfigInfo { genesis_config, diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index f8ffee9fe4d798..9dbc47b5c013c7 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -8,7 +8,7 @@ use { leader_slot_timing_metrics::LeaderExecuteAndCommitTimings, qos_service::QosService, scheduler_messages::MaxAge, - unprocessed_transaction_storage::{ConsumeScannerPayload, VoteStorage}, + vote_storage::{ConsumeScannerPayload, VoteStorage}, BankingStageStats, }, itertools::Itertools, diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index 8b2a3ebe491868..5eca7f60b611c1 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -3,7 +3,7 @@ use { consumer::LeaderProcessedTransactionCounts, leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics}, packet_deserializer::PacketReceiverStats, - unprocessed_transaction_storage::InsertPacketBatchSummary, + vote_storage::InsertPacketBatchSummary, }, solana_poh::poh_recorder::BankStart, solana_sdk::{clock::Slot, saturating_add_assign}, diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index 99fa2435287e98..be395ff408ed91 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -3,7 +3,7 @@ use { immutable_deserialized_packet::ImmutableDeserializedPacket, leader_slot_metrics::LeaderSlotMetricsTracker, packet_deserializer::{PacketDeserializer, ReceivePacketResults}, - unprocessed_transaction_storage::VoteStorage, + vote_storage::VoteStorage, BankingStageStats, }, agave_banking_stage_ingress_types::BankingPacketReceiver, @@ -29,17 +29,17 @@ impl PacketReceiver { /// Receive incoming packets, push into unprocessed buffer with packet indexes pub fn receive_and_buffer_packets( &mut self, - unprocessed_transaction_storage: &mut VoteStorage, + vote_storage: &mut VoteStorage, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { let (result, recv_time_us) = measure_us!({ - let recv_timeout = Self::get_receive_timeout(unprocessed_transaction_storage); + let recv_timeout = Self::get_receive_timeout(vote_storage); let mut recv_and_buffer_measure = Measure::start("recv_and_buffer"); self.packet_deserializer .receive_packets( recv_timeout, - unprocessed_transaction_storage.max_receive_size(), + vote_storage.max_receive_size(), |packet| { packet.check_insufficent_compute_unit_limit()?; packet.check_excessive_precompiles()?; @@ -50,7 +50,7 @@ impl PacketReceiver { .map(|receive_packet_results| { self.buffer_packets( receive_packet_results, - unprocessed_transaction_storage, + vote_storage, banking_stage_stats, slot_metrics_tracker, ); @@ -68,11 +68,11 @@ impl PacketReceiver { result } - fn get_receive_timeout(unprocessed_transaction_storage: &VoteStorage) -> Duration { + fn get_receive_timeout(vote_storage: &VoteStorage) -> Duration { // Gossip thread (does not process) should not continuously receive with 0 duration. // This can cause the thread to run at 100% CPU because it is continuously polling. - if !unprocessed_transaction_storage.should_not_process() - && !unprocessed_transaction_storage.is_empty() + if !vote_storage.should_not_process() + && !vote_storage.is_empty() { // If there are buffered packets, run the equivalent of try_recv to try reading more // packets. This prevents starving BankingStage::consume_buffered_packets due to @@ -91,7 +91,7 @@ impl PacketReceiver { deserialized_packets, packet_stats, }: ReceivePacketResults, - unprocessed_transaction_storage: &mut VoteStorage, + vote_storage: &mut VoteStorage, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { @@ -104,7 +104,7 @@ impl PacketReceiver { let mut newly_buffered_packets_count = 0; let mut newly_buffered_forwarded_packets_count = 0; Self::push_unprocessed( - unprocessed_transaction_storage, + vote_storage, deserialized_packets, &mut dropped_packets_count, &mut newly_buffered_packets_count, @@ -124,11 +124,11 @@ impl PacketReceiver { .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); banking_stage_stats .current_buffered_packets_count - .swap(unprocessed_transaction_storage.len(), Ordering::Relaxed); + .swap(vote_storage.len(), Ordering::Relaxed); } fn push_unprocessed( - unprocessed_transaction_storage: &mut VoteStorage, + vote_storage: &mut VoteStorage, deserialized_packets: Vec, dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, @@ -150,7 +150,7 @@ impl PacketReceiver { .increment_newly_buffered_packets_count(deserialized_packets.len() as u64); let insert_packet_batches_summary = - unprocessed_transaction_storage.insert_batch(deserialized_packets); + vote_storage.insert_batch(deserialized_packets); slot_metrics_tracker .accumulate_insert_packet_batches_summary(&insert_packet_batches_summary); saturating_add_assign!( diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/vote_storage.rs similarity index 100% rename from core/src/banking_stage/unprocessed_transaction_storage.rs rename to core/src/banking_stage/vote_storage.rs From d4bdea281620a679b79c9a14cbeda0df8a974f82 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 27 Feb 2025 18:14:50 -0800 Subject: [PATCH 11/17] Remove ThreadType --- core/src/banking_stage/vote_storage.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/banking_stage/vote_storage.rs b/core/src/banking_stage/vote_storage.rs index f5fa68017712b0..b3a0dc833ee4b2 100644 --- a/core/src/banking_stage/vote_storage.rs +++ b/core/src/banking_stage/vote_storage.rs @@ -38,12 +38,6 @@ pub struct VoteStorage { vote_source: VoteSource, } -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub enum ThreadType { - Voting(VoteSource), - Transactions, -} - #[derive(Debug)] pub(crate) enum InsertPacketBatchSummary { VoteBatchInsertionMetrics(VoteBatchInsertionMetrics), From 4ba65d454de888975c669297faefcdc57a648cec Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 27 Feb 2025 18:16:18 -0800 Subject: [PATCH 12/17] Remove get_min_priority get_max_priority --- core/src/banking_stage/vote_storage.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/core/src/banking_stage/vote_storage.rs b/core/src/banking_stage/vote_storage.rs index b3a0dc833ee4b2..0461a9de5ee2bd 100644 --- a/core/src/banking_stage/vote_storage.rs +++ b/core/src/banking_stage/vote_storage.rs @@ -218,16 +218,6 @@ impl VoteStorage { } } - // TODO: Remove this. - pub fn get_min_priority(&self) -> Option { - None - } - - // TODO: Remove this. - pub fn get_max_priority(&self) -> Option { - None - } - pub fn is_empty(&self) -> bool { self.latest_unprocessed_votes.is_empty() } From 2e83926abaaebe20856a037e2233a8809ce53160 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 27 Feb 2025 18:17:35 -0800 Subject: [PATCH 13/17] Remove filter_processed_packets --- core/src/banking_stage/vote_storage.rs | 70 -------------------------- 1 file changed, 70 deletions(-) diff --git a/core/src/banking_stage/vote_storage.rs b/core/src/banking_stage/vote_storage.rs index 0461a9de5ee2bd..f1a24e87021f2c 100644 --- a/core/src/banking_stage/vote_storage.rs +++ b/core/src/banking_stage/vote_storage.rs @@ -362,76 +362,6 @@ mod tests { std::error::Error, }; - fn filter_processed_packets<'a, F>( - retryable_transaction_indexes: impl Iterator, - mut f: F, - ) where - F: FnMut(usize, usize), - { - let mut prev_retryable_index = 0; - for (i, retryable_index) in retryable_transaction_indexes.enumerate() { - let start = if i == 0 { 0 } else { prev_retryable_index + 1 }; - - let end = *retryable_index; - prev_retryable_index = *retryable_index; - - if start < end { - f(start, end) - } - } - } - - #[test] - fn test_filter_processed_packets() { - let retryable_indexes = [0, 1, 2, 3]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - filter_processed_packets(retryable_indexes.iter(), f); - assert!(non_retryable_indexes.is_empty()); - - let retryable_indexes = [0, 1, 2, 3, 5]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(4, 5)]); - - let retryable_indexes = [1, 2, 3]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1)]); - - let retryable_indexes = [1, 2, 3, 5]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5)]); - - let retryable_indexes = [1, 2, 3, 5, 8]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); - - let retryable_indexes = [1, 2, 3, 5, 8, 8]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); - } - #[test] fn test_process_packets_retryable_indexes_reinserted() -> Result<(), Box> { let node_keypair = Keypair::new(); From 01bfc6aeabce66d4bdbf6bfd8c340874981a413c Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 27 Feb 2025 18:25:01 -0800 Subject: [PATCH 14/17] Remove PacketBatchInsertionMetrics --- .../unprocessed_packet_batches.rs | 25 ------------------- core/src/banking_stage/vote_storage.rs | 12 +-------- 2 files changed, 1 insertion(+), 36 deletions(-) diff --git a/core/src/banking_stage/unprocessed_packet_batches.rs b/core/src/banking_stage/unprocessed_packet_batches.rs index 76325d37946286..c1944da85fa5dc 100644 --- a/core/src/banking_stage/unprocessed_packet_batches.rs +++ b/core/src/banking_stage/unprocessed_packet_batches.rs @@ -51,11 +51,6 @@ impl Ord for DeserializedPacket { } } -#[derive(Debug)] -pub struct PacketBatchInsertionMetrics { - pub(crate) num_dropped_packets: usize, -} - /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store /// PacketBatch's received from sigverify. Banking thread continuously scans the buffer /// to pick proper packets to add to the block. @@ -89,26 +84,6 @@ impl UnprocessedPacketBatches { self.message_hash_to_transaction.clear(); } - /// Insert new `deserialized_packet_batch` into inner `MinMaxHeap`, - /// ordered by the tx priority. - /// If buffer is at the max limit, the lowest priority packet is dropped - /// - /// Returns tuple of number of packets dropped - pub fn insert_batch( - &mut self, - deserialized_packets: impl Iterator, - ) -> PacketBatchInsertionMetrics { - let mut num_dropped_packets = 0; - for deserialized_packet in deserialized_packets { - if let Some(_dropped_packet) = self.push(deserialized_packet) { - num_dropped_packets += 1; - } - } - PacketBatchInsertionMetrics { - num_dropped_packets, - } - } - /// Pushes a new `deserialized_packet` into the unprocessed packet batches if it does not already /// exist. /// diff --git a/core/src/banking_stage/vote_storage.rs b/core/src/banking_stage/vote_storage.rs index f1a24e87021f2c..47fc09447d5401 100644 --- a/core/src/banking_stage/vote_storage.rs +++ b/core/src/banking_stage/vote_storage.rs @@ -9,7 +9,7 @@ use { leader_slot_metrics::LeaderSlotMetricsTracker, multi_iterator_scanner::{MultiIteratorScanner, ProcessingDecision}, read_write_account_set::ReadWriteAccountSet, - unprocessed_packet_batches::{DeserializedPacket, PacketBatchInsertionMetrics}, + unprocessed_packet_batches::DeserializedPacket, BankingStageStats, }, itertools::Itertools, @@ -41,7 +41,6 @@ pub struct VoteStorage { #[derive(Debug)] pub(crate) enum InsertPacketBatchSummary { VoteBatchInsertionMetrics(VoteBatchInsertionMetrics), - PacketBatchInsertionMetrics(PacketBatchInsertionMetrics), } impl InsertPacketBatchSummary { @@ -50,21 +49,18 @@ impl InsertPacketBatchSummary { Self::VoteBatchInsertionMetrics(metrics) => { metrics.num_dropped_gossip + metrics.num_dropped_tpu } - Self::PacketBatchInsertionMetrics(metrics) => metrics.num_dropped_packets, } } pub fn dropped_gossip_packets(&self) -> usize { match self { Self::VoteBatchInsertionMetrics(metrics) => metrics.num_dropped_gossip, - _ => 0, } } pub fn dropped_tpu_packets(&self) -> usize { match self { Self::VoteBatchInsertionMetrics(metrics) => metrics.num_dropped_tpu, - _ => 0, } } } @@ -75,12 +71,6 @@ impl From for InsertPacketBatchSummary { } } -impl From for InsertPacketBatchSummary { - fn from(metrics: PacketBatchInsertionMetrics) -> Self { - Self::PacketBatchInsertionMetrics(metrics) - } -} - /// Convenient wrapper for shared-state between banking stage processing and the /// multi-iterator checking function. pub struct ConsumeScannerPayload<'a> { From 841f4068adb73681149af275e56af6c205eb25c6 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 27 Feb 2025 18:31:14 -0800 Subject: [PATCH 15/17] cargo clippy --- core/src/banking_stage/packet_receiver.rs | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index be395ff408ed91..3f592d7ab604b5 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -37,15 +37,11 @@ impl PacketReceiver { let recv_timeout = Self::get_receive_timeout(vote_storage); let mut recv_and_buffer_measure = Measure::start("recv_and_buffer"); self.packet_deserializer - .receive_packets( - recv_timeout, - vote_storage.max_receive_size(), - |packet| { - packet.check_insufficent_compute_unit_limit()?; - packet.check_excessive_precompiles()?; - Ok(packet) - }, - ) + .receive_packets(recv_timeout, vote_storage.max_receive_size(), |packet| { + packet.check_insufficent_compute_unit_limit()?; + packet.check_excessive_precompiles()?; + Ok(packet) + }) // Consumes results if Ok, otherwise we keep the Err .map(|receive_packet_results| { self.buffer_packets( @@ -71,9 +67,7 @@ impl PacketReceiver { fn get_receive_timeout(vote_storage: &VoteStorage) -> Duration { // Gossip thread (does not process) should not continuously receive with 0 duration. // This can cause the thread to run at 100% CPU because it is continuously polling. - if !vote_storage.should_not_process() - && !vote_storage.is_empty() - { + if !vote_storage.should_not_process() && !vote_storage.is_empty() { // If there are buffered packets, run the equivalent of try_recv to try reading more // packets. This prevents starving BankingStage::consume_buffered_packets due to // buffered_packet_batches containing transactions that exceed the cost model for @@ -149,8 +143,7 @@ impl PacketReceiver { slot_metrics_tracker .increment_newly_buffered_packets_count(deserialized_packets.len() as u64); - let insert_packet_batches_summary = - vote_storage.insert_batch(deserialized_packets); + let insert_packet_batches_summary = vote_storage.insert_batch(deserialized_packets); slot_metrics_tracker .accumulate_insert_packet_batches_summary(&insert_packet_batches_summary); saturating_add_assign!( From 90e068bde9a532aabe0743871f4ee38621f60dc8 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 27 Feb 2025 18:50:25 -0800 Subject: [PATCH 16/17] Collapse InsertPacketBatchSummary into VoteBatchInsertionMetrics --- .../banking_stage/latest_unprocessed_votes.rs | 14 +++++ core/src/banking_stage/leader_slot_metrics.rs | 12 ++-- core/src/banking_stage/packet_receiver.rs | 6 +- core/src/banking_stage/vote_storage.rs | 63 +++++-------------- 4 files changed, 37 insertions(+), 58 deletions(-) diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index b201b786f3ec3f..7770db3a7ebacc 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -146,6 +146,20 @@ pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_tpu: usize, } +impl VoteBatchInsertionMetrics { + pub fn total_dropped_packets(&self) -> usize { + self.num_dropped_gossip + self.num_dropped_tpu + } + + pub fn dropped_gossip_packets(&self) -> usize { + self.num_dropped_gossip + } + + pub fn dropped_tpu_packets(&self) -> usize { + self.num_dropped_tpu + } +} + #[derive(Debug)] pub struct LatestUnprocessedVotes { latest_vote_per_vote_pubkey: RwLock>>>, diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index 5eca7f60b611c1..605dea1ce8cc17 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -1,9 +1,9 @@ use { super::{ consumer::LeaderProcessedTransactionCounts, + latest_unprocessed_votes::VoteBatchInsertionMetrics, leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics}, packet_deserializer::PacketReceiverStats, - vote_storage::InsertPacketBatchSummary, }, solana_poh::poh_recorder::BankStart, solana_sdk::{clock::Slot, saturating_add_assign}, @@ -688,18 +688,18 @@ impl LeaderSlotMetricsTracker { } } - pub(crate) fn accumulate_insert_packet_batches_summary( + pub(crate) fn accumulate_vote_batch_insertion_metrics( &mut self, - insert_packet_batches_summary: &InsertPacketBatchSummary, + vote_batch_insertion_metrics: &VoteBatchInsertionMetrics, ) { self.increment_exceeded_buffer_limit_dropped_packets_count( - insert_packet_batches_summary.total_dropped_packets() as u64, + vote_batch_insertion_metrics.total_dropped_packets() as u64, ); self.increment_dropped_gossip_vote_count( - insert_packet_batches_summary.dropped_gossip_packets() as u64, + vote_batch_insertion_metrics.dropped_gossip_packets() as u64, ); self.increment_dropped_tpu_vote_count( - insert_packet_batches_summary.dropped_tpu_packets() as u64 + vote_batch_insertion_metrics.dropped_tpu_packets() as u64 ); } diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index 3f592d7ab604b5..07b6be66c868db 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -143,12 +143,12 @@ impl PacketReceiver { slot_metrics_tracker .increment_newly_buffered_packets_count(deserialized_packets.len() as u64); - let insert_packet_batches_summary = vote_storage.insert_batch(deserialized_packets); + let vote_batch_insertion_metrics = vote_storage.insert_batch(deserialized_packets); slot_metrics_tracker - .accumulate_insert_packet_batches_summary(&insert_packet_batches_summary); + .accumulate_vote_batch_insertion_metrics(&vote_batch_insertion_metrics); saturating_add_assign!( *dropped_packets_count, - insert_packet_batches_summary.total_dropped_packets() + vote_batch_insertion_metrics.total_dropped_packets() ); } } diff --git a/core/src/banking_stage/vote_storage.rs b/core/src/banking_stage/vote_storage.rs index 47fc09447d5401..b2a37b50f63c63 100644 --- a/core/src/banking_stage/vote_storage.rs +++ b/core/src/banking_stage/vote_storage.rs @@ -38,39 +38,6 @@ pub struct VoteStorage { vote_source: VoteSource, } -#[derive(Debug)] -pub(crate) enum InsertPacketBatchSummary { - VoteBatchInsertionMetrics(VoteBatchInsertionMetrics), -} - -impl InsertPacketBatchSummary { - pub fn total_dropped_packets(&self) -> usize { - match self { - Self::VoteBatchInsertionMetrics(metrics) => { - metrics.num_dropped_gossip + metrics.num_dropped_tpu - } - } - } - - pub fn dropped_gossip_packets(&self) -> usize { - match self { - Self::VoteBatchInsertionMetrics(metrics) => metrics.num_dropped_gossip, - } - } - - pub fn dropped_tpu_packets(&self) -> usize { - match self { - Self::VoteBatchInsertionMetrics(metrics) => metrics.num_dropped_tpu, - } - } -} - -impl From for InsertPacketBatchSummary { - fn from(metrics: VoteBatchInsertionMetrics) -> Self { - Self::VoteBatchInsertionMetrics(metrics) - } -} - /// Convenient wrapper for shared-state between banking stage processing and the /// multi-iterator checking function. pub struct ConsumeScannerPayload<'a> { @@ -223,22 +190,20 @@ impl VoteStorage { pub(crate) fn insert_batch( &mut self, deserialized_packets: Vec, - ) -> InsertPacketBatchSummary { - InsertPacketBatchSummary::from( - self.latest_unprocessed_votes.insert_batch( - deserialized_packets - .into_iter() - .filter_map(|deserialized_packet| { - LatestValidatorVotePacket::new_from_immutable( - Arc::new(deserialized_packet), - self.vote_source, - self.latest_unprocessed_votes - .should_deprecate_legacy_vote_ixs(), - ) - .ok() - }), - false, // should_replenish_taken_votes - ), + ) -> VoteBatchInsertionMetrics { + self.latest_unprocessed_votes.insert_batch( + deserialized_packets + .into_iter() + .filter_map(|deserialized_packet| { + LatestValidatorVotePacket::new_from_immutable( + Arc::new(deserialized_packet), + self.vote_source, + self.latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(), + ) + .ok() + }), + false, // should_replenish_taken_votes ) } From b5b33ee75931c3fdcfa383cdfed2440fe0d977c1 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Mon, 3 Mar 2025 15:48:15 -0800 Subject: [PATCH 17/17] Remove UnprocessedPacketBatches --- .../unprocessed_packet_batches.rs | 325 +----------------- 1 file changed, 3 insertions(+), 322 deletions(-) diff --git a/core/src/banking_stage/unprocessed_packet_batches.rs b/core/src/banking_stage/unprocessed_packet_batches.rs index c1944da85fa5dc..468aa9ab51e5a3 100644 --- a/core/src/banking_stage/unprocessed_packet_batches.rs +++ b/core/src/banking_stage/unprocessed_packet_batches.rs @@ -1,13 +1,7 @@ use { super::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, - min_max_heap::MinMaxHeap, solana_perf::packet::Packet, - solana_sdk::hash::Hash, - std::{ - cmp::Ordering, - collections::{hash_map::Entry, HashMap}, - sync::Arc, - }, + std::{cmp::Ordering, sync::Arc}, }; /// Holds deserialized messages, as well as computed message_hash and other things needed to create @@ -51,194 +45,6 @@ impl Ord for DeserializedPacket { } } -/// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store -/// PacketBatch's received from sigverify. Banking thread continuously scans the buffer -/// to pick proper packets to add to the block. -#[derive(Debug, Default)] -pub struct UnprocessedPacketBatches { - pub packet_priority_queue: MinMaxHeap>, - pub message_hash_to_transaction: HashMap, - batch_limit: usize, -} - -impl UnprocessedPacketBatches { - pub fn from_iter>(iter: I, capacity: usize) -> Self { - let mut unprocessed_packet_batches = Self::with_capacity(capacity); - for deserialized_packet in iter.into_iter() { - unprocessed_packet_batches.push(deserialized_packet); - } - - unprocessed_packet_batches - } - - pub fn with_capacity(capacity: usize) -> Self { - UnprocessedPacketBatches { - packet_priority_queue: MinMaxHeap::with_capacity(capacity), - message_hash_to_transaction: HashMap::with_capacity(capacity), - batch_limit: capacity, - } - } - - pub fn clear(&mut self) { - self.packet_priority_queue.clear(); - self.message_hash_to_transaction.clear(); - } - - /// Pushes a new `deserialized_packet` into the unprocessed packet batches if it does not already - /// exist. - /// - /// Returns and drops the lowest priority packet if the buffer is at capacity. - pub fn push(&mut self, deserialized_packet: DeserializedPacket) -> Option { - if self - .message_hash_to_transaction - .contains_key(deserialized_packet.immutable_section().message_hash()) - { - return None; - } - - if self.len() == self.batch_limit { - // Optimized to not allocate by calling `MinMaxHeap::push_pop_min()` - Some(self.push_pop_min(deserialized_packet)) - } else { - self.push_internal(deserialized_packet); - None - } - } - - pub fn iter(&mut self) -> impl Iterator { - self.message_hash_to_transaction.values() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.message_hash_to_transaction.iter_mut().map(|(_k, v)| v) - } - - pub fn retain(&mut self, mut f: F) - where - F: FnMut(&mut DeserializedPacket) -> bool, - { - // TODO: optimize this only when number of packets - // with outdated blockhash is high - let new_packet_priority_queue: MinMaxHeap> = self - .packet_priority_queue - .drain() - .filter(|immutable_packet| { - match self - .message_hash_to_transaction - .entry(*immutable_packet.message_hash()) - { - Entry::Vacant(_vacant_entry) => { - panic!( - "entry {} must exist to be consistent with `packet_priority_queue`", - immutable_packet.message_hash() - ); - } - Entry::Occupied(mut occupied_entry) => { - let should_retain = f(occupied_entry.get_mut()); - if !should_retain { - occupied_entry.remove_entry(); - } - should_retain - } - } - }) - .collect(); - self.packet_priority_queue = new_packet_priority_queue; - } - - pub fn len(&self) -> usize { - self.packet_priority_queue.len() - } - - pub fn is_empty(&self) -> bool { - self.packet_priority_queue.is_empty() - } - - pub fn get_min_compute_unit_price(&self) -> Option { - self.packet_priority_queue - .peek_min() - .map(|x| x.compute_unit_price()) - } - - pub fn get_max_compute_unit_price(&self) -> Option { - self.packet_priority_queue - .peek_max() - .map(|x| x.compute_unit_price()) - } - - fn push_internal(&mut self, deserialized_packet: DeserializedPacket) { - // Push into the priority queue - self.packet_priority_queue - .push(deserialized_packet.immutable_section().clone()); - - // Keep track of the original packet in the tracking hashmap - self.message_hash_to_transaction.insert( - *deserialized_packet.immutable_section().message_hash(), - deserialized_packet, - ); - } - - /// Returns the popped minimum packet from the priority queue. - fn push_pop_min(&mut self, deserialized_packet: DeserializedPacket) -> DeserializedPacket { - let immutable_packet = deserialized_packet.immutable_section().clone(); - - // Push into the priority queue - let popped_immutable_packet = self.packet_priority_queue.push_pop_min(immutable_packet); - - if popped_immutable_packet.message_hash() - != deserialized_packet.immutable_section().message_hash() - { - // Remove the popped entry from the tracking hashmap. Unwrap call is safe - // because the priority queue and hashmap are kept consistent at all times. - let removed_min = self - .message_hash_to_transaction - .remove(popped_immutable_packet.message_hash()) - .unwrap(); - - // Keep track of the original packet in the tracking hashmap - self.message_hash_to_transaction.insert( - *deserialized_packet.immutable_section().message_hash(), - deserialized_packet, - ); - removed_min - } else { - deserialized_packet - } - } - - #[cfg(test)] - fn pop_max(&mut self) -> Option { - self.packet_priority_queue - .pop_max() - .map(|immutable_packet| { - self.message_hash_to_transaction - .remove(immutable_packet.message_hash()) - .unwrap() - }) - } - - /// Pop up to the next `n` highest priority transactions from the queue. - /// Returns `None` if the queue is empty - #[cfg(test)] - fn pop_max_n(&mut self, n: usize) -> Option> { - let current_len = self.len(); - if self.is_empty() { - None - } else { - let num_to_pop = std::cmp::min(current_len, n); - Some( - std::iter::from_fn(|| Some(self.pop_max().unwrap())) - .take(num_to_pop) - .collect::>(), - ) - } - } - - pub fn capacity(&self) -> usize { - self.packet_priority_queue.capacity() - } -} - #[cfg(test)] mod tests { use { @@ -246,141 +52,16 @@ mod tests { solana_perf::packet::PacketFlags, solana_runtime::bank::Bank, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, - message::Message, + hash::Hash, reserved_account_keys::ReservedAccountKeys, signature::{Keypair, Signer}, - system_instruction, system_transaction, + system_transaction, transaction::Transaction, }, solana_vote::vote_transaction, solana_vote_program::vote_state::TowerSync, }; - fn simple_deserialized_packet() -> DeserializedPacket { - let tx = system_transaction::transfer( - &Keypair::new(), - &solana_pubkey::new_rand(), - 1, - Hash::new_unique(), - ); - let packet = Packet::from_data(None, tx).unwrap(); - DeserializedPacket::new(packet).unwrap() - } - - fn packet_with_compute_budget_details( - compute_unit_price: u64, - compute_unit_limit: u64, - ) -> DeserializedPacket { - let from_account = solana_pubkey::new_rand(); - let tx = Transaction::new_unsigned(Message::new( - &[ - ComputeBudgetInstruction::set_compute_unit_limit(compute_unit_limit as u32), - ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price), - system_instruction::transfer(&from_account, &solana_pubkey::new_rand(), 1), - ], - Some(&from_account), - )); - DeserializedPacket::new(Packet::from_data(None, tx).unwrap()).unwrap() - } - - #[test] - fn test_unprocessed_packet_batches_insert_pop_same_packet() { - let packet = simple_deserialized_packet(); - let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2); - unprocessed_packet_batches.push(packet.clone()); - unprocessed_packet_batches.push(packet.clone()); - - // There was only one unique packet, so that one should be the - // only packet returned - assert_eq!( - unprocessed_packet_batches.pop_max_n(2).unwrap(), - vec![packet] - ); - } - - #[test] - fn test_unprocessed_packet_batches_insert_minimum_packet_over_capacity() { - let heavier_packet_weight = 2; - let heavier_packet = packet_with_compute_budget_details(heavier_packet_weight, 200_000); - - let lesser_packet_weight = heavier_packet_weight - 1; - let lesser_packet = packet_with_compute_budget_details(lesser_packet_weight, 200_000); - - // Test that the heavier packet is actually heavier - let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2); - unprocessed_packet_batches.push(heavier_packet.clone()); - unprocessed_packet_batches.push(lesser_packet.clone()); - assert_eq!( - unprocessed_packet_batches.pop_max().unwrap(), - heavier_packet - ); - - let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(1); - unprocessed_packet_batches.push(heavier_packet); - - // Buffer is now at capacity, pushing the smaller weighted - // packet should immediately pop it - assert_eq!( - unprocessed_packet_batches - .push(lesser_packet.clone()) - .unwrap(), - lesser_packet - ); - } - - #[test] - fn test_unprocessed_packet_batches_pop_max_n() { - let num_packets = 10; - let packets_iter = std::iter::repeat_with(simple_deserialized_packet).take(num_packets); - let mut unprocessed_packet_batches = - UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets); - - // Test with small step size - let step_size = 1; - for _ in 0..num_packets { - assert_eq!( - unprocessed_packet_batches - .pop_max_n(step_size) - .unwrap() - .len(), - step_size - ); - } - - assert!(unprocessed_packet_batches.is_empty()); - assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); - assert!(unprocessed_packet_batches.pop_max_n(1).is_none()); - - // Test with step size larger than `num_packets` - let step_size = num_packets + 1; - let mut unprocessed_packet_batches = - UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets); - assert_eq!( - unprocessed_packet_batches - .pop_max_n(step_size) - .unwrap() - .len(), - num_packets - ); - assert!(unprocessed_packet_batches.is_empty()); - assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); - - // Test with step size equal to `num_packets` - let step_size = num_packets; - let mut unprocessed_packet_batches = - UnprocessedPacketBatches::from_iter(packets_iter, num_packets); - assert_eq!( - unprocessed_packet_batches - .pop_max_n(step_size) - .unwrap() - .len(), - step_size - ); - assert!(unprocessed_packet_batches.is_empty()); - assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); - } - #[cfg(test)] fn make_test_packets( transactions: Vec,