diff --git a/src/compiletest/procsrv.rs b/src/compiletest/procsrv.rs index 7d3aa33aae853..d3642a939db04 100644 --- a/src/compiletest/procsrv.rs +++ b/src/compiletest/procsrv.rs @@ -68,7 +68,7 @@ pub fn run(lib_path: &str, input: Option<~str>) -> Option { let env = env.clone().append(target_env(lib_path, prog).as_slice()); - let mut opt_process = Process::configure(ProcessConfig { + let opt_process = Process::configure(ProcessConfig { program: prog, args: args, env: Some(env.as_slice()), @@ -76,11 +76,12 @@ pub fn run(lib_path: &str, }); match opt_process { - Ok(ref mut process) => { + Ok(mut process) => { for input in input.iter() { process.stdin.get_mut_ref().write(input.as_bytes()).unwrap(); } - let ProcessOutput { status, output, error } = process.wait_with_output(); + let ProcessOutput { status, output, error } = + process.wait_with_output().unwrap(); Some(Result { status: status, diff --git a/src/compiletest/runtest.rs b/src/compiletest/runtest.rs index cea440afd9028..31deb12872d53 100644 --- a/src/compiletest/runtest.rs +++ b/src/compiletest/runtest.rs @@ -482,7 +482,7 @@ fn run_debuginfo_lldb_test(config: &config, props: &TestProps, testfile: &Path) let args = &[lldb_batchmode_script, test_executable_str, debugger_script_str]; let env = &[("PYTHONPATH".to_owned(), config.lldb_python_dir.clone().unwrap())]; - let mut opt_process = Process::configure(ProcessConfig { + let opt_process = Process::configure(ProcessConfig { program: "python", args: args, env: Some(env), @@ -490,8 +490,9 @@ fn run_debuginfo_lldb_test(config: &config, props: &TestProps, testfile: &Path) }); let (status, out, err) = match opt_process { - Ok(ref mut process) => { - let ProcessOutput { status, output, error } = process.wait_with_output(); + Ok(process) => { + let ProcessOutput { status, output, error } = + process.wait_with_output().unwrap(); (status, str::from_utf8(output.as_slice()).unwrap().to_owned(), diff --git a/src/libcore/str.rs b/src/libcore/str.rs index e677c4880b58d..1481759297868 100644 --- a/src/libcore/str.rs +++ b/src/libcore/str.rs @@ -1725,6 +1725,7 @@ impl<'a> StrSlice<'a> for &'a str { #[inline] fn is_char_boundary(&self, index: uint) -> bool { if index == self.len() { return true; } + if index > self.len() { return false; } let b = self[index]; return b < 128u8 || b >= 192u8; } diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs index e2b647f87a55d..3d8f565b6cf13 100644 --- a/src/liblibc/lib.rs +++ b/src/liblibc/lib.rs @@ -173,7 +173,7 @@ pub use funcs::bsd43::{shutdown}; #[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK}; #[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS}; #[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE}; -#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN, WNOHANG}; +#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN}; #[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX}; #[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone}; @@ -2461,8 +2461,6 @@ pub mod consts { pub static CLOCK_REALTIME: c_int = 0; pub static CLOCK_MONOTONIC: c_int = 1; - - pub static WNOHANG: c_int = 1; } pub mod posix08 { } @@ -2912,8 +2910,6 @@ pub mod consts { pub static CLOCK_REALTIME: c_int = 0; pub static CLOCK_MONOTONIC: c_int = 4; - - pub static WNOHANG: c_int = 1; } pub mod posix08 { } @@ -3301,8 +3297,6 @@ pub mod consts { pub static PTHREAD_CREATE_JOINABLE: c_int = 1; pub static PTHREAD_CREATE_DETACHED: c_int = 2; pub static PTHREAD_STACK_MIN: size_t = 8192; - - pub static WNOHANG: c_int = 1; } pub mod posix08 { } @@ -3968,16 +3962,6 @@ pub mod funcs { } } - pub mod wait { - use types::os::arch::c95::{c_int}; - use types::os::arch::posix88::{pid_t}; - - extern { - pub fn waitpid(pid: pid_t, status: *mut c_int, options: c_int) - -> pid_t; - } - } - pub mod glob { use types::os::arch::c95::{c_char, c_int}; use types::os::common::posix01::{glob_t}; diff --git a/src/libnative/io/c_unix.rs b/src/libnative/io/c_unix.rs index abb22476e5240..767090a10cda2 100644 --- a/src/libnative/io/c_unix.rs +++ b/src/libnative/io/c_unix.rs @@ -10,7 +10,12 @@ //! C definitions used by libnative that don't belong in liblibc +#![allow(dead_code)] + pub use self::select::fd_set; +pub use self::signal::{sigaction, siginfo, sigset_t}; +pub use self::signal::{SA_ONSTACK, SA_RESTART, SA_RESETHAND, SA_NOCLDSTOP}; +pub use self::signal::{SA_NODEFER, SA_NOCLDWAIT, SA_SIGINFO, SIGCHLD}; use libc; @@ -34,6 +39,8 @@ pub static MSG_DONTWAIT: libc::c_int = 0x80; #[cfg(target_os = "android")] pub static MSG_DONTWAIT: libc::c_int = 0x40; +pub static WNOHANG: libc::c_int = 1; + extern { pub fn gettimeofday(timeval: *mut libc::timeval, tzp: *libc::c_void) -> libc::c_int; @@ -49,6 +56,17 @@ extern { optlen: *mut libc::socklen_t) -> libc::c_int; pub fn ioctl(fd: libc::c_int, req: libc::c_ulong, ...) -> libc::c_int; + + pub fn waitpid(pid: libc::pid_t, status: *mut libc::c_int, + options: libc::c_int) -> libc::pid_t; + + pub fn sigaction(signum: libc::c_int, + act: *sigaction, + oldact: *mut sigaction) -> libc::c_int; + + pub fn sigaddset(set: *mut sigset_t, signum: libc::c_int) -> libc::c_int; + pub fn sigdelset(set: *mut sigset_t, signum: libc::c_int) -> libc::c_int; + pub fn sigemptyset(set: *mut sigset_t) -> libc::c_int; } #[cfg(target_os = "macos")] @@ -81,3 +99,94 @@ mod select { set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS); } } + +#[cfg(target_os = "linux")] +#[cfg(target_os = "android")] +mod signal { + use libc; + + pub static SA_NOCLDSTOP: libc::c_ulong = 0x00000001; + pub static SA_NOCLDWAIT: libc::c_ulong = 0x00000002; + pub static SA_NODEFER: libc::c_ulong = 0x40000000; + pub static SA_ONSTACK: libc::c_ulong = 0x08000000; + pub static SA_RESETHAND: libc::c_ulong = 0x80000000; + pub static SA_RESTART: libc::c_ulong = 0x10000000; + pub static SA_SIGINFO: libc::c_ulong = 0x00000004; + pub static SIGCHLD: libc::c_int = 17; + + // This definition is not as accurate as it could be, {pid, uid, status} is + // actually a giant union. Currently we're only interested in these fields, + // however. + pub struct siginfo { + si_signo: libc::c_int, + si_errno: libc::c_int, + si_code: libc::c_int, + pub pid: libc::pid_t, + pub uid: libc::uid_t, + pub status: libc::c_int, + } + + pub struct sigaction { + pub sa_handler: extern fn(libc::c_int), + pub sa_mask: sigset_t, + pub sa_flags: libc::c_ulong, + sa_restorer: *mut libc::c_void, + } + + #[cfg(target_word_size = "32")] + pub struct sigset_t { + __val: [libc::c_ulong, ..32], + } + #[cfg(target_word_size = "64")] + pub struct sigset_t { + __val: [libc::c_ulong, ..16], + } +} + +#[cfg(target_os = "macos")] +#[cfg(target_os = "freebsd")] +mod signal { + use libc; + + pub static SA_ONSTACK: libc::c_int = 0x0001; + pub static SA_RESTART: libc::c_int = 0x0002; + pub static SA_RESETHAND: libc::c_int = 0x0004; + pub static SA_NOCLDSTOP: libc::c_int = 0x0008; + pub static SA_NODEFER: libc::c_int = 0x0010; + pub static SA_NOCLDWAIT: libc::c_int = 0x0020; + pub static SA_SIGINFO: libc::c_int = 0x0040; + pub static SIGCHLD: libc::c_int = 20; + + #[cfg(target_os = "macos")] + pub type sigset_t = u32; + #[cfg(target_os = "freebsd")] + pub struct sigset_t { + bits: [u32, ..4], + } + + // This structure has more fields, but we're not all that interested in + // them. + pub struct siginfo { + pub si_signo: libc::c_int, + pub si_errno: libc::c_int, + pub si_code: libc::c_int, + pub pid: libc::pid_t, + pub uid: libc::uid_t, + pub status: libc::c_int, + } + + #[cfg(target_os = "macos")] + pub struct sigaction { + pub sa_handler: extern fn(libc::c_int), + sa_tramp: *mut libc::c_void, + pub sa_mask: sigset_t, + pub sa_flags: libc::c_int, + } + + #[cfg(target_os = "freebsd")] + pub struct sigaction { + pub sa_handler: extern fn(libc::c_int), + pub sa_flags: libc::c_int, + pub sa_mask: sigset_t, + } +} diff --git a/src/libnative/io/helper_thread.rs b/src/libnative/io/helper_thread.rs new file mode 100644 index 0000000000000..63dabed77e159 --- /dev/null +++ b/src/libnative/io/helper_thread.rs @@ -0,0 +1,204 @@ +// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Implementation of the helper thread for the timer module +//! +//! This module contains the management necessary for the timer worker thread. +//! This thread is responsible for performing the send()s on channels for timers +//! that are using channels instead of a blocking call. +//! +//! The timer thread is lazily initialized, and it's shut down via the +//! `shutdown` function provided. It must be maintained as an invariant that +//! `shutdown` is only called when the entire program is finished. No new timers +//! can be created in the future and there must be no active timers at that +//! time. + +#![macro_escape] + +use std::mem; +use std::rt::bookkeeping; +use std::rt; +use std::ty::Unsafe; +use std::unstable::mutex::StaticNativeMutex; + +use task; + +/// A structure for management of a helper thread. +/// +/// This is generally a static structure which tracks the lifetime of a helper +/// thread. +/// +/// The fields of this helper are all public, but they should not be used, this +/// is for static initialization. +pub struct Helper { + /// Internal lock which protects the remaining fields + pub lock: StaticNativeMutex, + + // You'll notice that the remaining fields are Unsafe, and this is + // because all helper thread operations are done through &self, but we need + // these to be mutable (once `lock` is held). + + /// Lazily allocated channel to send messages to the helper thread. + pub chan: Unsafe<*mut Sender>, + + /// OS handle used to wake up a blocked helper thread + pub signal: Unsafe, + + /// Flag if this helper thread has booted and been initialized yet. + pub initialized: Unsafe, +} + +macro_rules! helper_init( (static mut $name:ident: Helper<$m:ty>) => ( + static mut $name: Helper<$m> = Helper { + lock: ::std::unstable::mutex::NATIVE_MUTEX_INIT, + chan: ::std::ty::Unsafe { + value: 0 as *mut Sender<$m>, + marker1: ::std::kinds::marker::InvariantType, + }, + signal: ::std::ty::Unsafe { + value: 0, + marker1: ::std::kinds::marker::InvariantType, + }, + initialized: ::std::ty::Unsafe { + value: false, + marker1: ::std::kinds::marker::InvariantType, + }, + }; +) ) + +impl Helper { + /// Lazily boots a helper thread, becoming a no-op if the helper has already + /// been spawned. + /// + /// This function will check to see if the thread has been initialized, and + /// if it has it returns quickly. If initialization has not happened yet, + /// the closure `f` will be run (inside of the initialization lock) and + /// passed to the helper thread in a separate task. + /// + /// This function is safe to be called many times. + pub fn boot(&'static self, + f: || -> T, + helper: fn(imp::signal, Receiver, T)) { + unsafe { + let _guard = self.lock.lock(); + if !*self.initialized.get() { + let (tx, rx) = channel(); + *self.chan.get() = mem::transmute(box tx); + let (receive, send) = imp::new(); + *self.signal.get() = send as uint; + + let t = f(); + task::spawn(proc() { + bookkeeping::decrement(); + helper(receive, rx, t); + self.lock.lock().signal() + }); + + rt::at_exit(proc() { self.shutdown() }); + *self.initialized.get() = true; + } + } + } + + /// Sends a message to a spawned worker thread. + /// + /// This is only valid if the worker thread has previously booted + pub fn send(&'static self, msg: M) { + unsafe { + let _guard = self.lock.lock(); + + // Must send and *then* signal to ensure that the child receives the + // message. Otherwise it could wake up and go to sleep before we + // send the message. + assert!(!self.chan.get().is_null()); + (**self.chan.get()).send(msg); + imp::signal(*self.signal.get() as imp::signal); + } + } + + fn shutdown(&'static self) { + unsafe { + // Shut down, but make sure this is done inside our lock to ensure + // that we'll always receive the exit signal when the thread + // returns. + let guard = self.lock.lock(); + + // Close the channel by destroying it + let chan: Box> = mem::transmute(*self.chan.get()); + *self.chan.get() = 0 as *mut Sender; + drop(chan); + imp::close(*self.signal.get() as imp::signal); + + // Wait for the child to exit + guard.wait(); + drop(guard); + + // Clean up after ourselves + self.lock.destroy(); + *self.signal.get() = 0; + } + } +} + +#[cfg(unix)] +mod imp { + use libc; + use std::os; + + use io::file::FileDesc; + + pub type signal = libc::c_int; + + pub fn new() -> (signal, signal) { + let pipe = os::pipe(); + (pipe.input, pipe.out) + } + + pub fn signal(fd: libc::c_int) { + FileDesc::new(fd, false).inner_write([0]).unwrap(); + } + + pub fn close(fd: libc::c_int) { + let _fd = FileDesc::new(fd, true); + } +} + +#[cfg(windows)] +mod imp { + use libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle}; + use std::ptr; + use libc; + + pub type signal = HANDLE; + + pub fn new() -> (HANDLE, HANDLE) { + unsafe { + let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE, + ptr::null()); + (handle, handle) + } + } + + pub fn signal(handle: HANDLE) { + assert!(unsafe { SetEvent(handle) != 0 }); + } + + pub fn close(handle: HANDLE) { + assert!(unsafe { CloseHandle(handle) != 0 }); + } + + extern "system" { + fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCSTR) -> HANDLE; + fn SetEvent(hEvent: HANDLE) -> BOOL; + } +} diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index f2c2c66e1425f..a9aca656319ef 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -40,6 +40,8 @@ use ai = std::io::net::addrinfo; pub use self::file::FileDesc; pub use self::process::Process; +mod helper_thread; + // Native I/O implementations pub mod addrinfo; pub mod net; @@ -75,8 +77,6 @@ pub mod pipe; #[cfg(unix)] #[path = "c_unix.rs"] mod c; #[cfg(windows)] #[path = "c_win32.rs"] mod c; -mod timer_helper; - pub type IoResult = Result; fn unimpl() -> IoError { diff --git a/src/libnative/io/process.rs b/src/libnative/io/process.rs index 81c76bba7a0eb..cb8cbfa63e79b 100644 --- a/src/libnative/io/process.rs +++ b/src/libnative/io/process.rs @@ -8,20 +8,27 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::io; use libc::{pid_t, c_void, c_int}; use libc; +use std::io; +use std::mem; use std::os; use std::ptr; use std::rt::rtio; use p = std::io::process; + use super::IoResult; use super::file; +use super::util; -#[cfg(windows)] use std::mem; #[cfg(windows)] use std::strbuf::StrBuf; -#[cfg(not(windows))] use super::retry; +#[cfg(unix)] use super::c; +#[cfg(unix)] use super::retry; +#[cfg(unix)] use io::helper_thread::Helper; + +#[cfg(unix)] +helper_init!(static mut HELPER: Helper) /** * A value representing a child process. @@ -44,6 +51,14 @@ pub struct Process { /// Manually delivered signal exit_signal: Option, + + /// Deadline after which wait() will return + deadline: u64, +} + +#[cfg(unix)] +enum Req { + NewChild(libc::pid_t, Sender, u64), } impl Process { @@ -116,6 +131,7 @@ impl Process { handle: res.handle, exit_code: None, exit_signal: None, + deadline: 0, }, ret_io)) } @@ -131,11 +147,15 @@ impl Process { impl rtio::RtioProcess for Process { fn id(&self) -> pid_t { self.pid } - fn wait(&mut self) -> p::ProcessExit { + fn set_timeout(&mut self, timeout: Option) { + self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0); + } + + fn wait(&mut self) -> IoResult { match self.exit_code { - Some(code) => code, + Some(code) => Ok(code), None => { - let code = waitpid(self.pid); + let code = try!(waitpid(self.pid, self.deadline)); // On windows, waitpid will never return a signal. If a signal // was successfully delivered to the process, however, we can // consider it as having died via a signal. @@ -145,7 +165,7 @@ impl rtio::RtioProcess for Process { Some(..) => code, }; self.exit_code = Some(code); - code + Ok(code) } } } @@ -758,61 +778,301 @@ fn translate_status(status: c_int) -> p::ProcessExit { * operate on a none-existent process or, even worse, on a newer process * with the same id. */ -fn waitpid(pid: pid_t) -> p::ProcessExit { - return waitpid_os(pid); - - #[cfg(windows)] - fn waitpid_os(pid: pid_t) -> p::ProcessExit { - use libc::types::os::arch::extra::DWORD; - use libc::consts::os::extra::{ - SYNCHRONIZE, - PROCESS_QUERY_INFORMATION, - FALSE, - STILL_ACTIVE, - INFINITE, - WAIT_FAILED - }; - use libc::funcs::extra::kernel32::{ - OpenProcess, - GetExitCodeProcess, - CloseHandle, - WaitForSingleObject - }; +#[cfg(windows)] +fn waitpid(pid: pid_t, deadline: u64) -> IoResult { + use libc::types::os::arch::extra::DWORD; + use libc::consts::os::extra::{ + SYNCHRONIZE, + PROCESS_QUERY_INFORMATION, + FALSE, + STILL_ACTIVE, + INFINITE, + WAIT_TIMEOUT, + WAIT_OBJECT_0, + }; + use libc::funcs::extra::kernel32::{ + OpenProcess, + GetExitCodeProcess, + CloseHandle, + WaitForSingleObject, + }; - unsafe { + unsafe { + let process = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, + FALSE, + pid as DWORD); + if process.is_null() { + return Err(super::last_error()) + } - let process = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, - FALSE, - pid as DWORD); - if process.is_null() { - fail!("failure in OpenProcess: {}", os::last_os_error()); + loop { + let mut status = 0; + if GetExitCodeProcess(process, &mut status) == FALSE { + let err = Err(super::last_error()); + assert!(CloseHandle(process) != 0); + return err; } - - loop { - let mut status = 0; - if GetExitCodeProcess(process, &mut status) == FALSE { - assert!(CloseHandle(process) != 0); - fail!("failure in GetExitCodeProcess: {}", os::last_os_error()); - } - if status != STILL_ACTIVE { + if status != STILL_ACTIVE { + assert!(CloseHandle(process) != 0); + return Ok(p::ExitStatus(status as int)); + } + let interval = if deadline == 0 { + INFINITE + } else { + let now = ::io::timer::now(); + if deadline < now {0} else {(deadline - now) as u32} + }; + match WaitForSingleObject(process, interval) { + WAIT_OBJECT_0 => {} + WAIT_TIMEOUT => { assert!(CloseHandle(process) != 0); - return p::ExitStatus(status as int); + return Err(util::timeout("process wait timed out")) } - if WaitForSingleObject(process, INFINITE) == WAIT_FAILED { + _ => { + let err = Err(super::last_error()); assert!(CloseHandle(process) != 0); - fail!("failure in WaitForSingleObject: {}", os::last_os_error()); + return err } } } } +} - #[cfg(unix)] - fn waitpid_os(pid: pid_t) -> p::ProcessExit { - use libc::funcs::posix01::wait; - let mut status = 0 as c_int; - match retry(|| unsafe { wait::waitpid(pid, &mut status, 0) }) { +#[cfg(unix)] +fn waitpid(pid: pid_t, deadline: u64) -> IoResult { + use std::cmp; + use std::comm; + + static mut WRITE_FD: libc::c_int = 0; + + let mut status = 0 as c_int; + if deadline == 0 { + return match retry(|| unsafe { c::waitpid(pid, &mut status, 0) }) { -1 => fail!("unknown waitpid error: {}", super::last_error()), - _ => translate_status(status), + _ => Ok(translate_status(status)), + } + } + + // On unix, wait() and its friends have no timeout parameters, so there is + // no way to time out a thread in wait(). From some googling and some + // thinking, it appears that there are a few ways to handle timeouts in + // wait(), but the only real reasonable one for a multi-threaded program is + // to listen for SIGCHLD. + // + // With this in mind, the waiting mechanism with a timeout barely uses + // waitpid() at all. There are a few times that waitpid() is invoked with + // WNOHANG, but otherwise all the necessary blocking is done by waiting for + // a SIGCHLD to arrive (and that blocking has a timeout). Note, however, + // that waitpid() is still used to actually reap the child. + // + // Signal handling is super tricky in general, and this is no exception. Due + // to the async nature of SIGCHLD, we use the self-pipe trick to transmit + // data out of the signal handler to the rest of the application. The first + // idea would be to have each thread waiting with a timeout to read this + // output file descriptor, but a write() is akin to a signal(), not a + // broadcast(), so it would only wake up one thread, and possibly the wrong + // thread. Hence a helper thread is used. + // + // The helper thread here is responsible for farming requests for a + // waitpid() with a timeout, and then processing all of the wait requests. + // By guaranteeing that only this helper thread is reading half of the + // self-pipe, we're sure that we'll never lose a SIGCHLD. This helper thread + // is also responsible for select() to wait for incoming messages or + // incoming SIGCHLD messages, along with passing an appropriate timeout to + // select() to wake things up as necessary. + // + // The ordering of the following statements is also very purposeful. First, + // we must be guaranteed that the helper thread is booted and available to + // receive SIGCHLD signals, and then we must also ensure that we do a + // nonblocking waitpid() at least once before we go ask the sigchld helper. + // This prevents the race where the child exits, we boot the helper, and + // then we ask for the child's exit status (never seeing a sigchld). + // + // The actual communication between the helper thread and this thread is + // quite simple, just a channel moving data around. + + unsafe { HELPER.boot(register_sigchld, waitpid_helper) } + + match waitpid_nowait(pid) { + Some(ret) => return Ok(ret), + None => {} + } + + let (tx, rx) = channel(); + unsafe { HELPER.send(NewChild(pid, tx, deadline)); } + return match rx.recv_opt() { + Ok(e) => Ok(e), + Err(()) => Err(util::timeout("wait timed out")), + }; + + // Register a new SIGCHLD handler, returning the reading half of the + // self-pipe plus the old handler registered (return value of sigaction). + fn register_sigchld() -> (libc::c_int, c::sigaction) { + unsafe { + let mut old: c::sigaction = mem::init(); + let mut new: c::sigaction = mem::init(); + new.sa_handler = sigchld_handler; + new.sa_flags = c::SA_NOCLDSTOP; + assert_eq!(c::sigaction(c::SIGCHLD, &new, &mut old), 0); + + let mut pipes = [0, ..2]; + assert_eq!(libc::pipe(pipes.as_mut_ptr()), 0); + util::set_nonblocking(pipes[0], true).unwrap(); + util::set_nonblocking(pipes[1], true).unwrap(); + WRITE_FD = pipes[1]; + (pipes[0], old) + } + } + + // Helper thread for processing SIGCHLD messages + fn waitpid_helper(input: libc::c_int, + messages: Receiver, + (read_fd, old): (libc::c_int, c::sigaction)) { + util::set_nonblocking(input, true).unwrap(); + let mut set: c::fd_set = unsafe { mem::init() }; + let mut tv: libc::timeval; + let mut active = Vec::<(libc::pid_t, Sender, u64)>::new(); + let max = cmp::max(input, read_fd) + 1; + + 'outer: loop { + // Figure out the timeout of our syscall-to-happen. If we're waiting + // for some processes, then they'll have a timeout, otherwise we + // wait indefinitely for a message to arrive. + // + // FIXME: sure would be nice to not have to scan the entire array + let min = active.iter().map(|a| *a.ref2()).enumerate().min_by(|p| { + p.val1() + }); + let (p, idx) = match min { + Some((idx, deadline)) => { + let now = ::io::timer::now(); + let ms = if now < deadline {deadline - now} else {0}; + tv = util::ms_to_timeval(ms); + (&tv as *_, idx) + } + None => (ptr::null(), -1), + }; + + // Wait for something to happen + c::fd_set(&mut set, input); + c::fd_set(&mut set, read_fd); + match unsafe { c::select(max, &set, ptr::null(), ptr::null(), p) } { + // interrupted, retry + -1 if os::errno() == libc::EINTR as int => continue, + + // We read something, break out and process + 1 | 2 => {} + + // Timeout, the pending request is removed + 0 => { + drop(active.remove(idx)); + continue + } + + n => fail!("error in select {} ({})", os::errno(), n), + } + + // Process any pending messages + if drain(input) { + loop { + match messages.try_recv() { + Ok(NewChild(pid, tx, deadline)) => { + active.push((pid, tx, deadline)); + } + Err(comm::Disconnected) => { + assert!(active.len() == 0); + break 'outer; + } + Err(comm::Empty) => break, + } + } + } + + // If a child exited (somehow received SIGCHLD), then poll all + // children to see if any of them exited. + // + // We also attempt to be responsible netizens when dealing with + // SIGCHLD by invoking any previous SIGCHLD handler instead of just + // ignoring any previous SIGCHLD handler. Note that we don't provide + // a 1:1 mapping of our handler invocations to the previous handler + // invocations because we drain the `read_fd` entirely. This is + // probably OK because the kernel is already allowed to coalesce + // simultaneous signals, we're just doing some extra coalescing. + // + // Another point of note is that this likely runs the signal handler + // on a different thread than the one that received the signal. I + // *think* this is ok at this time. + // + // The main reason for doing this is to allow stdtest to run native + // tests as well. Both libgreen and libnative are running around + // with process timeouts, but libgreen should get there first + // (currently libuv doesn't handle old signal handlers). + if drain(read_fd) { + let i: uint = unsafe { mem::transmute(old.sa_handler) }; + if i != 0 { + assert!(old.sa_flags & c::SA_SIGINFO == 0); + (old.sa_handler)(c::SIGCHLD); + } + + // FIXME: sure would be nice to not have to scan the entire + // array... + active.retain(|&(pid, ref tx, _)| { + match waitpid_nowait(pid) { + Some(msg) => { tx.send(msg); false } + None => true, + } + }); + } + } + + // Once this helper thread is done, we re-register the old sigchld + // handler and close our intermediate file descriptors. + unsafe { + assert_eq!(c::sigaction(c::SIGCHLD, &old, ptr::mut_null()), 0); + let _ = libc::close(read_fd); + let _ = libc::close(WRITE_FD); + WRITE_FD = -1; + } + } + + // Drain all pending data from the file descriptor, returning if any data + // could be drained. This requires that the file descriptor is in + // nonblocking mode. + fn drain(fd: libc::c_int) -> bool { + let mut ret = false; + loop { + let mut buf = [0u8, ..1]; + match unsafe { + libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, + buf.len() as libc::size_t) + } { + n if n > 0 => { ret = true; } + 0 => return true, + -1 if util::wouldblock() => return ret, + n => fail!("bad read {} ({})", os::last_os_error(), n), + } + } + } + + // Signal handler for SIGCHLD signals, must be async-signal-safe! + // + // This function will write to the writing half of the "self pipe" to wake + // up the helper thread if it's waiting. Note that this write must be + // nonblocking because if it blocks and the reader is the thread we + // interrupted, then we'll deadlock. + // + // When writing, if the write returns EWOULDBLOCK then we choose to ignore + // it. At that point we're guaranteed that there's something in the pipe + // which will wake up the other end at some point, so we just allow this + // signal to be coalesced with the pending signals on the pipe. + extern fn sigchld_handler(_signum: libc::c_int) { + let mut msg = 1; + match unsafe { + libc::write(WRITE_FD, &mut msg as *mut _ as *libc::c_void, 1) + } { + 1 => {} + -1 if util::wouldblock() => {} // see above comments + n => fail!("bad error on write fd: {} {}", n, os::errno()), } } } @@ -826,10 +1086,9 @@ fn waitpid_nowait(pid: pid_t) -> Option { #[cfg(unix)] fn waitpid_os(pid: pid_t) -> Option { - use libc::funcs::posix01::wait; let mut status = 0 as c_int; match retry(|| unsafe { - wait::waitpid(pid, &mut status, libc::WNOHANG) + c::waitpid(pid, &mut status, c::WNOHANG) }) { n if n == pid => Some(translate_status(status)), 0 => None, diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs deleted file mode 100644 index 95b2620f3c798..0000000000000 --- a/src/libnative/io/timer_helper.rs +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Implementation of the helper thread for the timer module -//! -//! This module contains the management necessary for the timer worker thread. -//! This thread is responsible for performing the send()s on channels for timers -//! that are using channels instead of a blocking call. -//! -//! The timer thread is lazily initialized, and it's shut down via the -//! `shutdown` function provided. It must be maintained as an invariant that -//! `shutdown` is only called when the entire program is finished. No new timers -//! can be created in the future and there must be no active timers at that -//! time. - -use std::mem; -use std::rt::bookkeeping; -use std::rt; -use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; - -use io::timer::{Req, Shutdown}; -use task; - -// You'll note that these variables are *not* protected by a lock. These -// variables are initialized with a Once before any Timer is created and are -// only torn down after everything else has exited. This means that these -// variables are read-only during use (after initialization) and both of which -// are safe to use concurrently. -static mut HELPER_CHAN: *mut Sender = 0 as *mut Sender; -static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal; - -static mut TIMER_HELPER_EXIT: StaticNativeMutex = NATIVE_MUTEX_INIT; - -pub fn boot(helper: fn(imp::signal, Receiver)) { - static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; - static mut INITIALIZED: bool = false; - - unsafe { - let mut _guard = LOCK.lock(); - if !INITIALIZED { - let (tx, rx) = channel(); - // promote this to a shared channel - drop(tx.clone()); - HELPER_CHAN = mem::transmute(box tx); - let (receive, send) = imp::new(); - HELPER_SIGNAL = send; - - task::spawn(proc() { - bookkeeping::decrement(); - helper(receive, rx); - TIMER_HELPER_EXIT.lock().signal() - }); - - rt::at_exit(proc() { shutdown() }); - INITIALIZED = true; - } - } -} - -pub fn send(req: Req) { - unsafe { - assert!(!HELPER_CHAN.is_null()); - (*HELPER_CHAN).send(req); - imp::signal(HELPER_SIGNAL); - } -} - -fn shutdown() { - // Request a shutdown, and then wait for the task to exit - unsafe { - let guard = TIMER_HELPER_EXIT.lock(); - send(Shutdown); - guard.wait(); - drop(guard); - TIMER_HELPER_EXIT.destroy(); - } - - - // Clean up after ther helper thread - unsafe { - imp::close(HELPER_SIGNAL); - let _chan: Box> = mem::transmute(HELPER_CHAN); - HELPER_CHAN = 0 as *mut Sender; - HELPER_SIGNAL = 0 as imp::signal; - } -} - -#[cfg(unix)] -mod imp { - use libc; - use std::os; - - use io::file::FileDesc; - - pub type signal = libc::c_int; - - pub fn new() -> (signal, signal) { - let pipe = os::pipe(); - (pipe.input, pipe.out) - } - - pub fn signal(fd: libc::c_int) { - FileDesc::new(fd, false).inner_write([0]).unwrap(); - } - - pub fn close(fd: libc::c_int) { - let _fd = FileDesc::new(fd, true); - } -} - -#[cfg(windows)] -mod imp { - use libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle}; - use std::ptr; - use libc; - - pub type signal = HANDLE; - - pub fn new() -> (HANDLE, HANDLE) { - unsafe { - let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE, - ptr::null()); - (handle, handle) - } - } - - pub fn signal(handle: HANDLE) { - assert!(unsafe { SetEvent(handle) != 0 }); - } - - pub fn close(handle: HANDLE) { - assert!(unsafe { CloseHandle(handle) != 0 }); - } - - extern "system" { - fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES, - bManualReset: BOOL, - bInitialState: BOOL, - lpName: LPCSTR) -> HANDLE; - fn SetEvent(hEvent: HANDLE) -> BOOL; - } -} diff --git a/src/libnative/io/timer_unix.rs b/src/libnative/io/timer_unix.rs index e008e6fb9e905..2c5b798482777 100644 --- a/src/libnative/io/timer_unix.rs +++ b/src/libnative/io/timer_unix.rs @@ -52,11 +52,14 @@ use std::os; use std::ptr; use std::rt::rtio; use std::sync::atomics; +use std::comm; use io::IoResult; use io::c; use io::file::FileDesc; -use io::timer_helper; +use io::helper_thread::Helper; + +helper_init!(static mut HELPER: Helper) pub struct Timer { id: uint, @@ -79,9 +82,6 @@ pub enum Req { // Remove a timer based on its id and then send it back on the channel // provided RemoveTimer(uint, Sender>), - - // Shut down the loop and then ACK this channel once it's shut down - Shutdown, } // returns the current time (in milliseconds) @@ -93,7 +93,7 @@ pub fn now() -> u64 { } } -fn helper(input: libc::c_int, messages: Receiver) { +fn helper(input: libc::c_int, messages: Receiver, _: ()) { let mut set: c::fd_set = unsafe { mem::init() }; let mut fd = FileDesc::new(input, true); @@ -163,7 +163,7 @@ fn helper(input: libc::c_int, messages: Receiver) { 1 => { loop { match messages.try_recv() { - Ok(Shutdown) => { + Err(comm::Disconnected) => { assert!(active.len() == 0); break 'outer; } @@ -202,7 +202,7 @@ fn helper(input: libc::c_int, messages: Receiver) { impl Timer { pub fn new() -> IoResult { - timer_helper::boot(helper); + unsafe { HELPER.boot(|| {}, helper); } static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; let id = unsafe { ID.fetch_add(1, atomics::Relaxed) }; @@ -235,7 +235,7 @@ impl Timer { Some(i) => i, None => { let (tx, rx) = channel(); - timer_helper::send(RemoveTimer(self.id, tx)); + unsafe { HELPER.send(RemoveTimer(self.id, tx)); } rx.recv() } } @@ -261,7 +261,7 @@ impl rtio::RtioTimer for Timer { inner.interval = msecs; inner.target = now + msecs; - timer_helper::send(NewTimer(inner)); + unsafe { HELPER.send(NewTimer(inner)); } return rx; } @@ -275,7 +275,7 @@ impl rtio::RtioTimer for Timer { inner.interval = msecs; inner.target = now + msecs; - timer_helper::send(NewTimer(inner)); + unsafe { HELPER.send(NewTimer(inner)); } return rx; } } diff --git a/src/libnative/io/timer_win32.rs b/src/libnative/io/timer_win32.rs index 15e6e62421a5b..e7130de05c26d 100644 --- a/src/libnative/io/timer_win32.rs +++ b/src/libnative/io/timer_win32.rs @@ -23,10 +23,13 @@ use libc; use std::ptr; use std::rt::rtio; +use std::comm; -use io::timer_helper; +use io::helper_thread::Helper; use io::IoResult; +helper_init!(static mut HELPER: Helper) + pub struct Timer { obj: libc::HANDLE, on_worker: bool, @@ -35,10 +38,9 @@ pub struct Timer { pub enum Req { NewTimer(libc::HANDLE, Sender<()>, bool), RemoveTimer(libc::HANDLE, Sender<()>), - Shutdown, } -fn helper(input: libc::HANDLE, messages: Receiver) { +fn helper(input: libc::HANDLE, messages: Receiver, _: ()) { let mut objs = vec![input]; let mut chans = vec![]; @@ -67,12 +69,12 @@ fn helper(input: libc::HANDLE, messages: Receiver) { None => {} } } - Ok(Shutdown) => { + Err(comm::Disconnected) => { assert_eq!(objs.len(), 1); assert_eq!(chans.len(), 0); break 'outer; } - _ => break + Err(..) => break } } } else { @@ -102,7 +104,7 @@ pub fn now() -> u64 { impl Timer { pub fn new() -> IoResult { - timer_helper::boot(helper); + unsafe { HELPER.boot(|| {}, helper) } let obj = unsafe { imp::CreateWaitableTimerA(ptr::mut_null(), 0, ptr::null()) @@ -124,7 +126,7 @@ impl Timer { if !self.on_worker { return } let (tx, rx) = channel(); - timer_helper::send(RemoveTimer(self.obj, tx)); + unsafe { HELPER.send(RemoveTimer(self.obj, tx)) } rx.recv(); self.on_worker = false; @@ -157,7 +159,7 @@ impl rtio::RtioTimer for Timer { ptr::mut_null(), 0) }, 1); - timer_helper::send(NewTimer(self.obj, tx, true)); + unsafe { HELPER.send(NewTimer(self.obj, tx, true)) } self.on_worker = true; return rx; } @@ -173,7 +175,7 @@ impl rtio::RtioTimer for Timer { ptr::null(), ptr::mut_null(), 0) }, 1); - timer_helper::send(NewTimer(self.obj, tx, false)); + unsafe { HELPER.send(NewTimer(self.obj, tx, false)) } self.on_worker = true; return rx; diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs index 0df45f7d5a0a9..198b5d440974a 100644 --- a/src/libnative/lib.rs +++ b/src/libnative/lib.rs @@ -54,6 +54,7 @@ // NB this crate explicitly does *not* allow glob imports, please seriously // consider whether they're needed before adding that feature here (the // answer is that you don't need them) +#![feature(macro_rules)] extern crate libc; diff --git a/src/librustc/back/archive.rs b/src/librustc/back/archive.rs index a0b38700ecf0a..29ab3d75eae6f 100644 --- a/src/librustc/back/archive.rs +++ b/src/librustc/back/archive.rs @@ -54,8 +54,8 @@ fn run_ar(sess: &Session, args: &str, cwd: Option<&Path>, cwd: cwd.map(|a| &*a), .. ProcessConfig::new() }) { - Ok(mut prog) => { - let o = prog.wait_with_output(); + Ok(prog) => { + let o = prog.wait_with_output().unwrap(); if !o.status.success() { sess.err(format!("{} {} failed with: {}", ar, args.connect(" "), o.status)); diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index d671e20868c5c..7afac6801519b 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -19,7 +19,8 @@ use std::rt::task::BlockedTask; use homing::{HomingIO, HomeHandle}; use pipe::PipeWatcher; use super::{UvHandle, UvError, uv_error_to_io_error, - wait_until_woken_after, wakeup}; + wait_until_woken_after, wakeup, Loop}; +use timer::TimerWatcher; use uvio::UvIoFactory; use uvll; @@ -32,6 +33,16 @@ pub struct Process { /// Collected from the exit_cb exit_status: Option, + + /// Lazily initialized timeout timer + timer: Option>, + timeout_state: TimeoutState, +} + +enum TimeoutState { + NoTimeout, + TimeoutPending, + TimeoutElapsed, } impl Process { @@ -92,6 +103,8 @@ impl Process { home: io_loop.make_handle(), to_wake: None, exit_status: None, + timer: None, + timeout_state: NoTimeout, }; match unsafe { uvll::uv_spawn(io_loop.uv_loop(), handle, &options) @@ -223,21 +236,71 @@ impl RtioProcess for Process { } } - fn wait(&mut self) -> process::ProcessExit { + fn wait(&mut self) -> Result { // Make sure (on the home scheduler) that we have an exit status listed let _m = self.fire_homing_missile(); match self.exit_status { - Some(..) => {} - None => { - // If there's no exit code previously listed, then the - // process's exit callback has yet to be invoked. We just - // need to deschedule ourselves and wait to be reawoken. + Some(status) => return Ok(status), + None => {} + } + + // If there's no exit code previously listed, then the process's exit + // callback has yet to be invoked. We just need to deschedule ourselves + // and wait to be reawoken. + match self.timeout_state { + NoTimeout | TimeoutPending => { wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {}); - assert!(self.exit_status.is_some()); } + TimeoutElapsed => {} + } + + // If there's still no exit status listed, then we timed out, and we + // need to return. + match self.exit_status { + Some(status) => Ok(status), + None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } + } + + fn set_timeout(&mut self, timeout: Option) { + let _m = self.fire_homing_missile(); + self.timeout_state = NoTimeout; + let ms = match timeout { + Some(ms) => ms, + None => { + match self.timer { + Some(ref mut timer) => timer.stop(), + None => {} + } + return + } + }; + if self.timer.is_none() { + let loop_ = Loop::wrap(unsafe { + uvll::get_loop_for_uv_handle(self.uv_handle()) + }); + let mut timer = box TimerWatcher::new_home(&loop_, self.home().clone()); + unsafe { + timer.set_data(self as *mut _ as *Process); + } + self.timer = Some(timer); } - self.exit_status.unwrap() + let timer = self.timer.get_mut_ref(); + timer.stop(); + timer.start(timer_cb, ms, 0); + self.timeout_state = TimeoutPending; + + extern fn timer_cb(timer: *uvll::uv_timer_t) { + let p: &mut Process = unsafe { + &mut *(uvll::get_data_for_uv_handle(timer) as *mut Process) + }; + p.timeout_state = TimeoutElapsed; + match p.to_wake.take() { + Some(task) => { let _t = task.wake().map(|t| t.reawaken()); } + None => {} + } + } } } diff --git a/src/libstd/io/process.rs b/src/libstd/io/process.rs index 3babef6126e8f..1f6a6159d981b 100644 --- a/src/libstd/io/process.rs +++ b/src/libstd/io/process.rs @@ -10,6 +10,8 @@ //! Bindings for executing child processes +#![allow(experimental)] + use prelude::*; use fmt; @@ -50,7 +52,7 @@ use rt::rtio::{RtioProcess, IoFactory, LocalIo}; /// }; /// /// let contents = child.stdout.get_mut_ref().read_to_end(); -/// assert!(child.wait().success()); +/// assert!(child.wait().unwrap().success()); /// ``` pub struct Process { handle: Box, @@ -283,7 +285,7 @@ impl Process { /// println!("stderr: {}", str::from_utf8_lossy(output.error.as_slice())); /// ``` pub fn output(prog: &str, args: &[~str]) -> IoResult { - Process::new(prog, args).map(|mut p| p.wait_with_output()) + Process::new(prog, args).and_then(|p| p.wait_with_output()) } /// Executes a child process and collects its exit status. This will block @@ -302,7 +304,7 @@ impl Process { /// println!("process exited with: {}", status); /// ``` pub fn status(prog: &str, args: &[~str]) -> IoResult { - Process::new(prog, args).map(|mut p| p.wait()) + Process::new(prog, args).and_then(|mut p| p.wait()) } /// Creates a new process with the specified configuration. @@ -377,17 +379,72 @@ impl Process { /// after it has been called at least once. /// /// The stdin handle to the child process will be closed before waiting. - pub fn wait(&mut self) -> ProcessExit { + /// + /// # Errors + /// + /// This function can fail if a timeout was previously specified via + /// `set_timeout` and the timeout expires before the child exits. + pub fn wait(&mut self) -> IoResult { drop(self.stdin.take()); self.handle.wait() } + /// Sets a timeout, in milliseconds, for future calls to wait(). + /// + /// The argument specified is a relative distance into the future, in + /// milliseconds, after which any call to wait() will return immediately + /// with a timeout error, and all future calls to wait() will not block. + /// + /// A value of `None` will clear any previous timeout, and a value of `Some` + /// will override any previously set timeout. + /// + /// # Example + /// + /// ```no_run + /// # #![allow(experimental)] + /// use std::io::process::{Process, ProcessExit}; + /// use std::io::IoResult; + /// + /// fn run_gracefully(prog: &str) -> IoResult { + /// let mut p = try!(Process::new("long-running-process", [])); + /// + /// // give the process 10 seconds to finish completely + /// p.set_timeout(Some(10_000)); + /// match p.wait() { + /// Ok(status) => return Ok(status), + /// Err(..) => {} + /// } + /// + /// // Attempt to exit gracefully, but don't wait for it too long + /// try!(p.signal_exit()); + /// p.set_timeout(Some(1_000)); + /// match p.wait() { + /// Ok(status) => return Ok(status), + /// Err(..) => {} + /// } + /// + /// // Well, we did our best, forcefully kill the process + /// try!(p.signal_kill()); + /// p.set_timeout(None); + /// p.wait() + /// } + /// ``` + #[experimental = "the type of the timeout is likely to change"] + pub fn set_timeout(&mut self, timeout_ms: Option) { + self.handle.set_timeout(timeout_ms) + } + /// Simultaneously wait for the child to exit and collect all remaining /// output on the stdout/stderr handles, returning a `ProcessOutput` /// instance. /// /// The stdin handle to the child is closed before waiting. - pub fn wait_with_output(&mut self) -> ProcessOutput { + /// + /// # Errors + /// + /// This function can fail for any of the same reasons that `wait()` can + /// fail. + pub fn wait_with_output(mut self) -> IoResult { drop(self.stdin.take()); fn read(stream: Option) -> Receiver>> { let (tx, rx) = channel(); @@ -403,11 +460,13 @@ impl Process { let stdout = read(self.stdout.take()); let stderr = read(self.stderr.take()); - let status = self.wait(); + let status = try!(self.wait()); - ProcessOutput { status: status, - output: stdout.recv().ok().unwrap_or(Vec::new()), - error: stderr.recv().ok().unwrap_or(Vec::new()) } + Ok(ProcessOutput { + status: status, + output: stdout.recv().ok().unwrap_or(Vec::new()), + error: stderr.recv().ok().unwrap_or(Vec::new()), + }) } } @@ -420,7 +479,8 @@ impl Drop for Process { drop(self.stderr.take()); drop(mem::replace(&mut self.extra_io, Vec::new())); - self.wait(); + self.set_timeout(None); + let _ = self.wait().unwrap(); } } @@ -440,7 +500,7 @@ mod tests { let p = Process::configure(args); assert!(p.is_ok()); let mut p = p.unwrap(); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); }) #[cfg(not(target_os="android"))] @@ -464,7 +524,7 @@ mod tests { let p = Process::configure(args); assert!(p.is_ok()); let mut p = p.unwrap(); - assert!(p.wait().matches_exit_status(1)); + assert!(p.wait().unwrap().matches_exit_status(1)); drop(p.wait().clone()); }) @@ -478,7 +538,7 @@ mod tests { let p = Process::configure(args); assert!(p.is_ok()); let mut p = p.unwrap(); - match p.wait() { + match p.wait().unwrap() { process::ExitSignal(1) => {}, result => fail!("not terminated by signal 1 (instead, {})", result), } @@ -494,7 +554,7 @@ mod tests { let mut p = p.unwrap(); assert!(p.stdout.is_some()); let ret = read_all(p.stdout.get_mut_ref() as &mut Reader); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); return ret; } @@ -535,7 +595,7 @@ mod tests { p.stdin.get_mut_ref().write("foobar".as_bytes()).unwrap(); drop(p.stdin.take()); let out = read_all(p.stdout.get_mut_ref() as &mut Reader); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); assert_eq!(out, "foobar\n".to_owned()); }) @@ -547,7 +607,7 @@ mod tests { .. ProcessConfig::new() }; let mut p = Process::configure(args).unwrap(); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); }) #[cfg(windows)] @@ -571,7 +631,7 @@ mod tests { .. ProcessConfig::new() }; let mut p = Process::configure(args).unwrap(); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); }) #[cfg(unix, not(target_os="android"))] @@ -634,36 +694,21 @@ mod tests { #[cfg(not(target_os="android"))] iotest!(fn test_finish_once() { let mut prog = Process::new("false", []).unwrap(); - assert!(prog.wait().matches_exit_status(1)); + assert!(prog.wait().unwrap().matches_exit_status(1)); }) #[cfg(not(target_os="android"))] iotest!(fn test_finish_twice() { let mut prog = Process::new("false", []).unwrap(); - assert!(prog.wait().matches_exit_status(1)); - assert!(prog.wait().matches_exit_status(1)); + assert!(prog.wait().unwrap().matches_exit_status(1)); + assert!(prog.wait().unwrap().matches_exit_status(1)); }) #[cfg(not(target_os="android"))] iotest!(fn test_wait_with_output_once() { - let mut prog = Process::new("echo", ["hello".to_owned()]).unwrap(); - let ProcessOutput {status, output, error} = prog.wait_with_output(); - let output_str = str::from_utf8(output.as_slice()).unwrap(); - - assert!(status.success()); - assert_eq!(output_str.trim().to_owned(), "hello".to_owned()); - // FIXME #7224 - if !running_on_valgrind() { - assert_eq!(error, Vec::new()); - } - }) - - #[cfg(not(target_os="android"))] - iotest!(fn test_wait_with_output_twice() { - let mut prog = Process::new("echo", ["hello".to_owned()]).unwrap(); - let ProcessOutput {status, output, error} = prog.wait_with_output(); - + let prog = Process::new("echo", ["hello".to_owned()]).unwrap(); + let ProcessOutput {status, output, error} = prog.wait_with_output().unwrap(); let output_str = str::from_utf8(output.as_slice()).unwrap(); assert!(status.success()); @@ -672,15 +717,6 @@ mod tests { if !running_on_valgrind() { assert_eq!(error, Vec::new()); } - - let ProcessOutput {status, output, error} = prog.wait_with_output(); - - assert!(status.success()); - assert_eq!(output, Vec::new()); - // FIXME #7224 - if !running_on_valgrind() { - assert_eq!(error, Vec::new()); - } }) #[cfg(unix,not(target_os="android"))] @@ -713,9 +749,10 @@ mod tests { iotest!(fn test_keep_current_working_dir() { use os; - let mut prog = run_pwd(None); + let prog = run_pwd(None); - let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned(); + let output = str::from_utf8(prog.wait_with_output().unwrap() + .output.as_slice()).unwrap().to_owned(); let parent_dir = os::getcwd(); let child_dir = Path::new(output.trim()); @@ -731,9 +768,10 @@ mod tests { // test changing to the parent of os::getcwd() because we know // the path exists (and os::getcwd() is not expected to be root) let parent_dir = os::getcwd().dir_path(); - let mut prog = run_pwd(Some(&parent_dir)); + let prog = run_pwd(Some(&parent_dir)); - let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned(); + let output = str::from_utf8(prog.wait_with_output().unwrap() + .output.as_slice()).unwrap().to_owned(); let child_dir = Path::new(output.trim()); let parent_stat = parent_dir.stat().unwrap(); @@ -776,8 +814,9 @@ mod tests { use os; if running_on_valgrind() { return; } - let mut prog = run_env(None); - let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned(); + let prog = run_env(None); + let output = str::from_utf8(prog.wait_with_output().unwrap() + .output.as_slice()).unwrap().to_owned(); let r = os::env(); for &(ref k, ref v) in r.iter() { @@ -790,7 +829,7 @@ mod tests { use os; if running_on_valgrind() { return; } - let mut prog = run_env(None); + let prog = run_env(None); let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned(); let r = os::env(); @@ -806,8 +845,8 @@ mod tests { iotest!(fn test_add_to_env() { let new_env = box [("RUN_TEST_NEW_ENV".to_owned(), "123".to_owned())]; - let mut prog = run_env(Some(new_env)); - let result = prog.wait_with_output(); + let prog = run_env(Some(new_env)); + let result = prog.wait_with_output().unwrap(); let output = str::from_utf8_lossy(result.output.as_slice()).into_owned(); assert!(output.contains("RUN_TEST_NEW_ENV=123"), @@ -829,14 +868,14 @@ mod tests { iotest!(fn test_kill() { let mut p = sleeper(); Process::kill(p.id(), PleaseExitSignal).unwrap(); - assert!(!p.wait().success()); + assert!(!p.wait().unwrap().success()); }) iotest!(fn test_exists() { let mut p = sleeper(); assert!(Process::kill(p.id(), 0).is_ok()); p.signal_kill().unwrap(); - assert!(!p.wait().success()); + assert!(!p.wait().unwrap().success()); }) iotest!(fn test_zero() { @@ -844,11 +883,42 @@ mod tests { p.signal_kill().unwrap(); for _ in range(0, 20) { if p.signal(0).is_err() { - assert!(!p.wait().success()); + assert!(!p.wait().unwrap().success()); return } timer::sleep(100); } fail!("never saw the child go away"); }) + + iotest!(fn wait_timeout() { + let mut p = sleeper(); + p.set_timeout(Some(10)); + assert_eq!(p.wait().err().unwrap().kind, TimedOut); + assert_eq!(p.wait().err().unwrap().kind, TimedOut); + p.signal_kill().unwrap(); + p.set_timeout(None); + assert!(p.wait().is_ok()); + }) + + iotest!(fn wait_timeout2() { + let (tx, rx) = channel(); + let tx2 = tx.clone(); + spawn(proc() { + let mut p = sleeper(); + p.set_timeout(Some(10)); + assert_eq!(p.wait().err().unwrap().kind, TimedOut); + p.signal_kill().unwrap(); + tx.send(()); + }); + spawn(proc() { + let mut p = sleeper(); + p.set_timeout(Some(10)); + assert_eq!(p.wait().err().unwrap().kind, TimedOut); + p.signal_kill().unwrap(); + tx2.send(()); + }); + rx.recv(); + rx.recv(); + }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index bc3a483f30d2b..4b9a623e33ce8 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -274,7 +274,8 @@ pub trait RtioFileStream { pub trait RtioProcess { fn id(&self) -> libc::pid_t; fn kill(&mut self, signal: int) -> IoResult<()>; - fn wait(&mut self) -> ProcessExit; + fn wait(&mut self) -> IoResult; + fn set_timeout(&mut self, timeout: Option); } pub trait RtioPipe { diff --git a/src/test/run-pass/backtrace.rs b/src/test/run-pass/backtrace.rs index 989453d8570d1..53fe91cff3745 100644 --- a/src/test/run-pass/backtrace.rs +++ b/src/test/run-pass/backtrace.rs @@ -50,7 +50,7 @@ fn runtest(me: &str) { env: Some(env.as_slice()), .. ProcessConfig::new() }).unwrap(); - let out = p.wait_with_output(); + let out = p.wait_with_output().unwrap(); assert!(!out.status.success()); let s = str::from_utf8(out.error.as_slice()).unwrap(); assert!(s.contains("stack backtrace") && s.contains("foo::h"), @@ -62,7 +62,7 @@ fn runtest(me: &str) { args: ["fail".to_owned()], .. ProcessConfig::new() }).unwrap(); - let out = p.wait_with_output(); + let out = p.wait_with_output().unwrap(); assert!(!out.status.success()); let s = str::from_utf8(out.error.as_slice()).unwrap(); assert!(!s.contains("stack backtrace") && !s.contains("foo::h"), @@ -74,7 +74,7 @@ fn runtest(me: &str) { args: ["double-fail".to_owned()], .. ProcessConfig::new() }).unwrap(); - let out = p.wait_with_output(); + let out = p.wait_with_output().unwrap(); assert!(!out.status.success()); let s = str::from_utf8(out.error.as_slice()).unwrap(); assert!(s.contains("stack backtrace") && s.contains("double::h"), @@ -87,7 +87,7 @@ fn runtest(me: &str) { env: Some(env.as_slice()), .. ProcessConfig::new() }).unwrap(); - let out = p.wait_with_output(); + let out = p.wait_with_output().unwrap(); assert!(!out.status.success()); let s = str::from_utf8(out.error.as_slice()).unwrap(); let mut i = 0; diff --git a/src/test/run-pass/core-run-destroy.rs b/src/test/run-pass/core-run-destroy.rs index 83d3b51f74a4f..01a71d862b4ff 100644 --- a/src/test/run-pass/core-run-destroy.rs +++ b/src/test/run-pass/core-run-destroy.rs @@ -120,7 +120,7 @@ pub fn test_destroy_actually_kills(force: bool) { () = rx1.recv() => {} } }); - match p.wait() { + match p.wait().unwrap() { ExitStatus(..) => fail!("expected a signal"), ExitSignal(..) => tx.send(()), } diff --git a/src/test/run-pass/issue-13304.rs b/src/test/run-pass/issue-13304.rs index f66b943d85f64..fc1825d22cd94 100644 --- a/src/test/run-pass/issue-13304.rs +++ b/src/test/run-pass/issue-13304.rs @@ -52,7 +52,7 @@ fn parent(flavor: ~str) { let args = args.as_slice(); let mut p = io::Process::new(args[0].as_slice(), ["child".to_owned(), flavor]).unwrap(); p.stdin.get_mut_ref().write_str("test1\ntest2\ntest3").unwrap(); - let out = p.wait_with_output(); + let out = p.wait_with_output().unwrap(); assert!(out.status.success()); let s = str::from_utf8(out.output.as_slice()).unwrap(); assert_eq!(s, "test1\n\ntest2\n\ntest3\n"); diff --git a/src/test/run-pass/logging-separate-lines.rs b/src/test/run-pass/logging-separate-lines.rs index a5e632b94a288..f87c22bdb57c3 100644 --- a/src/test/run-pass/logging-separate-lines.rs +++ b/src/test/run-pass/logging-separate-lines.rs @@ -36,7 +36,7 @@ fn main() { env: Some(env.as_slice()), ..ProcessConfig::new() }; - let p = Process::configure(config).unwrap().wait_with_output(); + let p = Process::configure(config).unwrap().wait_with_output().unwrap(); assert!(p.status.success()); let mut lines = str::from_utf8(p.error.as_slice()).unwrap().lines(); assert!(lines.next().unwrap().contains("foo")); diff --git a/src/test/run-pass/process-detach.rs b/src/test/run-pass/process-detach.rs index 2a814956631d4..f41f2619032bc 100644 --- a/src/test/run-pass/process-detach.rs +++ b/src/test/run-pass/process-detach.rs @@ -54,7 +54,7 @@ fn main() { // Wait for the child process to die (terminate it's stdin and the read // should fail). drop(p.stdin.take()); - match p.wait() { + match p.wait().unwrap() { process::ExitStatus(..) => {} process::ExitSignal(..) => fail!() } diff --git a/src/test/run-pass/sigpipe-should-be-ignored.rs b/src/test/run-pass/sigpipe-should-be-ignored.rs index 34d1f5e66c678..2b42e3ada5428 100644 --- a/src/test/run-pass/sigpipe-should-be-ignored.rs +++ b/src/test/run-pass/sigpipe-should-be-ignored.rs @@ -31,5 +31,5 @@ fn main() { } let mut p = Process::new(args[0], ["test".to_owned()]).unwrap(); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); }