Skip to content

Commit

Permalink
feat(server): Rewrite the accept loop into a custom thread pool.
Browse files Browse the repository at this point in the history
This is a modified and specialized thread pool meant for
managing an acceptor in a multi-threaded way. A single handler
is provided which will be invoked on each stream.

Unlike the old thread pool, this returns a join guard which
will block until the acceptor closes, enabling friendly behavior
for the listening guard.

The task pool itself is also faster as it only pays for message passing
if sub-threads panic. In the optimistic case where there are few panics,
this saves using channels for any other communication.

This improves performance by around 15%, all the way to 105k req/sec
on my machine, which usually gets about 90k.

BREAKING_CHANGE: server::Listening::await is removed.
  • Loading branch information
reem committed Feb 14, 2015
1 parent f554c09 commit 3528fb9
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 80 deletions.
3 changes: 2 additions & 1 deletion examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fn hello(_: Request, res: Response) {
}

fn main() {
hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000).listen(hello).unwrap();
let _listening = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000)
.listen(hello).unwrap();
println!("Listening on http://127.0.0.1:3000");
}
3 changes: 1 addition & 2 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ fn echo(mut req: Request, mut res: Response) {

fn main() {
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337);
let mut listening = server.listen(echo).unwrap();
let _guard = server.listen(echo).unwrap();
println!("Listening on http://127.0.0.1:1337");
listening.await();
}
10 changes: 7 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![feature(core, collections, hash, io, os, path, std_misc,
slicing_syntax, box_syntax)]
slicing_syntax, box_syntax, unsafe_destructor)]
#![deny(missing_docs)]
#![cfg_attr(test, deny(warnings))]
#![cfg_attr(test, feature(alloc, test))]
Expand Down Expand Up @@ -130,12 +130,16 @@ extern crate "rustc-serialize" as serialize;
extern crate time;
extern crate url;
extern crate openssl;
#[macro_use] extern crate log;
#[cfg(test)] extern crate test;
extern crate "unsafe-any" as uany;
extern crate cookie;
extern crate unicase;

#[macro_use]
extern crate log;

#[cfg(test)]
extern crate test;

pub use std::old_io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr, Port};
pub use mimewrapper::mime;
pub use url::Url;
Expand Down
95 changes: 95 additions & 0 deletions src/server/acceptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::thread::{Thread, JoinGuard};
use std::sync::Arc;
use std::sync::mpsc;
use net::NetworkAcceptor;

pub struct AcceptorPool<A: NetworkAcceptor> {
acceptor: A
}

impl<A: NetworkAcceptor> AcceptorPool<A> {
/// Create a thread pool to manage the acceptor.
pub fn new(acceptor: A) -> AcceptorPool<A> {
AcceptorPool { acceptor: acceptor }
}

/// Runs the acceptor pool. Blocks until the acceptors are closed.
///
/// ## Panics
///
/// Panics if threads == 0.
pub fn accept<F: Fn(A::Stream) + Send + Sync>(self,
work: F,
threads: usize) -> JoinGuard<'static, ()> {
assert!(threads != 0, "Can't accept on 0 threads.");

// Replace with &F when Send changes land.
let work = Arc::new(work);

let (super_tx, supervisor_rx) = mpsc::channel();

let spawn =
move || spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());

// Go
for _ in 0..threads { spawn() }

// Spawn the supervisor
Thread::scoped(move || for () in supervisor_rx.iter() { spawn() })
}
}

fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
where A: NetworkAcceptor,
F: Fn(<A as NetworkAcceptor>::Stream) + Send + Sync {
use std::old_io::EndOfFile;

Thread::spawn(move || {
let sentinel = Sentinel::new(supervisor, ());

loop {
match acceptor.accept() {
Ok(stream) => work(stream),
Err(ref e) if e.kind == EndOfFile => {
debug!("Server closed.");
sentinel.cancel();
return;
},

Err(e) => {
error!("Connection failed: {}", e);
}
}
}
});
}

struct Sentinel<T: Send> {
value: Option<T>,
supervisor: mpsc::Sender<T>,
active: bool
}

impl<T: Send> Sentinel<T> {
fn new(channel: mpsc::Sender<T>, data: T) -> Sentinel<T> {
Sentinel {
value: Some(data),
supervisor: channel,
active: true
}
}

fn cancel(mut self) { self.active = false; }
}

#[unsafe_destructor]
impl<T: Send> Drop for Sentinel<T> {
fn drop(&mut self) {
// If we were cancelled, get out of here.
if !self.active { return; }

// Respawn ourselves
let _ = self.supervisor.send(self.value.take().unwrap());
}
}

127 changes: 53 additions & 74 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
//! HTTP Server
use std::old_io::{Listener, EndOfFile, BufferedReader, BufferedWriter};
use std::old_io::{Listener, BufferedReader, BufferedWriter};
use std::old_io::net::ip::{IpAddr, Port, SocketAddr};
use std::os;
use std::sync::{Arc, TaskPool};
use std::thread::{Builder, JoinGuard};

use std::thread::JoinGuard;

pub use self::request::Request;
pub use self::response::Response;
Expand All @@ -19,9 +17,13 @@ use net::{NetworkListener, NetworkStream, NetworkAcceptor,
HttpAcceptor, HttpListener};
use version::HttpVersion::{Http10, Http11};

use self::acceptor::AcceptorPool;

pub mod request;
pub mod response;

mod acceptor;

/// A server can listen on a TCP socket.
///
/// Once listening, it will create a `Request`/`Response` pair for each
Expand Down Expand Up @@ -71,71 +73,14 @@ S: NetworkStream + Clone + Send> Server<L> {
let acceptor = try!(self.listener.listen((self.ip, self.port)));
let socket = try!(acceptor.socket_name());

let mut captured = acceptor.clone();
let guard = Builder::new().name("hyper acceptor".to_string()).scoped(move || {
let handler = Arc::new(handler);
debug!("threads = {:?}", threads);
let pool = TaskPool::new(threads);
for conn in captured.incoming() {
match conn {
Ok(mut stream) => {
debug!("Incoming stream");
let handler = handler.clone();
pool.execute(move || {
let addr = match stream.peer_name() {
Ok(addr) => addr,
Err(e) => {
error!("Peer Name error: {:?}", e);
return;
}
};
let mut rdr = BufferedReader::new(stream.clone());
let mut wrt = BufferedWriter::new(stream);

let mut keep_alive = true;
while keep_alive {
let mut res = Response::new(&mut wrt);
let req = match Request::new(&mut rdr, addr) {
Ok(req) => req,
Err(e@HttpIoError(_)) => {
debug!("ioerror in keepalive loop = {:?}", e);
return;
}
Err(e) => {
//TODO: send a 400 response
error!("request error = {:?}", e);
return;
}
};

keep_alive = match (req.version, req.headers.get::<Connection>()) {
(Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
(Http11, Some(conn)) if conn.contains(&Close) => false,
_ => true
};
res.version = req.version;
handler.handle(req, res);
debug!("keep_alive = {:?}", keep_alive);
}

});
},
Err(ref e) if e.kind == EndOfFile => {
debug!("server closed");
break;
},
Err(e) => {
error!("Connection failed: {}", e);
continue;
}
}
}
});
debug!("threads = {:?}", threads);
let pool = AcceptorPool::new(acceptor.clone());
let work = move |stream| handle_connection(stream, &handler);

Ok(Listening {
acceptor: acceptor,
guard: Some(guard),
_guard: pool.accept(work, threads),
socket: socket,
acceptor: acceptor
})
}

Expand All @@ -146,22 +91,56 @@ S: NetworkStream + Clone + Send> Server<L> {

}

fn handle_connection<S, H>(mut stream: S, handler: &H)
where S: NetworkStream + Clone, H: Handler {
debug!("Incoming stream");
let addr = match stream.peer_name() {
Ok(addr) => addr,
Err(e) => {
error!("Peer Name error: {:?}", e);
return;
}
};

let mut rdr = BufferedReader::new(stream.clone());
let mut wrt = BufferedWriter::new(stream);

let mut keep_alive = true;
while keep_alive {
let mut res = Response::new(&mut wrt);
let req = match Request::new(&mut rdr, addr) {
Ok(req) => req,
Err(e@HttpIoError(_)) => {
debug!("ioerror in keepalive loop = {:?}", e);
return;
}
Err(e) => {
//TODO: send a 400 response
error!("request error = {:?}", e);
return;
}
};

keep_alive = match (req.version, req.headers.get::<Connection>()) {
(Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
(Http11, Some(conn)) if conn.contains(&Close) => false,
_ => true
};
res.version = req.version;
handler.handle(req, res);
debug!("keep_alive = {:?}", keep_alive);
}
}

/// A listening server, which can later be closed.
pub struct Listening<A = HttpAcceptor> {
acceptor: A,
guard: Option<JoinGuard<'static, ()>>,
_guard: JoinGuard<'static, ()>,
/// The socket addresses that the server is bound to.
pub socket: SocketAddr,
}

impl<A: NetworkAcceptor> Listening<A> {
/// Causes the current thread to wait for this listening to complete.
pub fn await(&mut self) {
if let Some(guard) = self.guard.take() {
let _ = guard.join();
}
}

/// Stop the server from listening to its socket address.
pub fn close(&mut self) -> HttpResult<()> {
debug!("closing server");
Expand Down

0 comments on commit 3528fb9

Please sign in to comment.