Skip to content

Commit

Permalink
auto merge of #13688 : alexcrichton/rust/accept-timeout, r=brson
Browse files Browse the repository at this point in the history
This adds experimental support for timeouts when accepting sockets through
`TcpAcceptor::accept`. This does not add a separate `accept_timeout` function,
but rather it adds a `set_timeout` function instead. This second function is
intended to be used as a hard deadline after which all accepts will never block
and fail immediately.

This idea was derived from Go's SetDeadline() methods. We do not currently have
a robust time abstraction in the standard library, so I opted to have the
argument be a relative time in millseconds into the future. I believe a more
appropriate argument type is an absolute time, but this concept does not exist
yet (this is also why the function is marked #[experimental]).

The native support is built on select(), similarly to connect_timeout(), and the
green support is based on channel select and a timer.

cc #13523
  • Loading branch information
bors committed Apr 24, 2014
2 parents d910330 + e5d3e51 commit 3d05e7f
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 41 deletions.
6 changes: 3 additions & 3 deletions src/libnative/io/c_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ extern "system" {
pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long,
argp: *mut libc::c_ulong) -> libc::c_int;
pub fn select(nfds: libc::c_int,
readfds: *mut fd_set,
writefds: *mut fd_set,
exceptfds: *mut fd_set,
readfds: *fd_set,
writefds: *fd_set,
exceptfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn getsockopt(sockfd: libc::SOCKET,
level: libc::c_int,
Expand Down
83 changes: 56 additions & 27 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::cast;
use std::io::net::ip;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
Expand Down Expand Up @@ -144,6 +145,21 @@ fn last_error() -> io::IoError {
super::last_error()
}

fn ms_to_timeval(ms: u64) -> libc::timeval {
libc::timeval {
tv_sec: (ms / 1000) as libc::time_t,
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
}
}

fn timeout(desc: &'static str) -> io::IoError {
io::IoError {
kind: io::TimedOut,
desc: desc,
detail: None,
}
}

#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }

Expand Down Expand Up @@ -271,8 +287,7 @@ impl TcpStream {
fn connect_timeout(fd: sock_t,
addrp: *libc::sockaddr,
len: libc::socklen_t,
timeout: u64) -> IoResult<()> {
use std::os;
timeout_ms: u64) -> IoResult<()> {
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
Expand All @@ -289,12 +304,8 @@ impl TcpStream {
os::errno() as int == WOULDBLOCK as int => {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match await(fd, &mut set, timeout) {
0 => Err(io::IoError {
kind: io::TimedOut,
desc: "connection timed out",
detail: None,
}),
match await(fd, &mut set, timeout_ms) {
0 => Err(timeout("connection timed out")),
-1 => Err(last_error()),
_ => {
let err: libc::c_int = try!(
Expand Down Expand Up @@ -338,22 +349,14 @@ impl TcpStream {
// Recalculate the timeout each iteration (it is generally
// undefined what the value of the 'tv' is after select
// returns EINTR).
let timeout = timeout - (::io::timer::now() - start);
let tv = libc::timeval {
tv_sec: (timeout / 1000) as libc::time_t,
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
};
c::select(fd + 1, ptr::null(), set as *mut _ as *_,
ptr::null(), &tv)
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
})
}
#[cfg(windows)]
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let tv = libc::timeval {
tv_sec: (timeout / 1000) as libc::time_t,
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
};
unsafe { c::select(1, ptr::mut_null(), set, ptr::mut_null(), &tv) }
let tv = ms_to_timeval(timeout);
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
}
}

Expand Down Expand Up @@ -467,7 +470,7 @@ impl Drop for Inner {
////////////////////////////////////////////////////////////////////////////////

pub struct TcpListener {
inner: UnsafeArc<Inner>,
inner: Inner,
}

impl TcpListener {
Expand All @@ -477,7 +480,7 @@ impl TcpListener {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let inner = Inner { fd: fd };
let ret = TcpListener { inner: UnsafeArc::new(inner) };
let ret = TcpListener { inner: inner };
// On platforms with Berkeley-derived sockets, this allows
// to quickly rebind a socket, without needing to wait for
// the OS to clean up the previous one.
Expand All @@ -498,15 +501,12 @@ impl TcpListener {
}
}

pub fn fd(&self) -> sock_t {
// This is just a read-only arc so the unsafety is fine
unsafe { (*self.inner.get()).fd }
}
pub fn fd(&self) -> sock_t { self.inner.fd }

pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(last_error()),
_ => Ok(TcpAcceptor { listener: self })
_ => Ok(TcpAcceptor { listener: self, deadline: 0 })
}
}
}
Expand All @@ -525,12 +525,16 @@ impl rtio::RtioSocket for TcpListener {

pub struct TcpAcceptor {
listener: TcpListener,
deadline: u64,
}

impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.listener.fd() }

pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
try!(self.accept_deadline());
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
let storagep = &mut storage as *mut libc::sockaddr_storage;
Expand All @@ -546,6 +550,25 @@ impl TcpAcceptor {
}
}
}

fn accept_deadline(&mut self) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, self.fd());

match retry(|| {
// If we're past the deadline, then pass a 0 timeout to select() so
// we can poll the status of the socket.
let now = ::io::timer::now();
let ms = if self.deadline > now {0} else {self.deadline - now};
let tv = ms_to_timeval(ms);
let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
}) {
-1 => Err(last_error()),
0 => Err(timeout("accept timed out")),
_ => return Ok(()),
}
}
}

impl rtio::RtioSocket for TcpAcceptor {
Expand All @@ -561,6 +584,12 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {

fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = match timeout {
None => 0,
Some(t) => ::io::timer::now() + t,
};
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
11 changes: 11 additions & 0 deletions src/libnative/io/timer_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
}
}

// returns the current time (in milliseconds)
pub fn now() -> u64 {
let mut ticks_per_s = 0;
assert_eq!(unsafe { libc::QueryPerformanceFrequency(&mut ticks_per_s) }, 1);
let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s};
let mut ticks = 0;
assert_eq!(unsafe { libc::QueryPerformanceCounter(&mut ticks) }, 1);

return (ticks as u64 * 1000) / (ticks_per_s as u64);
}

impl Timer {
pub fn new() -> IoResult<Timer> {
timer_helper::boot(helper);
Expand Down
88 changes: 86 additions & 2 deletions src/librustuv/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ pub struct TcpListener {

pub struct TcpAcceptor {
listener: ~TcpListener,
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}

// TCP watchers (clients/streams)
Expand Down Expand Up @@ -459,7 +462,12 @@ impl rtio::RtioSocket for TcpListener {
impl rtio::RtioTcpListener for TcpListener {
fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
// create the acceptor object from ourselves
let mut acceptor = ~TcpAcceptor { listener: self };
let mut acceptor = ~TcpAcceptor {
listener: self,
timer: None,
timeout_tx: None,
timeout_rx: None,
};

let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
Expand Down Expand Up @@ -509,7 +517,37 @@ impl rtio::RtioSocket for TcpAcceptor {

impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
self.listener.incoming.recv()
match self.timeout_rx {
None => self.listener.incoming.recv(),
Some(ref rx) => {
use std::comm::Select;

// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match self.listener.incoming.try_recv() {
Ok(data) => return data,
Err(..) => {}
}

// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(&self.listener.incoming);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
self.listener.incoming.recv()
}
}
}
}

fn accept_simultaneously(&mut self) -> Result<(), IoError> {
Expand All @@ -525,6 +563,52 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
})
}

fn set_timeout(&mut self, ms: Option<u64>) {
// First, if the timeout is none, clear any previous timeout by dropping
// the timer and transmission channels
let ms = match ms {
None => {
return drop((self.timer.take(),
self.timeout_tx.take(),
self.timeout_rx.take()))
}
Some(ms) => ms,
};

// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let _m = self.fire_homing_missile();
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(self.listener.handle)
});
let mut timer = TimerWatcher::new_home(&loop_, self.home().clone());
unsafe {
timer.set_data(self as *mut _ as *TcpAcceptor);
}
self.timer = Some(timer);
}

// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);

extern fn timer_cb(timer: *uvll::uv_timer_t, status: c_int) {
assert_eq!(status, 0);
let acceptor: &mut TcpAcceptor = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
19 changes: 11 additions & 8 deletions src/librustuv/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::rt::rtio::RtioTimer;
use std::rt::task::BlockedTask;

use homing::{HomeHandle, HomingIO};
use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after};
use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after, Loop};
use uvio::UvIoFactory;
use uvll;

Expand All @@ -34,18 +34,21 @@ pub enum NextAction {

impl TimerWatcher {
pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher {
let handle = io.make_handle();
let me = ~TimerWatcher::new_home(&io.loop_, handle);
me.install()
}

pub fn new_home(loop_: &Loop, home: HomeHandle) -> TimerWatcher {
let handle = UvHandle::alloc(None::<TimerWatcher>, uvll::UV_TIMER);
assert_eq!(unsafe {
uvll::uv_timer_init(io.uv_loop(), handle)
}, 0);
let me = ~TimerWatcher {
assert_eq!(unsafe { uvll::uv_timer_init(loop_.handle, handle) }, 0);
TimerWatcher {
handle: handle,
action: None,
blocker: None,
home: io.make_handle(),
home: home,
id: 0,
};
return me.install();
}
}

pub fn start(&mut self, f: uvll::uv_timer_cb, msecs: u64, period: u64) {
Expand Down
Loading

0 comments on commit 3d05e7f

Please sign in to comment.