diff --git a/core/src/banking_stage/unprocessed_packet_batches.rs b/core/src/banking_stage/unprocessed_packet_batches.rs index c1944da85fa5dc..468aa9ab51e5a3 100644 --- a/core/src/banking_stage/unprocessed_packet_batches.rs +++ b/core/src/banking_stage/unprocessed_packet_batches.rs @@ -1,13 +1,7 @@ use { super::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, - min_max_heap::MinMaxHeap, solana_perf::packet::Packet, - solana_sdk::hash::Hash, - std::{ - cmp::Ordering, - collections::{hash_map::Entry, HashMap}, - sync::Arc, - }, + std::{cmp::Ordering, sync::Arc}, }; /// Holds deserialized messages, as well as computed message_hash and other things needed to create @@ -51,194 +45,6 @@ impl Ord for DeserializedPacket { } } -/// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store -/// PacketBatch's received from sigverify. Banking thread continuously scans the buffer -/// to pick proper packets to add to the block. -#[derive(Debug, Default)] -pub struct UnprocessedPacketBatches { - pub packet_priority_queue: MinMaxHeap>, - pub message_hash_to_transaction: HashMap, - batch_limit: usize, -} - -impl UnprocessedPacketBatches { - pub fn from_iter>(iter: I, capacity: usize) -> Self { - let mut unprocessed_packet_batches = Self::with_capacity(capacity); - for deserialized_packet in iter.into_iter() { - unprocessed_packet_batches.push(deserialized_packet); - } - - unprocessed_packet_batches - } - - pub fn with_capacity(capacity: usize) -> Self { - UnprocessedPacketBatches { - packet_priority_queue: MinMaxHeap::with_capacity(capacity), - message_hash_to_transaction: HashMap::with_capacity(capacity), - batch_limit: capacity, - } - } - - pub fn clear(&mut self) { - self.packet_priority_queue.clear(); - self.message_hash_to_transaction.clear(); - } - - /// Pushes a new `deserialized_packet` into the unprocessed packet batches if it does not already - /// exist. - /// - /// Returns and drops the lowest priority packet if the buffer is at capacity. - pub fn push(&mut self, deserialized_packet: DeserializedPacket) -> Option { - if self - .message_hash_to_transaction - .contains_key(deserialized_packet.immutable_section().message_hash()) - { - return None; - } - - if self.len() == self.batch_limit { - // Optimized to not allocate by calling `MinMaxHeap::push_pop_min()` - Some(self.push_pop_min(deserialized_packet)) - } else { - self.push_internal(deserialized_packet); - None - } - } - - pub fn iter(&mut self) -> impl Iterator { - self.message_hash_to_transaction.values() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.message_hash_to_transaction.iter_mut().map(|(_k, v)| v) - } - - pub fn retain(&mut self, mut f: F) - where - F: FnMut(&mut DeserializedPacket) -> bool, - { - // TODO: optimize this only when number of packets - // with outdated blockhash is high - let new_packet_priority_queue: MinMaxHeap> = self - .packet_priority_queue - .drain() - .filter(|immutable_packet| { - match self - .message_hash_to_transaction - .entry(*immutable_packet.message_hash()) - { - Entry::Vacant(_vacant_entry) => { - panic!( - "entry {} must exist to be consistent with `packet_priority_queue`", - immutable_packet.message_hash() - ); - } - Entry::Occupied(mut occupied_entry) => { - let should_retain = f(occupied_entry.get_mut()); - if !should_retain { - occupied_entry.remove_entry(); - } - should_retain - } - } - }) - .collect(); - self.packet_priority_queue = new_packet_priority_queue; - } - - pub fn len(&self) -> usize { - self.packet_priority_queue.len() - } - - pub fn is_empty(&self) -> bool { - self.packet_priority_queue.is_empty() - } - - pub fn get_min_compute_unit_price(&self) -> Option { - self.packet_priority_queue - .peek_min() - .map(|x| x.compute_unit_price()) - } - - pub fn get_max_compute_unit_price(&self) -> Option { - self.packet_priority_queue - .peek_max() - .map(|x| x.compute_unit_price()) - } - - fn push_internal(&mut self, deserialized_packet: DeserializedPacket) { - // Push into the priority queue - self.packet_priority_queue - .push(deserialized_packet.immutable_section().clone()); - - // Keep track of the original packet in the tracking hashmap - self.message_hash_to_transaction.insert( - *deserialized_packet.immutable_section().message_hash(), - deserialized_packet, - ); - } - - /// Returns the popped minimum packet from the priority queue. - fn push_pop_min(&mut self, deserialized_packet: DeserializedPacket) -> DeserializedPacket { - let immutable_packet = deserialized_packet.immutable_section().clone(); - - // Push into the priority queue - let popped_immutable_packet = self.packet_priority_queue.push_pop_min(immutable_packet); - - if popped_immutable_packet.message_hash() - != deserialized_packet.immutable_section().message_hash() - { - // Remove the popped entry from the tracking hashmap. Unwrap call is safe - // because the priority queue and hashmap are kept consistent at all times. - let removed_min = self - .message_hash_to_transaction - .remove(popped_immutable_packet.message_hash()) - .unwrap(); - - // Keep track of the original packet in the tracking hashmap - self.message_hash_to_transaction.insert( - *deserialized_packet.immutable_section().message_hash(), - deserialized_packet, - ); - removed_min - } else { - deserialized_packet - } - } - - #[cfg(test)] - fn pop_max(&mut self) -> Option { - self.packet_priority_queue - .pop_max() - .map(|immutable_packet| { - self.message_hash_to_transaction - .remove(immutable_packet.message_hash()) - .unwrap() - }) - } - - /// Pop up to the next `n` highest priority transactions from the queue. - /// Returns `None` if the queue is empty - #[cfg(test)] - fn pop_max_n(&mut self, n: usize) -> Option> { - let current_len = self.len(); - if self.is_empty() { - None - } else { - let num_to_pop = std::cmp::min(current_len, n); - Some( - std::iter::from_fn(|| Some(self.pop_max().unwrap())) - .take(num_to_pop) - .collect::>(), - ) - } - } - - pub fn capacity(&self) -> usize { - self.packet_priority_queue.capacity() - } -} - #[cfg(test)] mod tests { use { @@ -246,141 +52,16 @@ mod tests { solana_perf::packet::PacketFlags, solana_runtime::bank::Bank, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, - message::Message, + hash::Hash, reserved_account_keys::ReservedAccountKeys, signature::{Keypair, Signer}, - system_instruction, system_transaction, + system_transaction, transaction::Transaction, }, solana_vote::vote_transaction, solana_vote_program::vote_state::TowerSync, }; - fn simple_deserialized_packet() -> DeserializedPacket { - let tx = system_transaction::transfer( - &Keypair::new(), - &solana_pubkey::new_rand(), - 1, - Hash::new_unique(), - ); - let packet = Packet::from_data(None, tx).unwrap(); - DeserializedPacket::new(packet).unwrap() - } - - fn packet_with_compute_budget_details( - compute_unit_price: u64, - compute_unit_limit: u64, - ) -> DeserializedPacket { - let from_account = solana_pubkey::new_rand(); - let tx = Transaction::new_unsigned(Message::new( - &[ - ComputeBudgetInstruction::set_compute_unit_limit(compute_unit_limit as u32), - ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price), - system_instruction::transfer(&from_account, &solana_pubkey::new_rand(), 1), - ], - Some(&from_account), - )); - DeserializedPacket::new(Packet::from_data(None, tx).unwrap()).unwrap() - } - - #[test] - fn test_unprocessed_packet_batches_insert_pop_same_packet() { - let packet = simple_deserialized_packet(); - let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2); - unprocessed_packet_batches.push(packet.clone()); - unprocessed_packet_batches.push(packet.clone()); - - // There was only one unique packet, so that one should be the - // only packet returned - assert_eq!( - unprocessed_packet_batches.pop_max_n(2).unwrap(), - vec![packet] - ); - } - - #[test] - fn test_unprocessed_packet_batches_insert_minimum_packet_over_capacity() { - let heavier_packet_weight = 2; - let heavier_packet = packet_with_compute_budget_details(heavier_packet_weight, 200_000); - - let lesser_packet_weight = heavier_packet_weight - 1; - let lesser_packet = packet_with_compute_budget_details(lesser_packet_weight, 200_000); - - // Test that the heavier packet is actually heavier - let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2); - unprocessed_packet_batches.push(heavier_packet.clone()); - unprocessed_packet_batches.push(lesser_packet.clone()); - assert_eq!( - unprocessed_packet_batches.pop_max().unwrap(), - heavier_packet - ); - - let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(1); - unprocessed_packet_batches.push(heavier_packet); - - // Buffer is now at capacity, pushing the smaller weighted - // packet should immediately pop it - assert_eq!( - unprocessed_packet_batches - .push(lesser_packet.clone()) - .unwrap(), - lesser_packet - ); - } - - #[test] - fn test_unprocessed_packet_batches_pop_max_n() { - let num_packets = 10; - let packets_iter = std::iter::repeat_with(simple_deserialized_packet).take(num_packets); - let mut unprocessed_packet_batches = - UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets); - - // Test with small step size - let step_size = 1; - for _ in 0..num_packets { - assert_eq!( - unprocessed_packet_batches - .pop_max_n(step_size) - .unwrap() - .len(), - step_size - ); - } - - assert!(unprocessed_packet_batches.is_empty()); - assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); - assert!(unprocessed_packet_batches.pop_max_n(1).is_none()); - - // Test with step size larger than `num_packets` - let step_size = num_packets + 1; - let mut unprocessed_packet_batches = - UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets); - assert_eq!( - unprocessed_packet_batches - .pop_max_n(step_size) - .unwrap() - .len(), - num_packets - ); - assert!(unprocessed_packet_batches.is_empty()); - assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); - - // Test with step size equal to `num_packets` - let step_size = num_packets; - let mut unprocessed_packet_batches = - UnprocessedPacketBatches::from_iter(packets_iter, num_packets); - assert_eq!( - unprocessed_packet_batches - .pop_max_n(step_size) - .unwrap() - .len(), - step_size - ); - assert!(unprocessed_packet_batches.is_empty()); - assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); - } - #[cfg(test)] fn make_test_packets( transactions: Vec,