Skip to content

Commit

Permalink
Remove UnprocessedPacketBatches
Browse files Browse the repository at this point in the history
  • Loading branch information
ksolana committed Mar 3, 2025
1 parent 5076dd4 commit 5d630be
Showing 1 changed file with 3 additions and 322 deletions.
325 changes: 3 additions & 322 deletions core/src/banking_stage/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
use {
super::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
min_max_heap::MinMaxHeap,
solana_perf::packet::Packet,
solana_sdk::hash::Hash,
std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
sync::Arc,
},
std::{cmp::Ordering, sync::Arc},
};

/// Holds deserialized messages, as well as computed message_hash and other things needed to create
Expand Down Expand Up @@ -51,336 +45,23 @@ impl Ord for DeserializedPacket {
}
}

/// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store
/// PacketBatch's received from sigverify. Banking thread continuously scans the buffer
/// to pick proper packets to add to the block.
#[derive(Debug, Default)]
pub struct UnprocessedPacketBatches {
pub packet_priority_queue: MinMaxHeap<Arc<ImmutableDeserializedPacket>>,
pub message_hash_to_transaction: HashMap<Hash, DeserializedPacket>,
batch_limit: usize,
}

impl UnprocessedPacketBatches {
pub fn from_iter<I: IntoIterator<Item = DeserializedPacket>>(iter: I, capacity: usize) -> Self {
let mut unprocessed_packet_batches = Self::with_capacity(capacity);
for deserialized_packet in iter.into_iter() {
unprocessed_packet_batches.push(deserialized_packet);
}

unprocessed_packet_batches
}

pub fn with_capacity(capacity: usize) -> Self {
UnprocessedPacketBatches {
packet_priority_queue: MinMaxHeap::with_capacity(capacity),
message_hash_to_transaction: HashMap::with_capacity(capacity),
batch_limit: capacity,
}
}

pub fn clear(&mut self) {
self.packet_priority_queue.clear();
self.message_hash_to_transaction.clear();
}

/// Pushes a new `deserialized_packet` into the unprocessed packet batches if it does not already
/// exist.
///
/// Returns and drops the lowest priority packet if the buffer is at capacity.
pub fn push(&mut self, deserialized_packet: DeserializedPacket) -> Option<DeserializedPacket> {
if self
.message_hash_to_transaction
.contains_key(deserialized_packet.immutable_section().message_hash())
{
return None;
}

if self.len() == self.batch_limit {
// Optimized to not allocate by calling `MinMaxHeap::push_pop_min()`
Some(self.push_pop_min(deserialized_packet))
} else {
self.push_internal(deserialized_packet);
None
}
}

pub fn iter(&mut self) -> impl Iterator<Item = &DeserializedPacket> {
self.message_hash_to_transaction.values()
}

pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut DeserializedPacket> {
self.message_hash_to_transaction.iter_mut().map(|(_k, v)| v)
}

pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&mut DeserializedPacket) -> bool,
{
// TODO: optimize this only when number of packets
// with outdated blockhash is high
let new_packet_priority_queue: MinMaxHeap<Arc<ImmutableDeserializedPacket>> = self
.packet_priority_queue
.drain()
.filter(|immutable_packet| {
match self
.message_hash_to_transaction
.entry(*immutable_packet.message_hash())
{
Entry::Vacant(_vacant_entry) => {
panic!(
"entry {} must exist to be consistent with `packet_priority_queue`",
immutable_packet.message_hash()
);
}
Entry::Occupied(mut occupied_entry) => {
let should_retain = f(occupied_entry.get_mut());
if !should_retain {
occupied_entry.remove_entry();
}
should_retain
}
}
})
.collect();
self.packet_priority_queue = new_packet_priority_queue;
}

pub fn len(&self) -> usize {
self.packet_priority_queue.len()
}

pub fn is_empty(&self) -> bool {
self.packet_priority_queue.is_empty()
}

pub fn get_min_compute_unit_price(&self) -> Option<u64> {
self.packet_priority_queue
.peek_min()
.map(|x| x.compute_unit_price())
}

pub fn get_max_compute_unit_price(&self) -> Option<u64> {
self.packet_priority_queue
.peek_max()
.map(|x| x.compute_unit_price())
}

fn push_internal(&mut self, deserialized_packet: DeserializedPacket) {
// Push into the priority queue
self.packet_priority_queue
.push(deserialized_packet.immutable_section().clone());

// Keep track of the original packet in the tracking hashmap
self.message_hash_to_transaction.insert(
*deserialized_packet.immutable_section().message_hash(),
deserialized_packet,
);
}

/// Returns the popped minimum packet from the priority queue.
fn push_pop_min(&mut self, deserialized_packet: DeserializedPacket) -> DeserializedPacket {
let immutable_packet = deserialized_packet.immutable_section().clone();

// Push into the priority queue
let popped_immutable_packet = self.packet_priority_queue.push_pop_min(immutable_packet);

if popped_immutable_packet.message_hash()
!= deserialized_packet.immutable_section().message_hash()
{
// Remove the popped entry from the tracking hashmap. Unwrap call is safe
// because the priority queue and hashmap are kept consistent at all times.
let removed_min = self
.message_hash_to_transaction
.remove(popped_immutable_packet.message_hash())
.unwrap();

// Keep track of the original packet in the tracking hashmap
self.message_hash_to_transaction.insert(
*deserialized_packet.immutable_section().message_hash(),
deserialized_packet,
);
removed_min
} else {
deserialized_packet
}
}

#[cfg(test)]
fn pop_max(&mut self) -> Option<DeserializedPacket> {
self.packet_priority_queue
.pop_max()
.map(|immutable_packet| {
self.message_hash_to_transaction
.remove(immutable_packet.message_hash())
.unwrap()
})
}

/// Pop up to the next `n` highest priority transactions from the queue.
/// Returns `None` if the queue is empty
#[cfg(test)]
fn pop_max_n(&mut self, n: usize) -> Option<Vec<DeserializedPacket>> {
let current_len = self.len();
if self.is_empty() {
None
} else {
let num_to_pop = std::cmp::min(current_len, n);
Some(
std::iter::from_fn(|| Some(self.pop_max().unwrap()))
.take(num_to_pop)
.collect::<Vec<DeserializedPacket>>(),
)
}
}

pub fn capacity(&self) -> usize {
self.packet_priority_queue.capacity()
}
}

#[cfg(test)]
mod tests {
use {
super::*,
solana_perf::packet::PacketFlags,
solana_runtime::bank::Bank,
solana_sdk::{
compute_budget::ComputeBudgetInstruction,
message::Message,
hash::Hash,
reserved_account_keys::ReservedAccountKeys,
signature::{Keypair, Signer},
system_instruction, system_transaction,
system_transaction,
transaction::Transaction,
},
solana_vote::vote_transaction,
solana_vote_program::vote_state::TowerSync,
};

fn simple_deserialized_packet() -> DeserializedPacket {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, tx).unwrap();
DeserializedPacket::new(packet).unwrap()
}

fn packet_with_compute_budget_details(
compute_unit_price: u64,
compute_unit_limit: u64,
) -> DeserializedPacket {
let from_account = solana_pubkey::new_rand();
let tx = Transaction::new_unsigned(Message::new(
&[
ComputeBudgetInstruction::set_compute_unit_limit(compute_unit_limit as u32),
ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price),
system_instruction::transfer(&from_account, &solana_pubkey::new_rand(), 1),
],
Some(&from_account),
));
DeserializedPacket::new(Packet::from_data(None, tx).unwrap()).unwrap()
}

#[test]
fn test_unprocessed_packet_batches_insert_pop_same_packet() {
let packet = simple_deserialized_packet();
let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2);
unprocessed_packet_batches.push(packet.clone());
unprocessed_packet_batches.push(packet.clone());

// There was only one unique packet, so that one should be the
// only packet returned
assert_eq!(
unprocessed_packet_batches.pop_max_n(2).unwrap(),
vec![packet]
);
}

#[test]
fn test_unprocessed_packet_batches_insert_minimum_packet_over_capacity() {
let heavier_packet_weight = 2;
let heavier_packet = packet_with_compute_budget_details(heavier_packet_weight, 200_000);

let lesser_packet_weight = heavier_packet_weight - 1;
let lesser_packet = packet_with_compute_budget_details(lesser_packet_weight, 200_000);

// Test that the heavier packet is actually heavier
let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2);
unprocessed_packet_batches.push(heavier_packet.clone());
unprocessed_packet_batches.push(lesser_packet.clone());
assert_eq!(
unprocessed_packet_batches.pop_max().unwrap(),
heavier_packet
);

let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(1);
unprocessed_packet_batches.push(heavier_packet);

// Buffer is now at capacity, pushing the smaller weighted
// packet should immediately pop it
assert_eq!(
unprocessed_packet_batches
.push(lesser_packet.clone())
.unwrap(),
lesser_packet
);
}

#[test]
fn test_unprocessed_packet_batches_pop_max_n() {
let num_packets = 10;
let packets_iter = std::iter::repeat_with(simple_deserialized_packet).take(num_packets);
let mut unprocessed_packet_batches =
UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets);

// Test with small step size
let step_size = 1;
for _ in 0..num_packets {
assert_eq!(
unprocessed_packet_batches
.pop_max_n(step_size)
.unwrap()
.len(),
step_size
);
}

assert!(unprocessed_packet_batches.is_empty());
assert!(unprocessed_packet_batches.pop_max_n(0).is_none());
assert!(unprocessed_packet_batches.pop_max_n(1).is_none());

// Test with step size larger than `num_packets`
let step_size = num_packets + 1;
let mut unprocessed_packet_batches =
UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets);
assert_eq!(
unprocessed_packet_batches
.pop_max_n(step_size)
.unwrap()
.len(),
num_packets
);
assert!(unprocessed_packet_batches.is_empty());
assert!(unprocessed_packet_batches.pop_max_n(0).is_none());

// Test with step size equal to `num_packets`
let step_size = num_packets;
let mut unprocessed_packet_batches =
UnprocessedPacketBatches::from_iter(packets_iter, num_packets);
assert_eq!(
unprocessed_packet_batches
.pop_max_n(step_size)
.unwrap()
.len(),
step_size
);
assert!(unprocessed_packet_batches.is_empty());
assert!(unprocessed_packet_batches.pop_max_n(0).is_none());
}

#[cfg(test)]
fn make_test_packets(
transactions: Vec<Transaction>,
Expand Down

0 comments on commit 5d630be

Please sign in to comment.