diff --git a/src/daemon.rs b/src/daemon.rs index 07d19aa5..56039067 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -3,7 +3,6 @@ use crate::build_loop::{BuildLoop, Event}; use crate::nix::options::NixOptions; use crate::ops::error::ExitError; -use crate::project::Project; use crate::socket::SocketPath; use crate::NixFile; use crossbeam_channel as chan; @@ -47,13 +46,10 @@ pub struct IndicateActivity { struct Handler { tx: chan::Sender<()>, - _handle: std::thread::JoinHandle<()>, } /// Keeps all state of the running `lorri daemon` service, watches nix files and runs builds. pub struct Daemon { - /// A thread for each `BuildLoop`, keyed by the nix files listened on. - handler_threads: HashMap, /// Sending end that we pass to every `BuildLoop` the daemon controls. // TODO: this needs to transmit information to identify the builder with build_events_tx: chan::Sender, @@ -72,7 +68,6 @@ impl Daemon { let (mon_tx, mon_rx) = chan::unbounded(); ( Daemon { - handler_threads: HashMap::new(), build_events_tx, build_events_rx, mon_tx, @@ -84,102 +79,149 @@ impl Daemon { /// Serve the daemon's RPC endpoint. pub fn serve( - mut self, + &mut self, socket_path: SocketPath, gc_root_dir: PathBuf, cas: crate::cas::ContentAddressable, ) -> Result<(), ExitError> { - let (activity_tx, activity_rx) = chan::unbounded(); + let (activity_tx, activity_rx): ( + chan::Sender, + chan::Receiver, + ) = chan::unbounded(); - let server = rpc::Server::new(socket_path, activity_tx, self.build_events_tx.clone())?; let mut pool = crate::thread::Pool::new(); - pool.spawn("accept-loop", move || { - server - .serve() - .expect("failed to serve daemon server endpoint"); + let build_events_tx = self.build_events_tx.clone(); + + let server = + rpc::Server::new(socket_path.clone(), activity_tx, build_events_tx).map_err(|e| { + ExitError::temporary(format!( + "unable to bind to the server socket at {}: {:?}", + socket_path.0.display(), + e + )) + })?; + + pool.spawn("accept-loop", || { + server.serve().expect("varlink error"); })?; + let build_events_rx = self.build_events_rx.clone(); let mon_tx = self.mon_tx.clone(); - pool.spawn("build-loop", move || { - let mut project_states: HashMap = HashMap::new(); - let mut event_listeners: Vec> = Vec::new(); - - for msg in build_events_rx { - mon_tx - .send(msg.clone()) - .expect("listener still to be there"); - match &msg { - LoopHandlerEvent::BuildEvent(ev) => match ev { - Event::SectionEnd => (), - Event::Started { nix_file, .. } - | Event::Completed { nix_file, .. } - | Event::Failure { nix_file, .. } => { - project_states.insert(nix_file.clone(), ev.clone()); - event_listeners.retain(|tx| { - let keep = tx.send(ev.clone()).is_ok(); - debug!("Sent"; "event" => ?ev, "keep" => keep); - keep - }) - } - }, - LoopHandlerEvent::NewListener(tx) => { - debug!("adding listener"); - let keep = project_states.values().all(|event| { - let keeping = tx.send(event.clone()).is_ok(); - debug!("Sent snapshot"; "event" => ?&event, "keep" => keeping); - keeping - }); - debug!("Finished snapshot"; "keep" => keep); - if keep { - event_listeners.push(tx.clone()); - } + pool.spawn("build-loop", || Self::build_loop(build_events_rx, mon_tx))?; + + let build_events_tx = self.build_events_tx.clone(); + let extra_nix_options = self.extra_nix_options.clone(); + pool.spawn("foo", || { + Self::build_instruction_handler( + build_events_tx, + extra_nix_options, + activity_rx, + gc_root_dir, + cas, + ) + })?; + + pool.join_all_or_panic(); + + Ok(()) + } + + fn build_loop( + build_events_rx: chan::Receiver, + mon_tx: chan::Sender, + ) { + let mut project_states: HashMap = HashMap::new(); + let mut event_listeners: Vec> = Vec::new(); + + for msg in build_events_rx { + mon_tx + .send(msg.clone()) + .expect("listener still to be there"); + match &msg { + LoopHandlerEvent::BuildEvent(ev) => match ev { + Event::SectionEnd => (), + Event::Started { nix_file, .. } + | Event::Completed { nix_file, .. } + | Event::Failure { nix_file, .. } => { + project_states.insert(nix_file.clone(), ev.clone()); event_listeners.retain(|tx| { - let keep = tx.send(Event::SectionEnd).is_ok(); - debug!("Sent new listener sectionend"; "keep" => keep); + let keep = tx.send(ev.clone()).is_ok(); + debug!("Sent"; "event" => ?ev, "keep" => keep); keep }) } + }, + LoopHandlerEvent::NewListener(tx) => { + debug!("adding listener"); + let keep = project_states.values().all(|event| { + let keeping = tx.send(event.clone()).is_ok(); + debug!("Sent snapshot"; "event" => ?&event, "keep" => keeping); + keeping + }); + debug!("Finished snapshot"; "keep" => keep); + if keep { + event_listeners.push(tx.clone()); + } + event_listeners.retain(|tx| { + let keep = tx.send(Event::SectionEnd).is_ok(); + debug!("Sent new listener sectionend"; "keep" => keep); + keep + }) } } - })?; - pool.spawn("build-instruction-handler", move || { - // For each build instruction, add the corresponding file - // to the watch list. - for start_build in activity_rx { - let project = - crate::project::Project::new(start_build.nix_file, &gc_root_dir, cas.clone()) - // TODO: the project needs to create its gc root dir - .unwrap(); - self.add(project) - } - })?; - pool.join_all_or_panic(); - - Ok(()) + } } - /// Add nix file to the set of files this daemon watches - /// & build if they change. - pub fn add(&mut self, project: Project) { - let (tx, rx) = chan::unbounded(); - let build_events_tx = self.build_events_tx.clone(); - let extra_nix_options = self.extra_nix_options.clone(); + fn build_instruction_handler( + // TODO: use the pool here + // pool: &mut crate::thread::Pool, + build_events_tx: chan::Sender, + extra_nix_options: NixOptions, + activity_rx: chan::Receiver, + gc_root_dir: PathBuf, + cas: crate::cas::ContentAddressable, + ) { + // A thread for each `BuildLoop`, keyed by the nix files listened on. + let mut handler_threads: HashMap = HashMap::new(); + + // For each build instruction, add the corresponding file + // to the watch list. + for start_build in activity_rx { + let project = + crate::project::Project::new(start_build.nix_file, &gc_root_dir, cas.clone()) + // TODO: the project needs to create its gc root dir + .unwrap(); + + // Add nix file to the set of files this daemon watches + // & build if they change. + let (tx, rx) = chan::unbounded(); + // cloning the tx means the daemon’s rx gets all + // messages from all builders. + let build_events_tx = build_events_tx.clone(); + let extra_nix_options = extra_nix_options.clone(); + + handler_threads + .entry(project.nix_file.clone()) + .or_insert_with(|| { + // TODO: how to use the pool here? + // We cannot just spawn new threads once messages come in, + // because then then pool objects is stuck in this loop + // and will never start to wait for joins, which means + // we don’t catch panics as they happen! + // If we can get the pool to “wait for join but also spawn new + // thread when you get a message” that could work! + // pool.spawn(format!("build_loop for {}", nix_file.display()), + let _ = std::thread::spawn(move || { + let mut build_loop = BuildLoop::new(&project, extra_nix_options); - self.handler_threads - .entry(project.nix_file.clone()) - .or_insert_with(|| Handler { - tx, - _handle: std::thread::spawn(move || { - let mut build_loop = BuildLoop::new(&project, extra_nix_options); - - // cloning the tx means the daemon’s rx gets all - // messages from all builders. - build_loop.forever(build_events_tx, rx); - }), - }) - // Notify the handler, whether or not it was newly added - .tx - .send(()) - .unwrap(); + build_loop.forever(build_events_tx, rx); + }); + Handler { tx } + }) + // Notify the handler, whether or not it was newly added + .tx + .send(()) + .unwrap(); + } } } diff --git a/src/daemon/rpc.rs b/src/daemon/rpc.rs index 18ff038d..b1646216 100644 --- a/src/daemon/rpc.rs +++ b/src/daemon/rpc.rs @@ -4,7 +4,6 @@ use super::IndicateActivity; use super::LoopHandlerEvent; use crate::build_loop::Event; use crate::error; -use crate::ops::error::ExitError; use crate::rpc; use crate::socket::{BindLock, SocketPath}; use crate::watch; @@ -29,7 +28,7 @@ impl Server { socket_path: SocketPath, activity_tx: chan::Sender, build_tx: chan::Sender, - ) -> Result { + ) -> Result { let lock = socket_path.lock()?; Ok(Server { socket_path, @@ -40,7 +39,7 @@ impl Server { } /// Serve the daemon endpoint. - pub fn serve(self) -> Result<(), ExitError> { + pub fn serve(self) -> Result<(), varlink::error::Error> { let address = &self.socket_path.address(); let service = varlink::VarlinkService::new( /* vendor */ "com.target", @@ -59,7 +58,6 @@ impl Server { max_worker_threads, idle_timeout, ) - .map_err(|e| ExitError::temporary(format!("{}", e))) } } diff --git a/src/lib.rs b/src/lib.rs index cea0eb6e..a6e13953 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,11 +56,20 @@ pub enum NixFile { Services(PathBuf), } -impl From<&NixFile> for PathBuf { - fn from(p: &NixFile) -> PathBuf { - match p { - NixFile::Shell(p) => p.to_path_buf(), - NixFile::Services(p) => p.to_path_buf(), +impl NixFile { + /// Underlying `Path`. + pub fn as_path(&self) -> &Path { + match self { + Self::Shell(ref path) => path, + Self::Services(ref path) => path, + } + } + + /// Display the underlying path + pub fn display(&self) -> std::path::Display { + match self { + Self::Shell(path) => path.display(), + Self::Services(path) => path.display(), } } } @@ -72,7 +81,7 @@ impl slog::Value for NixFile { key: slog::Key, serializer: &mut dyn slog::Serializer, ) -> slog::Result { - serializer.emit_arguments(key, &format_args!("{}", PathBuf::from(self).display())) + serializer.emit_arguments(key, &format_args!("{}", self.as_path().display())) } } diff --git a/src/nix.rs b/src/nix.rs index ea3b0c4e..c0adb929 100644 --- a/src/nix.rs +++ b/src/nix.rs @@ -534,7 +534,7 @@ mod tests { "--expr", "my-cool-expression", ] - .into_iter() + .iter() .map(OsStr::new) .collect(); assert_eq!(exp, nix.command_arguments()); @@ -565,7 +565,7 @@ mod tests { "--", "/my-cool-file.nix", ] - .into_iter() + .iter() .map(OsStr::new) .collect(); assert_eq!(exp2, nix2.command_arguments()); diff --git a/src/ops/daemon.rs b/src/ops/daemon.rs index a34eb691..7603cc06 100644 --- a/src/ops/daemon.rs +++ b/src/ops/daemon.rs @@ -17,7 +17,7 @@ pub fn main(opts: crate::cli::DaemonOptions) -> OpResult { }, }; - let (daemon, build_rx) = Daemon::new(extra_nix_options); + let (mut daemon, build_rx) = Daemon::new(extra_nix_options); let build_handle = std::thread::spawn(|| { for msg in build_rx { info!("build status"; "message" => ?msg); diff --git a/src/ops/shell.rs b/src/ops/shell.rs index e6d464ed..c63b2fdd 100644 --- a/src/ops/shell.rs +++ b/src/ops/shell.rs @@ -61,7 +61,9 @@ pub fn main(project: Project, opts: ShellOptions) -> OpResult { .to_str() .expect("lorri executable path not UTF-8 clean"), &shell, - &PathBuf::from(&project.nix_file) + project + .nix_file + .as_path() .to_str() .expect("Nix file path not UTF-8 clean"), ]) diff --git a/src/project.rs b/src/project.rs index aa2624e8..d650cf81 100644 --- a/src/project.rs +++ b/src/project.rs @@ -36,7 +36,7 @@ impl Project { ) -> std::io::Result { let hash = format!( "{:x}", - md5::compute(PathBuf::from(&nix_file).as_os_str().as_bytes()) + md5::compute(nix_file.as_path().as_os_str().as_bytes()) ); let project_gc_root = gc_root_dir.join(&hash).join("gc_root"); diff --git a/src/socket.rs b/src/socket.rs index ee572f88..fd4331ed 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -4,7 +4,8 @@ use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; /// Small wrapper that makes sure lorri sockets are handled correctly. -pub struct SocketPath(PathBuf); +#[derive(Clone)] +pub struct SocketPath(pub PathBuf); /// Binding to the socket failed. #[derive(Debug)] @@ -17,12 +18,6 @@ pub enum BindError { Unix(nix::Error), } -impl From for crate::ops::error::ExitError { - fn from(e: BindError) -> crate::ops::error::ExitError { - crate::ops::error::ExitError::temporary(format!("Bind error: {:?}", e)) - } -} - impl From for BindError { fn from(e: std::io::Error) -> BindError { BindError::Io(e) diff --git a/src/thread.rs b/src/thread.rs index d5288cdf..8c4a01e7 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -8,28 +8,32 @@ //! already. use crossbeam_channel as chan; +use std::any::Any; use std::collections::HashMap; use std::thread; use std::thread::ThreadId; -struct DeathCertificate { - tx: chan::Sender, +struct Thread { + name: String, + join_handle: thread::JoinHandle<()>, } -impl Drop for DeathCertificate { - fn drop(&mut self) { - self.tx - .send(thread::current().id()) - .expect("failed to send thread shut-down message!"); - } +struct Dead { + thread_id: ThreadId, + cause: Cause, +} + +enum Cause { + Natural, + Paniced(Box), } /// A thread pool for joining many threads at once, panicking /// if any of the threads panicked. pub struct Pool { - threads: HashMap>, - tx: chan::Sender, - rx: chan::Receiver, + threads: HashMap, + tx: chan::Sender, + rx: chan::Receiver, } impl Default for Pool { @@ -58,24 +62,33 @@ impl Pool { /// Spawn a sub-thread which is joined at the same time as all the /// remaining threads. - pub fn spawn(&mut self, name: N, f: F) -> Result<(), std::io::Error> + pub fn spawn(&mut self, name: N, f: F) -> Result<(), std::io::Error> where N: Into, - F: FnOnce() -> T, + F: FnOnce() -> () + std::panic::UnwindSafe, F: Send + 'static, - T: Send + 'static, { - let builder = thread::Builder::new().name(name.into()); + let name = name.into(); + let builder = thread::Builder::new().name(name.clone()); let tx = self.tx.clone(); let handle = builder.spawn(move || { - let certificate = DeathCertificate { tx }; - - f(); - drop(certificate); + let thread_id = thread::current().id(); + let cause = match std::panic::catch_unwind(|| f()) { + Ok(()) => Cause::Natural, + Err(panic) => Cause::Paniced(panic), + }; + tx.send(Dead { thread_id, cause }) + .expect("failed to send thread shut-down message!") })?; - self.threads.insert(handle.thread().id(), handle); + self.threads.insert( + handle.thread().id(), + Thread { + name, + join_handle: handle, + }, + ); Ok(()) } @@ -88,19 +101,34 @@ impl Pool { return; } - let thread_id = self + let death = self .rx .recv() - .expect("Failed to receive a ThreadResult, even though there are more threads."); + .expect("thread pool: Failed to receive a ThreadResult, even though there are more threads."); - let handle = self + let thread = self .threads - .remove(&thread_id) - .expect("Failed to find thread ID in handle table"); + .remove(&death.thread_id) + .expect("thread pool: Failed to find thread ID in handle table"); - handle + let name = thread.name; + thread + .join_handle .join() - .expect("Failed to join thread, despite catch_unwind!"); + // If the thread panics without an unwindable panic, + // there’s nothing we can do here. + // Otherwise the stack is unrolled via Cause::Paniced + .unwrap_or_else(|_any| { + panic!( + "thread pool: thread {} paniced and we were unable to unwind it", + name + ) + }); + + match death.cause { + Cause::Natural => {} + Cause::Paniced(panic) => std::panic::resume_unwind(panic), + } } } } diff --git a/tests/daemon/main.rs b/tests/daemon/main.rs index a7e08f1f..62600cbc 100644 --- a/tests/daemon/main.rs +++ b/tests/daemon/main.rs @@ -29,7 +29,7 @@ pub fn start_job_with_ping() -> std::io::Result<()> { let gc_root_dir = tempdir.path().join("gc_root").to_path_buf(); // The daemon knows how to build stuff - let (daemon, build_rx) = Daemon::new(NixOptions::empty()); + let (mut daemon, build_rx) = Daemon::new(NixOptions::empty()); let accept_handle = thread::spawn(move || { daemon .serve(socket_path, gc_root_dir, cas) diff --git a/tests/shell/main.rs b/tests/shell/main.rs index aebb97d8..35550149 100644 --- a/tests/shell/main.rs +++ b/tests/shell/main.rs @@ -35,10 +35,7 @@ fn loads_env() { .args(&[ "shell", "--shell-file", - PathBuf::from(&project.nix_file) - .as_os_str() - .to_str() - .unwrap(), + project.nix_file.as_path().as_os_str().to_str().unwrap(), ]) .current_dir(&tempdir) .output()