Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching system for shard file manager #159

Merged
merged 3 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cas_client/src/local_shard_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ impl LocalShardClient {
}

// This loads and cleans all the shards in the session directory; no need to do it explicitly
let shard_manager = ShardFileManager::load_dir(&shard_directory, download_only_mode).await?;
let shard_manager = ShardFileManager::builder(&shard_directory)
.with_chunk_dedup(!download_only_mode)
.with_expired_shard_cleanup(true)
.build()
.await?;

let global_dedup = DiskBasedGlobalDedupTable::open_or_create(cas_directory.join("ddb").join("chunk2shard.db"))?;

Expand Down
6 changes: 5 additions & 1 deletion data/src/remote_shard_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,12 @@ impl RemoteShardInterface {
if !file_type.is_file() || !is_shard_file(&file_path) {
continue;
}
let dest_shard_name = cache_dir.join(file_path.file_name().unwrap());

std::fs::rename(&file_path, cache_dir.join(file_path.file_name().unwrap()))?;
std::fs::rename(&file_path, &dest_shard_name)?;

// Register this in any existing shard manager
ShardFileManager::register_shard_in_existing_managers(&dest_shard_name).await?;
}

Ok(())
Expand Down
28 changes: 18 additions & 10 deletions data/src/shard_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use cas_client::{HttpShardClient, LocalShardClient, ShardClientInterface};
use mdb_shard::ShardFileManager;
use tracing::{debug, warn};
use tracing::debug;

use super::configurations::Endpoint::*;
use super::configurations::StorageConfig;
Expand All @@ -16,21 +16,29 @@ pub async fn create_shard_manager(
.staging_directory
.as_ref()
.expect("Need shard staging directory to create ShardFileManager");

let shard_cache_directory = &shard_storage_config
.cache_config
.as_ref()
.expect("Need shard cache directory to create ShardFileManager")
.cache_directory;

let shard_manager = ShardFileManager::load_dir(shard_session_directory, download_only_mode).await?;

if shard_cache_directory.exists() {
shard_manager.register_shards_by_path(&[shard_cache_directory]).await?;
} else {
warn!("Merkle DB Cache path {:?} does not exist, skipping registration.", shard_cache_directory);
}

Ok(shard_manager)
let cache_shard_manager = ShardFileManager::builder(shard_cache_directory)
.with_chunk_dedup(!download_only_mode)
.with_expired_shard_cleanup(true)
.from_global_manager_cache(true)
.build()
.await?;

let session_shard_manager = ShardFileManager::builder(shard_session_directory)
.with_chunk_dedup(!download_only_mode)
.with_expired_shard_cleanup(false)
.from_global_manager_cache(false)
.with_upstream_manager(cache_shard_manager)
.build()
.await?;

Ok(session_shard_manager)
}

pub async fn create_shard_client(
Expand Down
2 changes: 1 addition & 1 deletion mdb_shard/src/session_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn consolidate_shards_in_directory(
session_directory: &Path,
target_max_size: u64,
) -> Result<Vec<Arc<MDBShardFile>>> {
let mut shards = MDBShardFile::load_all(session_directory)?;
let mut shards = MDBShardFile::load_all_valid(session_directory)?;

shards.sort_unstable_by_key(|si| si.last_modified_time);

Expand Down
2 changes: 1 addition & 1 deletion mdb_shard/src/shard_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn run_shard_benchmark(

// Now, spawn tasks to
let counter = Arc::new(AtomicUsize::new(0));
let mdb = Arc::new(ShardFileManager::load_dir(dir, false).await?);
let mdb = Arc::new(ShardFileManager::new(dir).await?);

let start_time = Instant::now();

Expand Down
61 changes: 47 additions & 14 deletions mdb_shard/src/shard_file_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};

use merklehash::{compute_data_hash, HMACKey, HashedWrite, MerkleHash};
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};

use crate::cas_structs::CASChunkSequenceHeader;
use crate::error::{MDBShardError, Result};
use crate::file_structs::{FileDataSequenceEntry, MDBFileInfo};
use crate::shard_file::current_timestamp;
use crate::shard_format::MDBShardInfo;
use crate::utils::{parse_shard_filename, shard_file_name, temp_shard_file_name, truncate_hash};

Expand Down Expand Up @@ -128,37 +129,69 @@ impl MDBShardFile {
}
}

pub fn load_all(path: &Path) -> Result<Vec<Arc<Self>>> {
pub fn load_all_valid(path: impl AsRef<Path>) -> Result<Vec<Arc<Self>>> {
Self::load_all(path, false)
}

pub fn load_all(path: impl AsRef<Path>, load_expired: bool) -> Result<Vec<Arc<Self>>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is load_all(.., true) used anywhere? If not we don't need to make this function pub.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was used for testing, but then changed that.

let current_time = current_timestamp();

let mut ret = Vec::new();

Self::scan_impl(path, |s| {
if load_expired || current_time <= s.shard.metadata.shard_key_expiry {
ret.push(s);
}

Ok(())
})?;

Ok(ret)
}

pub fn clean_expired_shards(path: impl AsRef<Path>, expiration_buffer_secs: u64) -> Result<()> {
let current_time = current_timestamp();

Self::scan_impl(path, |s| {
if s.shard.metadata.shard_key_expiry.saturating_add(expiration_buffer_secs) <= current_time {
info!("Deleting expired shard {:?}", &s.path);
let _ = std::fs::remove_file(&s.path);
}

Ok(())
})?;

Ok(())
}

#[inline]
fn scan_impl(path: impl AsRef<Path>, mut callback: impl FnMut(Arc<Self>) -> Result<()>) -> Result<()> {
let path = path.as_ref();

if path.is_dir() {
for entry in std::fs::read_dir(path)? {
let entry = entry?;
if let Some(file_name) = entry.file_name().to_str() {
if let Some(h) = parse_shard_filename(file_name) {
ret.push(Self::load_from_hash_and_path(h, &path.join(file_name))?);
let s = Self::load_from_hash_and_path(h, &path.join(file_name))?;
s.verify_shard_integrity_debug_only();
callback(s)?;
debug!("Registerd shard file '{file_name:?}'.");
}
debug!("Found shard file '{file_name:?}'.");
}
}
} else if let Some(file_name) = path.to_str() {
if let Some(h) = parse_shard_filename(file_name) {
ret.push(Self::load_from_hash_and_path(h, &path.join(file_name))?);
let s = Self::load_from_hash_and_path(h, &path.join(file_name))?;
s.verify_shard_integrity_debug_only();
callback(s)?;
debug!("Registerd shard file '{file_name:?}'.");
} else {
return Err(MDBShardError::BadFilename(format!("Filename {file_name} not valid shard file name.")));
}
}

#[cfg(debug_assertions)]
{
// In debug mode, verify all shards on loading to catch errors earlier.
for s in ret.iter() {
s.verify_shard_integrity_debug_only();
}
}

Ok(ret)
Ok(())
}

/// Write out the current shard, re-keyed with an hmac key, to the output directory in question, returning
Expand Down
Loading