Skip to content

Commit

Permalink
feat(Turborepo): Enable rust daemon (#5964)
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Soltis authored and Zertsov committed Sep 27, 2023
1 parent df02f73 commit 30d4218
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"preLaunchTask": "prepare turbo",
"program": "${workspaceRoot}/target/debug/turbo",
"args": ["daemon"],
"args": ["--skip-infer", "daemon"],
"cwd": "${workspaceRoot}"
}
]
Expand Down
9 changes: 5 additions & 4 deletions cli/internal/daemon/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ func (c *Connector) getOrStartDaemon() (int, error) {
lockFile := c.lockFile()
daemonProcess, getDaemonProcessErr := lockFile.GetOwner()
if getDaemonProcessErr != nil {
// If we're in a clean state this isn't an "error" per se.
// We attempt to start a daemon.
if errors.Is(getDaemonProcessErr, fs.ErrNotExist) {
// We expect the daemon to write the pid file, so a non-existent or stale
// pid file is fine. The daemon will write its own, after verifying that it
// doesn't exist or is stale.
if errors.Is(getDaemonProcessErr, fs.ErrNotExist) || errors.Is(getDaemonProcessErr, lockfile.ErrDeadOwner) {
if c.Opts.DontStart {
return 0, ErrDaemonNotRunning
}
Expand Down Expand Up @@ -339,7 +340,7 @@ func (c *Connector) sendHello(ctx context.Context, client turbodprotocol.TurbodC
case codes.FailedPrecondition:
return ErrVersionMismatch
case codes.Unavailable:
return errConnectionFailure
return errUnavailable
default:
return err
}
Expand Down
21 changes: 18 additions & 3 deletions cli/internal/daemonclient/daemonclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package daemonclient

import (
"context"
"path/filepath"

"github.com/vercel/turbo/cli/internal/daemon/connector"
"github.com/vercel/turbo/cli/internal/fs"
Expand Down Expand Up @@ -33,9 +34,14 @@ func New(client *connector.Client) *DaemonClient {

// GetChangedOutputs implements runcache.OutputWatcher.GetChangedOutputs
func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, int, error) {
// The daemon expects globs to be unix paths
var outputGlobs []string
for _, outputGlob := range repoRelativeOutputGlobs {
outputGlobs = append(outputGlobs, filepath.ToSlash(outputGlob))
}
resp, err := d.client.GetChangedOutputs(ctx, &turbodprotocol.GetChangedOutputsRequest{
Hash: hash,
OutputGlobs: repoRelativeOutputGlobs,
OutputGlobs: outputGlobs,
})
if err != nil {
return nil, 0, err
Expand All @@ -45,10 +51,19 @@ func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoR

// NotifyOutputsWritten implements runcache.OutputWatcher.NotifyOutputsWritten
func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs, timeSaved int) error {
// The daemon expects globs to be unix paths
var inclusions []string
var exclusions []string
for _, inclusion := range repoRelativeOutputGlobs.Inclusions {
inclusions = append(inclusions, filepath.ToSlash(inclusion))
}
for _, exclusion := range repoRelativeOutputGlobs.Exclusions {
exclusions = append(exclusions, filepath.ToSlash(exclusion))
}
_, err := d.client.NotifyOutputsWritten(ctx, &turbodprotocol.NotifyOutputsWrittenRequest{
Hash: hash,
OutputGlobs: repoRelativeOutputGlobs.Inclusions,
OutputExclusionGlobs: repoRelativeOutputGlobs.Exclusions,
OutputGlobs: inclusions,
OutputExclusionGlobs: exclusions,
TimeSaved: uint64(timeSaved),
})
return err
Expand Down
4 changes: 3 additions & 1 deletion crates/turborepo-filewatch/src/cookie_jar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
sync::{broadcast, oneshot},
time::error::Elapsed,
};
use tracing::debug;
use tracing::{debug, trace};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::NotifyError;
Expand Down Expand Up @@ -101,6 +101,7 @@ impl CookieJar {
opts.truncate(true).create(true).write(true);
{
// dropping the resulting file closes the handle
trace!("writing cookie {}", cookie_path);
_ = cookie_path.open_with_options(opts)?;
}
// ??? -> timeout, recv failure, actual cookie failure
Expand Down Expand Up @@ -129,6 +130,7 @@ async fn watch_cookies(
.try_into()
.expect("Non-absolute path from filewatching");
if root.relation_to_path(abs_path) == PathRelation::Parent {
trace!("saw cookie: {}", abs_path);
if let Some(responder) = watches.cookies.remove(&path) {
if responder.send(Ok(())).is_err() {
// Note that cookie waiters will time out if they don't get a
Expand Down
42 changes: 35 additions & 7 deletions crates/turborepo-filewatch/src/globwatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{HashMap, HashSet},
fmt::Display,
future::IntoFuture,
str::FromStr,
};
Expand All @@ -24,26 +25,53 @@ pub struct GlobSet {
exclude: Any<'static>,
}

#[derive(Debug, Error)]
pub struct GlobError {
// Boxed to minimize error size
underlying: Box<wax::BuildError>,
raw_glob: String,
}

impl Display for GlobError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.underlying, self.raw_glob)
}
}

fn compile_glob(raw: &str) -> Result<Glob<'static>, GlobError> {
Glob::from_str(raw)
.map(|g| g.to_owned())
.map_err(|e| GlobError {
underlying: Box::new(e),
raw_glob: raw.to_owned(),
})
}

impl GlobSet {
pub fn from_raw(
raw_includes: Vec<String>,
raw_excludes: Vec<String>,
) -> Result<Self, wax::BuildError> {
) -> Result<Self, GlobError> {
let include = raw_includes
.into_iter()
.map(|raw_glob| {
let glob = Glob::from_str(&raw_glob)?.to_owned();
let glob = compile_glob(&raw_glob)?;
Ok((raw_glob, glob))
})
.collect::<Result<HashMap<_, _>, wax::BuildError>>()?;
.collect::<Result<HashMap<_, _>, GlobError>>()?;
let excludes = raw_excludes
.into_iter()
.iter()
.map(|raw_glob| {
let glob = Glob::from_str(&raw_glob)?.to_owned();
let glob = compile_glob(raw_glob)?;
Ok(glob)
})
.collect::<Result<Vec<_>, wax::BuildError>>()?;
let exclude = wax::any(excludes)?.to_owned();
.collect::<Result<Vec<_>, GlobError>>()?;
let exclude = wax::any(excludes)
.map_err(|e| GlobError {
underlying: Box::new(e),
raw_glob: format!("{{{}}}", raw_excludes.join(",")),
})?
.to_owned();
Ok(Self { include, exclude })
}
}
Expand Down
7 changes: 6 additions & 1 deletion crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use notify::{Config, RecommendedWatcher};
use notify::{Event, EventHandler, RecursiveMode, Watcher};
use thiserror::Error;
use tokio::sync::{broadcast, mpsc};
use tracing::warn;
use tracing::{debug, warn};
// windows -> no recursive watch, watch ancestors
// linux -> recursive watch, watch ancestors
#[cfg(feature = "watch_ancestors")]
Expand All @@ -33,6 +33,7 @@ use {
ErrorKind,
},
std::io,
tracing::trace,
walkdir::WalkDir,
};

Expand Down Expand Up @@ -93,9 +94,11 @@ impl FileSystemWatcher {
let (send_file_events, mut recv_file_events) = mpsc::channel(1024);
let watch_root = root.to_owned();
let broadcast_sender = sender.clone();
debug!("starting filewatcher");
let watcher = run_watcher(&watch_root, send_file_events)?;
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
// Ensure we are ready to receive new events, not events for existing state
debug!("waiting for initial filesystem cookie");
wait_for_cookie(root, &mut recv_file_events).await?;
tokio::task::spawn(watch_events(
watcher,
Expand All @@ -104,6 +107,7 @@ impl FileSystemWatcher {
exit_signal,
broadcast_sender,
));
debug!("filewatching ready");
Ok(Self {
sender,
_exit_ch: exit_ch,
Expand Down Expand Up @@ -273,6 +277,7 @@ fn manually_add_recursive_watches(
for dir in WalkDir::new(root).follow_links(false).into_iter() {
let dir = dir?;
if dir.file_type().is_dir() {
trace!("manually watching {}", dir.path().display());
match watcher.watch(dir.path(), RecursiveMode::NonRecursive) {
Ok(()) => {}
// If we try to watch a non-existent path, we can just skip
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "MPL-2.0"
[features]
# Allows configuring a specific tls backend for reqwest.
# See top level Cargo.toml for more details.
default = ["rustls-tls", "go-daemon"]
default = ["rustls-tls"]
native-tls = ["turborepo-api-client/native-tls", "turbo-updater/native-tls"]
rustls-tls = ["turborepo-api-client/rustls-tls", "turbo-updater/rustls-tls"]
run-stub = []
Expand Down
25 changes: 20 additions & 5 deletions crates/turborepo-lib/src/commands/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ pub async fn daemon_client(command: &DaemonCommand, base: &CommandBase) -> Resul
let client = connector.connect().await?;
client.restart().await?;
}
// connector.connect will have already started the daemon if needed,
// so this is a no-op
DaemonCommand::Start => {}
DaemonCommand::Start => {
// We don't care about the client, but we do care that we can connect
// which ensures that daemon is started if it wasn't already.
let _ = connector.connect().await?;
println!("Daemon is running");
}
DaemonCommand::Stop => {
let client = connector.connect().await?;
client.stop().await?;
Expand Down Expand Up @@ -171,8 +174,20 @@ pub async fn daemon_server(
}
CloseReason::Interrupt
});
// TODO: be more methodical about this choice:
let cookie_dir = base.repo_root.join_component(".git");
// We already store logs in .turbo and recommend it be gitignore'd.
// Watchman uses .git, but we can't guarantee that git is present _or_
// that the turbo root is the same as the git root.
let cookie_dir = base.repo_root.join_components(&[".turbo", "cookies"]);
// We need to ensure that the cookie directory is cleared out first so
// that we can start over with cookies.
if cookie_dir.exists() {
cookie_dir
.remove_dir_all()
.map_err(|e| DaemonError::CookieDir(e, cookie_dir.clone()))?;
}
cookie_dir
.create_dir_all()
.map_err(|e| DaemonError::CookieDir(e, cookie_dir.clone()))?;
let reason = crate::daemon::serve(
&base.repo_root,
cookie_dir,
Expand Down
6 changes: 6 additions & 0 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::io;

use thiserror::Error;
use tonic::{Code, Status};
use tracing::info;
use turbopath::AbsoluteSystemPathBuf;

use self::proto::turbod_client::TurbodClient;
use super::{
Expand Down Expand Up @@ -160,6 +163,9 @@ pub enum DaemonError {

#[error("unable to complete daemon clean")]
CleanFailed,

#[error("failed to setup cookie dir {1}: {0}")]
CookieDir(io::Error, AbsoluteSystemPathBuf),
}

impl From<Status> for DaemonError {
Expand Down
15 changes: 11 additions & 4 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use tokio::{
};
use tonic::transport::{NamedService, Server};
use tower::ServiceBuilder;
use tracing::{error, trace, warn};
use tracing::{error, info, trace, warn};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};
use turborepo_filewatch::{
cookie_jar::CookieJar,
globwatcher::{Error as GlobWatcherError, GlobSet, GlobWatcher},
globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher},
FileSystemWatcher, WatchError,
};

Expand Down Expand Up @@ -68,7 +68,7 @@ enum RpcError {
#[error("deadline exceeded")]
DeadlineExceeded,
#[error("invalid glob: {0}")]
InvalidGlob(#[from] wax::BuildError),
InvalidGlob(#[from] GlobError),
#[error("globwatching failed: {0}")]
GlobWatching(#[from] GlobWatcherError),
#[error("filewatching unavailable")]
Expand Down Expand Up @@ -105,6 +105,9 @@ async fn start_filewatching(
Ok(())
}

/// Timeout for every RPC the server handles
const REQUEST_TIMEOUT: Duration = Duration::from_millis(100);

/// run a gRPC server providing the Turbod interface. external_shutdown
/// can be used to deliver a signal to shutdown the server. This is expected
/// to be wired to signal handling.
Expand Down Expand Up @@ -144,6 +147,7 @@ where
error!("filewatching failed to start: {}", e);
let _ = fw_shutdown.send(()).await;
}
info!("filewatching started");
});
// exit_root_watch delivers a signal to the root watch loop to exit.
// In the event that the server shuts down via some other mechanism, this
Expand Down Expand Up @@ -187,13 +191,15 @@ where
));

Server::builder()
// set a max timeout for RPCs
.timeout(REQUEST_TIMEOUT)
.add_service(service)
.serve_with_incoming_shutdown(stream, shutdown_fut)
};
// Wait for the server to exit.
// This can be triggered by timeout, root watcher, or an RPC
let _ = server_fut.await;
trace!("gRPC server exited");
info!("gRPC server exited");
// Ensure our timer will exit
running.store(false, Ordering::SeqCst);
// We expect to have a signal from the grpc server on what triggered the exit
Expand Down Expand Up @@ -223,6 +229,7 @@ struct TurboGrpcService {

impl TurboGrpcService {
async fn trigger_shutdown(&self) {
info!("triggering shutdown");
let _ = self.shutdown.send(()).await;
}

Expand Down
Loading

0 comments on commit 30d4218

Please sign in to comment.