Skip to content

Commit

Permalink
refactor: add LeaderSchedule trait (#4973)
Browse files Browse the repository at this point in the history
* refactor: add LeaderSchedule trait

* refactor: rename leader schedule index

* refactor: rename get_indices

* identity keyed leader schedule
  • Loading branch information
jstarry authored Mar 4, 2025
1 parent 703677b commit bf1809c
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 209 deletions.
8 changes: 4 additions & 4 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5402,7 +5402,7 @@ pub mod tests {
super::*,
crate::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule::{FixedSchedule, LeaderSchedule},
leader_schedule::{FixedSchedule, IdentityKeyedLeaderSchedule},
shred::{max_ticks_per_n_shreds, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
},
assert_matches::assert_matches,
Expand Down Expand Up @@ -10508,9 +10508,9 @@ pub mod tests {
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
let fixed_schedule = FixedSchedule {
leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![
leader_keypair.pubkey()
])),
leader_schedule: Arc::new(Box::new(IdentityKeyedLeaderSchedule::new_from_schedule(
vec![leader_keypair.pubkey()],
))),
};
leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule));

Expand Down
230 changes: 48 additions & 182 deletions ledger/src/leader_schedule.rs
Original file line number Diff line number Diff line change
@@ -1,101 +1,40 @@
use {
itertools::Itertools,
rand::distributions::{Distribution, WeightedIndex},
rand_chacha::{rand_core::SeedableRng, ChaChaRng},
solana_pubkey::Pubkey,
solana_sdk::clock::Epoch,
std::{collections::HashMap, convert::identity, ops::Index, sync::Arc},
};

mod identity_keyed;
pub use identity_keyed::LeaderSchedule as IdentityKeyedLeaderSchedule;

// Used for testing
#[derive(Clone, Debug)]
pub struct FixedSchedule {
pub leader_schedule: Arc<LeaderSchedule>,
}

/// Stake-weighted leader schedule for one epoch.
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct LeaderSchedule {
slot_leaders: Vec<Pubkey>,
// Inverted index from pubkeys to indices where they are the leader.
index: HashMap<Pubkey, Arc<Vec<usize>>>,
}

impl LeaderSchedule {
// Note: passing in zero stakers will cause a panic.
pub fn new_keyed_by_validator_identity(
epoch_staked_nodes: &HashMap<Pubkey, u64>,
epoch: Epoch,
len: u64,
repeat: u64,
) -> Self {
let keyed_stakes: Vec<_> = epoch_staked_nodes
.iter()
.map(|(pubkey, stake)| (pubkey, *stake))
.collect();
let slot_leaders = Self::stake_weighted_slot_leaders(keyed_stakes, epoch, len, repeat);
Self::new_from_schedule(slot_leaders)
}

// Note: passing in zero stakers will cause a panic.
fn stake_weighted_slot_leaders(
mut keyed_stakes: Vec<(&Pubkey, u64)>,
epoch: Epoch,
len: u64,
repeat: u64,
) -> Vec<Pubkey> {
sort_stakes(&mut keyed_stakes);
let (keys, stakes): (Vec<_>, Vec<_>) = keyed_stakes.into_iter().unzip();
let weighted_index = WeightedIndex::new(stakes).unwrap();
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
let rng = &mut ChaChaRng::from_seed(seed);
let mut current_slot_leader = Pubkey::default();
(0..len)
.map(|i| {
if i % repeat == 0 {
current_slot_leader = keys[weighted_index.sample(rng)];
}
current_slot_leader
})
.collect()
}

pub fn new_from_schedule(slot_leaders: Vec<Pubkey>) -> Self {
Self {
index: Self::index_from_slot_leaders(&slot_leaders),
slot_leaders,
}
}

fn index_from_slot_leaders(slot_leaders: &[Pubkey]) -> HashMap<Pubkey, Arc<Vec<usize>>> {
slot_leaders
.iter()
.enumerate()
.map(|(i, pk)| (*pk, i))
.into_group_map()
.into_iter()
.map(|(k, v)| (k, Arc::new(v)))
.collect()
}

pub fn get_slot_leaders(&self) -> &[Pubkey] {
&self.slot_leaders
}
pub type LeaderSchedule = Box<dyn LeaderScheduleVariant>;

pub fn num_slots(&self) -> usize {
self.slot_leaders.len()
}
pub trait LeaderScheduleVariant:
std::fmt::Debug + Send + Sync + Index<u64, Output = Pubkey>
{
fn get_slot_leaders(&self) -> &[Pubkey];
fn get_leader_slots_map(&self) -> &HashMap<Pubkey, Arc<Vec<usize>>>;

/// 'offset' is an index into the leader schedule. The function returns an
/// iterator of indices i >= offset where the given pubkey is the leader.
pub(crate) fn get_indices(
fn get_leader_upcoming_slots(
&self,
pubkey: &Pubkey,
offset: usize, // Starting index.
) -> impl Iterator<Item = usize> {
let index = self.index.get(pubkey).cloned().unwrap_or_default();
let num_slots = self.slot_leaders.len();
) -> Box<dyn Iterator<Item = usize>> {
let index = self
.get_leader_slots_map()
.get(pubkey)
.cloned()
.unwrap_or_default();
let num_slots = self.num_slots();
let size = index.len();
#[allow(clippy::reversed_empty_ranges)]
let range = if index.is_empty() {
Expand All @@ -111,18 +50,38 @@ impl LeaderSchedule {
// for LeaderSchedule, where the schedule keeps repeating endlessly.
// The '%' returns where in a cycle we are and the '/' returns how many
// times the schedule is repeated.
range.map(move |k| index[k % size] + k / size * num_slots)
Box::new(range.map(move |k| index[k % size] + k / size * num_slots))
}
}

impl Index<u64> for LeaderSchedule {
type Output = Pubkey;
fn index(&self, index: u64) -> &Pubkey {
let index = index as usize;
&self.slot_leaders[index % self.slot_leaders.len()]
fn num_slots(&self) -> usize {
self.get_slot_leaders().len()
}
}

// Note: passing in zero stakers will cause a panic.
fn stake_weighted_slot_leaders(
mut keyed_stakes: Vec<(&Pubkey, u64)>,
epoch: Epoch,
len: u64,
repeat: u64,
) -> Vec<Pubkey> {
sort_stakes(&mut keyed_stakes);
let (keys, stakes): (Vec<_>, Vec<_>) = keyed_stakes.into_iter().unzip();
let weighted_index = WeightedIndex::new(stakes).unwrap();
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
let rng = &mut ChaChaRng::from_seed(seed);
let mut current_slot_leader = Pubkey::default();
(0..len)
.map(|i| {
if i % repeat == 0 {
current_slot_leader = keys[weighted_index.sample(rng)];
}
current_slot_leader
})
.collect()
}

fn sort_stakes(stakes: &mut Vec<(&Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
// deterministic result.
Expand All @@ -141,118 +100,25 @@ fn sort_stakes(stakes: &mut Vec<(&Pubkey, u64)>) {

#[cfg(test)]
mod tests {
use {super::*, rand::Rng, std::iter::repeat_with};

#[test]
fn test_leader_schedule_index() {
let pubkey0 = solana_pubkey::new_rand();
let pubkey1 = solana_pubkey::new_rand();
let leader_schedule = LeaderSchedule::new_from_schedule(vec![pubkey0, pubkey1]);
assert_eq!(leader_schedule[0], pubkey0);
assert_eq!(leader_schedule[1], pubkey1);
assert_eq!(leader_schedule[2], pubkey0);
}

#[test]
fn test_leader_schedule_basic() {
let num_keys = 10;
let stakes: HashMap<_, _> = (0..num_keys)
.map(|i| (solana_pubkey::new_rand(), i))
.collect();

let epoch: Epoch = rand::random();
let len = num_keys * 10;
let leader_schedule =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1);
let leader_schedule2 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1);
assert_eq!(leader_schedule.num_slots() as u64, len);
// Check that the same schedule is reproducibly generated
assert_eq!(leader_schedule, leader_schedule2);
}

#[test]
fn test_repeated_leader_schedule() {
let num_keys = 10;
let stakes: HashMap<_, _> = (0..num_keys)
.map(|i| (solana_pubkey::new_rand(), i))
.collect();

let epoch = rand::random::<Epoch>();
let len = num_keys * 10;
let repeat = 8;
let leader_schedule =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, repeat);
assert_eq!(leader_schedule.num_slots() as u64, len);
let mut leader_node = Pubkey::default();
for (i, node) in leader_schedule.slot_leaders.iter().enumerate() {
if i % repeat as usize == 0 {
leader_node = *node;
} else {
assert_eq!(leader_node, *node);
}
}
}

#[test]
fn test_repeated_leader_schedule_specific() {
let alice_pubkey = solana_pubkey::new_rand();
let bob_pubkey = solana_pubkey::new_rand();
let stakes: HashMap<_, _> = [(alice_pubkey, 2), (bob_pubkey, 1)].into_iter().collect();

let epoch = 0;
let len = 8;
// What the schedule looks like without any repeats
let leaders1 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1).slot_leaders;

// What the schedule looks like with repeats
let leaders2 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 2).slot_leaders;
assert_eq!(leaders1.len(), leaders2.len());

let leaders1_expected = vec![
alice_pubkey,
alice_pubkey,
alice_pubkey,
bob_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
];
let leaders2_expected = vec![
alice_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
bob_pubkey,
bob_pubkey,
];

assert_eq!(leaders1, leaders1_expected);
assert_eq!(leaders2, leaders2_expected);
}
use {super::*, itertools::Itertools, rand::Rng, std::iter::repeat_with};

#[test]
fn test_get_indices() {
fn test_get_leader_upcoming_slots() {
const NUM_SLOTS: usize = 97;
let mut rng = rand::thread_rng();
let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(4).collect();
let schedule: Vec<_> = repeat_with(|| pubkeys[rng.gen_range(0..3)])
.take(19)
.collect();
let schedule = LeaderSchedule::new_from_schedule(schedule);
let schedule = IdentityKeyedLeaderSchedule::new_from_schedule(schedule);
let leaders = (0..NUM_SLOTS)
.map(|i| (schedule[i as u64], i))
.into_group_map();
for pubkey in &pubkeys {
let index = leaders.get(pubkey).cloned().unwrap_or_default();
for offset in 0..NUM_SLOTS {
let schedule: Vec<_> = schedule
.get_indices(pubkey, offset)
.get_leader_upcoming_slots(pubkey, offset)
.take_while(|s| *s < NUM_SLOTS)
.collect();
let index: Vec<_> = index.iter().copied().skip_while(|s| *s < offset).collect();
Expand Down
Loading

0 comments on commit bf1809c

Please sign in to comment.