-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix filewatching setup cookie path #6183
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,8 @@ use std::{ | |
time::Duration, | ||
}; | ||
|
||
// windows -> no recursive watch, watch ancestors | ||
// linux -> recursive watch, watch ancestors | ||
// macos -> custom watcher impl in fsevents, no recursive watch, no watching ancestors | ||
#[cfg(target_os = "macos")] | ||
use fsevent::FsEventWatcher; | ||
|
@@ -21,11 +23,7 @@ use notify::{Event, EventHandler, RecursiveMode, Watcher}; | |
use thiserror::Error; | ||
use tokio::sync::{broadcast, mpsc}; | ||
use tracing::{debug, warn}; | ||
// windows -> no recursive watch, watch ancestors | ||
// linux -> recursive watch, watch ancestors | ||
#[cfg(feature = "watch_ancestors")] | ||
use turbopath::PathRelation; | ||
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; | ||
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation}; | ||
#[cfg(feature = "manual_recursive_watch")] | ||
use { | ||
notify::{ | ||
|
@@ -86,10 +84,30 @@ pub struct FileSystemWatcher { | |
// dropping the other sender for the broadcast channel, causing all receivers | ||
// to be notified of a close. | ||
_exit_ch: tokio::sync::oneshot::Sender<()>, | ||
cookie_dir: AbsoluteSystemPathBuf, | ||
} | ||
|
||
impl FileSystemWatcher { | ||
pub async fn new(root: &AbsoluteSystemPath) -> Result<Self, WatchError> { | ||
pub async fn new_with_default_cookie_dir( | ||
root: &AbsoluteSystemPath, | ||
) -> Result<Self, WatchError> { | ||
// We already store logs in .turbo and recommend it be gitignore'd. | ||
// Watchman uses .git, but we can't guarantee that git is present _or_ | ||
// that the turbo root is the same as the git root. | ||
Self::new(root, &root.join_components(&[".turbo", "cookies"])).await | ||
} | ||
|
||
pub async fn new( | ||
root: &AbsoluteSystemPath, | ||
cookie_dir: &AbsoluteSystemPath, | ||
) -> Result<Self, WatchError> { | ||
if root.relation_to_path(cookie_dir) != PathRelation::Parent { | ||
return Err(WatchError::Setup(format!( | ||
"Invalid cookie directory: {} does not contain {}", | ||
root, cookie_dir | ||
))); | ||
} | ||
setup_cookie_dir(cookie_dir)?; | ||
let (sender, _) = broadcast::channel(1024); | ||
let (send_file_events, mut recv_file_events) = mpsc::channel(1024); | ||
let watch_root = root.to_owned(); | ||
|
@@ -99,7 +117,7 @@ impl FileSystemWatcher { | |
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); | ||
// Ensure we are ready to receive new events, not events for existing state | ||
debug!("waiting for initial filesystem cookie"); | ||
wait_for_cookie(root, &mut recv_file_events).await?; | ||
wait_for_cookie(cookie_dir, &mut recv_file_events).await?; | ||
tokio::task::spawn(watch_events( | ||
watcher, | ||
watch_root, | ||
|
@@ -111,12 +129,31 @@ impl FileSystemWatcher { | |
Ok(Self { | ||
sender, | ||
_exit_ch: exit_ch, | ||
cookie_dir: cookie_dir.to_owned(), | ||
}) | ||
} | ||
|
||
pub fn subscribe(&self) -> broadcast::Receiver<Result<Event, NotifyError>> { | ||
self.sender.subscribe() | ||
} | ||
|
||
pub fn cookie_dir(&self) -> &AbsoluteSystemPath { | ||
&self.cookie_dir | ||
} | ||
} | ||
|
||
fn setup_cookie_dir(cookie_dir: &AbsoluteSystemPath) -> Result<(), WatchError> { | ||
// We need to ensure that the cookie directory is cleared out first so | ||
// that we can start over with cookies. | ||
if cookie_dir.exists() { | ||
cookie_dir.remove_dir_all().map_err(|e| { | ||
WatchError::Setup(format!("failed to clear cookie dir {}: {}", cookie_dir, e)) | ||
})?; | ||
} | ||
cookie_dir.create_dir_all().map_err(|e| { | ||
WatchError::Setup(format!("failed to setup cookie dir {}: {}", cookie_dir, e)) | ||
})?; | ||
Ok(()) | ||
} | ||
|
||
#[cfg(not(any(feature = "watch_ancestors", feature = "manual_recursive_watch")))] | ||
|
@@ -333,10 +370,13 @@ fn make_watcher<F: EventHandler>(event_handler: F) -> Result<Backend, notify::Er | |
/// This ensures that we are ready to receive *new* filesystem events, rather | ||
/// than receiving events from existing state, which some backends can do. | ||
async fn wait_for_cookie( | ||
root: &AbsoluteSystemPath, | ||
cookie_dir: &AbsoluteSystemPath, | ||
recv: &mut mpsc::Receiver<EventResult>, | ||
) -> Result<(), WatchError> { | ||
let cookie_path = root.join_component(".turbo-cookie"); | ||
// TODO: should this be passed in? Currently the caller guarantees that the | ||
// directory is empty, but it could be the responsibility of the | ||
// filewatcher... | ||
let cookie_path = cookie_dir.join_component(".turbo-cookie"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe out of scope here, but can we call this something more descriptive than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are other cookies (one per RPC), that are numbered serially. |
||
cookie_path.create_with_contents("cookie").map_err(|e| { | ||
WatchError::Setup(format!("failed to write cookie to {}: {}", cookie_path, e)) | ||
})?; | ||
|
@@ -441,7 +481,9 @@ mod test { | |
let sibling_path = parent_path.join_component("sibling"); | ||
sibling_path.create_dir_all().unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
|
||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
@@ -499,7 +541,9 @@ mod test { | |
let child_path = parent_path.join_component("child"); | ||
child_path.create_dir_all().unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
|
||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
@@ -541,7 +585,9 @@ mod test { | |
let child_path = parent_path.join_component("child"); | ||
child_path.create_dir_all().unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
||
|
@@ -570,7 +616,9 @@ mod test { | |
let child_path = parent_path.join_component("child"); | ||
child_path.create_dir_all().unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
||
|
@@ -602,7 +650,9 @@ mod test { | |
let child_path = parent_path.join_component("child"); | ||
child_path.create_dir_all().unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
||
|
@@ -633,7 +683,9 @@ mod test { | |
let child_path = parent_path.join_component("child"); | ||
child_path.create_dir_all().unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
||
|
@@ -674,7 +726,9 @@ mod test { | |
let symlink_path = repo_root.join_component("symlink"); | ||
symlink_path.symlink_to_dir(child_path.as_str()).unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
||
|
@@ -713,7 +767,9 @@ mod test { | |
let symlink_path = repo_root.join_component("symlink"); | ||
symlink_path.symlink_to_dir(child_path.as_str()).unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
||
|
@@ -757,7 +813,9 @@ mod test { | |
let child_path = parent_path.join_component("child"); | ||
child_path.create_dir_all().unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
||
|
@@ -795,7 +853,9 @@ mod test { | |
let child_path = parent_path.join_component("child"); | ||
child_path.create_dir_all().unwrap(); | ||
|
||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
let mut recv = watcher.subscribe(); | ||
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; | ||
|
||
|
@@ -816,7 +876,9 @@ mod test { | |
let mut recv = { | ||
// create and immediately drop the watcher, which should trigger the exit | ||
// channel | ||
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap(); | ||
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) | ||
.await | ||
.unwrap(); | ||
watcher.subscribe() | ||
}; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a new method instead of just inlining into
new()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filewatcher is somewhat generic so I didn't want to fully hard-code where the cookie files should go.