diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 981c7db3a04660..86a370c316e7c2 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -26,8 +26,7 @@ use { fmt::Debug, num::NonZeroUsize, ops::{ - Bound, - Bound::{Excluded, Included, Unbounded}, + Bound::{self, Unbounded}, Range, RangeBounds, }, path::PathBuf, @@ -483,115 +482,57 @@ pub struct AccountsIndexRootsStats { pub clean_dead_slot_us: u64, } +type RangeItemVec = Vec<(Pubkey, AccountMapEntry)>; + pub struct AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From + Into> { account_maps: &'a LockMapTypeSlice, - bin_calculator: &'a PubkeyBinCalculator24, - start_bound: Bound, - end_bound: Bound, + start_bound: Bound<&'a Pubkey>, + end_bound: Bound<&'a Pubkey>, + start_bin: usize, + end_bin_inclusive: usize, is_finished: bool, + bin_range: RangeItemVec, returns_items: AccountsIndexIteratorReturnsItems, } impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIterator<'a, T, U> { - fn range( - map: &AccountMaps, - range: R, - returns_items: AccountsIndexIteratorReturnsItems, - ) -> Vec<(Pubkey, AccountMapEntry)> - where - R: RangeBounds + std::fmt::Debug, - { - let mut result = map.items(&range); - if returns_items == AccountsIndexIteratorReturnsItems::Sorted { - result.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - } - result - } - - fn clone_bound(bound: Bound<&Pubkey>) -> Bound { - match bound { - Unbounded => Unbounded, - Included(k) => Included(*k), - Excluded(k) => Excluded(*k), - } - } - - fn bin_from_bound(&self, bound: &Bound, unbounded_bin: usize) -> usize { - match bound { - Bound::Included(bound) | Bound::Excluded(bound) => { - self.bin_calculator.bin_from_pubkey(bound) - } - Bound::Unbounded => unbounded_bin, - } - } - - fn start_bin(&self) -> usize { - // start in bin where 'start_bound' would exist - self.bin_from_bound(&self.start_bound, 0) - } - - fn end_bin_inclusive(&self) -> usize { - // end in bin where 'end_bound' would exist - self.bin_from_bound(&self.end_bound, usize::MAX) - } - - fn bin_start_and_range(&self) -> (usize, usize) { - let start_bin = self.start_bin(); - // calculate the max range of bins to look in - let end_bin_inclusive = self.end_bin_inclusive(); - let bin_range = if start_bin > end_bin_inclusive { - 0 // empty range - } else if end_bin_inclusive == usize::MAX { - usize::MAX - } else { - // the range is end_inclusive + 1 - start - // end_inclusive could be usize::MAX already if no bound was specified - end_bin_inclusive.saturating_add(1) - start_bin - }; - (start_bin, bin_range) - } - pub fn new( index: &'a AccountsIndex, - range: Option<&R>, + range: &'a Option, returns_items: AccountsIndexIteratorReturnsItems, ) -> Self where R: RangeBounds, { - Self { - start_bound: range - .as_ref() - .map(|r| Self::clone_bound(r.start_bound())) - .unwrap_or(Unbounded), - end_bound: range - .as_ref() - .map(|r| Self::clone_bound(r.end_bound())) - .unwrap_or(Unbounded), - account_maps: &index.account_maps, - is_finished: false, - bin_calculator: &index.bin_calculator, - returns_items, + match range { + Some(range) => { + let (start_bin, end_bin_inclusive) = index.bin_start_end_inclusive(range); + Self { + account_maps: &index.account_maps, + start_bound: range.start_bound(), + end_bound: range.end_bound(), + start_bin, + end_bin_inclusive, + is_finished: false, + bin_range: Vec::new(), + returns_items, + } + } + None => Self { + account_maps: &index.account_maps, + start_bound: Unbounded, + end_bound: Unbounded, + start_bin: 0, + end_bin_inclusive: index.account_maps.len() - 1, + is_finished: false, + bin_range: Vec::new(), + returns_items, + }, } } - - pub fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) - where - R: RangeBounds + Debug + Sync, - { - // forward this hold request ONLY to the bins which contain keys in the specified range - let (start_bin, bin_range) = self.bin_start_and_range(); - // the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow - // so, parallelize the bucket loads - thread_pool.install(|| { - (0..bin_range).into_par_iter().for_each(|idx| { - let map = &self.account_maps[idx + start_bin]; - map.hold_range_in_memory(range, start_holding); - }); - }); - } } +/// Implement the Iterator trait for AccountsIndexIterator impl + Into> Iterator for AccountsIndexIterator<'_, T, U> { @@ -600,31 +541,33 @@ impl + Into> Iterator if self.is_finished { return None; } - let (start_bin, bin_range) = self.bin_start_and_range(); - let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); - 'outer: for i in self.account_maps.iter().skip(start_bin).take(bin_range) { - for (pubkey, account_map_entry) in - Self::range(&i, (self.start_bound, self.end_bound), self.returns_items) - { - if chunk.len() >= ITER_BATCH_SIZE - && self.returns_items == AccountsIndexIteratorReturnsItems::Sorted - { - break 'outer; - } - let item = (pubkey, account_map_entry); - chunk.push(item); + + while self.bin_range.len() < ITER_BATCH_SIZE { + if self.start_bin > self.end_bin_inclusive { + break; + } + + let bin = self.start_bin; + let map = &self.account_maps[bin]; + let mut range = map.items(&(self.start_bound, self.end_bound)); + if self.returns_items == AccountsIndexIteratorReturnsItems::Sorted { + range.sort_unstable_by(|a, b| a.0.cmp(&b.0)); } + self.bin_range.append(&mut range); + self.start_bin += 1; } - if chunk.is_empty() { - self.is_finished = true; - return None; - } else if self.returns_items == AccountsIndexIteratorReturnsItems::Unsorted { + if self.bin_range.is_empty() { self.is_finished = true; + None + } else { + let chunk = if self.bin_range.len() < ITER_BATCH_SIZE { + self.bin_range.drain(..).collect() + } else { + self.bin_range.drain(0..ITER_BATCH_SIZE).collect() + }; + Some(chunk) } - - self.start_bound = Excluded(chunk.last().unwrap().0); - Some(chunk) } } @@ -760,6 +703,52 @@ impl + Into> AccountsIndex { } } + fn bin_from_pubkey(&self, pubkey: &Pubkey) -> usize { + self.bin_calculator.bin_from_pubkey(pubkey) + } + + fn bin_start_end_inclusive(&self, range: &R) -> (usize, usize) + where + R: RangeBounds, + { + let start_bin = match range.start_bound() { + Bound::Included(start) => self.bin_from_pubkey(start), + Bound::Excluded(start) => { + // TODO: when highest_pubkey is implemented, we can check if + // start == self.account_maps[start_bin].highest_pubkey(), then + // we should return start_bin + 1 + self.bin_from_pubkey(start) + } + Bound::Unbounded => 0, + }; + + let end_bin_inclusive = match range.end_bound() { + Bound::Included(end) => self.bin_from_pubkey(end), + Bound::Excluded(end) => { + // TODO: when lowest_pubkey is implemented, we can check if + // end == self.account_maps[end_bin].lowest_pubkey(), then + // we should return end_bin - 1 + self.bin_from_pubkey(end) + } + Bound::Unbounded => self.account_maps.len() - 1, + }; + + (start_bin, end_bin_inclusive) + } + + fn bin_start_and_range(&self, range: &R) -> (usize, usize) + where + R: RangeBounds + Debug + Sync, + { + let (start_bin, end_bin_inclusive) = self.bin_start_end_inclusive(range); + let bin_range = if start_bin > end_bin_inclusive { + 0 + } else { + end_bin_inclusive - start_bin + 1 + }; + (start_bin, bin_range) + } + fn allocate_accounts_index( config: Option, exit: Arc, @@ -781,11 +770,11 @@ impl + Into> AccountsIndex { (account_maps, bin_calculator, storage) } - fn iter( - &self, - range: Option<&R>, + fn iter<'a, R>( + &'a self, + range: &'a Option, returns_items: AccountsIndexIteratorReturnsItems, - ) -> AccountsIndexIterator + ) -> AccountsIndexIterator<'a, T, U> where R: RangeBounds, { @@ -1076,7 +1065,8 @@ impl + Into> AccountsIndex { let mut read_lock_elapsed = 0; let mut iterator_elapsed = 0; let mut iterator_timer = Measure::start("iterator_elapsed"); - for pubkey_list in self.iter(range.as_ref(), returns_items) { + + for pubkey_list in self.iter(&range, returns_items) { iterator_timer.stop(); iterator_elapsed += iterator_timer.as_us(); for (pubkey, list) in pubkey_list { @@ -1415,8 +1405,15 @@ impl + Into> AccountsIndex { where R: RangeBounds + Debug + Sync, { - let iter = self.iter(Some(range), AccountsIndexIteratorReturnsItems::Unsorted); - iter.hold_range_in_memory(range, start_holding, thread_pool); + let (start_bin, bin_range) = self.bin_start_and_range(range); + // the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow + // so, parallelize the bucket loads + thread_pool.install(|| { + (0..bin_range).into_par_iter().for_each(|idx| { + let map = &self.account_maps[idx + start_bin]; + map.hold_range_in_memory(range, start_holding); + }); + }); } /// get stats related to startup @@ -2114,7 +2111,10 @@ pub mod tests { solana_account::{AccountSharedData, WritableAccount}, solana_inline_spl::token::SPL_TOKEN_ACCOUNT_OWNER_OFFSET, solana_pubkey::PUBKEY_BYTES, - std::ops::RangeInclusive, + std::ops::{ + Bound::{Excluded, Included}, + RangeInclusive, + }, }; const SPL_TOKENS: &[Pubkey] = &[ @@ -3062,7 +3062,7 @@ pub mod tests { fn test_accounts_iter_finished() { let (index, _) = setup_accounts_index_keys(0); let mut iter = index.iter( - None::<&Range>, + &None::>, AccountsIndexIteratorReturnsItems::Sorted, ); assert!(iter.next().is_none()); @@ -3903,45 +3903,86 @@ pub mod tests { } #[test] - fn test_bin_start_and_range() { + fn test_account_index_iter_batched() { let index = AccountsIndex::::default_for_tests(); - let iter = AccountsIndexIterator::new( - &index, - None::<&RangeInclusive>, + // Setup an account index for test. + // Two bins. First bin has 2000 accounts, second bin has 0 accounts. + let num_pubkeys = 2 * ITER_BATCH_SIZE; + let pubkeys = std::iter::repeat_with(Pubkey::new_unique) + .take(num_pubkeys) + .collect::>(); + + for key in pubkeys { + let slot = 0; + let value = true; + let mut gc = Vec::new(); + index.upsert( + slot, + slot, + &key, + &AccountSharedData::default(), + &AccountSecondaryIndexes::default(), + value, + &mut gc, + UPSERT_POPULATE_RECLAIMS, + ); + } + + for returns_items in [ AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!((0, usize::MAX), iter.bin_start_and_range()); + AccountsIndexIteratorReturnsItems::Unsorted, + ] { + // Create a sorted iterator for the whole pubkey range. + let mut iter = index.iter( + &None::>, + //AccountsIndexIteratorReturnsItems::Sorted, + returns_items, + ); + // First iter.next() should return the first batch of pubkeys (1000 + // pubkeys) out of the 2000 pubkeys in the first bin. And the remaining + // 1000 pubkeys from the first bin should be cached in + // self.bin_range, so that the second iter.next() don't need to + // load/filter/sort the first bin again. + let x = iter.next().unwrap(); + assert_eq!(x.len(), ITER_BATCH_SIZE); + assert_eq!( + x.is_sorted_by(|a, b| a.0 < b.0), + returns_items == AccountsIndexIteratorReturnsItems::Sorted + ); + assert_eq!(iter.bin_range.len(), ITER_BATCH_SIZE); // Contains the remaining 1000 items. + + // Second iter.next() should return the second batch of pubkeys - the remaining 1000 pubkeys. + let x = iter.next().unwrap(); + assert_eq!( + x.is_sorted_by(|a, b| a.0 < b.0), + returns_items == AccountsIndexIteratorReturnsItems::Sorted + ); + assert_eq!(x.len(), ITER_BATCH_SIZE); // contains the remaining 1000 pubkeys. + assert!(iter.bin_range.is_empty()); // last_bin_range should be empty. + + // Third iter.next() should return None. + assert!(iter.next().is_none()); + } + } + + #[test] + fn test_bin_start_and_range() { + let index = AccountsIndex::::default_for_tests(); + let range = (Unbounded::, Unbounded); + assert_eq!((0, BINS_FOR_TESTING), index.bin_start_and_range(&range)); let key_0 = Pubkey::from([0; 32]); let key_ff = Pubkey::from([0xff; 32]); - let iter = AccountsIndexIterator::new( - &index, - Some(&RangeInclusive::new(key_0, key_ff)), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let range = (Included(key_0), Included(key_ff)); let bins = index.bins(); - assert_eq!((0, bins), iter.bin_start_and_range()); - let iter = AccountsIndexIterator::new( - &index, - Some(&RangeInclusive::new(key_ff, key_0)), - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!((bins - 1, 0), iter.bin_start_and_range()); - let iter = AccountsIndexIterator::new( - &index, - Some(&(Included(key_0), Unbounded)), - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!((0, usize::MAX), iter.bin_start_and_range()); - let iter = AccountsIndexIterator::new( - &index, - Some(&(Included(key_ff), Unbounded)), - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!((bins - 1, usize::MAX), iter.bin_start_and_range()); - - assert_eq!((0..2).skip(1).take(usize::MAX).collect::>(), vec![1]); + assert_eq!((0, bins), index.bin_start_and_range(&range)); + let range = (Included(key_ff), Included(key_0)); + assert_eq!((bins - 1, 0), index.bin_start_and_range(&range)); + let range = (Included(key_0), Unbounded); + assert_eq!((0, BINS_FOR_TESTING), index.bin_start_and_range(&range)); + let range = (Included(key_ff), Unbounded); + assert_eq!((bins - 1, 1), index.bin_start_and_range(&range)); } #[test] @@ -4182,60 +4223,44 @@ pub mod tests { fn test_start_end_bin() { let index = AccountsIndex::::default_for_tests(); assert_eq!(index.bins(), BINS_FOR_TESTING); - let iter = AccountsIndexIterator::new( - &index, - None::<&RangeInclusive>, - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!(iter.start_bin(), 0); // no range, so 0 - assert_eq!(iter.end_bin_inclusive(), usize::MAX); // no range, so max + + let range = (Unbounded::, Unbounded); + let (start, end) = index.bin_start_end_inclusive(&range); + assert_eq!(start, 0); // no range, so 0 + assert_eq!(end, BINS_FOR_TESTING - 1); // no range, so last bin let key = Pubkey::from([0; 32]); - let iter = AccountsIndexIterator::new( - &index, - Some(&RangeInclusive::new(key, key)), - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 - assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 - let iter = AccountsIndexIterator::new( - &index, - Some(&(Included(key), Excluded(key))), - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 - assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 - let iter = AccountsIndexIterator::new( - &index, - Some(&(Excluded(key), Excluded(key))), - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 - assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 + let range = RangeInclusive::new(key, key); + let (start, end) = index.bin_start_end_inclusive(&range); + assert_eq!(start, 0); // start at pubkey 0, so 0 + assert_eq!(end, 0); // end at pubkey 0, so 0 + + let range = (Included(key), Excluded(key)); + let (start, end) = index.bin_start_end_inclusive(&range); + assert_eq!(start, 0); // start at pubkey 0, so 0 + assert_eq!(end, 0); // end at pubkey 0, so 0 + + let range = (Excluded(key), Excluded(key)); + let (start, end) = index.bin_start_end_inclusive(&range); + assert_eq!(start, 0); // start at pubkey 0, so 0 + assert_eq!(end, 0); // end at pubkey 0, so 0 let key = Pubkey::from([0xff; 32]); - let iter = AccountsIndexIterator::new( - &index, - Some(&RangeInclusive::new(key, key)), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let range = RangeInclusive::new(key, key); + let (start, end) = index.bin_start_end_inclusive(&range); let bins = index.bins(); - assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 - assert_eq!(iter.end_bin_inclusive(), bins - 1); - let iter = AccountsIndexIterator::new( - &index, - Some(&(Included(key), Excluded(key))), - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 - assert_eq!(iter.end_bin_inclusive(), bins - 1); - let iter = AccountsIndexIterator::new( - &index, - Some(&(Excluded(key), Excluded(key))), - AccountsIndexIteratorReturnsItems::Sorted, - ); - assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 - assert_eq!(iter.end_bin_inclusive(), bins - 1); + assert_eq!(start, bins - 1); // start at highest possible pubkey, so bins - 1 + assert_eq!(end, bins - 1); + + let range = (Included(key), Excluded(key)); + let (start, end) = index.bin_start_end_inclusive(&range); + assert_eq!(start, bins - 1); // start at highest possible pubkey, so bins - 1 + assert_eq!(end, bins - 1); + + let range = (Excluded(key), Excluded(key)); + let (start, end) = index.bin_start_end_inclusive(&range); + assert_eq!(start, bins - 1); // start at highest possible pubkey, so bins - 1 + assert_eq!(end, bins - 1); } #[test]