diff --git a/crates/turborepo-globwatch/src/lib.rs b/crates/turborepo-globwatch/src/lib.rs index 0d4ea2bc2892c1..c889f7b2963771 100644 --- a/crates/turborepo-globwatch/src/lib.rs +++ b/crates/turborepo-globwatch/src/lib.rs @@ -235,6 +235,8 @@ pub enum ConfigError { ServerStopped, /// Watch error WatchError(Vec), + /// The server has already been consumed. + WatchingAlready, } impl WatchConfig { @@ -287,6 +289,24 @@ impl WatchConfig { .map_err(ConfigError::WatchError) } + /// Register a single path to be included by the watcher. + pub async fn include_path(&self, path: &Path) -> Result<(), ConfigError> { + trace!("watching {:?}", path); + // Windows doesn't create an event when a watched directory itself is deleted + // we watch the parent directory instead. + // More information at https://github.com/notify-rs/notify/issues/403 + #[cfg(windows)] + let watched_path = path.parent().expect("turbo is unusable at filesytem root"); + #[cfg(not(windows))] + let watched_path = path; + + self.watcher + .lock() + .expect("watcher lock poisoned") + .watch(watched_path, notify::RecursiveMode::NonRecursive) + .map_err(|e| ConfigError::WatchError(vec![e])) + } + /// Register a glob to be excluded by the watcher. #[tracing::instrument(skip(self))] pub async fn exclude(&self, relative_to: &Path, glob: &str) { diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index 7ddd0ed84c4667..e1a26c8cc3559c 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -195,7 +195,15 @@ impl DaemonServer { None => CloseReason::ServerClosed, } }, - _ = watcher_fut => CloseReason::WatcherClosed, + watch_res = watcher_fut => { + match watch_res { + Ok(()) => CloseReason::WatcherClosed, + Err(e) => { + error!("Globwatch config error: {:?}", e); + CloseReason::WatcherClosed + }, + } + }, } // here the stop token is dropped, and the pid lock is dropped diff --git a/crates/turborepo-lib/src/globwatcher/mod.rs b/crates/turborepo-lib/src/globwatcher/mod.rs index cc3863a1158c55..235ddaf5a8f81a 100644 --- a/crates/turborepo-lib/src/globwatcher/mod.rs +++ b/crates/turborepo-lib/src/globwatcher/mod.rs @@ -8,7 +8,7 @@ use std::{ use futures::{stream::iter, StreamExt}; use globwatch::{ConfigError, GlobWatcher, StopToken, WatchConfig, Watcher}; use itertools::Itertools; -use notify::RecommendedWatcher; +use notify::{EventKind, RecommendedWatcher}; use tokio::time::timeout; use tracing::{trace, warn}; use turbopath::AbsoluteSystemPathBuf; @@ -66,7 +66,7 @@ impl HashGlobWatcher { /// Watches a given path, using the flush_folder as temporary storage to /// make sure that file events are handled in the appropriate order. #[tracing::instrument(skip(self, token))] - pub async fn watch(&self, token: StopToken) { + pub async fn watch(&self, token: StopToken) -> Result<(), ConfigError> { let start_globs = { let lock = self.hash_globs.lock().expect("only fails if poisoned"); lock.iter() @@ -79,17 +79,25 @@ impl HashGlobWatcher { Some(watcher) => watcher.into_stream(token), None => { warn!("watcher already consumed"); - return; + return Err(ConfigError::WatchingAlready); } }; + // watch the root of the repo to shut down if the folder is deleted + self.config.include_path(&self.relative_to).await?; + // watch all the globs currently in the map for glob in start_globs { self.config.include(&self.relative_to, &glob).await.ok(); } while let Some(Ok(event)) = stream.next().await { - trace!("processing event: {:?}", event); + if event.paths.contains(&self.relative_to) && matches!(event.kind, EventKind::Remove(_)) + { + // if the root of the repo is deleted, we shut down + trace!("repo root was removed, shutting down"); + break; + } let repo_relative_paths = event .paths @@ -115,6 +123,8 @@ impl HashGlobWatcher { self.config.exclude(&self.relative_to, &glob).await; } } + + Ok(()) } /// registers a hash with a set of globs to watch for changes @@ -335,9 +345,10 @@ fn clear_hash_globs( #[cfg(test)] mod test { - use std::{fs::File, sync::Arc}; + use std::{fs::File, sync::Arc, time::Duration}; use globwatch::StopSource; + use tokio::time::timeout; use turbopath::AbsoluteSystemPathBuf; fn setup() -> tempdir::TempDir { @@ -657,4 +668,36 @@ mod test { watcher.glob_statuses.lock().unwrap() ); } + + #[tokio::test] + #[tracing_test::traced_test] + async fn delete_root_kill_daemon() { + let dir = setup(); + let flush = tempdir::TempDir::new("globwatch-flush").unwrap(); + let watcher = Arc::new( + super::HashGlobWatcher::new( + AbsoluteSystemPathBuf::new(dir.path()).unwrap(), + flush.path().to_path_buf(), + ) + .unwrap(), + ); + + let stop = StopSource::new(); + + let task_watcher = watcher.clone(); + let token = stop.token(); + + // dropped when the test ends + let task = tokio::task::spawn(async move { task_watcher.watch(token).await }); + + watcher.config.flush().await.unwrap(); + std::fs::remove_dir_all(dir.path()).unwrap(); + + // it should shut down + match timeout(Duration::from_secs(60), task).await { + Err(e) => panic!("test timed out: {e}"), + Ok(Err(e)) => panic!("expected task to finish when root is deleted: {e}"), + _ => (), + } + } }