-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add debouncer #286
Merged
Add debouncer #286
Changes from 5 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
ed242d1
Add debouncer
0xpr03 e9fbc59
cleanup
0xpr03 86a1203
remove smart event kind handler in debouncer
0xpr03 baa22c8
send errors in debouncer, add example, name thread
0xpr03 2e3a2ac
Allow selecting a tick_rate for the debouncer
0xpr03 74333bb
move debouncer to own crate and fix audit failures
0xpr03 7e19d21
allow more than channels and make crossbeam optional
0xpr03 00c7cba
add docs
0xpr03 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
use std::{path::Path, time::Duration}; | ||
|
||
use notify::{new_debouncer, RecursiveMode, Watcher}; | ||
|
||
fn main() { | ||
std::thread::spawn(|| { | ||
let path = Path::new("test.txt"); | ||
let _ = std::fs::remove_file(&path); | ||
loop { | ||
std::fs::write(&path, b"Lorem ipsum").unwrap(); | ||
std::thread::sleep(Duration::from_millis(250)); | ||
} | ||
}); | ||
|
||
let (rx, mut watcher) = new_debouncer(Duration::from_secs(2), None).unwrap(); | ||
|
||
watcher | ||
.watch(Path::new("."), RecursiveMode::Recursive) | ||
.unwrap(); | ||
|
||
for events in rx { | ||
for e in events { | ||
println!("{:?}", e); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
//! Debouncer & access code | ||
#[cfg(feature = "serde")] | ||
use serde::{Deserialize, Serialize}; | ||
use std::{ | ||
collections::HashMap, | ||
path::PathBuf, | ||
sync::{ | ||
mpsc::{self, Receiver}, | ||
Arc, Mutex, | ||
}, | ||
time::{Duration, Instant}, | ||
}; | ||
|
||
use crate::{Error, ErrorKind, Event, RecommendedWatcher, Watcher}; | ||
|
||
/// Deduplicate event data entry | ||
struct EventData { | ||
/// Insertion Time | ||
insert: Instant, | ||
/// Last Update | ||
update: Instant, | ||
} | ||
|
||
impl EventData { | ||
fn new_any() -> Self { | ||
let time = Instant::now(); | ||
Self { | ||
insert: time.clone(), | ||
update: time, | ||
} | ||
} | ||
} | ||
|
||
type DebounceChannelType = Result<Vec<DebouncedEvent>, Vec<Error>>; | ||
|
||
/// A debounced event kind. | ||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] | ||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] | ||
#[non_exhaustive] | ||
pub enum DebouncedEventKind { | ||
/// When precise events are disabled for files | ||
Any, | ||
/// Event but debounce timed out (for example continuous writes) | ||
AnyContinuous, | ||
} | ||
|
||
/// A debounced event. | ||
/// | ||
/// Does not emit any specific event type on purpose, only distinguishes between an any event and a continuous any event. | ||
#[derive(Clone, Debug, Eq, Hash, PartialEq)] | ||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] | ||
pub struct DebouncedEvent { | ||
/// Event path | ||
pub path: PathBuf, | ||
/// Event kind | ||
pub kind: DebouncedEventKind, | ||
} | ||
|
||
impl DebouncedEvent { | ||
fn new(path: PathBuf, kind: DebouncedEventKind) -> Self { | ||
Self { path, kind } | ||
} | ||
} | ||
|
||
type DebounceData = Arc<Mutex<DebounceDataInner>>; | ||
|
||
#[derive(Default)] | ||
struct DebounceDataInner { | ||
d: HashMap<PathBuf, EventData>, | ||
timeout: Duration, | ||
e: Vec<crate::Error>, | ||
} | ||
|
||
impl DebounceDataInner { | ||
/// Retrieve a vec of debounced events, removing them if not continuous | ||
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> { | ||
let mut events_expired = Vec::with_capacity(self.d.len()); | ||
let mut data_back = HashMap::with_capacity(self.d.len()); | ||
// TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618 | ||
for (k, v) in self.d.drain() { | ||
if v.update.elapsed() >= self.timeout { | ||
println!("normal timeout"); | ||
events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::Any)); | ||
} else if v.insert.elapsed() >= self.timeout { | ||
println!("continuous"); | ||
data_back.insert(k.clone(), v); | ||
events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::AnyContinuous)); | ||
} else { | ||
data_back.insert(k, v); | ||
} | ||
} | ||
self.d = data_back; | ||
events_expired | ||
} | ||
|
||
/// Returns all currently stored errors | ||
pub fn errors(&mut self) -> Vec<Error> { | ||
let mut v = Vec::new(); | ||
std::mem::swap(&mut v, &mut self.e); | ||
v | ||
} | ||
|
||
/// Add an error entry to re-send later on | ||
pub fn add_error(&mut self, e: crate::Error) { | ||
self.e.push(e); | ||
} | ||
|
||
/// Add new event to debouncer cache | ||
pub fn add_event(&mut self, e: Event) { | ||
for path in e.paths.into_iter() { | ||
if let Some(v) = self.d.get_mut(&path) { | ||
v.update = Instant::now(); | ||
println!("Exists"); | ||
} else { | ||
self.d.insert(path, EventData::new_any()); | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Creates a new debounced watcher. | ||
/// | ||
/// Timeout is the amount of time after which a debounced event is emitted or a Continuous event is send, if there still are events incoming for the specific path. | ||
/// | ||
/// If tick_rate is None, notify will select a tick rate that is less than the provided timeout. | ||
pub fn new_debouncer( | ||
timeout: Duration, | ||
tick_rate: Option<Duration>, | ||
) -> Result<(Receiver<DebounceChannelType>, RecommendedWatcher), Error> { | ||
let data = DebounceData::default(); | ||
|
||
let tick_div = 4; | ||
let tick = match tick_rate { | ||
Some(v) => { | ||
if v > timeout { | ||
return Err(Error::new(ErrorKind::Generic(format!( | ||
"Invalid tick_rate, tick rate {:?} > {:?} timeout!", | ||
v, timeout | ||
)))); | ||
} | ||
v | ||
} | ||
None => timeout.checked_div(tick_div).ok_or_else(|| { | ||
Error::new(ErrorKind::Generic(format!( | ||
"Failed to calculate tick as {:?}/{}!", | ||
timeout, tick_div | ||
))) | ||
})?, | ||
}; | ||
|
||
{ | ||
let mut data_w = data.lock().unwrap(); | ||
data_w.timeout = timeout; | ||
} | ||
|
||
let (tx, rx) = mpsc::channel(); | ||
|
||
let data_c = data.clone(); | ||
|
||
std::thread::Builder::new() | ||
.name("notify-rs debouncer loop".to_string()) | ||
.spawn(move || { | ||
loop { | ||
std::thread::sleep(tick); | ||
let send_data; | ||
let errors: Vec<crate::Error>; | ||
{ | ||
let mut lock = data_c.lock().expect("Can't lock debouncer data!"); | ||
send_data = lock.debounced_events(); | ||
errors = lock.errors(); | ||
} | ||
if send_data.len() > 0 { | ||
// channel shut down | ||
if tx.send(Ok(send_data)).is_err() { | ||
break; | ||
} | ||
} | ||
if errors.len() > 0 { | ||
// channel shut down | ||
if tx.send(Err(errors)).is_err() { | ||
break; | ||
} | ||
} | ||
} | ||
})?; | ||
|
||
let watcher = RecommendedWatcher::new(move |e: Result<Event, Error>| { | ||
let mut lock = data.lock().expect("Can't lock debouncer data!"); | ||
|
||
match e { | ||
Ok(e) => lock.add_event(e), | ||
// can't have multiple TX, so we need to pipe that through our debouncer | ||
Err(e) => lock.add_error(e), | ||
} | ||
})?; | ||
|
||
Ok((rx, watcher)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd like to manage this value as const and declare it on the module scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understandable, had a moment of wrangling with myself whether I'd want that as top module declaration