diff --git a/src/example.rs b/src/example.rs index 11d1fe3..7ff2ea1 100644 --- a/src/example.rs +++ b/src/example.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ backends::{InMemoryDatabase, VersionedKVName}, errors::Result, @@ -5,27 +7,28 @@ use crate::{ traits::KeyValueStoreManager, }; use ethereum_types::H256; +use parking_lot::Mutex; use static_assertions::assert_impl_all; pub struct Storage { backend: InMemoryDatabase, - cache: VersionedStoreCache, + cache: Arc>>, } impl Storage { pub fn new() -> Self { Self { backend: InMemoryDatabase::empty(), - cache: VersionedStoreCache::new_empty(), + cache: Arc::new(Mutex::new(VersionedStoreCache::new_empty())), } } - pub fn as_manager(&mut self) -> Result> { - VersionedStore::new(&self.backend, &mut self.cache) + pub fn as_manager(&self) -> Result> { + VersionedStore::new(&self.backend, self.cache.clone()) } } -assert_impl_all!(VersionedStore<'_, '_, FlatKeyValue>: KeyValueStoreManager, Box<[u8]>, H256>); +assert_impl_all!(VersionedStore<'_, FlatKeyValue>: KeyValueStoreManager, Box<[u8]>, H256>); #[derive(Clone, Copy, Debug)] pub struct FlatKeyValue; diff --git a/src/lvmt/example.rs b/src/lvmt/example.rs index 64af8a9..acc0f27 100644 --- a/src/lvmt/example.rs +++ b/src/lvmt/example.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use parking_lot::Mutex; + use crate::{ backends::DatabaseTrait, errors::Result, @@ -17,25 +19,25 @@ use super::{ pub struct LvmtStorage { backend: D, - key_value_cache: VersionedStoreCache, - amt_node_cache: VersionedStoreCache, - slot_alloc_cache: VersionedStoreCache, + key_value_cache: Arc>>, + amt_node_cache: Arc>>, + slot_alloc_cache: Arc>>, } impl LvmtStorage { pub fn new(backend: D) -> Result { Ok(Self { backend, - key_value_cache: VersionedStoreCache::new_empty(), - amt_node_cache: VersionedStoreCache::new_empty(), - slot_alloc_cache: VersionedStoreCache::new_empty(), + key_value_cache: Arc::new(Mutex::new(VersionedStoreCache::new_empty())), + amt_node_cache: Arc::new(Mutex::new(VersionedStoreCache::new_empty())), + slot_alloc_cache: Arc::new(Mutex::new(VersionedStoreCache::new_empty())), }) } - pub fn as_manager(&mut self) -> Result> { - let key_value_store = VersionedStore::new(&self.backend, &mut self.key_value_cache)?; - let amt_node_store = VersionedStore::new(&self.backend, &mut self.amt_node_cache)?; - let slot_alloc_store = VersionedStore::new(&self.backend, &mut self.slot_alloc_cache)?; + pub fn as_manager(&self) -> Result> { + let key_value_store = VersionedStore::new(&self.backend, self.key_value_cache.clone())?; + let amt_node_store = VersionedStore::new(&self.backend, self.amt_node_cache.clone())?; + let slot_alloc_store = VersionedStore::new(&self.backend, self.slot_alloc_cache.clone())?; let auth_changes = KeyValueStoreBulks::new(Arc::new(self.backend.view::()?)); @@ -52,13 +54,18 @@ impl LvmtStorage { } pub fn confirmed_pending_to_history( - &mut self, + &self, new_root_commit_id: CommitID, write_schema: &D::WriteSchema, ) -> Result<()> { - let key_value_confirmed_path = self.key_value_cache.change_root(new_root_commit_id)?; - let amt_node_confirmed_path = self.amt_node_cache.change_root(new_root_commit_id)?; - let slot_alloc_confirmed_path = self.slot_alloc_cache.change_root(new_root_commit_id)?; + let mut key_value_cache_guard = self.key_value_cache.lock(); + let mut amt_node_cache_guard = self.amt_node_cache.lock(); + let mut slot_alloc_confirmed_guard = self.slot_alloc_cache.lock(); + + let key_value_confirmed_path = key_value_cache_guard.change_root(new_root_commit_id)?; + let amt_node_confirmed_path = amt_node_cache_guard.change_root(new_root_commit_id)?; + let slot_alloc_confirmed_path = + slot_alloc_confirmed_guard.change_root(new_root_commit_id)?; assert!(key_value_confirmed_path.is_same_path(&amt_node_confirmed_path)); assert!(key_value_confirmed_path.is_same_path(&slot_alloc_confirmed_path)); diff --git a/src/lvmt/snapshot.rs b/src/lvmt/snapshot.rs index 2e876ad..c8ca888 100644 --- a/src/lvmt/snapshot.rs +++ b/src/lvmt/snapshot.rs @@ -13,7 +13,7 @@ pub struct LvmtSnapshot<'db> { key_value_view: Box>, } -impl<'cache, 'db> LvmtStore<'cache, 'db> { +impl<'db> LvmtStore<'db> { pub fn get_state(&self, commit: CommitID) -> Result { let key_value_view = self.get_key_value_store().get_versioned_store(&commit)?; diff --git a/src/lvmt/storage.rs b/src/lvmt/storage.rs index c20aca1..43ed090 100644 --- a/src/lvmt/storage.rs +++ b/src/lvmt/storage.rs @@ -23,20 +23,20 @@ use crate::{ utils::hash::blake2s, }; -pub struct LvmtStore<'cache, 'db> { - key_value_store: VersionedStore<'cache, 'db, FlatKeyValue>, - amt_node_store: VersionedStore<'cache, 'db, AmtNodes>, - slot_alloc_store: VersionedStore<'cache, 'db, SlotAllocations>, +pub struct LvmtStore<'db> { + key_value_store: VersionedStore<'db, FlatKeyValue>, + amt_node_store: VersionedStore<'db, AmtNodes>, + slot_alloc_store: VersionedStore<'db, SlotAllocations>, auth_changes: KeyValueStoreBulks<'db, AuthChangeTable>, } const ALLOC_START_VERSION: u64 = 1; -impl<'cache, 'db> LvmtStore<'cache, 'db> { +impl<'db> LvmtStore<'db> { pub fn new( - key_value_store: VersionedStore<'cache, 'db, FlatKeyValue>, - amt_node_store: VersionedStore<'cache, 'db, AmtNodes>, - slot_alloc_store: VersionedStore<'cache, 'db, SlotAllocations>, + key_value_store: VersionedStore<'db, FlatKeyValue>, + amt_node_store: VersionedStore<'db, AmtNodes>, + slot_alloc_store: VersionedStore<'db, SlotAllocations>, auth_changes: KeyValueStoreBulks<'db, AuthChangeTable>, ) -> Self { Self { @@ -203,18 +203,18 @@ fn allocate_version_slot( } } -impl<'cache, 'db> LvmtStore<'cache, 'db> { - pub fn get_key_value_store(&self) -> &VersionedStore<'cache, 'db, FlatKeyValue> { +impl<'db> LvmtStore<'db> { + pub fn get_key_value_store(&self) -> &VersionedStore<'db, FlatKeyValue> { &self.key_value_store } #[cfg(test)] - pub fn get_amt_node_store(&self) -> &VersionedStore<'cache, 'db, AmtNodes> { + pub fn get_amt_node_store(&self) -> &VersionedStore<'db, AmtNodes> { &self.amt_node_store } #[cfg(test)] - pub fn get_slot_alloc_store(&self) -> &VersionedStore<'cache, 'db, SlotAllocations> { + pub fn get_slot_alloc_store(&self) -> &VersionedStore<'db, SlotAllocations> { &self.slot_alloc_store } } diff --git a/src/lvmt/tests.rs b/src/lvmt/tests.rs index 227c540..af371c1 100644 --- a/src/lvmt/tests.rs +++ b/src/lvmt/tests.rs @@ -150,8 +150,8 @@ fn test_lvmt_store_inmemory() { test_lvmt_store::(backend, 100000); } -impl<'cache, 'db> LvmtStore<'cache, 'db> { - pub fn check_consistency(&mut self, commit: CommitID, pp: &AmtParams) -> Result<()> { +impl<'db> LvmtStore<'db> { + pub fn check_consistency(&self, commit: CommitID, pp: &AmtParams) -> Result<()> { use std::collections::BTreeSet; use ark_ec::CurveGroup; diff --git a/src/middlewares/versioned_flat_key_value/manager_impl.rs b/src/middlewares/versioned_flat_key_value/manager_impl.rs index b9339c9..89aa72c 100644 --- a/src/middlewares/versioned_flat_key_value/manager_impl.rs +++ b/src/middlewares/versioned_flat_key_value/manager_impl.rs @@ -1,4 +1,6 @@ -use std::{borrow::Borrow, collections::BTreeMap}; +use std::{borrow::Borrow, sync::Arc}; + +use parking_lot::Mutex; use crate::{ backends::TableReader, @@ -7,18 +9,23 @@ use crate::{ traits::{ IsCompleted, KeyValueStoreBulksTrait, KeyValueStoreManager, KeyValueStoreRead, NeedNext, }, - types::ValueEntry, StorageError, }; +#[cfg(test)] +use crate::types::ValueEntry; +#[cfg(test)] +use std::collections::BTreeMap; + use super::{ get_versioned_key, + pending_part::{pending_schema::PendingKeyValueConfig, VersionedMap}, table_schema::{HistoryChangeTable, HistoryIndicesTable, VersionedKeyValueSchema}, HistoryIndexKey, PendingError, VersionedStore, }; pub struct SnapshotView<'db, T: VersionedKeyValueSchema> { - pending_updates: Option>>, + pending_updates: Option>, history: Option>, } @@ -94,7 +101,11 @@ impl<'db, T: VersionedKeyValueSchema> SnapshotView<'db, T> { pub fn iter(&self) -> Result)>> { let mut map = self.iter_history()?; - if let Some(ref pending_map) = self.pending_updates { + if let Some(ref pending_updates) = self.pending_updates { + let pending_map = pending_updates + .pending_part + .lock() + .get_versioned_store(pending_updates.commit_id)?; for (k, v) in pending_map { map.insert(k.clone(), v.clone()); } @@ -104,7 +115,12 @@ impl<'db, T: VersionedKeyValueSchema> SnapshotView<'db, T> { } } -pub struct SnapshotHistorical<'db, T: VersionedKeyValueSchema> { +struct SnapshotPending { + commit_id: CommitID, + pending_part: Arc>>>, +} + +struct SnapshotHistorical<'db, T: VersionedKeyValueSchema> { history_number: HistoryNumber, history_index_table: TableReader<'db, HistoryIndicesTable>, change_history_table: KeyValueStoreBulks<'db, HistoryChangeTable>, @@ -112,8 +128,14 @@ pub struct SnapshotHistorical<'db, T: VersionedKeyValueSchema> { impl<'db, T: VersionedKeyValueSchema> KeyValueStoreRead for SnapshotView<'db, T> { fn get(&self, key: &T::Key) -> Result> { - if let Some(opt_v) = self.pending_updates.as_ref().and_then(|u| u.get(key)) { - return Ok(opt_v.to_option()); + if let Some(pending_updates) = &self.pending_updates { + let pending_optv = pending_updates + .pending_part + .lock() + .get_versioned_key(&pending_updates.commit_id, key)?; + if let Some(pending_v) = pending_optv { + return Ok(pending_v.into_option()); + } } if let Some(history) = &self.history { @@ -141,41 +163,39 @@ impl<'db, T: VersionedKeyValueSchema> KeyValueStoreRead } } -impl<'cache, 'db, T: VersionedKeyValueSchema> KeyValueStoreManager - for VersionedStore<'cache, 'db, T> +impl<'db, T: VersionedKeyValueSchema> KeyValueStoreManager + for VersionedStore<'db, T> { type Store = SnapshotView<'db, T>; fn get_versioned_store(&self, commit: &CommitID) -> Result { - let pending_res = self.pending_part.get_versioned_store(*commit); - match pending_res { - Ok(pending_map) => { - let history = if let Some(history_commit) = self.pending_part.get_parent_of_root() { - Some(SnapshotHistorical { - history_number: self.get_history_number_by_commit_id(history_commit)?, - history_index_table: self.history_index_table.clone(), - change_history_table: self.change_history_table.clone(), - }) - } else { - None - }; - Ok(SnapshotView { - pending_updates: Some(pending_map), - history, - }) - } - Err(PendingError::CommitIDNotFound(target_commit_id)) => { - assert_eq!(target_commit_id, *commit); - let history = SnapshotHistorical { - history_number: self.get_history_number_by_commit_id(*commit)?, + let pending_guard = self.pending_part.lock(); + if pending_guard.contains_commit_id(commit) { + let history = if let Some(history_commit) = pending_guard.get_parent_of_root() { + Some(SnapshotHistorical { + history_number: self.get_history_number_by_commit_id(history_commit)?, history_index_table: self.history_index_table.clone(), change_history_table: self.change_history_table.clone(), - }; - Ok(SnapshotView { - pending_updates: None, - history: Some(history), }) - } - Err(other_err) => Err(StorageError::PendingError(other_err)), + } else { + None + }; + Ok(SnapshotView { + pending_updates: Some(SnapshotPending { + commit_id: *commit, + pending_part: self.pending_part.clone(), + }), + history, + }) + } else { + let history = SnapshotHistorical { + history_number: self.get_history_number_by_commit_id(*commit)?, + history_index_table: self.history_index_table.clone(), + change_history_table: self.change_history_table.clone(), + }; + Ok(SnapshotView { + pending_updates: None, + history: Some(history), + }) } } @@ -185,13 +205,12 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> KeyValueStoreManager Result { - let pending_res = self - .pending_part - .iter_historical_changes(&mut accept, commit_id, key); + let pending_guard = self.pending_part.lock(); + let pending_res = pending_guard.iter_historical_changes(&mut accept, commit_id, key); match pending_res { Ok(false) => Ok(false), Ok(true) => { - if let Some(history_commit) = self.pending_part.get_parent_of_root() { + if let Some(history_commit) = pending_guard.get_parent_of_root() { self.iter_historical_changes_history_part(&mut accept, &history_commit, key) } else { Ok(true) @@ -206,22 +225,25 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> KeyValueStoreManager Result<()> { + let mut pending_guard = self.pending_part.lock(); + if self.commit_id_table.get(&commit)?.is_some() { return Ok(()); } - Ok(self.pending_part.discard(commit)?) + Ok(pending_guard.discard(commit)?) } fn get_versioned_key(&self, commit: &CommitID, key: &T::Key) -> Result> { - // let pending_res = self.pending_part.get_versioned_key_with_checkout(commit, key); // this will checkout_current - let pending_res = self.pending_part.get_versioned_key(commit, key); + let pending_guard = self.pending_part.lock(); + // let pending_res = pending_guard.get_versioned_key_with_checkout(commit, key); // this will checkout_current + let pending_res = pending_guard.get_versioned_key(commit, key); let history_commit = match pending_res { Ok(Some(value)) => { return Ok(value.into_option()); } Ok(None) => { - if let Some(commit) = self.pending_part.get_parent_of_root() { + if let Some(commit) = pending_guard.get_parent_of_root() { commit } else { return Ok(None); @@ -242,7 +264,7 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> KeyValueStoreManager VersionedStore<'cache, 'db, T> { +impl<'db, T: VersionedKeyValueSchema> VersionedStore<'db, T> { fn iter_historical_changes_history_part( &self, mut accept: impl FnMut(&CommitID, &T::Key, Option<&T::Value>) -> NeedNext, diff --git a/src/middlewares/versioned_flat_key_value/mod.rs b/src/middlewares/versioned_flat_key_value/mod.rs index dc08f7b..4401a5d 100644 --- a/src/middlewares/versioned_flat_key_value/mod.rs +++ b/src/middlewares/versioned_flat_key_value/mod.rs @@ -9,6 +9,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::sync::Arc; +use parking_lot::Mutex; pub use pending_part::PendingError; #[cfg(test)] @@ -43,18 +44,18 @@ impl HistoryIndices { } } -pub struct VersionedStore<'cache, 'db, T: VersionedKeyValueSchema> { - pending_part: &'cache mut VersionedMap>, +pub struct VersionedStore<'db, T: VersionedKeyValueSchema> { + pending_part: Arc>>>, history_index_table: TableReader<'db, HistoryIndicesTable>, commit_id_table: TableReader<'db, CommitIDSchema>, history_number_table: TableReader<'db, HistoryNumberSchema>, change_history_table: KeyValueStoreBulks<'db, HistoryChangeTable>, } -impl<'cache, 'db, T: VersionedKeyValueSchema> VersionedStore<'cache, 'db, T> { +impl<'db, T: VersionedKeyValueSchema> VersionedStore<'db, T> { pub fn new( db: &'db D, - pending_part: &'cache mut VersionedMap>, + pending_part: Arc>>>, ) -> Result { let history_index_table = Arc::new(db.view::>()?); let commit_id_table = Arc::new(db.view::()?); @@ -74,16 +75,18 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> VersionedStore<'cache, 'db, T> { } pub fn add_to_pending_part( - &mut self, + &self, parent_commit: Option, commit: CommitID, updates: BTreeMap>, ) -> Result<()> { + let mut pending_guard = self.pending_part.lock(); + if self.commit_id_table.get(&commit)?.is_some() { return Err(StorageError::CommitIdAlreadyExistsInHistory); } - Ok(self.pending_part.add_node(updates, commit, parent_commit)?) + Ok(pending_guard.add_node(updates, commit, parent_commit)?) } fn get_history_number_by_commit_id(&self, commit: CommitID) -> Result { @@ -138,11 +141,12 @@ fn get_versioned_key<'db, T: VersionedKeyValueSchema>( pub fn confirmed_pending_to_history( db: &D, - pending_part: &mut VersionedMap>, + pending_part: Arc>>>, new_root_commit_id: CommitID, write_schema: &D::WriteSchema, ) -> Result<()> { - let confirmed_path = pending_part.change_root(new_root_commit_id)?; + let mut pending_guard = pending_part.lock(); + let confirmed_path = pending_guard.change_root(new_root_commit_id)?; confirm_ids_to_history::( db, diff --git a/src/middlewares/versioned_flat_key_value/pending_part/versioned_map.rs b/src/middlewares/versioned_flat_key_value/pending_part/versioned_map.rs index 8c689e0..52346c8 100644 --- a/src/middlewares/versioned_flat_key_value/pending_part/versioned_map.rs +++ b/src/middlewares/versioned_flat_key_value/pending_part/versioned_map.rs @@ -195,6 +195,10 @@ impl VersionedMap { } } + pub fn contains_commit_id(&self, commit_id: &S::CommitId) -> bool { + self.tree.contains_commit_id(commit_id) + } + pub fn get_versioned_store(&self, commit_id: S::CommitId) -> PendResult, S> { // let query node to be self.current let mut guard = self.current.write(); diff --git a/src/middlewares/versioned_flat_key_value/tests.rs b/src/middlewares/versioned_flat_key_value/tests.rs index fc9c0f0..10773ff 100644 --- a/src/middlewares/versioned_flat_key_value/tests.rs +++ b/src/middlewares/versioned_flat_key_value/tests.rs @@ -1,4 +1,5 @@ use ethereum_types::H256; +use parking_lot::Mutex; use super::{ pending_part::pending_schema::PendingKeyValueConfig, table_schema::VersionedKeyValueSchema, @@ -19,14 +20,17 @@ use crate::{ traits::{IsCompleted, KeyValueStoreManager, KeyValueStoreRead, NeedNext}, StorageError, }; -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use std::{ + collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, + sync::Arc, +}; use rand_chacha::{ rand_core::{RngCore, SeedableRng}, ChaChaRng, }; -impl<'cache, 'db, T: VersionedKeyValueSchema> VersionedStore<'cache, 'db, T> { +impl<'db, T: VersionedKeyValueSchema> VersionedStore<'db, T> { #[cfg(test)] pub fn check_consistency(&self) -> Result<()> { if self.check_consistency_inner().is_err() { @@ -42,7 +46,9 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> VersionedStore<'cache, 'db, T> { height_to_history_number, history_number_to_height, }; - if let Some(parent) = self.pending_part.get_parent_of_root() { + let pending_guard = self.pending_part.lock(); + + if let Some(parent) = pending_guard.get_parent_of_root() { let parent_history_number = if let Some(parent_history_number) = self.commit_id_table.get(&parent)? { parent_history_number.into_owned() @@ -87,9 +93,7 @@ impl<'cache, 'db, T: VersionedKeyValueSchema> VersionedStore<'cache, 'db, T> { return Err(StorageError::ConsistencyCheckFailure); } - if !self - .pending_part - .check_consistency(history_number_to_height(parent_history_number + 1)) + if !pending_guard.check_consistency(history_number_to_height(parent_history_number + 1)) { return Err(StorageError::ConsistencyCheckFailure); } @@ -765,20 +769,20 @@ fn gen_key(rng: &mut ChaChaRng, existing_keys: Vec) -> (KeyType, u64) { } } -struct VersionedStoreProxy<'a, 'b, 'c, 'cache, 'db, T: VersionedKeyValueSchema> { +struct VersionedStoreProxy<'a, 'b, 'c, 'db, T: VersionedKeyValueSchema> { mock_store: &'a mut MockVersionedStore, - real_store: &'b mut VersionedStore<'cache, 'db, T>, + real_store: &'b mut VersionedStore<'db, T>, all_keys: &'c mut BTreeSet, } -impl<'a, 'b, 'c, 'cache, 'db, T: VersionedKeyValueSchema> - VersionedStoreProxy<'a, 'b, 'c, 'cache, 'db, T> +impl<'a, 'b, 'c, 'db, T: VersionedKeyValueSchema> + VersionedStoreProxy<'a, 'b, 'c, 'db, T> where T::Value: PartialEq, { fn new( mock_store: &'a mut MockVersionedStore, - real_store: &'b mut VersionedStore<'cache, 'db, T>, + real_store: &'b mut VersionedStore<'db, T>, all_keys: &'c mut BTreeSet, ) -> Self { Self { @@ -929,8 +933,8 @@ where } } -impl<'a, 'b, 'c, 'cache, 'db, T: VersionedKeyValueSchema> - VersionedStoreProxy<'a, 'b, 'c, 'cache, 'db, T> +impl<'a, 'b, 'c, 'db, T: VersionedKeyValueSchema> + VersionedStoreProxy<'a, 'b, 'c, 'db, T> { fn get_versioned_store( &self, @@ -1155,7 +1159,7 @@ fn test_versioned_store( // init history part let write_schema = D::write_schema(); - let (history_cids, history_updates, mut pending_part) = gen_init( + let (history_cids, history_updates, pending_part) = gen_init( db, num_history, &mut rng, @@ -1164,13 +1168,14 @@ fn test_versioned_store( &mut all_keys, &write_schema, ); + let pending_part = Arc::new(Mutex::new(pending_part)); db.commit(write_schema).unwrap(); // build proxy let mut mock_versioned_store = MockVersionedStore::build(history_cids.clone(), history_updates.clone()); - let mut real_versioned_store = VersionedStore::new(db, &mut pending_part).unwrap(); + let mut real_versioned_store = VersionedStore::new(db, pending_part.clone()).unwrap(); real_versioned_store.check_consistency().unwrap(); let mut versioned_store_proxy = VersionedStoreProxy::new( @@ -1230,11 +1235,15 @@ fn test_versioned_store( drop(real_versioned_store); let write_schema = D::write_schema(); - let real_res = - confirmed_pending_to_history(db, &mut pending_part, commit_id, &write_schema); + let real_res = confirmed_pending_to_history( + db, + pending_part.clone(), + commit_id, + &write_schema, + ); db.commit(write_schema).unwrap(); - real_versioned_store = VersionedStore::new(db, &mut pending_part).unwrap(); + real_versioned_store = VersionedStore::new(db, pending_part.clone()).unwrap(); real_versioned_store.check_consistency().unwrap(); versioned_store_proxy = VersionedStoreProxy::new(