Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(daemon): kill daemon when root is removed #5038

Merged
merged 7 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/turborepo-globwatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ pub enum ConfigError {
ServerStopped,
/// Watch error
WatchError(Vec<notify::Error>),
/// The server has already been consumed.
WatchingAlready,
}

impl<T: Watcher> WatchConfig<T> {
Expand Down Expand Up @@ -287,6 +289,16 @@ impl<T: Watcher> WatchConfig<T> {
.map_err(ConfigError::WatchError)
}

/// Register a single path to be included by the watcher.
pub async fn include_path(&self, path: &Path) -> Result<(), ConfigError> {
trace!("watching {:?}", path);
self.watcher
.lock()
.expect("watcher lock poisoned")
.watch(path, notify::RecursiveMode::NonRecursive)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to be NonRecursive as I don't think we want to recursively watch the repo root: @arlyon

.map_err(|e| ConfigError::WatchError(vec![e]))
}

/// Register a glob to be excluded by the watcher.
#[tracing::instrument(skip(self))]
pub async fn exclude(&self, relative_to: &Path, glob: &str) {
Expand Down
50 changes: 46 additions & 4 deletions crates/turborepo-lib/src/globwatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use futures::{stream::iter, StreamExt};
use globwatch::{ConfigError, GlobWatcher, StopToken, WatchConfig, Watcher};
use itertools::Itertools;
use notify::RecommendedWatcher;
use notify::{EventKind, RecommendedWatcher};
use tokio::time::timeout;
use tracing::{trace, warn};
use turbopath::AbsoluteSystemPathBuf;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<T: Watcher> HashGlobWatcher<T> {
/// Watches a given path, using the flush_folder as temporary storage to
/// make sure that file events are handled in the appropriate order.
#[tracing::instrument(skip(self, token))]
pub async fn watch(&self, token: StopToken) {
pub async fn watch(&self, token: StopToken) -> Result<(), ConfigError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do other callers need to handle this error as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, the only caller happens in a macro that ended up suppressing the lint.

let start_globs = {
let lock = self.hash_globs.lock().expect("only fails if poisoned");
lock.iter()
Expand All @@ -79,17 +79,25 @@ impl<T: Watcher> HashGlobWatcher<T> {
Some(watcher) => watcher.into_stream(token),
None => {
warn!("watcher already consumed");
return;
return Err(ConfigError::WatchingAlready);
}
};

// watch the root of the repo to shut down if the folder is deleted
self.config.include_path(&self.relative_to).await?;

// watch all the globs currently in the map
for glob in start_globs {
self.config.include(&self.relative_to, &glob).await.ok();
}

while let Some(Ok(event)) = stream.next().await {
trace!("processing event: {:?}", event);
if event.paths.contains(&self.relative_to) && matches!(event.kind, EventKind::Remove(_))
{
// if the root of the repo is deleted, we shut down
trace!("repo root was removed, shutting down");
break;
}

let repo_relative_paths = event
.paths
Expand All @@ -115,6 +123,8 @@ impl<T: Watcher> HashGlobWatcher<T> {
self.config.exclude(&self.relative_to, &glob).await;
}
}

Ok(())
}

/// registers a hash with a set of globs to watch for changes
Expand Down Expand Up @@ -657,4 +667,36 @@ mod test {
watcher.glob_statuses.lock().unwrap()
);
}

#[tokio::test]
#[tracing_test::traced_test]
async fn delete_root_kill_daemon() {
let dir = setup();
let flush = tempdir::TempDir::new("globwatch-flush").unwrap();
let watcher = Arc::new(
super::HashGlobWatcher::new(
AbsoluteSystemPathBuf::new(dir.path()).unwrap(),
flush.path().to_path_buf(),
)
.unwrap(),
);

let stop = StopSource::new();

let task_watcher = watcher.clone();
let token = stop.token();

// dropped when the test ends
let task = tokio::task::spawn(async move { task_watcher.watch(token).await });

watcher.config.flush().await.unwrap();
std::fs::remove_dir_all(dir.path()).unwrap();

// it should shut down
let finish = task.await;
assert!(
finish.is_ok(),
"expected task to finish when root is deleted"
);
}
}