From 192b4dc1213b23c5eb6e5242f1da6b4e8438fe7f Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 2 Feb 2024 11:24:59 -0800 Subject: [PATCH] Route through existing wait_for_filewatching call --- .../src/package_watcher.rs | 2 +- crates/turborepo-lib/src/daemon/mod.rs | 2 +- crates/turborepo-lib/src/daemon/server.rs | 74 ++++++++----------- .../src/run/package_discovery/mod.rs | 45 +---------- .../src/package_graph/builder.rs | 1 - 5 files changed, 33 insertions(+), 91 deletions(-) diff --git a/crates/turborepo-filewatch/src/package_watcher.rs b/crates/turborepo-filewatch/src/package_watcher.rs index 36acfb1d23097..2bb6ebb0b2ef4 100644 --- a/crates/turborepo-filewatch/src/package_watcher.rs +++ b/crates/turborepo-filewatch/src/package_watcher.rs @@ -372,7 +372,7 @@ mod test { let (tx, rx) = broadcast::channel(10); let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel(); - let root = AbsoluteSystemPathBuf::new(tmp.path().to_string_lossy()).unwrap(); + let root: AbsoluteSystemPathBuf = tmp.path().try_into().unwrap(); let manager = PackageManager::Yarn; let package_data = vec![ diff --git a/crates/turborepo-lib/src/daemon/mod.rs b/crates/turborepo-lib/src/daemon/mod.rs index 1aebb1f1930a7..d8c2e19d09fb0 100644 --- a/crates/turborepo-lib/src/daemon/mod.rs +++ b/crates/turborepo-lib/src/daemon/mod.rs @@ -7,7 +7,7 @@ mod server; pub use client::{DaemonClient, DaemonError}; pub use connector::{DaemonConnector, DaemonConnectorError}; -pub use server::{CloseReason, FileWatching, TurboGrpcService}; +pub use server::{CloseReason, TurboGrpcService}; pub(crate) mod proto { diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index 82d06576b481d..150ee0861dfe3 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -26,7 +26,7 @@ use semver::Version; use thiserror::Error; use tokio::{ select, - sync::{mpsc, oneshot, watch, Mutex as AsyncMutex}, + sync::{mpsc, oneshot, watch}, }; use tonic::transport::{NamedService, Server}; use tower::ServiceBuilder; @@ -39,7 +39,7 @@ use turborepo_filewatch::{ FileSystemWatcher, WatchError, }; use turborepo_repository::discovery::{ - LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder, + DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder, }; use super::{ @@ -47,10 +47,7 @@ use super::{ endpoint::SocketOpenError, proto::{self}, }; -use crate::{ - daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket}, - run::package_discovery::WatchingPackageDiscovery, -}; +use crate::daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket}; #[derive(Debug)] #[allow(dead_code)] @@ -123,7 +120,7 @@ async fn start_filewatching( /// Timeout for every RPC the server handles const REQUEST_TIMEOUT: Duration = Duration::from_millis(100); -pub struct TurboGrpcService { +pub struct TurboGrpcService { watcher_tx: watch::Sender>>, watcher_rx: watch::Receiver>>, repo_root: AbsoluteSystemPathBuf, @@ -132,11 +129,10 @@ pub struct TurboGrpcService { timeout: Duration, external_shutdown: S, - package_discovery: PDA, package_discovery_backup: PDB, } -impl TurboGrpcService +impl TurboGrpcService where S: Future, { @@ -155,7 +151,6 @@ where ) -> Self { let (watcher_tx, watcher_rx) = watch::channel(None); - let package_discovery = WatchingPackageDiscovery::new(watcher_rx.clone()); let package_discovery_backup = LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None); @@ -170,16 +165,14 @@ where log_file, timeout, external_shutdown, - package_discovery, package_discovery_backup, } } } -impl TurboGrpcService +impl TurboGrpcService where S: Future, - PDA: PackageDiscovery + Send + 'static, PDB: PackageDiscoveryBuilder, PDB::Output: PackageDiscovery + Send + 'static, { @@ -188,9 +181,8 @@ where pub fn with_package_discovery_backup( self, package_discovery_backup: PDB2, - ) -> TurboGrpcService { + ) -> TurboGrpcService { TurboGrpcService { - package_discovery: self.package_discovery, daemon_root: self.daemon_root, external_shutdown: self.external_shutdown, log_file: self.log_file, @@ -211,7 +203,6 @@ where log_file, repo_root, timeout, - package_discovery, package_discovery_backup, } = self; @@ -270,7 +261,6 @@ where // so we use a private struct with just the pieces of state needed to handle // RPCs. let service = TurboGrpcServiceInner { - package_discovery: AsyncMutex::new(package_discovery), shutdown: trigger_shutdown, watcher_rx, times_saved: Arc::new(Mutex::new(HashMap::new())), @@ -313,17 +303,15 @@ where } } -struct TurboGrpcServiceInner { - //shutdown: Arc>>>, +struct TurboGrpcServiceInner { shutdown: mpsc::Sender<()>, watcher_rx: watch::Receiver>>, times_saved: Arc>>, start_time: Instant, log_file: AbsoluteSystemPathBuf, - package_discovery: AsyncMutex, } -impl TurboGrpcServiceInner { +impl TurboGrpcServiceInner { async fn trigger_shutdown(&self) { info!("triggering shutdown"); let _ = self.shutdown.send(()).await; @@ -364,6 +352,14 @@ impl TurboGrpcServiceInner { let changed_globs = fw.glob_watcher.get_changed_globs(hash, candidates).await?; Ok((changed_globs, time_saved)) } + + async fn discover_packages(&self) -> Result { + let fw = self.wait_for_filewatching().await?; + Ok(DiscoveryResponse { + workspaces: fw.package_watcher.get_package_data().await, + package_manager: fw.package_watcher.get_package_manager().await, + }) + } } async fn wait_for_filewatching( @@ -422,10 +418,7 @@ async fn watch_root( } #[tonic::async_trait] -impl proto::turbod_server::Turbod for TurboGrpcServiceInner -where - PD: PackageDiscovery + Send + 'static, -{ +impl proto::turbod_server::Turbod for TurboGrpcServiceInner { async fn hello( &self, request: tonic::Request, @@ -513,25 +506,18 @@ where &self, _request: tonic::Request, ) -> Result, tonic::Status> { - self.package_discovery - .lock() - .await - .discover_packages() - .await - .map(|packages| { - tonic::Response::new(proto::DiscoverPackagesResponse { - package_files: packages - .workspaces - .into_iter() - .map(|d| proto::PackageFiles { - package_json: d.package_json.to_string(), - turbo_json: d.turbo_json.map(|t| t.to_string()), - }) - .collect(), - package_manager: proto::PackageManager::from(packages.package_manager).into(), + let resp = self.discover_packages().await?; + Ok(tonic::Response::new(proto::DiscoverPackagesResponse { + package_files: resp + .workspaces + .into_iter() + .map(|d| proto::PackageFiles { + package_json: d.package_json.to_string(), + turbo_json: d.turbo_json.map(|t| t.to_string()), }) - }) - .map_err(|e| tonic::Status::internal(format!("{}", e))) + .collect(), + package_manager: proto::PackageManager::from(resp.package_manager).into(), + })) } } @@ -558,7 +544,7 @@ fn compare_versions(client: Version, server: Version, constraint: proto::Version } } -impl NamedService for TurboGrpcServiceInner { +impl NamedService for TurboGrpcServiceInner { const NAME: &'static str = "turborepo.Daemon"; } diff --git a/crates/turborepo-lib/src/run/package_discovery/mod.rs b/crates/turborepo-lib/src/run/package_discovery/mod.rs index b05a46caeb950..cc9983edebbbd 100644 --- a/crates/turborepo-lib/src/run/package_discovery/mod.rs +++ b/crates/turborepo-lib/src/run/package_discovery/mod.rs @@ -1,10 +1,7 @@ -use std::sync::Arc; - -use tokio::sync::watch::Receiver; use turbopath::AbsoluteSystemPathBuf; use turborepo_repository::discovery::{DiscoveryResponse, Error, PackageDiscovery, WorkspaceData}; -use crate::daemon::{proto::PackageManager, DaemonClient, FileWatching}; +use crate::daemon::{proto::PackageManager, DaemonClient}; #[derive(Debug)] pub struct DaemonPackageDiscovery<'a, C: Clone> { @@ -44,43 +41,3 @@ impl<'a, C: Clone + Send> PackageDiscovery for DaemonPackageDiscovery<'a, C> { }) } } - -/// A package discovery strategy that watches the file system for changes. Basic -/// idea: -/// - Set up a watcher on file changes on the relevant workspace file for the -/// package manager -/// - When the workspace globs change, re-discover the workspace -/// - When a package.json changes, re-discover the workspace -/// - Keep an in-memory cache of the workspace -pub struct WatchingPackageDiscovery { - /// file watching may not be ready yet so we store a watcher - /// through which we can get the file watching stack - watcher: Receiver>>, -} - -impl WatchingPackageDiscovery { - pub fn new(watcher: Receiver>>) -> Self { - Self { watcher } - } -} - -impl PackageDiscovery for WatchingPackageDiscovery { - async fn discover_packages(&mut self) -> Result { - tracing::debug!("discovering packages using watcher implementation"); - - // need to clone and drop the Ref before we can await - let watcher = { - let watcher = self - .watcher - .wait_for(|opt| opt.is_some()) - .await - .map_err(|e| Error::Failed(Box::new(e)))?; - watcher.as_ref().expect("guaranteed some above").clone() - }; - - Ok(DiscoveryResponse { - workspaces: watcher.package_watcher.get_package_data().await, - package_manager: watcher.package_watcher.get_package_manager().await, - }) - } -} diff --git a/crates/turborepo-repository/src/package_graph/builder.rs b/crates/turborepo-repository/src/package_graph/builder.rs index 3974405f5a8ae..f6aff36a135d0 100644 --- a/crates/turborepo-repository/src/package_graph/builder.rs +++ b/crates/turborepo-repository/src/package_graph/builder.rs @@ -103,7 +103,6 @@ impl<'a, P> PackageGraphBuilder<'a, P> { /// Set the package discovery strategy to use. Note that whatever strategy /// selected here will be wrapped in a `CachingPackageDiscovery` to /// prevent unnecessary work during building. - #[allow(dead_code)] pub fn with_package_discovery( self, discovery: P2,