Skip to content

Commit

Permalink
fix(daemon): kill daemon when root is removed (#5038)
Browse files Browse the repository at this point in the history
### Description

Fixes #5025

In Go code we would shut down the daemon whenever the repository root
was removed. This PR adds that logic back by watching the root and
exiting the daemon if the root is removed.

### Testing Instructions

Added unit test. Follow instructions in reproduction repository in
`uncurated-tests/turborepo-netflix-cache-repro`

---------

Co-authored-by: Alexander Lyon <arlyon@me.com>
Co-authored-by: Chris Olszewski <Chris Olszewski>
  • Loading branch information
2 people authored and Greg Soltis committed May 22, 2023
1 parent 77277af commit b0b9c43
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 6 deletions.
20 changes: 20 additions & 0 deletions crates/turborepo-globwatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ pub enum ConfigError {
ServerStopped,
/// Watch error
WatchError(Vec<notify::Error>),
/// The server has already been consumed.
WatchingAlready,
}

impl<T: Watcher> WatchConfig<T> {
Expand Down Expand Up @@ -287,6 +289,24 @@ impl<T: Watcher> WatchConfig<T> {
.map_err(ConfigError::WatchError)
}

/// Register a single path to be included by the watcher.
pub async fn include_path(&self, path: &Path) -> Result<(), ConfigError> {
trace!("watching {:?}", path);
// Windows doesn't create an event when a watched directory itself is deleted
// we watch the parent directory instead.
// More information at https://github.com/notify-rs/notify/issues/403
#[cfg(windows)]
let watched_path = path.parent().expect("turbo is unusable at filesytem root");
#[cfg(not(windows))]
let watched_path = path;

self.watcher
.lock()
.expect("watcher lock poisoned")
.watch(watched_path, notify::RecursiveMode::NonRecursive)
.map_err(|e| ConfigError::WatchError(vec![e]))
}

/// Register a glob to be excluded by the watcher.
#[tracing::instrument(skip(self))]
pub async fn exclude(&self, relative_to: &Path, glob: &str) {
Expand Down
10 changes: 9 additions & 1 deletion crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,15 @@ impl<T: Watcher + Send + 'static> DaemonServer<T> {
None => CloseReason::ServerClosed,
}
},
_ = watcher_fut => CloseReason::WatcherClosed,
watch_res = watcher_fut => {
match watch_res {
Ok(()) => CloseReason::WatcherClosed,
Err(e) => {
error!("Globwatch config error: {:?}", e);
CloseReason::WatcherClosed
},
}
},
}

// here the stop token is dropped, and the pid lock is dropped
Expand Down
53 changes: 48 additions & 5 deletions crates/turborepo-lib/src/globwatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use futures::{stream::iter, StreamExt};
use globwatch::{ConfigError, GlobWatcher, StopToken, WatchConfig, Watcher};
use itertools::Itertools;
use notify::RecommendedWatcher;
use notify::{EventKind, RecommendedWatcher};
use tokio::time::timeout;
use tracing::{trace, warn};
use turbopath::AbsoluteSystemPathBuf;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<T: Watcher> HashGlobWatcher<T> {
/// Watches a given path, using the flush_folder as temporary storage to
/// make sure that file events are handled in the appropriate order.
#[tracing::instrument(skip(self, token))]
pub async fn watch(&self, token: StopToken) {
pub async fn watch(&self, token: StopToken) -> Result<(), ConfigError> {
let start_globs = {
let lock = self.hash_globs.lock().expect("only fails if poisoned");
lock.iter()
Expand All @@ -79,17 +79,25 @@ impl<T: Watcher> HashGlobWatcher<T> {
Some(watcher) => watcher.into_stream(token),
None => {
warn!("watcher already consumed");
return;
return Err(ConfigError::WatchingAlready);
}
};

// watch the root of the repo to shut down if the folder is deleted
self.config.include_path(&self.relative_to).await?;

// watch all the globs currently in the map
for glob in start_globs {
self.config.include(&self.relative_to, &glob).await.ok();
}

while let Some(Ok(event)) = stream.next().await {
trace!("processing event: {:?}", event);
if event.paths.contains(&self.relative_to) && matches!(event.kind, EventKind::Remove(_))
{
// if the root of the repo is deleted, we shut down
trace!("repo root was removed, shutting down");
break;
}

let repo_relative_paths = event
.paths
Expand All @@ -115,6 +123,8 @@ impl<T: Watcher> HashGlobWatcher<T> {
self.config.exclude(&self.relative_to, &glob).await;
}
}

Ok(())
}

/// registers a hash with a set of globs to watch for changes
Expand Down Expand Up @@ -335,9 +345,10 @@ fn clear_hash_globs(

#[cfg(test)]
mod test {
use std::{fs::File, sync::Arc};
use std::{fs::File, sync::Arc, time::Duration};

use globwatch::StopSource;
use tokio::time::timeout;
use turbopath::AbsoluteSystemPathBuf;

fn setup() -> tempdir::TempDir {
Expand Down Expand Up @@ -657,4 +668,36 @@ mod test {
watcher.glob_statuses.lock().unwrap()
);
}

#[tokio::test]
#[tracing_test::traced_test]
async fn delete_root_kill_daemon() {
let dir = setup();
let flush = tempdir::TempDir::new("globwatch-flush").unwrap();
let watcher = Arc::new(
super::HashGlobWatcher::new(
AbsoluteSystemPathBuf::new(dir.path()).unwrap(),
flush.path().to_path_buf(),
)
.unwrap(),
);

let stop = StopSource::new();

let task_watcher = watcher.clone();
let token = stop.token();

// dropped when the test ends
let task = tokio::task::spawn(async move { task_watcher.watch(token).await });

watcher.config.flush().await.unwrap();
std::fs::remove_dir_all(dir.path()).unwrap();

// it should shut down
match timeout(Duration::from_secs(60), task).await {
Err(e) => panic!("test timed out: {e}"),
Ok(Err(e)) => panic!("expected task to finish when root is deleted: {e}"),
_ => (),
}
}
}

0 comments on commit b0b9c43

Please sign in to comment.