From 9f21ce1eed11cc85a75164475317e1124759098c Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Wed, 9 Aug 2023 16:11:05 +0200 Subject: [PATCH] Enable sys::unix::pipe when using pipe based Waker This also adds a new function sys::unix::pipe::new_raw that creates a pipe, returning the raw fds. This is used by the Waker implementation and of course sys::unix::pipe::new (which is exposed as mio::unix::pipe::new). --- src/sys/unix/mod.rs | 16 ++++- src/sys/unix/pipe.rs | 145 ++++++++++++++++++++++-------------------- src/sys/unix/waker.rs | 8 +-- 3 files changed, 94 insertions(+), 75 deletions(-) diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 53b579df3..eb268b9f4 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -94,9 +94,19 @@ cfg_os_poll! { pub(crate) use self::selector::IoSourceState; } - cfg_os_ext! { - pub(crate) mod pipe; - } + #[cfg(any( + // For the public `pipe` module, must match `cfg_os_ext` macro. + feature = "os-ext", + // For the `Waker` type based on a pipe. + mio_unsupported_force_waker_pipe, + target_os = "aix", + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "redox", + ))] + pub(crate) mod pipe; } cfg_not_os_poll! { diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs index 5cb8d58e4..8e92dd37d 100644 --- a/src/sys/unix/pipe.rs +++ b/src/sys/unix/pipe.rs @@ -2,9 +2,81 @@ //! //! See the [`new`] function for documentation. +use std::io; +use std::os::unix::io::RawFd; + +pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> { + let mut fds: [RawFd; 2] = [-1, -1]; + + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "illumos", + target_os = "redox", + ))] + unsafe { + if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { + return Err(io::Error::last_os_error()); + } + } + + #[cfg(any( + target_os = "aix", + target_os = "ios", + target_os = "macos", + target_os = "tvos", + target_os = "watchos", + target_os = "espidf", + ))] + unsafe { + // For platforms that don't have `pipe2(2)` we need to manually set the + // correct flags on the file descriptor. + if libc::pipe(fds.as_mut_ptr()) != 0 { + return Err(io::Error::last_os_error()); + } + + for fd in &fds { + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 + || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 + { + let err = io::Error::last_os_error(); + // Don't leak file descriptors. Can't handle closing error though. + let _ = libc::close(fds[0]); + let _ = libc::close(fds[1]); + return Err(err); + } + } + } + + #[cfg(not(any( + target_os = "aix", + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "illumos", + target_os = "ios", + target_os = "linux", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "redox", + target_os = "tvos", + target_os = "watchos", + target_os = "espidf", + )))] + compile_error!("unsupported target for `mio::unix::pipe`"); + + Ok(fds) +} + +cfg_os_ext! { use std::fs::File; -use std::io::{self, IoSlice, IoSliceMut, Read, Write}; -use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::io::{IoSlice, IoSliceMut, Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; use std::process::{ChildStderr, ChildStdin, ChildStdout}; use crate::io_source::IoSource; @@ -145,74 +217,10 @@ use crate::{event, Interest, Registry, Token}; /// # } /// ``` pub fn new() -> io::Result<(Sender, Receiver)> { - let mut fds: [RawFd; 2] = [-1, -1]; - - #[cfg(any( - target_os = "android", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "linux", - target_os = "netbsd", - target_os = "openbsd", - target_os = "illumos", - target_os = "redox", - ))] - unsafe { - if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { - return Err(io::Error::last_os_error()); - } - } - - #[cfg(any( - target_os = "aix", - target_os = "ios", - target_os = "macos", - target_os = "tvos", - target_os = "watchos", - target_os = "espidf", - ))] - unsafe { - // For platforms that don't have `pipe2(2)` we need to manually set the - // correct flags on the file descriptor. - if libc::pipe(fds.as_mut_ptr()) != 0 { - return Err(io::Error::last_os_error()); - } - - for fd in &fds { - if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 - || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 - { - let err = io::Error::last_os_error(); - // Don't leak file descriptors. Can't handle closing error though. - let _ = libc::close(fds[0]); - let _ = libc::close(fds[1]); - return Err(err); - } - } - } - - #[cfg(not(any( - target_os = "aix", - target_os = "android", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "illumos", - target_os = "ios", - target_os = "linux", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "redox", - target_os = "tvos", - target_os = "watchos", - target_os = "espidf", - )))] - compile_error!("unsupported target for `mio::unix::pipe`"); - - // SAFETY: we just initialised the `fds` above. + let fds = new_raw()?; + // SAFETY: `new_raw` initialised the `fds` above. let r = unsafe { Receiver::from_raw_fd(fds[0]) }; let w = unsafe { Sender::from_raw_fd(fds[1]) }; - Ok((w, r)) } @@ -579,3 +587,4 @@ fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { Ok(()) } +} // `cfg_os_ext!`. diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 37a5f1656..4b73df32d 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -207,7 +207,7 @@ pub use self::kqueue::Waker; target_os = "redox", ))] mod pipe { - use crate::unix::pipe::new as new_pipe; + use crate::sys::unix::pipe; use std::fs::File; use std::io::{self, Read, Write}; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; @@ -224,9 +224,9 @@ mod pipe { impl WakerInternal { pub fn new() -> io::Result { - let (sender, receiver) = new_pipe()?; - let sender = unsafe { File::from_raw_fd(sender.as_raw_fd()) }; - let receiver = unsafe { File::from_raw_fd(receiver.as_raw_fd()) }; + let [sender, receiver] = pipe::new_raw()?; + let sender = unsafe { File::from_raw_fd(sender) }; + let receiver = unsafe { File::from_raw_fd(receiver) }; Ok(WakerInternal { sender, receiver }) }