Skip to content

Commit

Permalink
std: Add timeouts to unix connect/accept
Browse files Browse the repository at this point in the history
This adds support for connecting to a unix socket with a timeout (a named pipe
on windows), and accepting a connection with a timeout. The goal is to bring
unix pipes/named sockets back in line with TCP support for timeouts.

Similarly to the TCP sockets, all methods are marked #[experimental] due to
uncertainty about the type of the timeout argument.

This internally involved a good bit of refactoring to share as much code as
possible between TCP servers and pipe servers, but the core implementation did
not change drastically as part of this commit.

cc rust-lang#13523
  • Loading branch information
alexcrichton committed Apr 23, 2014
1 parent 6672810 commit f93cdba
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 348 deletions.
2 changes: 1 addition & 1 deletion src/liblibc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED};
#[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
#[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
#[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
#[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};
Expand Down
2 changes: 2 additions & 0 deletions src/libnative/io/c_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ extern "system" {
optname: libc::c_int,
optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;

pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
}
6 changes: 4 additions & 2 deletions src/libnative/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub use self::process::Process;
pub mod addrinfo;
pub mod net;
pub mod process;
mod util;

#[cfg(unix)]
#[path = "file_unix.rs"]
Expand Down Expand Up @@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory {
fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> {
pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send)
}
fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> {
pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send)
fn unix_connect(&mut self, path: &CString,
timeout: Option<u64>) -> IoResult<~RtioPipe:Send> {
pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {
Expand Down
128 changes: 6 additions & 122 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ use std::cast;
use std::io::net::ip;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;

use super::{IoResult, retry, keep_going};
use super::c;
use super::util;

////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
Expand Down Expand Up @@ -118,8 +117,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
}
}

fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
unsafe {
let mut slot: T = mem::init();
let mut len = mem::size_of::<T>() as libc::socklen_t;
Expand All @@ -145,21 +144,6 @@ fn last_error() -> io::IoError {
super::last_error()
}

fn ms_to_timeval(ms: u64) -> libc::timeval {
libc::timeval {
tv_sec: (ms / 1000) as libc::time_t,
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
}
}

fn timeout(desc: &'static str) -> io::IoError {
io::IoError {
kind: io::TimedOut,
desc: desc,
detail: None,
}
}

#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }

Expand Down Expand Up @@ -270,7 +254,7 @@ impl TcpStream {
let addrp = &addr as *_ as *libc::sockaddr;
match timeout {
Some(timeout) => {
try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
try!(util::connect_timeout(fd, addrp, len, timeout));
Ok(ret)
},
None => {
Expand All @@ -282,84 +266,6 @@ impl TcpStream {
}
}

// See http://developerweb.net/viewtopic.php?id=3196 for where this is
// derived from.
fn connect_timeout(fd: sock_t,
addrp: *libc::sockaddr,
len: libc::socklen_t,
timeout_ms: u64) -> IoResult<()> {
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
#[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;

// Make sure the call to connect() doesn't block
try!(set_nonblocking(fd, true));

let ret = match unsafe { libc::connect(fd, addrp, len) } {
// If the connection is in progress, then we need to wait for it to
// finish (with a timeout). The current strategy for doing this is
// to use select() with a timeout.
-1 if os::errno() as int == INPROGRESS as int ||
os::errno() as int == WOULDBLOCK as int => {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match await(fd, &mut set, timeout_ms) {
0 => Err(timeout("connection timed out")),
-1 => Err(last_error()),
_ => {
let err: libc::c_int = try!(
getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
if err == 0 {
Ok(())
} else {
Err(io::IoError::from_errno(err as uint, true))
}
}
}
}

-1 => Err(last_error()),
_ => Ok(()),
};

// be sure to turn blocking I/O back on
try!(set_nonblocking(fd, false));
return ret;

#[cfg(unix)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
#[cfg(windows)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let mut set = nb as libc::c_ulong;
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
Err(last_error())
} else {
Ok(())
}
}

#[cfg(unix)]
fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let start = ::io::timer::now();
retry(|| unsafe {
// Recalculate the timeout each iteration (it is generally
// undefined what the value of the 'tv' is after select
// returns EINTR).
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
})
}
#[cfg(windows)]
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let tv = ms_to_timeval(timeout);
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
}
}

pub fn fd(&self) -> sock_t {
// This unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
Expand Down Expand Up @@ -533,7 +439,7 @@ impl TcpAcceptor {

pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
try!(self.accept_deadline());
try!(util::accept_deadline(self.fd(), self.deadline));
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
Expand All @@ -550,25 +456,6 @@ impl TcpAcceptor {
}
}
}

fn accept_deadline(&mut self) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, self.fd());

match retry(|| {
// If we're past the deadline, then pass a 0 timeout to select() so
// we can poll the status of the socket.
let now = ::io::timer::now();
let ms = if self.deadline > now {0} else {self.deadline - now};
let tv = ms_to_timeval(ms);
let n = if cfg!(windows) {1} else {self.fd() + 1};
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
}) {
-1 => Err(last_error()),
0 => Err(timeout("accept timed out")),
_ => return Ok(()),
}
}
}

impl rtio::RtioSocket for TcpAcceptor {
Expand All @@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = match timeout {
None => 0,
Some(t) => ::io::timer::now() + t,
};
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}

Expand Down
61 changes: 31 additions & 30 deletions src/libnative/io/pipe_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use libc;
use std::c_str::CString;
use std::cast;
use std::intrinsics;
use std::io;
use libc;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::intrinsics;

use super::{IoResult, retry, keep_going};
use super::util;
use super::file::fd_t;

fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
Expand Down Expand Up @@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint
return Ok((storage, len));
}

fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
len: uint) -> IoResult<CString> {
match storage.ss_family as libc::c_int {
libc::AF_UNIX => {
assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
let storage: &libc::sockaddr_un = unsafe {
cast::transmute(storage)
};
unsafe {
Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
}
}
_ => Err(io::standard_error(io::InvalidInput))
}
}

struct Inner {
fd: fd_t,
}
Expand All @@ -76,16 +61,24 @@ impl Drop for Inner {
fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
}

fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
fn connect(addr: &CString, ty: libc::c_int,
timeout: Option<u64>) -> IoResult<Inner> {
let (addr, len) = try!(addr_to_sockaddr_un(addr));
let inner = Inner { fd: try!(unix_socket(ty)) };
let addrp = &addr as *libc::sockaddr_storage;
match retry(|| unsafe {
libc::connect(inner.fd, addrp as *libc::sockaddr,
len as libc::socklen_t)
}) {
-1 => Err(super::last_error()),
_ => Ok(inner)
let addrp = &addr as *_ as *libc::sockaddr;
let len = len as libc::socklen_t;

match timeout {
None => {
match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
-1 => Err(super::last_error()),
_ => Ok(inner)
}
}
Some(timeout_ms) => {
try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
Ok(inner)
}
}
}

Expand All @@ -110,8 +103,9 @@ pub struct UnixStream {
}

impl UnixStream {
pub fn connect(addr: &CString) -> IoResult<UnixStream> {
connect(addr, libc::SOCK_STREAM).map(|inner| {
pub fn connect(addr: &CString,
timeout: Option<u64>) -> IoResult<UnixStream> {
connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
UnixStream { inner: UnsafeArc::new(inner) }
})
}
Expand Down Expand Up @@ -176,25 +170,29 @@ impl UnixListener {
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(super::last_error()),
_ => Ok(UnixAcceptor { listener: self })
_ => Ok(UnixAcceptor { listener: self, deadline: 0 })
}
}
}

impl rtio::RtioUnixListener for UnixListener {
fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor:Send> {
self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor:Send)
self.native_listen(1).map(|a| ~a as ~rtio::RtioUnixAcceptor:Send)
}
}

pub struct UnixAcceptor {
listener: UnixListener,
deadline: u64,
}

impl UnixAcceptor {
fn fd(&self) -> fd_t { self.listener.fd() }

pub fn native_accept(&mut self) -> IoResult<UnixStream> {
if self.deadline != 0 {
try!(util::accept_deadline(self.fd(), self.deadline));
}
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
Expand All @@ -214,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}

impl Drop for UnixListener {
Expand Down
Loading

0 comments on commit f93cdba

Please sign in to comment.