Skip to content

Commit

Permalink
Refactor: optimize snapshot memory usage with reference sharing
Browse files Browse the repository at this point in the history
  • Loading branch information
rongma7 committed Feb 21, 2025
1 parent a823e01 commit 8df6e2f
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 105 deletions.
13 changes: 8 additions & 5 deletions src/example.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
use std::sync::Arc;

use crate::{
backends::{InMemoryDatabase, VersionedKVName},
errors::Result,
middlewares::{table_schema::VersionedKeyValueSchema, VersionedStore, VersionedStoreCache},
traits::KeyValueStoreManager,
};
use ethereum_types::H256;
use parking_lot::Mutex;
use static_assertions::assert_impl_all;

pub struct Storage {
backend: InMemoryDatabase,
cache: VersionedStoreCache<FlatKeyValue>,
cache: Arc<Mutex<VersionedStoreCache<FlatKeyValue>>>,
}

impl Storage {
pub fn new() -> Self {
Self {
backend: InMemoryDatabase::empty(),
cache: VersionedStoreCache::new_empty(),
cache: Arc::new(Mutex::new(VersionedStoreCache::new_empty())),
}
}

pub fn as_manager(&mut self) -> Result<VersionedStore<'_, '_, FlatKeyValue>> {
VersionedStore::new(&self.backend, &mut self.cache)
pub fn as_manager(&self) -> Result<VersionedStore<'_, FlatKeyValue>> {
VersionedStore::new(&self.backend, self.cache.clone())
}
}

assert_impl_all!(VersionedStore<'_, '_, FlatKeyValue>: KeyValueStoreManager<Box<[u8]>, Box<[u8]>, H256>);
assert_impl_all!(VersionedStore<'_, FlatKeyValue>: KeyValueStoreManager<Box<[u8]>, Box<[u8]>, H256>);

#[derive(Clone, Copy, Debug)]
pub struct FlatKeyValue;
Expand Down
35 changes: 21 additions & 14 deletions src/lvmt/example.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use parking_lot::Mutex;

use crate::{
backends::DatabaseTrait,
errors::Result,
Expand All @@ -17,25 +19,25 @@ use super::{

pub struct LvmtStorage<D: DatabaseTrait> {
backend: D,
key_value_cache: VersionedStoreCache<FlatKeyValue>,
amt_node_cache: VersionedStoreCache<AmtNodes>,
slot_alloc_cache: VersionedStoreCache<SlotAllocations>,
key_value_cache: Arc<Mutex<VersionedStoreCache<FlatKeyValue>>>,
amt_node_cache: Arc<Mutex<VersionedStoreCache<AmtNodes>>>,
slot_alloc_cache: Arc<Mutex<VersionedStoreCache<SlotAllocations>>>,
}

impl<D: DatabaseTrait> LvmtStorage<D> {
pub fn new(backend: D) -> Result<Self> {
Ok(Self {
backend,
key_value_cache: VersionedStoreCache::new_empty(),
amt_node_cache: VersionedStoreCache::new_empty(),
slot_alloc_cache: VersionedStoreCache::new_empty(),
key_value_cache: Arc::new(Mutex::new(VersionedStoreCache::new_empty())),
amt_node_cache: Arc::new(Mutex::new(VersionedStoreCache::new_empty())),
slot_alloc_cache: Arc::new(Mutex::new(VersionedStoreCache::new_empty())),
})
}

pub fn as_manager(&mut self) -> Result<LvmtStore<'_, '_>> {
let key_value_store = VersionedStore::new(&self.backend, &mut self.key_value_cache)?;
let amt_node_store = VersionedStore::new(&self.backend, &mut self.amt_node_cache)?;
let slot_alloc_store = VersionedStore::new(&self.backend, &mut self.slot_alloc_cache)?;
pub fn as_manager(&self) -> Result<LvmtStore<'_>> {
let key_value_store = VersionedStore::new(&self.backend, self.key_value_cache.clone())?;
let amt_node_store = VersionedStore::new(&self.backend, self.amt_node_cache.clone())?;
let slot_alloc_store = VersionedStore::new(&self.backend, self.slot_alloc_cache.clone())?;
let auth_changes =
KeyValueStoreBulks::new(Arc::new(self.backend.view::<AuthChangeTable>()?));

Expand All @@ -52,13 +54,18 @@ impl<D: DatabaseTrait> LvmtStorage<D> {
}

pub fn confirmed_pending_to_history(
&mut self,
&self,
new_root_commit_id: CommitID,
write_schema: &D::WriteSchema,
) -> Result<()> {
let key_value_confirmed_path = self.key_value_cache.change_root(new_root_commit_id)?;
let amt_node_confirmed_path = self.amt_node_cache.change_root(new_root_commit_id)?;
let slot_alloc_confirmed_path = self.slot_alloc_cache.change_root(new_root_commit_id)?;
let mut key_value_cache_guard = self.key_value_cache.lock();
let mut amt_node_cache_guard = self.amt_node_cache.lock();
let mut slot_alloc_confirmed_guard = self.slot_alloc_cache.lock();

let key_value_confirmed_path = key_value_cache_guard.change_root(new_root_commit_id)?;
let amt_node_confirmed_path = amt_node_cache_guard.change_root(new_root_commit_id)?;
let slot_alloc_confirmed_path =
slot_alloc_confirmed_guard.change_root(new_root_commit_id)?;

assert!(key_value_confirmed_path.is_same_path(&amt_node_confirmed_path));
assert!(key_value_confirmed_path.is_same_path(&slot_alloc_confirmed_path));
Expand Down
2 changes: 1 addition & 1 deletion src/lvmt/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct LvmtSnapshot<'db> {
key_value_view: Box<KeyValueSnapshotRead<'db, FlatKeyValue>>,
}

impl<'cache, 'db> LvmtStore<'cache, 'db> {
impl<'db> LvmtStore<'db> {
pub fn get_state(&self, commit: CommitID) -> Result<LvmtSnapshot> {
let key_value_view = self.get_key_value_store().get_versioned_store(&commit)?;

Expand Down
24 changes: 12 additions & 12 deletions src/lvmt/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ use crate::{
utils::hash::blake2s,
};

pub struct LvmtStore<'cache, 'db> {
key_value_store: VersionedStore<'cache, 'db, FlatKeyValue>,
amt_node_store: VersionedStore<'cache, 'db, AmtNodes>,
slot_alloc_store: VersionedStore<'cache, 'db, SlotAllocations>,
pub struct LvmtStore<'db> {
key_value_store: VersionedStore<'db, FlatKeyValue>,
amt_node_store: VersionedStore<'db, AmtNodes>,
slot_alloc_store: VersionedStore<'db, SlotAllocations>,
auth_changes: KeyValueStoreBulks<'db, AuthChangeTable>,
}

const ALLOC_START_VERSION: u64 = 1;

impl<'cache, 'db> LvmtStore<'cache, 'db> {
impl<'db> LvmtStore<'db> {
pub fn new(
key_value_store: VersionedStore<'cache, 'db, FlatKeyValue>,
amt_node_store: VersionedStore<'cache, 'db, AmtNodes>,
slot_alloc_store: VersionedStore<'cache, 'db, SlotAllocations>,
key_value_store: VersionedStore<'db, FlatKeyValue>,
amt_node_store: VersionedStore<'db, AmtNodes>,
slot_alloc_store: VersionedStore<'db, SlotAllocations>,
auth_changes: KeyValueStoreBulks<'db, AuthChangeTable>,
) -> Self {
Self {
Expand Down Expand Up @@ -203,18 +203,18 @@ fn allocate_version_slot(
}
}

impl<'cache, 'db> LvmtStore<'cache, 'db> {
pub fn get_key_value_store(&self) -> &VersionedStore<'cache, 'db, FlatKeyValue> {
impl<'db> LvmtStore<'db> {
pub fn get_key_value_store(&self) -> &VersionedStore<'db, FlatKeyValue> {
&self.key_value_store
}

#[cfg(test)]
pub fn get_amt_node_store(&self) -> &VersionedStore<'cache, 'db, AmtNodes> {
pub fn get_amt_node_store(&self) -> &VersionedStore<'db, AmtNodes> {
&self.amt_node_store
}

#[cfg(test)]
pub fn get_slot_alloc_store(&self) -> &VersionedStore<'cache, 'db, SlotAllocations> {
pub fn get_slot_alloc_store(&self) -> &VersionedStore<'db, SlotAllocations> {
&self.slot_alloc_store
}
}
4 changes: 2 additions & 2 deletions src/lvmt/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ fn test_lvmt_store_inmemory() {
test_lvmt_store::<InMemoryDatabase>(backend, 100000);
}

impl<'cache, 'db> LvmtStore<'cache, 'db> {
pub fn check_consistency(&mut self, commit: CommitID, pp: &AmtParams<PE>) -> Result<()> {
impl<'db> LvmtStore<'db> {
pub fn check_consistency(&self, commit: CommitID, pp: &AmtParams<PE>) -> Result<()> {
use std::collections::BTreeSet;

use ark_ec::CurveGroup;
Expand Down
112 changes: 67 additions & 45 deletions src/middlewares/versioned_flat_key_value/manager_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{borrow::Borrow, collections::BTreeMap};
use std::{borrow::Borrow, sync::Arc};

use parking_lot::Mutex;

use crate::{
backends::TableReader,
Expand All @@ -7,18 +9,23 @@ use crate::{
traits::{
IsCompleted, KeyValueStoreBulksTrait, KeyValueStoreManager, KeyValueStoreRead, NeedNext,
},
types::ValueEntry,
StorageError,
};

#[cfg(test)]
use crate::types::ValueEntry;
#[cfg(test)]
use std::collections::BTreeMap;

use super::{
get_versioned_key,
pending_part::{pending_schema::PendingKeyValueConfig, VersionedMap},
table_schema::{HistoryChangeTable, HistoryIndicesTable, VersionedKeyValueSchema},
HistoryIndexKey, PendingError, VersionedStore,
};

pub struct SnapshotView<'db, T: VersionedKeyValueSchema> {
pending_updates: Option<BTreeMap<T::Key, ValueEntry<T::Value>>>,
pending_updates: Option<SnapshotPending<T>>,
history: Option<SnapshotHistorical<'db, T>>,
}

Expand Down Expand Up @@ -94,7 +101,11 @@ impl<'db, T: VersionedKeyValueSchema> SnapshotView<'db, T> {
pub fn iter(&self) -> Result<impl Iterator<Item = (T::Key, ValueEntry<T::Value>)>> {
let mut map = self.iter_history()?;

if let Some(ref pending_map) = self.pending_updates {
if let Some(ref pending_updates) = self.pending_updates {
let pending_map = pending_updates
.pending_part
.lock()
.get_versioned_store(pending_updates.commit_id)?;
for (k, v) in pending_map {
map.insert(k.clone(), v.clone());
}
Expand All @@ -104,16 +115,27 @@ impl<'db, T: VersionedKeyValueSchema> SnapshotView<'db, T> {
}
}

pub struct SnapshotHistorical<'db, T: VersionedKeyValueSchema> {
struct SnapshotPending<T: VersionedKeyValueSchema> {
commit_id: CommitID,
pending_part: Arc<Mutex<VersionedMap<PendingKeyValueConfig<T, CommitID>>>>,
}

struct SnapshotHistorical<'db, T: VersionedKeyValueSchema> {
history_number: HistoryNumber,
history_index_table: TableReader<'db, HistoryIndicesTable<T>>,
change_history_table: KeyValueStoreBulks<'db, HistoryChangeTable<T>>,
}

impl<'db, T: VersionedKeyValueSchema> KeyValueStoreRead<T::Key, T::Value> for SnapshotView<'db, T> {
fn get(&self, key: &T::Key) -> Result<Option<T::Value>> {
if let Some(opt_v) = self.pending_updates.as_ref().and_then(|u| u.get(key)) {
return Ok(opt_v.to_option());
if let Some(pending_updates) = &self.pending_updates {
let pending_optv = pending_updates
.pending_part
.lock()
.get_versioned_key(&pending_updates.commit_id, key)?;
if let Some(pending_v) = pending_optv {
return Ok(pending_v.into_option());
}
}

if let Some(history) = &self.history {
Expand Down Expand Up @@ -141,41 +163,39 @@ impl<'db, T: VersionedKeyValueSchema> KeyValueStoreRead<T::Key, T::Value>
}
}

impl<'cache, 'db, T: VersionedKeyValueSchema> KeyValueStoreManager<T::Key, T::Value, CommitID>
for VersionedStore<'cache, 'db, T>
impl<'db, T: VersionedKeyValueSchema> KeyValueStoreManager<T::Key, T::Value, CommitID>
for VersionedStore<'db, T>
{
type Store = SnapshotView<'db, T>;
fn get_versioned_store(&self, commit: &CommitID) -> Result<Self::Store> {
let pending_res = self.pending_part.get_versioned_store(*commit);
match pending_res {
Ok(pending_map) => {
let history = if let Some(history_commit) = self.pending_part.get_parent_of_root() {
Some(SnapshotHistorical {
history_number: self.get_history_number_by_commit_id(history_commit)?,
history_index_table: self.history_index_table.clone(),
change_history_table: self.change_history_table.clone(),
})
} else {
None
};
Ok(SnapshotView {
pending_updates: Some(pending_map),
history,
})
}
Err(PendingError::CommitIDNotFound(target_commit_id)) => {
assert_eq!(target_commit_id, *commit);
let history = SnapshotHistorical {
history_number: self.get_history_number_by_commit_id(*commit)?,
let pending_guard = self.pending_part.lock();
if pending_guard.contains_commit_id(commit) {
let history = if let Some(history_commit) = pending_guard.get_parent_of_root() {
Some(SnapshotHistorical {
history_number: self.get_history_number_by_commit_id(history_commit)?,
history_index_table: self.history_index_table.clone(),
change_history_table: self.change_history_table.clone(),
};
Ok(SnapshotView {
pending_updates: None,
history: Some(history),
})
}
Err(other_err) => Err(StorageError::PendingError(other_err)),
} else {
None
};
Ok(SnapshotView {
pending_updates: Some(SnapshotPending {
commit_id: *commit,
pending_part: self.pending_part.clone(),
}),
history,
})
} else {
let history = SnapshotHistorical {
history_number: self.get_history_number_by_commit_id(*commit)?,
history_index_table: self.history_index_table.clone(),
change_history_table: self.change_history_table.clone(),
};
Ok(SnapshotView {
pending_updates: None,
history: Some(history),
})
}
}

Expand All @@ -185,13 +205,12 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> KeyValueStoreManager<T::Key, T::Va
commit_id: &CommitID,
key: &T::Key,
) -> Result<IsCompleted> {
let pending_res = self
.pending_part
.iter_historical_changes(&mut accept, commit_id, key);
let pending_guard = self.pending_part.lock();
let pending_res = pending_guard.iter_historical_changes(&mut accept, commit_id, key);
match pending_res {
Ok(false) => Ok(false),
Ok(true) => {
if let Some(history_commit) = self.pending_part.get_parent_of_root() {
if let Some(history_commit) = pending_guard.get_parent_of_root() {
self.iter_historical_changes_history_part(&mut accept, &history_commit, key)
} else {
Ok(true)
Expand All @@ -206,22 +225,25 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> KeyValueStoreManager<T::Key, T::Va
}

fn discard(&mut self, commit: CommitID) -> Result<()> {
let mut pending_guard = self.pending_part.lock();

if self.commit_id_table.get(&commit)?.is_some() {
return Ok(());
}

Ok(self.pending_part.discard(commit)?)
Ok(pending_guard.discard(commit)?)
}

fn get_versioned_key(&self, commit: &CommitID, key: &T::Key) -> Result<Option<T::Value>> {
// let pending_res = self.pending_part.get_versioned_key_with_checkout(commit, key); // this will checkout_current
let pending_res = self.pending_part.get_versioned_key(commit, key);
let pending_guard = self.pending_part.lock();
// let pending_res = pending_guard.get_versioned_key_with_checkout(commit, key); // this will checkout_current
let pending_res = pending_guard.get_versioned_key(commit, key);
let history_commit = match pending_res {
Ok(Some(value)) => {
return Ok(value.into_option());
}
Ok(None) => {
if let Some(commit) = self.pending_part.get_parent_of_root() {
if let Some(commit) = pending_guard.get_parent_of_root() {
commit
} else {
return Ok(None);
Expand All @@ -242,7 +264,7 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> KeyValueStoreManager<T::Key, T::Va
}

// Helper methods used in trait implementations
impl<'cache, 'db, T: VersionedKeyValueSchema> VersionedStore<'cache, 'db, T> {
impl<'db, T: VersionedKeyValueSchema> VersionedStore<'db, T> {
fn iter_historical_changes_history_part(
&self,
mut accept: impl FnMut(&CommitID, &T::Key, Option<&T::Value>) -> NeedNext,
Expand Down
Loading

0 comments on commit 8df6e2f

Please sign in to comment.