diff --git a/crates/turborepo-lib/src/cli.rs b/crates/turborepo-lib/src/cli.rs index 1e10f3d405555..30dd0b44e57c5 100644 --- a/crates/turborepo-lib/src/cli.rs +++ b/crates/turborepo-lib/src/cli.rs @@ -770,8 +770,8 @@ pub async fn run( if args.experimental_rust_codepath { use crate::commands::run; - run::run(base).await?; - Ok(Payload::Rust(Ok(0))) + let exit_code = run::run(base).await?; + Ok(Payload::Rust(Ok(exit_code))) } else { Ok(Payload::Go(Box::new(base))) } diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index 812f6f5759d5a..022b1b0441a7b 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -3,13 +3,13 @@ use tracing::{debug, error}; use crate::{commands::CommandBase, run::Run}; -pub async fn run(base: CommandBase) -> Result<()> { +pub async fn run(base: CommandBase) -> Result { let mut run = Run::new(&base); debug!("using the experimental rust codepath"); debug!("configured run struct: {:?}", run); match run.run().await { - Ok(_code) => Ok(()), + Ok(code) => Ok(code), Err(err) => { error!("run failed: {}", err); Err(err) diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index b213ef9a045f9..e1b6cafb1b318 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -9,7 +9,7 @@ use std::{ }; pub use builder::EngineBuilder; -pub use execute::{ExecuteError, ExecutionOptions, Message}; +pub use execute::{ExecuteError, ExecutionOptions, Message, StopExecution}; use petgraph::Graph; use crate::{ diff --git a/crates/turborepo-lib/src/opts.rs b/crates/turborepo-lib/src/opts.rs index c81634550e867..eed8e3c1b0854 100644 --- a/crates/turborepo-lib/src/opts.rs +++ b/crates/turborepo-lib/src/opts.rs @@ -62,7 +62,7 @@ pub struct RunOpts<'a> { // Whether or not to infer the framework for each workspace. pub(crate) framework_inference: bool, pub profile: Option<&'a str>, - continue_on_error: bool, + pub(crate) continue_on_error: bool, pub(crate) pass_through_args: &'a [String], pub(crate) only: bool, pub(crate) dry_run: bool, diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index 4c808a5223ac4..a10d3a2e96812 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -23,6 +23,7 @@ use std::{ use command_group::AsyncCommandGroup; use futures::future::try_join3; +use itertools::Itertools; pub use tokio::process::Command; use tokio::{ io::{AsyncBufReadExt, AsyncRead, BufReader}, @@ -148,6 +149,7 @@ pub struct Child { stdin: Arc>>, stdout: Arc>>, stderr: Arc>>, + label: String, } #[derive(Debug)] @@ -177,6 +179,18 @@ impl Child { /// Start a child process, returning a handle that can be used to interact /// with it. The command will be started immediately. pub fn spawn(mut command: Command, shutdown_style: ShutdownStyle) -> io::Result { + let label = { + let cmd = command.as_std(); + format!( + "({}) {} {}", + cmd.get_current_dir() + .map(|dir| dir.to_string_lossy()) + .unwrap_or_default(), + cmd.get_program().to_string_lossy(), + cmd.get_args().map(|s| s.to_string_lossy()).join(" ") + ) + }; + let group = command.group().spawn()?; let gid = group.id(); @@ -266,6 +280,7 @@ impl Child { stdin: Arc::new(Mutex::new(stdin)), stdout: Arc::new(Mutex::new(stdout)), stderr: Arc::new(Mutex::new(stderr)), + label, }) } @@ -381,6 +396,10 @@ impl Child { Ok(exit) } + + pub fn label(&self) -> &str { + &self.label + } } #[cfg(test)] diff --git a/crates/turborepo-lib/src/process/mod.rs b/crates/turborepo-lib/src/process/mod.rs index 796e83d05a6f5..11d25ba6e9f1e 100644 --- a/crates/turborepo-lib/src/process/mod.rs +++ b/crates/turborepo-lib/src/process/mod.rs @@ -22,7 +22,7 @@ use futures::Future; use tokio::task::JoinSet; use tracing::{debug, trace}; -use self::child::{Child, ChildExit}; +pub use self::child::{Child, ChildExit}; /// A process manager that is responsible for spawning and managing child /// processes. When the manager is Open, new child processes can be spawned diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 3a46f937ea7f7..bd8fa9f5dbdb0 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -6,7 +6,7 @@ mod scope; mod summary; pub mod task_id; use std::{ - io::{BufWriter, IsTerminal}, + io::{BufWriter, IsTerminal, Write}, sync::Arc, }; @@ -62,7 +62,7 @@ impl<'a> Run<'a> { self.base.args().try_into() } - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&mut self) -> Result { let start_at = Local::now(); let package_json_path = self.base.repo_root.join_component("package.json"); let root_package_json = @@ -206,7 +206,7 @@ impl<'a> Run<'a> { engine.dot_graph(std::io::stdout(), opts.run_opts.single_package)? } } - return Ok(()); + return Ok(0); } let root_workspace = pkg_dep_graph @@ -279,9 +279,22 @@ impl<'a> Run<'a> { global_env_mode, self.base.ui, false, + self.processes.clone(), + &self.base.repo_root, ); - visitor.visit(engine.clone()).await?; + let errors = visitor.visit(engine.clone()).await?; + + let exit_code = errors + .iter() + .filter_map(|err| err.exit_code()) + .max() + // We hit some error, it shouldn't be exit code 0 + .unwrap_or(if errors.is_empty() { 0 } else { 1 }); + + for err in &errors { + writeln!(std::io::stderr(), "{err}").ok(); + } let pass_through_env = global_hash_inputs.pass_through_env.unwrap_or_default(); let resolved_pass_through_env_vars = @@ -312,6 +325,6 @@ impl<'a> Run<'a> { run_summary.close(0, &pkg_dep_graph, self.base.ui)?; - Ok(()) + Ok(exit_code) } } diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs index 39118c4ea16df..27243f2fbd9f1 100644 --- a/crates/turborepo-lib/src/task_graph/visitor.rs +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -1,22 +1,28 @@ use std::{ borrow::Cow, io::Write, - sync::{Arc, OnceLock}, + process::Stdio, + sync::{Arc, Mutex, OnceLock}, + time::Duration, }; use console::{Style, StyledObject}; use futures::{stream::FuturesUnordered, StreamExt}; use regex::Regex; -use tokio::sync::mpsc; +use tokio::{process::Command, sync::mpsc}; use tracing::{debug, error}; +use turbopath::AbsoluteSystemPath; use turborepo_env::{EnvironmentVariableMap, ResolvedEnvMode}; -use turborepo_ui::{ColorSelector, OutputClient, OutputSink, OutputWriter, PrefixedUI, UI}; +use turborepo_ui::{ + ColorSelector, OutputClient, OutputSink, OutputWriter, PrefixedUI, PrefixedWriter, UI, +}; use crate::{ cli::EnvMode, - engine::{Engine, ExecutionOptions}, + engine::{Engine, ExecutionOptions, StopExecution}, opts::Opts, package_graph::{PackageGraph, WorkspaceName}, + process::{ChildExit, ProcessManager}, run::{ task_id::{self, TaskId}, RunCache, @@ -34,6 +40,8 @@ pub struct Visitor<'a> { sink: OutputSink, color_cache: ColorSelector, ui: UI, + manager: ProcessManager, + repo_root: &'a AbsoluteSystemPath, } #[derive(Debug, thiserror::Error)] @@ -70,6 +78,8 @@ impl<'a> Visitor<'a> { global_env_mode: EnvMode, ui: UI, silent: bool, + manager: ProcessManager, + repo_root: &'a AbsoluteSystemPath, ) -> Self { let task_hasher = TaskHasher::new( package_inputs_hashes, @@ -89,10 +99,12 @@ impl<'a> Visitor<'a> { sink, color_cache, ui, + manager, + repo_root, } } - pub async fn visit(&self, engine: Arc) -> Result<(), Error> { + pub async fn visit(&self, engine: Arc) -> Result, Error> { let concurrency = self.opts.run_opts.concurrency as usize; let (node_sender, mut node_stream) = mpsc::channel(concurrency); let engine_handle = { @@ -100,6 +112,7 @@ impl<'a> Visitor<'a> { tokio::spawn(engine.execute(ExecutionOptions::new(false, concurrency), node_sender)) }; let mut tasks = FuturesUnordered::new(); + let errors = Arc::new(Mutex::new(Vec::new())); while let Some(message) = node_stream.recv().await { let crate::engine::Message { info, callback } = message; @@ -177,18 +190,117 @@ impl<'a> Visitor<'a> { // hashing so that downstream tasks can count on the hash existing // // bail if the script doesn't exist - let Some(command) = command else { continue }; + let Some(_command) = command else { continue }; let output_client = self.output_client(); + let continue_on_error = self.opts.run_opts.continue_on_error; let prefix = self.prefix(&info); let pretty_prefix = self.color_cache.prefix_with_color(&task_hash, &prefix); let ui = self.ui; + let manager = self.manager.clone(); + let package_manager = self.package_graph.package_manager().clone(); + let workspace_directory = self.repo_root.resolve(workspace_dir); + let errors = errors.clone(); + let task_id_for_display = self.display_task_id(&info); tasks.push(tokio::spawn(async move { + let task_id = info; let _task_cache = task_cache; let mut prefixed_ui = - Self::prefixed_ui(ui, is_github_actions, &output_client, pretty_prefix); - prefixed_ui.output(command); + Self::prefixed_ui(ui, is_github_actions, &output_client, pretty_prefix.clone()); + + let mut cmd = Command::new(package_manager.to_string()); + cmd.args(["run", task_id.task()]); + cmd.current_dir(workspace_directory.as_path()); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + + let mut process = match manager.spawn(cmd, Duration::from_millis(500)) { + Some(Ok(child)) => child, + // Turbo was unable to spawn a process + Some(Err(e)) => { + // Note: we actually failed to spawn, but this matches the Go output + prefixed_ui.error(format!("command finished with error: {e}")); + errors + .lock() + .expect("lock poisoned") + .push(TaskError::from_spawn(task_id_for_display.clone(), e)); + callback + .send(if continue_on_error { + Ok(()) + } else { + Err(StopExecution) + }) + .ok(); + return; + } + // Turbo is shutting down + None => { + callback.send(Ok(())).ok(); + return; + } + }; + + let exit_status = match process + .wait_with_piped_outputs( + PrefixedWriter::new(ui, pretty_prefix.clone(), output_client.stdout()), + PrefixedWriter::new(ui, pretty_prefix.clone(), output_client.stderr()), + ) + .await + { + Ok(Some(exit_status)) => exit_status, + Err(e) => { + error!("unable to pipe outputs from command: {e}"); + callback.send(Err(StopExecution)).ok(); + manager.stop().await; + return; + } + Ok(None) => { + // TODO: how can this happen? we only update the + // exit status with Some and it is only initialized with + // None. Is it still running? + error!("unable to determine why child exited"); + callback.send(Err(StopExecution)).ok(); + return; + } + }; + + match exit_status { + // The task was successful, nothing special needs to happen. + ChildExit::Finished(Some(0)) => (), + ChildExit::Finished(Some(code)) => { + let error = + TaskErrorCause::from_execution(process.label().to_string(), code); + if continue_on_error { + prefixed_ui.warn("command finished with error, but continuing..."); + callback.send(Ok(())).ok(); + } else { + prefixed_ui.error(format!("command finished with error: {error}")); + callback.send(Err(StopExecution)).ok(); + manager.stop().await; + } + errors.lock().expect("lock poisoned").push(TaskError { + task_id: task_id_for_display.clone(), + cause: error, + }); + return; + } + // All of these indicate a failure where we don't know how to recover + ChildExit::Finished(None) + | ChildExit::Killed + | ChildExit::KilledExternal + | ChildExit::Failed => { + callback.send(Err(StopExecution)).ok(); + return; + } + } + + if let Err(e) = output_client.finish() { + error!("unable to flush output client: {e}"); + callback.send(Err(StopExecution)).unwrap(); + return; + } + callback.send(Ok(())).unwrap(); })); } @@ -200,7 +312,12 @@ impl<'a> Visitor<'a> { result.expect("task executor panicked"); } - Ok(()) + let errors = Arc::into_inner(errors) + .expect("only one strong reference to errors should remain") + .into_inner() + .expect("mutex poisoned"); + + Ok(errors) } fn sink(opts: &Opts, silent: bool) -> OutputSink { @@ -238,6 +355,14 @@ impl<'a> Visitor<'a> { } } + // Task ID as displayed in error messages + fn display_task_id(&self, task_id: &TaskId) -> String { + match self.opts.run_opts.single_package { + true => task_id.task().to_string(), + false => task_id.to_string(), + } + } + fn prefixed_ui( ui: UI, is_github_actions: bool, @@ -314,3 +439,57 @@ fn turbo_regex() -> &'static Regex { static RE: OnceLock = OnceLock::new(); RE.get_or_init(|| Regex::new(r"(?:^|\s)turbo(?:$|\s)").unwrap()) } + +// Error that comes from the execution of the task +#[derive(Debug, thiserror::Error, Clone)] +#[error("{task_id}: {cause}")] +pub struct TaskError { + task_id: String, + cause: TaskErrorCause, +} + +#[derive(Debug, thiserror::Error, Clone)] +enum TaskErrorCause { + #[error("unable to spawn child process: {msg}")] + // We eagerly serialize this in order to allow us to implement clone + Spawn { msg: String }, + #[error("command {command} exited ({exit_code})")] + Exit { command: String, exit_code: i32 }, +} + +impl TaskError { + pub fn exit_code(&self) -> Option { + match self.cause { + TaskErrorCause::Exit { exit_code, .. } => Some(exit_code), + _ => None, + } + } + + fn from_spawn(task_id: String, err: std::io::Error) -> Self { + Self { + task_id, + cause: TaskErrorCause::Spawn { + msg: err.to_string(), + }, + } + } + + fn from_execution(task_id: String, command: String, exit_code: i32) -> Self { + Self { + task_id, + cause: TaskErrorCause::Exit { command, exit_code }, + } + } +} + +impl TaskErrorCause { + fn from_spawn(err: std::io::Error) -> Self { + TaskErrorCause::Spawn { + msg: err.to_string(), + } + } + + fn from_execution(command: String, exit_code: i32) -> Self { + TaskErrorCause::Exit { command, exit_code } + } +}