diff --git a/fs-index/Cargo.toml b/fs-index/Cargo.toml index b7284d44..230c1e61 100644 --- a/fs-index/Cargo.toml +++ b/fs-index/Cargo.toml @@ -21,6 +21,8 @@ lazy_static = "1.4.0" canonical-path = "2.0.2" pathdiff = "0.2.1" itertools = "0.10.5" +notify = "6.1" +futures = "0.3" [dev-dependencies] uuid = { version = "1.6.1", features = ["v4"] } @@ -30,6 +32,8 @@ rstest = '0.18.2' criterion = { version = "0.5", features = ["html_reports"] } pprof = { version = "0.13", features = ["criterion", "flamegraph"] } rand = "0.8" +# Examples +tokio = { version = "1.33", features = ["full"] } [[bench]] name = "index_build_benchmark" diff --git a/fs-index/examples/index_watch.rs b/fs-index/examples/index_watch.rs new file mode 100644 index 00000000..03528008 --- /dev/null +++ b/fs-index/examples/index_watch.rs @@ -0,0 +1,25 @@ +use anyhow::Result; +use fs_index::watch::watch_index; +use std::{path::Path, thread}; + +/// Example demonstrating how to use fs_index to watch a directory for changes in a separate thread. +/// This automatically updates the index when changes are detected. +fn main() -> Result<()> { + let root = Path::new("test-assets"); + + let thread_handle = thread::spawn(move || { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + if let Err(err) = watch_index(root).await { + eprintln!("Error in watching index: {:?}", err); + } + }); + }); + + thread_handle + .join() + .expect("Failed to join thread"); + + Ok(()) +} diff --git a/fs-index/src/lib.rs b/fs-index/src/lib.rs index 4259bdd1..a0149c48 100644 --- a/fs-index/src/lib.rs +++ b/fs-index/src/lib.rs @@ -6,6 +6,7 @@ extern crate canonical_path; use data_error::{ArklibError, Result}; pub mod index; +pub mod watch; pub use fs_atomic_versions::atomic::{modify, modify_json, AtomicFile}; pub use fs_storage::{ARK_FOLDER, INDEX_PATH}; diff --git a/fs-index/src/watch.rs b/fs-index/src/watch.rs new file mode 100644 index 00000000..004c83f4 --- /dev/null +++ b/fs-index/src/watch.rs @@ -0,0 +1,69 @@ +use crate::{ResourceIndex, ARK_FOLDER}; +use anyhow::Result; +use futures::{ + channel::mpsc::{channel, Receiver}, + SinkExt, StreamExt, +}; +use log::info; +use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher}; +use std::{fs, path::Path}; + +/// Watch the root path for changes and update the index +pub async fn watch_index>(root_path: P) -> Result<()> { + log::debug!( + "Attempting to watch index at root path: {:?}", + root_path.as_ref() + ); + + let root_path = fs::canonicalize(root_path.as_ref())?; + let mut index = ResourceIndex::provide(&root_path)?; + + let (mut watcher, mut rx) = async_watcher()?; + info!("Watching directory: {:?}", root_path); + let config = Config::default(); + watcher.configure(config)?; + watcher.watch(root_path.as_ref(), RecursiveMode::Recursive)?; + info!("Started watcher with config: \n\t{:?}", config); + + while let Some(res) = rx.next().await { + match res { + Ok(event) => { + // If the event is a change in .ark folder, ignore it + let ark_folder = root_path.join(ARK_FOLDER); + if event + .paths + .iter() + .any(|p| p.starts_with(&ark_folder)) + { + continue; + } + + info!("Detected event: {:?}", event); + index.update_all()?; + index.store()?; + info!("Index updated and stored"); + } + Err(e) => log::error!("Error in watcher: {:?}", e), + } + } + + Ok(()) +} + +fn async_watcher( +) -> notify::Result<(RecommendedWatcher, Receiver>)> { + let (mut tx, rx) = channel(1); + + let watcher = RecommendedWatcher::new( + move |res| { + futures::executor::block_on(async { + if let Err(err) = tx.send(res).await { + log::error!("Error sending event: {:?}", err); + } + }) + }, + Config::default(), + )?; + + Ok((watcher, rx)) +}