Skip to content

Commit

Permalink
refactor to separate accountindexiter into sorted and unsorted
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoranYi committed Feb 14, 2025
1 parent 70c050f commit 56078ca
Showing 1 changed file with 176 additions and 43 deletions.
219 changes: 176 additions & 43 deletions accounts-db/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,32 +487,16 @@ pub struct AccountsIndexRootsStats {

type RangeItemVec<T> = Vec<(Pubkey, AccountMapEntry<T>)>;

pub struct AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
struct AccountsIndexIteratorInner<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
account_maps: &'a LockMapTypeSlice<T, U>,
bin_calculator: &'a PubkeyBinCalculator24,
start_bound: Bound<Pubkey>,
end_bound: Bound<Pubkey>,
is_finished: bool,
returns_items: AccountsIndexIteratorReturnsItems,
last_bin_range: Option<(usize, RangeItemVec<T>)>,
}

impl<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexIterator<'a, T, U> {
fn range<R>(
map: &AccountMaps<T, U>,
range: R,
returns_items: AccountsIndexIteratorReturnsItems,
) -> RangeItemVec<T>
where
R: RangeBounds<Pubkey> + std::fmt::Debug,
{
let mut result = map.items(&range);
if returns_items == AccountsIndexIteratorReturnsItems::Sorted {
result.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
result
}

impl<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>
AccountsIndexIteratorInner<'a, T, U>
{
fn clone_bound(bound: Bound<&Pubkey>) -> Bound<Pubkey> {
match bound {
Unbounded => Unbounded,
Expand Down Expand Up @@ -556,11 +540,7 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexIter
(start_bin, bin_range)
}

pub fn new<R>(
index: &'a AccountsIndex<T, U>,
range: Option<&R>,
returns_items: AccountsIndexIteratorReturnsItems,
) -> Self
pub fn new<R>(index: &'a AccountsIndex<T, U>, range: Option<&R>) -> Self
where
R: RangeBounds<Pubkey>,
{
Expand All @@ -574,10 +554,7 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexIter
.map(|r| Self::clone_bound(r.end_bound()))
.unwrap_or(Unbounded),
account_maps: &index.account_maps,
is_finished: false,
bin_calculator: &index.bin_calculator,
returns_items,
last_bin_range: None,
}
}

Expand All @@ -598,17 +575,82 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexIter
}
}

pub struct AccountsIndexIteratorSorted<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
inner: AccountsIndexIteratorInner<'a, T, U>,
is_finished: bool,
last_bin_range: Option<(usize, RangeItemVec<T>)>,
}

impl<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>
AccountsIndexIteratorSorted<'a, T, U>
{
fn range<R>(map: &AccountMaps<T, U>, range: R) -> RangeItemVec<T>
where
R: RangeBounds<Pubkey> + std::fmt::Debug,
{
let mut result = map.items(&range);
result.sort_unstable_by(|a, b| a.0.cmp(&b.0));
result
}

pub fn new<R>(index: &'a AccountsIndex<T, U>, range: Option<&R>) -> Self
where
R: RangeBounds<Pubkey>,
{
let inner = AccountsIndexIteratorInner::new(index, range);
Self {
inner,
is_finished: false,
last_bin_range: None,
}
}
}

pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
inner: AccountsIndexIteratorInner<'a, T, U>,
}

impl<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>
AccountsIndexIteratorUnsorted<'a, T, U>
{
fn range<R>(map: &AccountMaps<T, U>, range: R) -> RangeItemVec<T>
where
R: RangeBounds<Pubkey> + std::fmt::Debug,
{
map.items(&range)
}

pub fn new<R>(index: &'a AccountsIndex<T, U>, range: Option<&R>) -> Self
where
R: RangeBounds<Pubkey>,
{
let inner = AccountsIndexIteratorInner::new(index, range);
Self { inner }
}

pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool)
where
R: RangeBounds<Pubkey> + Debug + Sync,
{
self.inner
.hold_range_in_memory(range, start_holding, thread_pool)
}
}

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Iterator
for AccountsIndexIterator<'_, T, U>
for AccountsIndexIteratorSorted<'_, T, U>
{
type Item = Vec<(Pubkey, AccountMapEntry<T>)>;
fn next(&mut self) -> Option<Self::Item> {
let inner = &mut self.inner;

if self.is_finished {
return None;
}
let (start_bin, bin_range) = self.bin_start_and_range();

let (start_bin, bin_range) = inner.bin_start_and_range();
let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE);
'outer: for (i, map) in self
'outer: for (i, map) in inner
.account_maps
.iter()
.skip(start_bin)
Expand All @@ -623,11 +665,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Iterator
}
_ => {
// else load the new bin
Self::range(&map, (self.start_bound, self.end_bound), self.returns_items)
Self::range(&map, (inner.start_bound, inner.end_bound))
}
};
for (count, (pubkey, account_map_entry)) in range.iter().enumerate() {
if chunk.len() >= ITER_BATCH_SIZE && self.returns_items.is_sorted() {
if chunk.len() >= ITER_BATCH_SIZE {
range.drain(0..count);
self.last_bin_range = Some((bin, range));
break 'outer;
Expand All @@ -639,13 +681,92 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Iterator

if chunk.is_empty() {
self.is_finished = true;
return None;
} else if self.returns_items == AccountsIndexIteratorReturnsItems::Unsorted {
self.is_finished = true;
None
} else {
inner.start_bound = Excluded(chunk.last().unwrap().0);
Some(chunk)
}
}
}

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Iterator
for AccountsIndexIteratorUnsorted<'_, T, U>
{
type Item = Vec<(Pubkey, AccountMapEntry<T>)>;
fn next(&mut self) -> Option<Self::Item> {
let inner = &mut self.inner;
let (start_bin, bin_range) = inner.bin_start_and_range();
let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE);
for map in inner.account_maps.iter().skip(start_bin).take(bin_range) {
let mut range = Self::range(&map, (inner.start_bound, inner.end_bound));
chunk.append(&mut range);
}
if chunk.is_empty() {
None
} else {
Some(chunk)
}
}
}

pub enum AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
Sorted(AccountsIndexIteratorSorted<'a, T, U>),
Unsorted(AccountsIndexIteratorUnsorted<'a, T, U>),
}

impl<'a, T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexIterator<'a, T, U> {
pub fn new<R>(
index: &'a AccountsIndex<T, U>,
range: Option<&R>,
returns_items: AccountsIndexIteratorReturnsItems,
) -> Self
where
R: RangeBounds<Pubkey>,
{
match returns_items {
AccountsIndexIteratorReturnsItems::Sorted => {
AccountsIndexIterator::Sorted(AccountsIndexIteratorSorted::new(index, range))
}
AccountsIndexIteratorReturnsItems::Unsorted => {
AccountsIndexIterator::Unsorted(AccountsIndexIteratorUnsorted::new(index, range))
}
}
}
}

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Iterator
for AccountsIndexIterator<'_, T, U>
{
type Item = Vec<(Pubkey, AccountMapEntry<T>)>;
fn next(&mut self) -> Option<Self::Item> {
match self {
AccountsIndexIterator::Sorted(iter) => iter.next(),
AccountsIndexIterator::Unsorted(iter) => iter.next(),
}
}
}

#[cfg(feature = "dev-context-only-utils")]
impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexIterator<'_, T, U> {
pub fn bin_start_and_range(&self) -> (usize, usize) {
match self {
AccountsIndexIterator::Sorted(iter) => iter.inner.bin_start_and_range(),
AccountsIndexIterator::Unsorted(iter) => iter.inner.bin_start_and_range(),
}
}

pub fn start_bin(&self) -> usize {
match self {
AccountsIndexIterator::Sorted(iter) => iter.inner.start_bin(),
AccountsIndexIterator::Unsorted(iter) => iter.inner.start_bin(),
}
}

self.start_bound = Excluded(chunk.last().unwrap().0);
Some(chunk)
pub fn end_bin_inclusive(&self) -> usize {
match self {
AccountsIndexIterator::Sorted(iter) => iter.inner.end_bin_inclusive(),
AccountsIndexIterator::Unsorted(iter) => iter.inner.end_bin_inclusive(),
}
}
}

Expand Down Expand Up @@ -819,6 +940,21 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
AccountsIndexIterator::new(self, range, returns_items)
}

#[cfg(feature = "dev-context-only-utils")]
pub fn sorted_iter<R>(&self, range: Option<&R>) -> AccountsIndexIteratorSorted<T, U>
where
R: RangeBounds<Pubkey>,
{
AccountsIndexIteratorSorted::new(self, range)
}

fn unsorted_iter<R>(&self, range: Option<&R>) -> AccountsIndexIteratorUnsorted<T, U>
where
R: RangeBounds<Pubkey>,
{
AccountsIndexIteratorUnsorted::new(self, range)
}

/// is the accounts index using disk as a backing store
pub fn is_disk_index_enabled(&self) -> bool {
self.storage.storage.is_disk_index_enabled()
Expand Down Expand Up @@ -1442,7 +1578,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
where
R: RangeBounds<Pubkey> + Debug + Sync,
{
let iter = self.iter(Some(range), AccountsIndexIteratorReturnsItems::Unsorted);
let iter = self.unsorted_iter(Some(range));
iter.hold_range_in_memory(range, start_holding, thread_pool);
}

Expand Down Expand Up @@ -3956,10 +4092,7 @@ pub mod tests {
}

// Create an iterator for the whole pubkey range.
let mut iter = index.iter(
None::<&Range<Pubkey>>,
AccountsIndexIteratorReturnsItems::Sorted,
);
let mut iter = index.sorted_iter(None::<&Range<Pubkey>>);
// First iter.next() should return the first batch of pubkeys (1000
// pubkeys) out of the 2000 pubkeys in the first bin. And the remaining
// 1000 pubkeys from the first bin should be cached in
Expand Down

0 comments on commit 56078ca

Please sign in to comment.