diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index d1d4e677cc1589..12d870f8fc1014 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -276,7 +276,7 @@ pub struct ShredId(Slot, /*shred index:*/ u32, ShredType); impl ShredId { #[inline] - pub(crate) fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId { + pub fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId { ShredId(slot, index, shred_type) } @@ -285,6 +285,16 @@ impl ShredId { self.0 } + #[inline] + pub fn index(&self) -> u32 { + self.1 + } + + #[inline] + pub fn shred_type(&self) -> ShredType { + self.2 + } + #[inline] pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) { (self.0, self.1, self.2) diff --git a/ledger/src/shred/wire.rs b/ledger/src/shred/wire.rs index ff084e10071955..048f4ac1a6a8e0 100644 --- a/ledger/src/shred/wire.rs +++ b/ledger/src/shred/wire.rs @@ -112,9 +112,10 @@ pub(super) fn get_parent_offset(shred: &[u8]) -> Option { Some(u16::from_le_bytes(bytes)) } -// Returns DataShredHeader.flags. +// Returns DataShredHeader.flags if the shred is data. +// Returns Error::InvalidShredType for coding shreds. #[inline] -pub(crate) fn get_flags(shred: &[u8]) -> Result { +pub fn get_flags(shred: &[u8]) -> Result { match get_shred_type(shred)? { ShredType::Code => Err(Error::InvalidShredType), ShredType::Data => { diff --git a/turbine/src/addr_cache.rs b/turbine/src/addr_cache.rs new file mode 100644 index 00000000000000..8b544371f766bd --- /dev/null +++ b/turbine/src/addr_cache.rs @@ -0,0 +1,413 @@ +use { + crate::retransmit_stage::RetransmitSlotStats, + itertools::Itertools, + solana_ledger::{ + blockstore::MAX_DATA_SHREDS_PER_SLOT, + shred::{shred_code::MAX_CODE_SHREDS_PER_SLOT, ShredId, ShredType}, + }, + solana_sdk::clock::Slot, + std::{ + cmp::Reverse, + collections::{hash_map::Entry, HashMap, VecDeque}, + net::SocketAddr, + }, +}; + +// Number of most recent shreds to track slots counts based off. +const ROLLING_WINDOW_NUM_SHREDS: usize = 512; +// Capacity to initially allocate for CacheEntry.{code,data}. +const ADDR_CAPACITY: usize = 2_560; +// How far the cached addresses are speculatively extended beyond max-index +// observed. +const EXTEND_BUFFER: usize = ADDR_CAPACITY / 5; + +// Cache of Turbine tree retransmit addresses for the most frequent slots +// within the rolling window of shreds arriving at retransmit-stage. +pub(crate) struct AddrCache { + // Number of slots to cache addresses for. + capacity: usize, + // Number of shreds observed within the rolling window. + // Equivalent to: + // self.window.iter().map(|&(_, count)| count).sum::() + num_shreds: usize, + // Rolling window of slots and number of shreds observed. + // Worst case, all entries have count == 1, in which case the size of this + // ring buffer is bounded by ROLLING_WINDOW_NUM_SHREDS. + window: VecDeque<(Slot, /*count:*/ usize)>, + // Number of shreds observed in each slot within the rolling window. + // Equivalent to: + // self.window.iter().fold(HashMap::new(), |mut acc, &(slot, count)| { + // *acc.entry(slot).or_default() += count; + // acc + // }) + // Worst case, all shreds within the rolling window are from a unique slot, + // in which case the size is bounded by ROLLING_WINDOW_NUM_SHREDS. + counts: HashMap, + // Cache of addresses for the most frequent slots. + // Lazily trimmed to self.capacity size to achieve amortized O(1) + // complexity. The size is bounded by 1 + self.capacity * 2. + cache: HashMap, +} + +struct CacheEntry { + // Root distance and socket addresses cached either speculatively or when + // retransmitting incoming shreds. + code: Vec)>>, + data: Vec)>>, + // Code and data indices where [..index] are fully populated. + index_code: usize, + index_data: usize, + // Maximum code and data indices observed in retransmit-stage. + max_index_code: u32, + max_index_data: u32, + // If the last data shred in the slot is already observed, the cache is no + // longer extended beyond max_index_{code,data}. + last_shred_in_slot: bool, +} + +impl AddrCache { + pub(crate) fn with_capacity(capacity: usize) -> Self { + Self { + capacity, + num_shreds: 0, + window: VecDeque::new(), + counts: HashMap::new(), + cache: { + // 2x capacity in order to implement lazy eviction. + let capacity = capacity.saturating_mul(2).saturating_add(1); + HashMap::with_capacity(capacity) + }, + } + } + + // Returns (root-distance, socket-addresses) cached for the given shred-id. + #[inline] + pub(crate) fn get(&self, shred: &ShredId) -> Option<(/*root_distance:*/ u8, &[SocketAddr])> { + self.cache + .get(&shred.slot())? + .get(shred.shred_type(), shred.index()) + } + + // Stores (root-distance, socket-addresses) precomputed speculatively for + // the given shred-id. + pub(crate) fn put( + &mut self, + shred: &ShredId, + entry: (/*root_distance:*/ u8, Box<[SocketAddr]>), + ) { + self.get_cache_entry_mut(shred.slot()) + .put(shred.shred_type(), shred.index(), entry); + self.maybe_trim_cache(); + } + + // Records data observed from incoming shreds at retransmit stage. + pub(crate) fn record(&mut self, slot: Slot, stats: &mut RetransmitSlotStats) { + // All addresses should be for the same slot. + debug_assert!(stats.addrs.iter().all(|(shred, _, _)| shred.slot() == slot)); + // Update rolling window count of shreds per slot. + let num_shreds: usize = stats.num_shreds_received.iter().sum(); + if num_shreds > 0 { + self.num_shreds += num_shreds; + self.window.push_back((slot, num_shreds)); + *self.counts.entry(slot).or_default() += num_shreds; + self.maybe_trim_slot_counts(); + } + debug_assert!(self.verify()); + // If there are no addresses to cache and the cache entry is not + // allocated for the slot yet, then ignore. + if stats.addrs.is_empty() && !self.cache.contains_key(&slot) { + return; + } + // Update the cached entry for the slot. + let entry = self.get_cache_entry_mut(slot); + entry.max_index_code = entry.max_index_code.max(stats.max_index_code); + entry.max_index_data = entry.max_index_data.max(stats.max_index_data); + entry.last_shred_in_slot |= stats.last_shred_in_slot; + for (shred, root_distance, addrs) in std::mem::take(&mut stats.addrs) { + debug_assert_eq!(shred.slot(), slot); + entry.put(shred.shred_type(), shred.index(), (root_distance, addrs)); + } + self.maybe_trim_cache(); + debug_assert!(self.verify()); + } + + // Returns num_shreds shred-ids to speculatively pre-compute turbine tree. + // ShredIds are chosen based on which slots have received most number of + // shreds within the rolling window. + pub(crate) fn get_shreds(&mut self, num_shreds: usize) -> Vec { + fn make_shred(slot: Slot, (shred_type, index): (ShredType, usize)) -> ShredId { + ShredId::new(slot, index as u32, shred_type) + } + if self.counts.len() == 1 { + let slot = self.counts.keys().next().copied().unwrap(); + return self + .get_cache_entry_mut(slot) + .get_shreds(EXTEND_BUFFER) + .take(num_shreds) + .map(|entry| make_shred(slot, entry)) + .collect(); + } + let mut counts: Vec<(/*count:*/ usize, Slot)> = self + .counts + .iter() + .map(|(&slot, &count)| (count, slot)) + .collect(); + counts.sort_unstable(); + let mut out = Vec::with_capacity(num_shreds); + while let Some(count) = num_shreds.checked_sub(out.len()).filter(|&k| k > 0) { + let Some((_, slot)) = counts.pop() else { + break; + }; + // Leave some capacity for the 2nd most frequent slot. + let count = count.min(num_shreds * 3 / 4); + out.extend( + self.get_cache_entry_mut(slot) + .get_shreds(EXTEND_BUFFER) + .take(count) + .map(|entry| make_shred(slot, entry)), + ); + } + out + } + + // Returns a mutable reference to the cached entry for the given slot. + // Initializes the entry if not allocated yet. + #[inline] + fn get_cache_entry_mut(&mut self, slot: Slot) -> &mut CacheEntry { + self.cache + .entry(slot) + .or_insert_with(|| CacheEntry::new(ADDR_CAPACITY)) + } + + // If there are more than ROLLING_WINDOW_NUM_SHREDS shreds in the rolling + // window, drops the oldest entries and updates self.counts. + fn maybe_trim_slot_counts(&mut self) { + while let Some(count) = self + .num_shreds + .checked_sub(ROLLING_WINDOW_NUM_SHREDS) + .filter(|&k| k > 0) + { + let (slot, num_shreds) = self.window.front_mut().unwrap(); + let count = count.min(*num_shreds); + self.num_shreds -= count; + *num_shreds -= count; + let Entry::Occupied(mut entry) = self.counts.entry(*slot) else { + panic!("Entry must exist if it has non-zero count."); + }; + *entry.get_mut() -= count; + if *entry.get() == 0 { + entry.remove_entry(); + } + if *num_shreds == 0 { + self.window.pop_front(); + } + } + } + + // If there are more than 2 * self.capacity entries in the cache, drops + // the slots with the fewest counts in the rolling window. + fn maybe_trim_cache(&mut self) { + if self.cache.len() <= self.capacity.saturating_mul(2) { + return; + } + let mut entries: Vec<((Slot, CacheEntry), /*count:*/ usize)> = self + .cache + .drain() + .map(|entry @ (slot, _)| { + let count = self.counts.get(&slot).copied().unwrap_or_default(); + (entry, count) + }) + .collect(); + let index = self.capacity.saturating_sub(1); + entries.select_nth_unstable_by_key(index, |&((slot, _), count)| Reverse((count, slot))); + self.cache.extend( + entries + .into_iter() + .take(self.capacity) + .map(|(entry, _)| entry), + ); + } + + // Verifies internal consistency for tests and debug assertions. + #[must_use] + fn verify(&self) -> bool { + let num_shreds: usize = self.window.iter().map(|&(_, count)| count).sum(); + let counts = self + .window + .iter() + .fold(HashMap::new(), |mut acc, &(slot, count)| { + *acc.entry(slot).or_default() += count; + acc + }); + num_shreds <= ROLLING_WINDOW_NUM_SHREDS + && self.num_shreds == num_shreds + && self.counts == counts + && self.window.iter().all(|&(_, count)| count > 0) + && self.counts.values().all(|&count| count > 0) + } +} + +impl CacheEntry { + fn new(capacity: usize) -> Self { + Self { + code: Vec::with_capacity(capacity), + data: Vec::with_capacity(capacity), + index_code: 0, + index_data: 0, + max_index_code: 0, + max_index_data: 0, + last_shred_in_slot: false, + } + } + + // Returns (root-distance, socket-addresses) cached for the given shred + // type and index. + #[inline] + fn get( + &self, + shred_type: ShredType, + shred_index: u32, + ) -> Option<(/*root_distance:*/ u8, &[SocketAddr])> { + match shred_type { + ShredType::Code => &self.code, + ShredType::Data => &self.data, + } + .get(shred_index as usize)? + .as_ref() + .map(|(root_distance, addrs)| (*root_distance, addrs.as_ref())) + } + + // Stores (root-distance, socket-addresses) for the given shred type and + // index. + #[inline] + fn put( + &mut self, + shred_type: ShredType, + shred_index: u32, + entry: (/*root_distance:*/ u8, Box<[SocketAddr]>), + ) { + let cache = match shred_type { + ShredType::Code => &mut self.code, + ShredType::Data => &mut self.data, + }; + let k = shred_index as usize; + if cache.len() <= k { + cache.resize(k + 1, None); + } + cache[k] = Some(entry) + } + + // Returns an iterator of (shred-type, shred-index) to speculatively + // pre-compute turbine tree for. + fn get_shreds( + &mut self, + // How far the cached addresses are speculatively extended beyond + // max-index observed. + extend_buffer: usize, + ) -> impl Iterator + '_ { + // Move self.index_{code,data} forward until the first missing entry. + while matches!(self.code.get(self.index_code), Some(Some(_))) { + self.index_code += 1; + } + while matches!(self.data.get(self.index_data), Some(Some(_))) { + self.index_data += 1; + } + // If the last data shred in the slot is already observed, do not + // extend beyond observed max-indices. + let extend_buffer = if self.last_shred_in_slot { + 0 + } else { + extend_buffer + }; + // Find and interleave missing code and data entries in the cache. + let code = { + // There at least as many coding shreds as data shreds. + let max_index = self.max_index_code.max(self.max_index_data) as usize + extend_buffer; + self.index_code..max_index.min(MAX_CODE_SHREDS_PER_SLOT) + } + .filter(|&k| matches!(self.code.get(k), None | Some(None))) + .map(|k| (ShredType::Code, k)); + let data = { + let max_index = self.max_index_data as usize + extend_buffer; + self.index_data..max_index.min(MAX_DATA_SHREDS_PER_SLOT) + } + .filter(|&k| matches!(self.data.get(k), None | Some(None))) + .map(|k| (ShredType::Data, k)); + code.interleave(data) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cache_entry_get_shreds() { + let mut entry = CacheEntry::new(/*capacity:*/ 100); + assert!(entry.get_shreds(3).eq([ + (ShredType::Code, 0), + (ShredType::Data, 0), + (ShredType::Code, 1), + (ShredType::Data, 1), + (ShredType::Code, 2), + (ShredType::Data, 2) + ])); + assert_eq!(entry.index_code, 0); + assert_eq!(entry.index_data, 0); + + entry.put(ShredType::Code, 0, (0, Box::new([]))); + entry.put(ShredType::Code, 2, (0, Box::new([]))); + entry.put(ShredType::Data, 1, (0, Box::new([]))); + assert!(entry.get_shreds(5).eq([ + (ShredType::Code, 1), + (ShredType::Data, 0), + (ShredType::Code, 3), + (ShredType::Data, 2), + (ShredType::Code, 4), + (ShredType::Data, 3), + (ShredType::Data, 4), + ])); + assert_eq!(entry.index_code, 1); + assert_eq!(entry.index_data, 0); + + entry.put(ShredType::Code, 1, (0, Box::new([]))); + entry.put(ShredType::Code, 4, (0, Box::new([]))); + entry.put(ShredType::Data, 0, (0, Box::new([]))); + entry.put(ShredType::Data, 3, (0, Box::new([]))); + assert!(entry.get_shreds(5).eq([ + (ShredType::Code, 3), + (ShredType::Data, 2), + (ShredType::Data, 4), + ])); + assert_eq!(entry.index_code, 3); + assert_eq!(entry.index_data, 2); + + entry.max_index_code = 4; + entry.max_index_data = 3; + assert!(entry.get_shreds(4).eq([ + (ShredType::Code, 3), + (ShredType::Data, 2), + (ShredType::Code, 5), + (ShredType::Data, 4), + (ShredType::Code, 6), + (ShredType::Data, 5), + (ShredType::Code, 7), + (ShredType::Data, 6), + ])); + assert_eq!(entry.index_code, 3); + assert_eq!(entry.index_data, 2); + + entry.last_shred_in_slot = true; + assert!(entry + .get_shreds(7) + .eq([(ShredType::Code, 3), (ShredType::Data, 2)])); + assert_eq!(entry.index_code, 3); + assert_eq!(entry.index_data, 2); + + entry.put(ShredType::Code, 3, (0, Box::new([]))); + entry.put(ShredType::Data, 2, (0, Box::new([]))); + assert!(entry.get_shreds(7).eq([])); + assert_eq!(entry.index_code, 5); + assert_eq!(entry.index_data, 4); + } +} diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index e3604da042290f..b946b59c35790a 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -226,7 +226,7 @@ impl ClusterNodes { shred: &ShredId, fanout: usize, socket_addr_space: &SocketAddrSpace, - ) -> Result<(/*root_distance:*/ usize, Vec), Error> { + ) -> Result<(/*root_distance:*/ u8, Vec), Error> { // Exclude slot leader from list of nodes. if slot_leader == &self.pubkey { return Err(Error::Loopback { @@ -596,7 +596,7 @@ pub(crate) fn get_broadcast_protocol(_: &ShredId) -> Protocol { } #[inline] -fn get_root_distance(index: usize, fanout: usize) -> usize { +fn get_root_distance(index: usize, fanout: usize) -> u8 { if index == 0 { 0 } else if index <= fanout { diff --git a/turbine/src/lib.rs b/turbine/src/lib.rs index 2c7f3c9368f416..42694e06dc88b4 100644 --- a/turbine/src/lib.rs +++ b/turbine/src/lib.rs @@ -1,5 +1,6 @@ #![allow(clippy::arithmetic_side_effects)] +mod addr_cache; pub mod broadcast_stage; pub mod cluster_nodes; pub mod quic_endpoint; diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index 70eb1f8dd2c18c..957d433d4c054f 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -1,16 +1,19 @@ //! The `retransmit_stage` retransmits shreds between validators use { - crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, + crate::{ + addr_cache::AddrCache, + cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, + }, bytes::Bytes, - crossbeam_channel::{Receiver, RecvError}, + crossbeam_channel::{Receiver, RecvError, TryRecvError}, lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, - shred::{self, ShredId}, + shred::{self, ShredFlags, ShredId, ShredType}, }, solana_measure::measure::Measure, solana_perf::deduper::Deduper, @@ -31,6 +34,7 @@ use { }, static_assertions::const_assert_eq, std::{ + borrow::Cow, collections::{HashMap, HashSet}, net::{SocketAddr, UdpSocket}, ops::AddAssign, @@ -55,18 +59,41 @@ const_assert_eq!(CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, 5); const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = MAX_LEADER_SCHEDULE_STAKES as usize; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); +// Output of fn retransmit_shred(...). +struct RetransmitShredOutput { + shred: ShredId, + // If the shred has ShredFlags::LAST_SHRED_IN_SLOT. + last_shred_in_slot: bool, + // This node's distance from the turbine root. + root_distance: u8, + // Number of nodes the shred was retransmitted to. + num_nodes: usize, + // Addresses the shred was sent to if there was a cache miss. + addrs: Option>, +} + #[derive(Default)] -struct RetransmitSlotStats { +pub(crate) struct RetransmitSlotStats { asof: u64, // Latest timestamp struct was updated. outset: u64, // 1st shred retransmit timestamp. + // Maximum code and data indices observed. + pub(crate) max_index_code: u32, + pub(crate) max_index_data: u32, + // If any of the shreds had ShredFlags::LAST_SHRED_IN_SLOT. + pub(crate) last_shred_in_slot: bool, // Number of shreds sent and received at different // distances from the turbine broadcast root. - num_shreds_received: [usize; MAX_NUM_TURBINE_HOPS], + pub(crate) num_shreds_received: [usize; MAX_NUM_TURBINE_HOPS], num_shreds_sent: [usize; MAX_NUM_TURBINE_HOPS], + // Root distance and socket-addresses the shreds were sent to if there was + // a cache miss. + pub(crate) addrs: Vec<(ShredId, /*root_distance:*/ u8, Box<[SocketAddr]>)>, } struct RetransmitStats { since: Instant, + addr_cache_hit: AtomicUsize, + addr_cache_miss: AtomicUsize, num_nodes: AtomicUsize, num_addrs_failed: AtomicUsize, num_loopback_errs: AtomicUsize, @@ -115,6 +142,8 @@ impl RetransmitStats { i64 ), ("retransmit_total", *self.retransmit_total.get_mut(), i64), + ("addr_cache_hit", *self.addr_cache_hit.get_mut(), i64), + ("addr_cache_miss", *self.addr_cache_miss.get_mut(), i64), ( "compute_turbine", *self.compute_turbine_peers_total.get_mut(), @@ -190,13 +219,32 @@ fn retransmit( quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>, stats: &mut RetransmitStats, cluster_nodes_cache: &ClusterNodesCache, + addr_cache: &mut AddrCache, shred_deduper: &mut ShredDeduper, max_slots: &MaxSlots, rpc_subscriptions: Option<&RpcSubscriptions>, slot_status_notifier: Option<&SlotStatusNotifier>, ) -> Result<(), RecvError> { - // wait for something on the channel - let mut shreds = retransmit_receiver.recv()?; + // Try to receive shreds from the channel without blocking. If the channel + // is empty precompute turbine trees speculatively. If no cache updates are + // made then block on the channel until some shreds are received. + let mut shreds = match retransmit_receiver.try_recv() { + Ok(shreds) => shreds, + Err(TryRecvError::Disconnected) => return Err(RecvError), + Err(TryRecvError::Empty) => { + if cache_retransmit_addrs( + thread_pool, + addr_cache, + bank_forks, + leader_schedule_cache, + cluster_info, + cluster_nodes_cache, + ) { + return Ok(()); + } + retransmit_receiver.recv()? + } + }; // now the batch has started let mut timer_start = Measure::start("retransmit"); // drain the channel until it is empty to form a batch @@ -244,29 +292,33 @@ fn retransmit( }) .collect(); let socket_addr_space = cluster_info.socket_addr_space(); - let record = |mut stats: HashMap, - (slot, root_distance, num_nodes)| { + let record = |mut stats: HashMap, out: RetransmitShredOutput| { let now = timestamp(); - let entry = stats.entry(slot).or_default(); - entry.record(now, root_distance, num_nodes); + let entry = stats.entry(out.shred.slot()).or_default(); + entry.record(now, out); stats }; + let retransmit_shred = |shred, socket, stats| { + retransmit_shred( + shred, + &root_bank, + shred_deduper, + &cache, + addr_cache, + socket_addr_space, + socket, + quic_endpoint_sender, + stats, + ) + }; let slot_stats = if shreds.len() < PAR_ITER_MIN_NUM_SHREDS { stats.num_small_batches += 1; shreds .into_iter() .enumerate() .filter_map(|(index, shred)| { - retransmit_shred( - shred, - &root_bank, - shred_deduper, - &cache, - socket_addr_space, - &retransmit_sockets[index % retransmit_sockets.len()], - quic_endpoint_sender, - stats, - ) + let socket = &retransmit_sockets[index % retransmit_sockets.len()]; + retransmit_shred(shred, socket, stats) }) .fold(HashMap::new(), record) } else { @@ -275,16 +327,8 @@ fn retransmit( .into_par_iter() .filter_map(|shred| { let index = thread_pool.current_thread_index().unwrap(); - retransmit_shred( - shred, - &root_bank, - shred_deduper, - &cache, - socket_addr_space, - &retransmit_sockets[index % retransmit_sockets.len()], - quic_endpoint_sender, - stats, - ) + let socket = &retransmit_sockets[index % retransmit_sockets.len()]; + retransmit_shred(shred, socket, stats) }) .fold(HashMap::new, record) .reduce(HashMap::new, RetransmitSlotStats::merge) @@ -293,6 +337,7 @@ fn retransmit( stats.upsert_slot_stats( slot_stats, root_bank.slot(), + addr_cache, rpc_subscriptions, slot_status_notifier, ); @@ -308,17 +353,13 @@ fn retransmit_shred( root_bank: &Bank, shred_deduper: &ShredDeduper, cache: &HashMap>)>, + addr_cache: &AddrCache, socket_addr_space: &SocketAddrSpace, socket: &UdpSocket, quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>, stats: &RetransmitStats, -) -> Option<( - Slot, // Shred slot. - usize, // This node's distance from the turbine root. - usize, // Number of nodes the shred was retransmitted to. -)> { +) -> Option { let key = shred::layout::get_shred_id(shred.as_ref())?; - let (slot_leader, cluster_nodes) = cache.get(&key.slot())?; if key.slot() < root_bank.slot() || shred_deduper.dedup(key, shred.as_ref(), MAX_DUPLICATE_COUNT) { @@ -326,29 +367,23 @@ fn retransmit_shred( return None; } let mut compute_turbine_peers = Measure::start("turbine_start"); - let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank); - let (root_distance, addrs) = cluster_nodes - .get_retransmit_addrs(slot_leader, &key, data_plane_fanout, socket_addr_space) - .inspect_err(|err| match err { - Error::Loopback { .. } => { - error!("retransmit_shred: {err}"); - stats.num_loopback_errs.fetch_add(1, Ordering::Relaxed); - } - }) - .ok()?; + let (root_distance, addrs) = + get_retransmit_addrs(&key, root_bank, cache, addr_cache, socket_addr_space, stats)?; compute_turbine_peers.stop(); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); - + let last_shred_in_slot = shred::wire::get_flags(shred.as_ref()) + .map(|flags| flags.contains(ShredFlags::LAST_SHRED_IN_SLOT)) + .unwrap_or_default(); let mut retransmit_time = Measure::start("retransmit_to"); let num_addrs = addrs.len(); let num_nodes = match cluster_nodes::get_broadcast_protocol(&key) { Protocol::QUIC => { let shred = Bytes::from(shred::Payload::unwrap_or_clone(shred)); addrs - .into_iter() - .filter_map(|addr| quic_endpoint_sender.try_send((addr, shred.clone())).ok()) + .iter() + .filter_map(|&addr| quic_endpoint_sender.try_send((addr, shred.clone())).ok()) .count() } Protocol::UDP => match multi_target_send(socket, shred, &addrs) { @@ -370,7 +405,106 @@ fn retransmit_shred( stats .retransmit_total .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); - Some((key.slot(), root_distance, num_nodes)) + Some(RetransmitShredOutput { + shred: key, + last_shred_in_slot, + root_distance, + num_nodes, + addrs: match addrs { + Cow::Owned(addrs) => Some(addrs.into_boxed_slice()), + Cow::Borrowed(_) => None, + }, + }) +} + +fn get_retransmit_addrs<'a>( + shred: &ShredId, + root_bank: &Bank, + cache: &HashMap>)>, + addr_cache: &'a AddrCache, + socket_addr_space: &SocketAddrSpace, + stats: &RetransmitStats, +) -> Option<(/*root_distance:*/ u8, Cow<'a, [SocketAddr]>)> { + if let Some((root_distance, addrs)) = addr_cache.get(shred) { + stats.addr_cache_hit.fetch_add(1, Ordering::Relaxed); + return Some((root_distance, Cow::Borrowed(addrs))); + } + let (slot_leader, cluster_nodes) = cache.get(&shred.slot())?; + let data_plane_fanout = cluster_nodes::get_data_plane_fanout(shred.slot(), root_bank); + let (root_distance, addrs) = cluster_nodes + .get_retransmit_addrs(slot_leader, shred, data_plane_fanout, socket_addr_space) + .inspect_err(|err| match err { + Error::Loopback { .. } => { + stats.num_loopback_errs.fetch_add(1, Ordering::Relaxed); + } + }) + .ok()?; + stats.addr_cache_miss.fetch_add(1, Ordering::Relaxed); + Some((root_distance, Cow::Owned(addrs))) +} + +// Speculatively precomputes turbine tree and caches retranmsit addresses. +// Returns false if no new addresses were cached. +#[must_use] +fn cache_retransmit_addrs( + thread_pool: &ThreadPool, + addr_cache: &mut AddrCache, + bank_forks: &RwLock, + leader_schedule_cache: &LeaderScheduleCache, + cluster_info: &ClusterInfo, + cluster_nodes_cache: &ClusterNodesCache, +) -> bool { + let shreds = addr_cache.get_shreds(thread_pool.current_num_threads() * 4); + if shreds.is_empty() { + return false; + } + let (working_bank, root_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.working_bank(), bank_forks.root_bank()) + }; + let cache: HashMap = shreds + .iter() + .map(ShredId::slot) + .collect::>() + .into_iter() + .filter_map(|slot: Slot| { + let slot_leader = leader_schedule_cache.slot_leader_at(slot, Some(&working_bank))?; + let cluster_nodes = + cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); + Some((slot, (slot_leader, cluster_nodes))) + }) + .collect(); + if cache.is_empty() { + return false; + } + let socket_addr_space = cluster_info.socket_addr_space(); + let get_retransmit_addrs = |shred: ShredId| { + let data_plane_fanout = cluster_nodes::get_data_plane_fanout(shred.slot(), &root_bank); + let (slot_leader, cluster_nodes) = cache.get(&shred.slot())?; + let (root_distance, addrs) = cluster_nodes + .get_retransmit_addrs(slot_leader, &shred, data_plane_fanout, socket_addr_space) + .ok()?; + Some((shred, (root_distance, addrs.into_boxed_slice()))) + }; + let mut out = false; + if shreds.len() < PAR_ITER_MIN_NUM_SHREDS { + for (shred, entry) in shreds.into_iter().filter_map(get_retransmit_addrs) { + addr_cache.put(&shred, entry); + out = true; + } + } else { + let entries: Vec<_> = thread_pool.install(|| { + shreds + .into_par_iter() + .filter_map(get_retransmit_addrs) + .collect() + }); + for (shred, entry) in entries { + addr_cache.put(&shred, entry); + out = true; + } + } + out } /// Service to retransmit messages received from other peers in turbine. @@ -403,16 +537,15 @@ impl RetransmitStage { CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, CLUSTER_NODES_CACHE_TTL, ); - - let mut stats = RetransmitStats::new(Instant::now()); - let mut rng = rand::thread_rng(); + let mut stats = RetransmitStats::new(Instant::now()); + let mut addr_cache = AddrCache::with_capacity(/*capacity:*/ 4); let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS); let thread_pool = { - // Using clamp will panic if less than 8 sockets are provided + // Using clamp will panic if max < min. #[allow(clippy::manual_clamp)] - let num_threads = get_thread_count().min(8).max(retransmit_sockets.len()); + let num_threads = get_thread_count().min(12).max(retransmit_sockets.len()); ThreadPoolBuilder::new() .num_threads(num_threads) .thread_name(|i| format!("solRetransmit{i:02}")) @@ -433,6 +566,7 @@ impl RetransmitStage { &quic_endpoint_sender, &mut stats, &cluster_nodes_cache, + &mut addr_cache, &mut shred_deduper, &max_slots, rpc_subscriptions.as_deref(), @@ -458,15 +592,26 @@ impl AddAssign for RetransmitSlotStats { let Self { asof, outset, + max_index_code, + max_index_data, + last_shred_in_slot, num_shreds_received, num_shreds_sent, + mut addrs, } = other; self.asof = self.asof.max(asof); + self.max_index_code = self.max_index_code.max(max_index_code); + self.max_index_data = self.max_index_data.max(max_index_data); + self.last_shred_in_slot |= last_shred_in_slot; self.outset = if self.outset == 0 { outset } else { self.outset.min(outset) }; + if self.addrs.len() < addrs.len() { + std::mem::swap(&mut self.addrs, &mut addrs); + } + self.addrs.append(&mut addrs); for k in 0..MAX_NUM_TURBINE_HOPS { self.num_shreds_received[k] += num_shreds_received[k]; self.num_shreds_sent[k] += num_shreds_sent[k]; @@ -480,6 +625,8 @@ impl RetransmitStats { fn new(now: Instant) -> Self { Self { since: now, + addr_cache_hit: AtomicUsize::default(), + addr_cache_miss: AtomicUsize::default(), num_nodes: AtomicUsize::default(), num_addrs_failed: AtomicUsize::default(), num_loopback_errs: AtomicUsize::default(), @@ -498,38 +645,26 @@ impl RetransmitStats { } } - fn upsert_slot_stats( + fn upsert_slot_stats( &mut self, - feed: I, + feed: impl IntoIterator, root: Slot, + addr_cache: &mut AddrCache, rpc_subscriptions: Option<&RpcSubscriptions>, slot_status_notifier: Option<&SlotStatusNotifier>, - ) where - I: IntoIterator, - { - for (slot, slot_stats) in feed { + ) { + for (slot, mut slot_stats) in feed { + addr_cache.record(slot, &mut slot_stats); match self.slot_stats.get_mut(&slot) { None => { - if let Some(rpc_subscriptions) = rpc_subscriptions { - if slot > root { - let slot_update = SlotUpdate::FirstShredReceived { - slot, - timestamp: slot_stats.outset, - }; - rpc_subscriptions.notify_slot_update(slot_update); - datapoint_info!("retransmit-first-shred", ("slot", slot, i64)); - } - } - - if let Some(slot_status_notifier) = slot_status_notifier { - if slot > root { - slot_status_notifier - .read() - .unwrap() - .notify_first_shred_received(slot); - } + if slot > root { + notify_subscribers( + slot, + slot_stats.outset, + rpc_subscriptions, + slot_status_notifier, + ); } - self.slot_stats.put(slot, slot_stats); } Some(entry) => { @@ -550,15 +685,24 @@ impl RetransmitStats { } impl RetransmitSlotStats { - fn record(&mut self, now: u64, root_distance: usize, num_nodes: usize) { + fn record(&mut self, now: u64, out: RetransmitShredOutput) { self.outset = if self.outset == 0 { now } else { self.outset.min(now) }; self.asof = self.asof.max(now); - self.num_shreds_received[root_distance] += 1; - self.num_shreds_sent[root_distance] += num_nodes; + let max_index = match out.shred.shred_type() { + ShredType::Code => &mut self.max_index_code, + ShredType::Data => &mut self.max_index_data, + }; + *max_index = (*max_index).max(out.shred.index()); + self.last_shred_in_slot |= out.last_shred_in_slot; + self.num_shreds_received[usize::from(out.root_distance)] += 1; + self.num_shreds_sent[usize::from(out.root_distance)] += out.num_nodes; + if let Some(addrs) = out.addrs { + self.addrs.push((out.shred, out.root_distance, addrs)); + } } fn merge(mut acc: HashMap, other: HashMap) -> HashMap { @@ -606,6 +750,26 @@ impl RetransmitSlotStats { } } +// Notifies subscribers of shreds received from a new slot. +fn notify_subscribers( + slot: Slot, + timestamp: u64, // When the first shred in the slot was received. + rpc_subscriptions: Option<&RpcSubscriptions>, + slot_status_notifier: Option<&SlotStatusNotifier>, +) { + if let Some(rpc_subscriptions) = rpc_subscriptions { + let slot_update = SlotUpdate::FirstShredReceived { slot, timestamp }; + rpc_subscriptions.notify_slot_update(slot_update); + datapoint_info!("retransmit-first-shred", ("slot", slot, i64)); + } + if let Some(slot_status_notifier) = slot_status_notifier { + slot_status_notifier + .read() + .unwrap() + .notify_first_shred_received(slot); + } +} + #[cfg(test)] mod tests { use {