diff --git a/cas_client/src/local_shard_client.rs b/cas_client/src/local_shard_client.rs index b3961e3..d40942f 100644 --- a/cas_client/src/local_shard_client.rs +++ b/cas_client/src/local_shard_client.rs @@ -34,7 +34,11 @@ impl LocalShardClient { } // This loads and cleans all the shards in the session directory; no need to do it explicitly - let shard_manager = ShardFileManager::load_dir(&shard_directory, download_only_mode).await?; + let shard_manager = ShardFileManager::builder(&shard_directory) + .with_chunk_dedup(!download_only_mode) + .with_expired_shard_cleanup(true) + .build() + .await?; let global_dedup = DiskBasedGlobalDedupTable::open_or_create(cas_directory.join("ddb").join("chunk2shard.db"))?; diff --git a/data/src/remote_shard_interface.rs b/data/src/remote_shard_interface.rs index 7a1066e..01a2c1e 100644 --- a/data/src/remote_shard_interface.rs +++ b/data/src/remote_shard_interface.rs @@ -317,8 +317,12 @@ impl RemoteShardInterface { if !file_type.is_file() || !is_shard_file(&file_path) { continue; } + let dest_shard_name = cache_dir.join(file_path.file_name().unwrap()); - std::fs::rename(&file_path, cache_dir.join(file_path.file_name().unwrap()))?; + std::fs::rename(&file_path, &dest_shard_name)?; + + // Register this in any existing shard manager + ShardFileManager::register_shard_in_existing_managers(&dest_shard_name).await?; } Ok(()) diff --git a/data/src/shard_interface.rs b/data/src/shard_interface.rs index bcead6a..2fea4d3 100644 --- a/data/src/shard_interface.rs +++ b/data/src/shard_interface.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use cas_client::{HttpShardClient, LocalShardClient, ShardClientInterface}; use mdb_shard::ShardFileManager; -use tracing::{debug, warn}; +use tracing::debug; use super::configurations::Endpoint::*; use super::configurations::StorageConfig; @@ -16,21 +16,29 @@ pub async fn create_shard_manager( .staging_directory .as_ref() .expect("Need shard staging directory to create ShardFileManager"); + let shard_cache_directory = &shard_storage_config .cache_config .as_ref() .expect("Need shard cache directory to create ShardFileManager") .cache_directory; - let shard_manager = ShardFileManager::load_dir(shard_session_directory, download_only_mode).await?; - - if shard_cache_directory.exists() { - shard_manager.register_shards_by_path(&[shard_cache_directory]).await?; - } else { - warn!("Merkle DB Cache path {:?} does not exist, skipping registration.", shard_cache_directory); - } - - Ok(shard_manager) + let cache_shard_manager = ShardFileManager::builder(shard_cache_directory) + .with_chunk_dedup(!download_only_mode) + .with_expired_shard_cleanup(true) + .from_global_manager_cache(true) + .build() + .await?; + + let session_shard_manager = ShardFileManager::builder(shard_session_directory) + .with_chunk_dedup(!download_only_mode) + .with_expired_shard_cleanup(false) + .from_global_manager_cache(false) + .with_upstream_manager(cache_shard_manager) + .build() + .await?; + + Ok(session_shard_manager) } pub async fn create_shard_client( diff --git a/mdb_shard/src/session_directory.rs b/mdb_shard/src/session_directory.rs index db25b9b..7d5532e 100644 --- a/mdb_shard/src/session_directory.rs +++ b/mdb_shard/src/session_directory.rs @@ -22,7 +22,7 @@ pub fn consolidate_shards_in_directory( session_directory: &Path, target_max_size: u64, ) -> Result>> { - let mut shards = MDBShardFile::load_all(session_directory)?; + let mut shards = MDBShardFile::load_all_valid(session_directory)?; shards.sort_unstable_by_key(|si| si.last_modified_time); diff --git a/mdb_shard/src/shard_benchmark.rs b/mdb_shard/src/shard_benchmark.rs index 11fd1dd..9fcabf5 100644 --- a/mdb_shard/src/shard_benchmark.rs +++ b/mdb_shard/src/shard_benchmark.rs @@ -77,7 +77,7 @@ async fn run_shard_benchmark( // Now, spawn tasks to let counter = Arc::new(AtomicUsize::new(0)); - let mdb = Arc::new(ShardFileManager::load_dir(dir, false).await?); + let mdb = Arc::new(ShardFileManager::new(dir).await?); let start_time = Instant::now(); diff --git a/mdb_shard/src/shard_file_handle.rs b/mdb_shard/src/shard_file_handle.rs index e393f82..7f87c0b 100644 --- a/mdb_shard/src/shard_file_handle.rs +++ b/mdb_shard/src/shard_file_handle.rs @@ -5,11 +5,12 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime}; use merklehash::{compute_data_hash, HMACKey, HashedWrite, MerkleHash}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use crate::cas_structs::CASChunkSequenceHeader; use crate::error::{MDBShardError, Result}; use crate::file_structs::{FileDataSequenceEntry, MDBFileInfo}; +use crate::shard_file::current_timestamp; use crate::shard_format::MDBShardInfo; use crate::utils::{parse_shard_filename, shard_file_name, temp_shard_file_name, truncate_hash}; @@ -128,37 +129,69 @@ impl MDBShardFile { } } - pub fn load_all(path: &Path) -> Result>> { + pub fn load_all_valid(path: impl AsRef) -> Result>> { + Self::load_all(path, false) + } + + pub fn load_all(path: impl AsRef, load_expired: bool) -> Result>> { + let current_time = current_timestamp(); + let mut ret = Vec::new(); + Self::scan_impl(path, |s| { + if load_expired || current_time <= s.shard.metadata.shard_key_expiry { + ret.push(s); + } + + Ok(()) + })?; + + Ok(ret) + } + + pub fn clean_expired_shards(path: impl AsRef, expiration_buffer_secs: u64) -> Result<()> { + let current_time = current_timestamp(); + + Self::scan_impl(path, |s| { + if s.shard.metadata.shard_key_expiry.saturating_add(expiration_buffer_secs) <= current_time { + info!("Deleting expired shard {:?}", &s.path); + let _ = std::fs::remove_file(&s.path); + } + + Ok(()) + })?; + + Ok(()) + } + + #[inline] + fn scan_impl(path: impl AsRef, mut callback: impl FnMut(Arc) -> Result<()>) -> Result<()> { + let path = path.as_ref(); + if path.is_dir() { for entry in std::fs::read_dir(path)? { let entry = entry?; if let Some(file_name) = entry.file_name().to_str() { if let Some(h) = parse_shard_filename(file_name) { - ret.push(Self::load_from_hash_and_path(h, &path.join(file_name))?); + let s = Self::load_from_hash_and_path(h, &path.join(file_name))?; + s.verify_shard_integrity_debug_only(); + callback(s)?; + debug!("Registerd shard file '{file_name:?}'."); } - debug!("Found shard file '{file_name:?}'."); } } } else if let Some(file_name) = path.to_str() { if let Some(h) = parse_shard_filename(file_name) { - ret.push(Self::load_from_hash_and_path(h, &path.join(file_name))?); + let s = Self::load_from_hash_and_path(h, &path.join(file_name))?; + s.verify_shard_integrity_debug_only(); + callback(s)?; debug!("Registerd shard file '{file_name:?}'."); } else { return Err(MDBShardError::BadFilename(format!("Filename {file_name} not valid shard file name."))); } } - #[cfg(debug_assertions)] - { - // In debug mode, verify all shards on loading to catch errors earlier. - for s in ret.iter() { - s.verify_shard_integrity_debug_only(); - } - } - - Ok(ret) + Ok(()) } /// Write out the current shard, re-keyed with an hmac key, to the output directory in question, returning diff --git a/mdb_shard/src/shard_file_manager.rs b/mdb_shard/src/shard_file_manager.rs index a6d68a8..26544f3 100644 --- a/mdb_shard/src/shard_file_manager.rs +++ b/mdb_shard/src/shard_file_manager.rs @@ -12,7 +12,6 @@ use crate::cas_structs::*; use crate::constants::{MDB_SHARD_EXPIRATION_BUFFER_SECS, MDB_SHARD_MIN_TARGET_SIZE}; use crate::error::{MDBShardError, Result}; use crate::file_structs::*; -use crate::shard_file::current_timestamp; use crate::shard_file_handle::MDBShardFile; use crate::shard_file_reconstructor::FileReconstructor; use crate::shard_in_memory::MDBInMemoryShard; @@ -27,6 +26,11 @@ lazy_static! { .unwrap_or(CHUNK_INDEX_TABLE_DEFAULT_MAX_SIZE); } +// The shard manager cache +lazy_static::lazy_static! { + static ref MDB_SHARD_FILE_MANAGER_CACHE: RwLock>> = RwLock::new(HashMap::default()); +} + // The structure used as the target for the dedup lookup #[repr(Rust, packed)] struct ChunkCacheElement { @@ -75,15 +79,17 @@ impl ShardBookkeeper { } } -pub struct SFMBuildParameters { +pub struct ShardFileManagerOptions { shard_directory: PathBuf, - clean_expired_shards: bool, target_shard_size: u64, chunk_dedup_enabled: bool, + cache_shard_manager: bool, + clean_expired_shards: bool, shard_expiration_delete_buffer_secs: u64, + upstream_manager: Option>, } -impl SFMBuildParameters { +impl ShardFileManagerOptions { pub fn new(shard_directory: impl AsRef) -> Self { Self { shard_directory: shard_directory.as_ref().to_path_buf(), @@ -91,6 +97,8 @@ impl SFMBuildParameters { target_shard_size: MDB_SHARD_MIN_TARGET_SIZE, chunk_dedup_enabled: true, shard_expiration_delete_buffer_secs: MDB_SHARD_EXPIRATION_BUFFER_SECS, + cache_shard_manager: false, + upstream_manager: None, } } @@ -103,18 +111,32 @@ impl SFMBuildParameters { self } + /// Makes a fallback manager that unresolved queries are queried against. + /// + /// To be used to combine a session shard manager with a cache shard manager. + pub fn with_upstream_manager(mut self, upstream_manager: Arc) -> Self { + self.upstream_manager = Some(upstream_manager); + self + } + + pub fn with_custom_expired_shard_cleanup_window(mut self, deletion_buffer_window: u64) -> Self { + self.shard_expiration_delete_buffer_secs = deletion_buffer_window; + self + } + pub fn with_expired_shard_cleanup(mut self, cleanup: bool) -> Self { self.clean_expired_shards = cleanup; + self.shard_expiration_delete_buffer_secs = MDB_SHARD_EXPIRATION_BUFFER_SECS; self } - pub fn with_shard_expiration_delete_buffer(mut self, nsecs: u64) -> Self { - self.shard_expiration_delete_buffer_secs = nsecs; + pub fn from_global_manager_cache(mut self, cache_shard_manager: bool) -> Self { + self.cache_shard_manager = cache_shard_manager; self } pub async fn build(self) -> Result> { - Ok(ShardFileManager::new_from_builder(self).await?.0) + ShardFileManager::new_from_builder(self).await } } @@ -122,6 +144,7 @@ pub struct ShardFileManager { shard_bookkeeper: RwLock, current_state: RwLock, + upstream_manager: Option>, shard_directory: PathBuf, target_shard_min_size: u64, chunk_dedup_enabled: bool, @@ -145,65 +168,111 @@ pub struct ShardFileManager { /// /// // new_shards is the list of new shards for this session. impl ShardFileManager { - pub fn builder(shard_directory: impl AsRef) -> SFMBuildParameters { - SFMBuildParameters::new(shard_directory) + pub fn builder(shard_directory: impl AsRef) -> ShardFileManagerOptions { + ShardFileManagerOptions::new(shard_directory) } - /// Construct a new shard file manager that uses session_directory as the temporary dumping - pub async fn load_dir(shard_directory: &Path, disable_chunk_dedup: bool) -> Result> { - Self::builder(shard_directory) - .with_chunk_dedup(!disable_chunk_dedup) - .build() - .await + pub async fn new(shard_directory: impl AsRef) -> Result> { + ShardFileManagerOptions::new(shard_directory).build().await } - pub async fn new_from_builder(sbp: SFMBuildParameters) -> Result<(Arc, Vec>)> { - let shard_directory = &sbp.shard_directory; + async fn new_from_builder(sbp: ShardFileManagerOptions) -> Result> { + let shard_directory = std::path::absolute(sbp.shard_directory)?; - let mut new_shards = Vec::new(); + // Make sure the shard directory exists; create it if not. + if !shard_directory.exists() { + std::fs::create_dir_all(&shard_directory)?; + } - let current_time = current_timestamp(); + // Clean up old shards if needed + if sbp.clean_expired_shards { + MDBShardFile::clean_expired_shards(&shard_directory, sbp.shard_expiration_delete_buffer_secs)?; + } + + let create_new_sfm = || { + Arc::new(Self { + shard_bookkeeper: RwLock::new(ShardBookkeeper::new()), + current_state: RwLock::new(MDBInMemoryShard::default()), + upstream_manager: sbp.upstream_manager, + shard_directory: shard_directory.clone(), + target_shard_min_size: sbp.target_shard_size, + chunk_dedup_enabled: sbp.chunk_dedup_enabled, + }) + }; - let mut deletion_candidates = Vec::new(); + let sfm = 'load_sfm: { + if !sbp.cache_shard_manager { + break 'load_sfm create_new_sfm(); + } + + let key = (shard_directory.clone(), sbp.chunk_dedup_enabled); - let shard_files = MDBShardFile::load_all(shard_directory)?; + { + let ro_lg = MDB_SHARD_FILE_MANAGER_CACHE.read().await; - // Now, go through and filter out the ones that can't be used any more, and also filter out the ones that - // can't be - new_shards.extend(shard_files.into_iter().filter_map(|s| { - let expiry_time = s.shard.metadata.shard_key_expiry; - if current_time < expiry_time { - Some(s) - } else { - if sbp.clean_expired_shards && expiry_time + sbp.shard_expiration_delete_buffer_secs <= current_time { - deletion_candidates.push(s.path.clone()); + if let Some(sfm) = ro_lg.get(&key) { + sfm.refresh_shard_dir().await?; + return Ok(sfm.clone()); } + } - None + // Now, create and insert it. + let mut rw_lg = MDB_SHARD_FILE_MANAGER_CACHE.write().await; + let sfm_entry = rw_lg.entry(key); + + // See if it's in there; insert otherwise + match sfm_entry { + std::collections::hash_map::Entry::Vacant(sfm_slot) => { + let sfm = create_new_sfm(); + sfm_slot.insert(sfm.clone()); + sfm + }, + std::collections::hash_map::Entry::Occupied(sfm) => sfm.get().clone(), } - })); + }; + + sfm.refresh_shard_dir().await?; + + Ok(sfm) + } + + pub async fn refresh_shard_dir(&self) -> Result<()> { + let mut shard_files = MDBShardFile::load_all_valid(&self.shard_directory)?; - for p in deletion_candidates { - // silently delete expired shard files, swallowing errors if need be. - let _ = std::fs::remove_file(p); + { + let shard_read_guard = self.shard_bookkeeper.read().await; + shard_files.retain(|s| !shard_read_guard.shard_lookup_by_shard_hash.contains_key(&s.shard_hash)); } - let s = Self { - shard_bookkeeper: RwLock::new(ShardBookkeeper::new()), - current_state: RwLock::new(MDBInMemoryShard::default()), - shard_directory: sbp.shard_directory, - target_shard_min_size: sbp.target_shard_size, - chunk_dedup_enabled: sbp.chunk_dedup_enabled, + self.register_shards(&shard_files).await?; + + Ok(()) + } + + /// If there are existing cached managers, register this shard with them. + pub async fn register_shard_in_existing_managers(shard_path: impl AsRef) -> Result<()> { + let shard_path = std::path::absolute(shard_path)?; + + let ro_lg = MDB_SHARD_FILE_MANAGER_CACHE.read().await; + + let Some(dir_path) = shard_path.parent() else { + return Ok(()); }; - s.register_shards(&new_shards).await?; + for download_only in [false, true] { + let key = (dir_path.to_path_buf(), download_only); + if let Some(sfm) = ro_lg.get(&key) { + sfm.register_shards_by_path(&[&shard_path]).await?; + } + } - Ok((Arc::new(s), new_shards)) + Ok(()) } pub async fn register_shards_by_path>(&self, new_shards: &[P]) -> Result<()> { let new_shards: Vec> = new_shards.iter().try_fold(Vec::new(), |mut acc, p| { - acc.extend(MDBShardFile::load_all(p.as_ref())?); + acc.extend(MDBShardFile::load_all_valid(p)?); + Result::Ok(acc) })?; @@ -223,6 +292,9 @@ impl ShardFileManager { for s in new_shards { s.verify_shard_integrity_debug_only(); + // Make sure the shard is in the shard directory + debug_assert!(s.path.starts_with(&self.shard_directory)); + if sbkp_lg.shard_lookup_by_shard_hash.contains_key(&s.shard_hash) { continue; } @@ -300,6 +372,16 @@ impl ShardFileManager { .shard_lookup_by_shard_hash .contains_key(shard_hash) } + + pub async fn registered_shard_list(&self) -> Result>> { + let shards = self.shard_bookkeeper.read().await; + + Ok(shards + .shard_lookup_by_shard_hash + .values() + .map(|&(i, j)| shards.shard_collections[i].shard_list[j].clone()) + .collect()) + } } #[async_trait] @@ -335,7 +417,12 @@ impl FileReconstructor for ShardFileManager { } } - Ok(None) + // See if the base shard manager has this information. + if let Some(parent_sfm) = &self.upstream_manager { + Box::pin(parent_sfm.get_file_reconstruction_info(file_hash)).await + } else { + Ok(None) + } } } @@ -385,7 +472,12 @@ impl ShardFileManager { } } - Ok(None) + // See if the base shard manager has this information. + if let Some(parent_sfm) = &self.upstream_manager { + Box::pin(parent_sfm.chunk_hash_dedup_query(query_hashes)).await + } else { + Ok(None) + } } /// Add CAS info to the in-memory state. @@ -555,7 +647,7 @@ mod tests { let mut rng = StdRng::seed_from_u64(seed); let shard_dir = shard_dir.as_ref(); - let sfm = ShardFileManager::load_dir(shard_dir, false).await?; + let sfm = ShardFileManager::builder(shard_dir).build().await?; let mut reference_shard = MDBInMemoryShard::default(); for _ in 0..n_shards { @@ -686,7 +778,11 @@ mod tests { } async fn sfm_with_target_shard_size(path: impl AsRef, target_size: u64) -> Result> { - ShardFileManager::builder(path).with_target_size(target_size).build().await + ShardFileManager::builder(path) + .from_global_manager_cache(false) + .with_target_size(target_size) + .build() + .await } #[tokio::test] @@ -695,7 +791,10 @@ mod tests { let mut mdb_in_mem = MDBInMemoryShard::default(); { - let mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; + let mdb = ShardFileManager::builder(tmp_dir.path()) + .from_global_manager_cache(false) + .build() + .await?; fill_with_specific_shard(&mdb, &mut mdb_in_mem, &[(0, &[(11, 5)])], &[(100, &[(200, (0, 5))])]).await?; @@ -711,13 +810,16 @@ mod tests { } { // Now, make sure that this happens if this directory is opened up - let mut mdb2 = ShardFileManager::load_dir(tmp_dir.path(), false).await?; + let mdb2 = ShardFileManager::builder(tmp_dir.path()) + .from_global_manager_cache(false) + .build() + .await?; // Make sure it's all in there this round. verify_mdb_shards_match(&mdb2, &mdb_in_mem, true).await?; // Now add some more, based on this directory - fill_with_random_shard(&mut mdb2, &mut mdb_in_mem, 0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6]).await?; + fill_with_random_shard(&mdb2, &mut mdb_in_mem, 0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6]).await?; verify_mdb_shards_match(&mdb2, &mdb_in_mem, true).await?; @@ -740,10 +842,10 @@ mod tests { async fn test_larger_simulated() -> Result<()> { let tmp_dir = TempDir::new("gitxet_shard_test_2")?; let mut mdb_in_mem = MDBInMemoryShard::default(); - let mut mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; + let mdb = ShardFileManager::new(tmp_dir.path()).await?; for i in 0..10 { - fill_with_random_shard(&mut mdb, &mut mdb_in_mem, i, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6]).await?; + fill_with_random_shard(&mdb, &mut mdb_in_mem, i, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6]).await?; verify_mdb_shards_match(&mdb, &mdb_in_mem, true).await?; @@ -759,7 +861,10 @@ mod tests { mdb.flush().await?; // Now, make sure that this happens if this directory is opened up - let mdb2 = ShardFileManager::load_dir(tmp_dir.path(), false).await?; + let mdb2 = ShardFileManager::builder(tmp_dir.path()) + .from_global_manager_cache(false) + .build() + .await?; // Make sure it's all in there this round. verify_mdb_shards_match(&mdb2, &mdb_in_mem, true).await?; @@ -775,7 +880,10 @@ mod tests { for sesh in 0..3 { for i in 0..10 { { - let mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await.unwrap(); + let mdb = ShardFileManager::builder(tmp_dir.path()) + .from_global_manager_cache(false) + .build() + .await?; fill_with_random_shard(&mdb, &mut mdb_in_mem, 100 * sesh + i, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6]) .await .unwrap(); @@ -809,7 +917,10 @@ mod tests { { // Now, make sure that this happens if this directory is opened up - let mdb2 = ShardFileManager::load_dir(tmp_dir.path(), false).await.unwrap(); + let mdb2 = ShardFileManager::builder(tmp_dir.path()) + .from_global_manager_cache(false) + .build() + .await?; verify_mdb_shards_match(&mdb2, &mdb_in_mem, true).await.unwrap(); } @@ -843,7 +954,7 @@ mod tests { // Reload and verify { - let mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; + let mdb = ShardFileManager::new(tmp_dir.path()).await?; verify_mdb_shards_match(&mdb, &mdb_in_mem, true).await?; } @@ -861,7 +972,7 @@ mod tests { // Reload and verify { - let mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; + let mdb = ShardFileManager::new(tmp_dir.path()).await?; verify_mdb_shards_match(&mdb, &mdb_in_mem, true).await?; } @@ -936,7 +1047,7 @@ mod tests { // First, load all of these with a shard file manager and check them. { - let shard_file_manager = ShardFileManager::load_dir(tmp_dir_path, false).await?; + let shard_file_manager = ShardFileManager::new(tmp_dir_path).await?; verify_mdb_shards_match(&shard_file_manager, &ref_shard, true).await?; } @@ -986,7 +1097,7 @@ mod tests { } // Now, verify that everything still works great. - let shard_file_manager = ShardFileManager::load_dir(tmp_dir_path_keyed, false).await?; + let shard_file_manager = ShardFileManager::new(tmp_dir_path_keyed).await?; verify_mdb_shards_match(&shard_file_manager, &ref_shard, include_info).await?; } @@ -995,11 +1106,14 @@ mod tests { } async fn shard_list_with_timestamp_filtering(path: &Path, del_buffer: u64) -> Result>> { - let build_params = ShardFileManager::builder(path) + Ok(ShardFileManager::builder(path) .with_expired_shard_cleanup(true) - .with_shard_expiration_delete_buffer(del_buffer); - - Ok(ShardFileManager::new_from_builder(build_params).await?.1) + .with_custom_expired_shard_cleanup_window(del_buffer) + .from_global_manager_cache(false) + .build() + .await? + .registered_shard_list() + .await?) } #[tokio::test] @@ -1032,8 +1146,9 @@ mod tests { assert_eq!(loaded_shards[0].shard_hash, out.shard_hash) } - // Sleep for 1.25 seconds to make sure at least a second has passed - std::thread::sleep(Duration::new(1, 250000000)); + // Sleep for 2.01 seconds to make sure at least a second has passed for the +1 and a second to handle the <= + // part of the equality, + std::thread::sleep(Duration::new(2, 10_000_000)); { let loaded_shards = shard_list_with_timestamp_filtering(tmp_dir_path_keyed, 100).await?;