From 3f2358848061398004b35e170c6ec85b7f2df8c5 Mon Sep 17 00:00:00 2001 From: Aron Heinecke Date: Tue, 2 Aug 2022 21:50:33 +0200 Subject: [PATCH] allow more than channels and make crossbeam optional --- examples/debounced.rs | 7 +- examples/debounced_full_custom.rs | 31 ++++++ notify-debouncer-mini/Cargo.toml | 15 ++- notify-debouncer-mini/README.md | 10 ++ notify-debouncer-mini/src/lib.rs | 169 +++++++++++++++++++++++------- 5 files changed, 194 insertions(+), 38 deletions(-) create mode 100644 examples/debounced_full_custom.rs create mode 100644 notify-debouncer-mini/README.md diff --git a/examples/debounced.rs b/examples/debounced.rs index c3192558..8e091e52 100644 --- a/examples/debounced.rs +++ b/examples/debounced.rs @@ -13,9 +13,12 @@ fn main() { } }); - let (rx, mut watcher) = new_debouncer(Duration::from_secs(2), None).unwrap(); + let (tx, rx) = std::sync::mpsc::channel(); - watcher + let mut debouncer = new_debouncer(Duration::from_secs(2), None, tx).unwrap(); + + debouncer + .watcher() .watch(Path::new("."), RecursiveMode::Recursive) .unwrap(); diff --git a/examples/debounced_full_custom.rs b/examples/debounced_full_custom.rs new file mode 100644 index 00000000..9aa79ef7 --- /dev/null +++ b/examples/debounced_full_custom.rs @@ -0,0 +1,31 @@ +use std::{path::Path, time::Duration}; + +use notify::{RecursiveMode, Watcher}; +use notify_debouncer_mini::new_debouncer; + +/// Debouncer with custom backend and waiting for exit +fn main() { + std::thread::spawn(|| { + let path = Path::new("test.txt"); + let _ = std::fs::remove_file(&path); + loop { + std::fs::write(&path, b"Lorem ipsum").unwrap(); + std::thread::sleep(Duration::from_millis(250)); + } + }); + + let (tx, rx) = std::sync::mpsc::channel(); + + let mut debouncer = new_debouncer_opt::<_,notify::PollWatcher>(Duration::from_secs(2), None, tx).unwrap(); + + debouncer + .watcher() + .watch(Path::new("."), RecursiveMode::Recursive) + .unwrap(); + + for events in rx { + for e in events { + println!("{:?}", e); + } + } +} diff --git a/notify-debouncer-mini/Cargo.toml b/notify-debouncer-mini/Cargo.toml index 05768180..da55df69 100644 --- a/notify-debouncer-mini/Cargo.toml +++ b/notify-debouncer-mini/Cargo.toml @@ -2,7 +2,15 @@ name = "notify-debouncer-mini" version = "0.1.0" edition = "2021" +rust-version = "1.56" description = "notify mini debouncer for events" +documentation = "https://docs.rs/notify_debouncer_mini" +homepage = "https://github.com/notify-rs/notify" +repository = "https://github.com/notify-rs/notify.git" +authors = ["Aron Heinecke "] +keywords = ["events", "filesystem", "notify", "watch"] +license = "CC0-1.0 OR Artistic-2.0" +readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -10,5 +18,10 @@ description = "notify mini debouncer for events" name = "notify_debouncer_mini" path = "src/lib.rs" +[features] +default = ["crossbeam"] + [dependencies] -notify = "5.0.0-pre.15" \ No newline at end of file +notify = "5.0.0-pre.15" +crossbeam-channel = { version = "0.5", optional = true } +serde = { version = "1.0.89", features = ["derive"], optional = true } \ No newline at end of file diff --git a/notify-debouncer-mini/README.md b/notify-debouncer-mini/README.md new file mode 100644 index 00000000..f5bbfaa5 --- /dev/null +++ b/notify-debouncer-mini/README.md @@ -0,0 +1,10 @@ +# Notify debouncer + +Tiny debouncer for notify. Filters incoming events and emits only one event per timeframe per file. + +## Features + +- `crossbeam` enabled by default, for crossbeam channel support. +This may create problems used in tokio environments. See [#380](https://github.com/notify-rs/notify/issues/380). +Use someting like `notify-debouncer-mini = { version = "*", default-features = false }` to disable it. +- `serde` for serde support of event types, off by default \ No newline at end of file diff --git a/notify-debouncer-mini/src/lib.rs b/notify-debouncer-mini/src/lib.rs index 3d0e6ebb..ced8eeec 100644 --- a/notify-debouncer-mini/src/lib.rs +++ b/notify-debouncer-mini/src/lib.rs @@ -5,7 +5,7 @@ use std::{ collections::HashMap, path::PathBuf, sync::{ - mpsc::{self, Receiver}, + atomic::{AtomicBool, Ordering}, Arc, Mutex, }, time::{Duration, Instant}, @@ -13,6 +13,51 @@ use std::{ use notify::{Error, ErrorKind, Event, RecommendedWatcher, Watcher}; +/// The set of requirements for watcher debounce event handling functions. +/// +/// # Example implementation +/// +/// ```no_run +/// use notify::{Event, Result, EventHandler}; +/// +/// /// Prints received events +/// struct EventPrinter; +/// +/// impl EventHandler for EventPrinter { +/// fn handle_event(&mut self, event: Result) { +/// if let Ok(event) = event { +/// println!("Event: {:?}", event); +/// } +/// } +/// } +/// ``` +pub trait DebounceEventHandler: Send + 'static { + /// Handles an event. + fn handle_event(&mut self, event: DebouncedEvents); +} + +impl DebounceEventHandler for F +where + F: FnMut(DebouncedEvents) + Send + 'static, +{ + fn handle_event(&mut self, event: DebouncedEvents) { + (self)(event); + } +} + +#[cfg(feature = "crossbeam")] +impl DebounceEventHandler for crossbeam_channel::Sender { + fn handle_event(&mut self, event: DebouncedEvents) { + let _ = self.send(event); + } +} + +impl DebounceEventHandler for std::sync::mpsc::Sender { + fn handle_event(&mut self, event: DebouncedEvents) { + let _ = self.send(event); + } +} + /// Deduplicate event data entry struct EventData { /// Insertion Time @@ -31,7 +76,7 @@ impl EventData { } } -type DebounceChannelType = Result, Vec>; +type DebouncedEvents = Result, Vec>; /// A debounced event kind. #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] @@ -118,17 +163,59 @@ impl DebounceDataInner { } } -/// Creates a new debounced watcher. -/// +/// Debouncer guard, stops the debouncer on drop +pub struct Debouncer { + stop: Arc, + watcher: T, + debouncer_thread: Option>, +} + +impl Debouncer { + /// Stop the debouncer, waits for the event thread to finish. + /// May block for the duration of one tick_rate. + pub fn stop(mut self) { + self.set_stop(); + if let Some(t) = self.debouncer_thread.take() { + let _ = t.join(); + } + } + + /// Stop the debouncer, does not wait for the event thread to finish. + pub fn stop_nonblocking(self) { + self.set_stop(); + } + + fn set_stop(&self) { + self.stop.store(true, Ordering::Relaxed); + } + + /// Access to the internally used notify Watcher backend + pub fn watcher(&mut self) -> &mut dyn Watcher { + &mut self.watcher + } +} + +impl Drop for Debouncer { + fn drop(&mut self) { + // don't imitate c++ async futures and block on drop + self.set_stop(); + } +} + +/// Creates a new debounced watcher with custom configuration. +/// /// Timeout is the amount of time after which a debounced event is emitted or a Continuous event is send, if there still are events incoming for the specific path. -/// +/// /// If tick_rate is None, notify will select a tick rate that is less than the provided timeout. -pub fn new_debouncer( +pub fn new_debouncer_opt( timeout: Duration, tick_rate: Option, -) -> Result<(Receiver, RecommendedWatcher), Error> { + mut event_handler: F, +) -> Result, Error> { let data = DebounceData::default(); + let stop = Arc::new(AtomicBool::new(false)); + let tick_div = 4; let tick = match tick_rate { Some(v) => { @@ -153,38 +240,31 @@ pub fn new_debouncer( data_w.timeout = timeout; } - let (tx, rx) = mpsc::channel(); - let data_c = data.clone(); - - std::thread::Builder::new() + let stop_c = stop.clone(); + let thread = std::thread::Builder::new() .name("notify-rs debouncer loop".to_string()) - .spawn(move || { - loop { - std::thread::sleep(tick); - let send_data; - let errors: Vec; - { - let mut lock = data_c.lock().expect("Can't lock debouncer data!"); - send_data = lock.debounced_events(); - errors = lock.errors(); - } - if send_data.len() > 0 { - // channel shut down - if tx.send(Ok(send_data)).is_err() { - break; - } - } - if errors.len() > 0 { - // channel shut down - if tx.send(Err(errors)).is_err() { - break; - } - } + .spawn(move || loop { + if stop_c.load(Ordering::Acquire) { + break; + } + std::thread::sleep(tick); + let send_data; + let errors: Vec; + { + let mut lock = data_c.lock().expect("Can't lock debouncer data!"); + send_data = lock.debounced_events(); + errors = lock.errors(); + } + if send_data.len() > 0 { + event_handler.handle_event(Ok(send_data)); + } + if errors.len() > 0 { + event_handler.handle_event(Err(errors)); } })?; - let watcher = RecommendedWatcher::new(move |e: Result| { + let watcher = T::new(move |e: Result| { let mut lock = data.lock().expect("Can't lock debouncer data!"); match e { @@ -194,5 +274,24 @@ pub fn new_debouncer( } })?; - Ok((rx, watcher)) + let guard = Debouncer { + watcher, + debouncer_thread: Some(thread), + stop, + }; + + Ok(guard) +} + +/// Short function to create a new debounced watcher with the recommended debouncer. +/// +/// Timeout is the amount of time after which a debounced event is emitted or a Continuous event is send, if there still are events incoming for the specific path. +/// +/// If tick_rate is None, notify will select a tick rate that is less than the provided timeout. +pub fn new_debouncer( + timeout: Duration, + tick_rate: Option, + event_handler: F, +) -> Result, Error> { + new_debouncer_opt::(timeout, tick_rate, event_handler) }