Skip to content

Commit

Permalink
feat(fs-index): add filesystem monitoring using notify-rs
Browse files Browse the repository at this point in the history
This commit adds the `watch_index` function to monitor changes in the root path and update the index accordingly.

Additionally, it adds an example in `fs-index/examples/index_watch.rs` to illustrate the function's usage.

Signed-off-by: Tarek <[email protected]>
  • Loading branch information
tareknaser committed Apr 22, 2024
1 parent 9a2240d commit 51c2ee5
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
4 changes: 4 additions & 0 deletions fs-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ lazy_static = "1.4.0"
canonical-path = "2.0.2"
pathdiff = "0.2.1"
itertools = "0.10.5"
notify = "6.1"
futures = "0.3"

[dev-dependencies]
uuid = { version = "1.6.1", features = ["v4"] }
Expand All @@ -30,6 +32,8 @@ rstest = '0.18.2'
criterion = { version = "0.5", features = ["html_reports"] }
pprof = { version = "0.13", features = ["criterion", "flamegraph"] }
rand = "0.8"
# Examples
tokio = { version = "1.33", features = ["full"] }

[[bench]]
name = "index_build_benchmark"
Expand Down
25 changes: 25 additions & 0 deletions fs-index/examples/index_watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use anyhow::Result;
use fs_index::watch::watch_index;
use std::{path::Path, thread};

/// Example demonstrating how to use fs_index to watch a directory for changes in a separate thread.
/// This automatically updates the index when changes are detected.
fn main() -> Result<()> {
let root = Path::new("test-assets");

let thread_handle = thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
if let Err(err) = watch_index(root).await {
eprintln!("Error in watching index: {:?}", err);
}
});
});

thread_handle
.join()
.expect("Failed to join thread");

Ok(())
}
1 change: 1 addition & 0 deletions fs-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern crate canonical_path;
use data_error::{ArklibError, Result};

pub mod index;
pub mod watch;

pub use fs_atomic_versions::atomic::{modify, modify_json, AtomicFile};
pub use fs_storage::{ARK_FOLDER, INDEX_PATH};
Expand Down
69 changes: 69 additions & 0 deletions fs-index/src/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::{ResourceIndex, ARK_FOLDER};
use anyhow::Result;
use futures::{
channel::mpsc::{channel, Receiver},
SinkExt, StreamExt,
};
use log::info;
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::{fs, path::Path};

/// Watch the root path for changes and update the index
pub async fn watch_index<P: AsRef<Path>>(root_path: P) -> Result<()> {
log::debug!(
"Attempting to watch index at root path: {:?}",
root_path.as_ref()
);

let root_path = fs::canonicalize(root_path.as_ref())?;
let mut index = ResourceIndex::provide(&root_path)?;

let (mut watcher, mut rx) = async_watcher()?;
info!("Watching directory: {:?}", root_path);
let config = Config::default();
watcher.configure(config)?;
watcher.watch(root_path.as_ref(), RecursiveMode::Recursive)?;
info!("Started watcher with config: \n\t{:?}", config);

while let Some(res) = rx.next().await {
match res {
Ok(event) => {
// If the event is a change in .ark folder, ignore it
let ark_folder = root_path.join(ARK_FOLDER);
if event
.paths
.iter()
.any(|p| p.starts_with(&ark_folder))
{
continue;
}

info!("Detected event: {:?}", event);
index.update_all()?;
index.store()?;
info!("Index updated and stored");
}
Err(e) => log::error!("Error in watcher: {:?}", e),
}
}

Ok(())
}

fn async_watcher(
) -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
let (mut tx, rx) = channel(1);

let watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
if let Err(err) = tx.send(res).await {
log::error!("Error sending event: {:?}", err);
}
})
},
Config::default(),
)?;

Ok((watcher, rx))
}

0 comments on commit 51c2ee5

Please sign in to comment.