Skip to content
This repository has been archived by the owner on Feb 3, 2025. It is now read-only.

Commit

Permalink
Delay batched remote storage for fedimint
Browse files Browse the repository at this point in the history
  • Loading branch information
AnthonyRonning committed Jan 29, 2024
1 parent f236ccd commit 790852a
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 3 deletions.
4 changes: 2 additions & 2 deletions mutiny-core/src/federation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,9 @@ impl<'a, S: MutinyStorage> IRawDatabaseTransaction for IndexedDBPseudoTransactio
version,
value: serde_json::to_value(hex_serialized_data).unwrap(),
};
// TODO await on persisting remotely
self.storage
.set_data(key_id(&self.federation_id), value, Some(version))?;
.set_data_async_queue_remote(key_id(&self.federation_id), value, version)
.await?;

Ok(())
}
Expand Down
92 changes: 91 additions & 1 deletion mutiny-core/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::ldkstorage::CHANNEL_MANAGER_KEY;
use crate::logging::MutinyLogger;
use crate::nodemanager::{NodeStorage, DEVICE_LOCK_INTERVAL_SECS};
use crate::utils::{now, spawn};
Expand All @@ -11,11 +10,13 @@ use crate::{
error::{MutinyError, MutinyStorageError},
event::PaymentInfo,
};
use crate::{ldkstorage::CHANNEL_MANAGER_KEY, utils::sleep};
use async_trait::async_trait;
use bdk::chain::{Append, PersistBackend};
use bip39::Mnemonic;
use bitcoin::hashes::hex::ToHex;
use bitcoin::hashes::Hash;
use futures_util::lock::Mutex;
use hex::FromHex;
use lightning::{ln::PaymentHash, util::logger::Logger};
use lightning::{log_error, log_trace};
Expand All @@ -40,6 +41,25 @@ pub(crate) const EXPECTED_NETWORK_KEY: &str = "network";
const PAYMENT_INBOUND_PREFIX_KEY: &str = "payment_inbound/";
const PAYMENT_OUTBOUND_PREFIX_KEY: &str = "payment_outbound/";
pub const LAST_DM_SYNC_TIME_KEY: &str = "last_dm_sync_time";
const DELAYED_WRITE_MS: i32 = 50;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DelayedKeyValueItem {
pub key: String,
pub value: Value,
pub version: u32,
pub write_time: u128,
}

impl From<DelayedKeyValueItem> for VssKeyValueItem {
fn from(item: DelayedKeyValueItem) -> Self {
VssKeyValueItem {
key: item.key,
value: item.value,
version: item.version,
}
}
}

fn needs_encryption(key: &str) -> bool {
match key {
Expand Down Expand Up @@ -213,6 +233,66 @@ pub trait MutinyStorage: Clone + Sized + Send + Sync + 'static {
Ok(())
}

fn get_delayed_objects(&self) -> Arc<Mutex<HashMap<String, DelayedKeyValueItem>>>;

/// Set a value to persist in local storage, queues remote save
/// The function will encrypt the value if needed
async fn set_data_async_queue_remote<T>(
&self,
key: String,
value: T,
version: u32,
) -> Result<(), MutinyError>
where
T: Serialize + Send,
{
let data = serde_json::to_value(value).map_err(|e| MutinyError::PersistenceFailed {
source: MutinyStorageError::SerdeError { source: e },
})?;

// encrypt value in async block so it can be done in parallel
// with the VSS call
let local_data = data.clone();
let key_clone = key.clone();
let json: Value = encrypt_value(key_clone.clone(), local_data, self.cipher())?;
self.set_async(key_clone, json).await?;

// save to VSS if it is enabled
// queue up keys to persist later
if let Some(vss) = self.vss_client() {
let initial_write_time = now().as_millis();
let item = DelayedKeyValueItem {
key: key.clone(),
value: data,
version,
write_time: initial_write_time,
};

let delayed_lock = self.get_delayed_objects();
let mut delayed_keys = delayed_lock.lock().await;
delayed_keys.insert(key.clone(), item.clone());
drop(delayed_keys);

let delayed_keys_ref = self.get_delayed_objects();
let original_item = item.clone();
spawn(async move {
sleep(DELAYED_WRITE_MS).await;

let threaded_keys = delayed_keys_ref.lock().await;

if let Some(key_to_check) = threaded_keys.get(&key) {
if key_to_check.write_time == initial_write_time {
drop(threaded_keys);

let _ = vss.put_objects(vec![original_item.into()]).await;
}
}
});
}

Ok(())
}

/// Get a value from the storage, use get_data if you want the value to be decrypted
fn get<T>(&self, key: impl AsRef<str>) -> Result<Option<T>, MutinyError>
where
Expand Down Expand Up @@ -481,6 +561,7 @@ pub struct MemoryStorage {
pub cipher: Option<Cipher>,
pub memory: Arc<RwLock<HashMap<String, Value>>>,
pub vss_client: Option<Arc<MutinyVssClient>>,
delayed_keys: Arc<Mutex<HashMap<String, DelayedKeyValueItem>>>,
}

impl MemoryStorage {
Expand All @@ -494,6 +575,7 @@ impl MemoryStorage {
password,
memory: Arc::new(RwLock::new(HashMap::new())),
vss_client,
delayed_keys: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -629,6 +711,10 @@ impl MutinyStorage for MemoryStorage {
async fn fetch_device_lock(&self) -> Result<Option<DeviceLock>, MutinyError> {
self.get_device_lock()
}

fn get_delayed_objects(&self) -> Arc<Mutex<HashMap<String, DelayedKeyValueItem>>> {
self.delayed_keys.clone()
}
}

// Dummy implementation for testing or if people want to ignore persistence
Expand Down Expand Up @@ -695,6 +781,10 @@ impl MutinyStorage for () {
async fn fetch_device_lock(&self) -> Result<Option<DeviceLock>, MutinyError> {
self.get_device_lock()
}

fn get_delayed_objects(&self) -> Arc<Mutex<HashMap<String, DelayedKeyValueItem>>> {
Arc::new(Mutex::new(HashMap::new()))
}
}

fn payment_key(inbound: bool, payment_hash: &[u8; 32]) -> String {
Expand Down
7 changes: 7 additions & 0 deletions mutiny-wasm/src/indexed_db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::anyhow;
use async_trait::async_trait;
use bip39::Mnemonic;
use futures::lock::Mutex;
use gloo_utils::format::JsValueSerdeExt;
use lightning::util::logger::Logger;
use lightning::{log_debug, log_error};
Expand Down Expand Up @@ -43,6 +44,7 @@ pub struct IndexedDbStorage {
pub(crate) indexed_db: Arc<RwLock<RexieContainer>>,
vss: Option<Arc<MutinyVssClient>>,
logger: Arc<MutinyLogger>,
delayed_keys: Arc<Mutex<HashMap<String, DelayedKeyValueItem>>>,
}

impl IndexedDbStorage {
Expand Down Expand Up @@ -73,6 +75,7 @@ impl IndexedDbStorage {
indexed_db,
vss,
logger,
delayed_keys: Arc::new(Mutex::new(HashMap::new())),
})
}

Expand Down Expand Up @@ -786,6 +789,10 @@ impl MutinyStorage for IndexedDbStorage {
}
}
}

fn get_delayed_objects(&self) -> Arc<Mutex<HashMap<String, DelayedKeyValueItem>>> {
self.delayed_keys.clone()
}
}

#[cfg(test)]
Expand Down

0 comments on commit 790852a

Please sign in to comment.