From 3528fb9b015a0959268452d5b42d5544c7b98a6a Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Fri, 13 Feb 2015 23:08:13 -0800 Subject: [PATCH] feat(server): Rewrite the accept loop into a custom thread pool. This is a modified and specialized thread pool meant for managing an acceptor in a multi-threaded way. A single handler is provided which will be invoked on each stream. Unlike the old thread pool, this returns a join guard which will block until the acceptor closes, enabling friendly behavior for the listening guard. The task pool itself is also faster as it only pays for message passing if sub-threads panic. In the optimistic case where there are few panics, this saves using channels for any other communication. This improves performance by around 15%, all the way to 105k req/sec on my machine, which usually gets about 90k. BREAKING_CHANGE: server::Listening::await is removed. --- examples/hello.rs | 3 +- examples/server.rs | 3 +- src/lib.rs | 10 +++- src/server/acceptor.rs | 95 ++++++++++++++++++++++++++++++ src/server/mod.rs | 127 +++++++++++++++++------------------------ 5 files changed, 158 insertions(+), 80 deletions(-) create mode 100644 src/server/acceptor.rs diff --git a/examples/hello.rs b/examples/hello.rs index b9ba116a14..8ed34e2bdd 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -13,6 +13,7 @@ fn hello(_: Request, res: Response) { } fn main() { - hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000).listen(hello).unwrap(); + let _listening = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000) + .listen(hello).unwrap(); println!("Listening on http://127.0.0.1:3000"); } diff --git a/examples/server.rs b/examples/server.rs index ca270dfc76..17c580bc84 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -51,7 +51,6 @@ fn echo(mut req: Request, mut res: Response) { fn main() { let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337); - let mut listening = server.listen(echo).unwrap(); + let _guard = server.listen(echo).unwrap(); println!("Listening on http://127.0.0.1:1337"); - listening.await(); } diff --git a/src/lib.rs b/src/lib.rs index 592c8d6ef0..4af33e7d07 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![feature(core, collections, hash, io, os, path, std_misc, - slicing_syntax, box_syntax)] + slicing_syntax, box_syntax, unsafe_destructor)] #![deny(missing_docs)] #![cfg_attr(test, deny(warnings))] #![cfg_attr(test, feature(alloc, test))] @@ -130,12 +130,16 @@ extern crate "rustc-serialize" as serialize; extern crate time; extern crate url; extern crate openssl; -#[macro_use] extern crate log; -#[cfg(test)] extern crate test; extern crate "unsafe-any" as uany; extern crate cookie; extern crate unicase; +#[macro_use] +extern crate log; + +#[cfg(test)] +extern crate test; + pub use std::old_io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr, Port}; pub use mimewrapper::mime; pub use url::Url; diff --git a/src/server/acceptor.rs b/src/server/acceptor.rs new file mode 100644 index 0000000000..3c0e0b5e15 --- /dev/null +++ b/src/server/acceptor.rs @@ -0,0 +1,95 @@ +use std::thread::{Thread, JoinGuard}; +use std::sync::Arc; +use std::sync::mpsc; +use net::NetworkAcceptor; + +pub struct AcceptorPool { + acceptor: A +} + +impl AcceptorPool { + /// Create a thread pool to manage the acceptor. + pub fn new(acceptor: A) -> AcceptorPool { + AcceptorPool { acceptor: acceptor } + } + + /// Runs the acceptor pool. Blocks until the acceptors are closed. + /// + /// ## Panics + /// + /// Panics if threads == 0. + pub fn accept(self, + work: F, + threads: usize) -> JoinGuard<'static, ()> { + assert!(threads != 0, "Can't accept on 0 threads."); + + // Replace with &F when Send changes land. + let work = Arc::new(work); + + let (super_tx, supervisor_rx) = mpsc::channel(); + + let spawn = + move || spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone()); + + // Go + for _ in 0..threads { spawn() } + + // Spawn the supervisor + Thread::scoped(move || for () in supervisor_rx.iter() { spawn() }) + } +} + +fn spawn_with(supervisor: mpsc::Sender<()>, work: Arc, mut acceptor: A) +where A: NetworkAcceptor, + F: Fn(::Stream) + Send + Sync { + use std::old_io::EndOfFile; + + Thread::spawn(move || { + let sentinel = Sentinel::new(supervisor, ()); + + loop { + match acceptor.accept() { + Ok(stream) => work(stream), + Err(ref e) if e.kind == EndOfFile => { + debug!("Server closed."); + sentinel.cancel(); + return; + }, + + Err(e) => { + error!("Connection failed: {}", e); + } + } + } + }); +} + +struct Sentinel { + value: Option, + supervisor: mpsc::Sender, + active: bool +} + +impl Sentinel { + fn new(channel: mpsc::Sender, data: T) -> Sentinel { + Sentinel { + value: Some(data), + supervisor: channel, + active: true + } + } + + fn cancel(mut self) { self.active = false; } +} + +#[unsafe_destructor] +impl Drop for Sentinel { + fn drop(&mut self) { + // If we were cancelled, get out of here. + if !self.active { return; } + + // Respawn ourselves + let _ = self.supervisor.send(self.value.take().unwrap()); + } +} + diff --git a/src/server/mod.rs b/src/server/mod.rs index 43061e7676..01fa9184bd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,10 +1,8 @@ //! HTTP Server -use std::old_io::{Listener, EndOfFile, BufferedReader, BufferedWriter}; +use std::old_io::{Listener, BufferedReader, BufferedWriter}; use std::old_io::net::ip::{IpAddr, Port, SocketAddr}; use std::os; -use std::sync::{Arc, TaskPool}; -use std::thread::{Builder, JoinGuard}; - +use std::thread::JoinGuard; pub use self::request::Request; pub use self::response::Response; @@ -19,9 +17,13 @@ use net::{NetworkListener, NetworkStream, NetworkAcceptor, HttpAcceptor, HttpListener}; use version::HttpVersion::{Http10, Http11}; +use self::acceptor::AcceptorPool; + pub mod request; pub mod response; +mod acceptor; + /// A server can listen on a TCP socket. /// /// Once listening, it will create a `Request`/`Response` pair for each @@ -71,71 +73,14 @@ S: NetworkStream + Clone + Send> Server { let acceptor = try!(self.listener.listen((self.ip, self.port))); let socket = try!(acceptor.socket_name()); - let mut captured = acceptor.clone(); - let guard = Builder::new().name("hyper acceptor".to_string()).scoped(move || { - let handler = Arc::new(handler); - debug!("threads = {:?}", threads); - let pool = TaskPool::new(threads); - for conn in captured.incoming() { - match conn { - Ok(mut stream) => { - debug!("Incoming stream"); - let handler = handler.clone(); - pool.execute(move || { - let addr = match stream.peer_name() { - Ok(addr) => addr, - Err(e) => { - error!("Peer Name error: {:?}", e); - return; - } - }; - let mut rdr = BufferedReader::new(stream.clone()); - let mut wrt = BufferedWriter::new(stream); - - let mut keep_alive = true; - while keep_alive { - let mut res = Response::new(&mut wrt); - let req = match Request::new(&mut rdr, addr) { - Ok(req) => req, - Err(e@HttpIoError(_)) => { - debug!("ioerror in keepalive loop = {:?}", e); - return; - } - Err(e) => { - //TODO: send a 400 response - error!("request error = {:?}", e); - return; - } - }; - - keep_alive = match (req.version, req.headers.get::()) { - (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false, - (Http11, Some(conn)) if conn.contains(&Close) => false, - _ => true - }; - res.version = req.version; - handler.handle(req, res); - debug!("keep_alive = {:?}", keep_alive); - } - - }); - }, - Err(ref e) if e.kind == EndOfFile => { - debug!("server closed"); - break; - }, - Err(e) => { - error!("Connection failed: {}", e); - continue; - } - } - } - }); + debug!("threads = {:?}", threads); + let pool = AcceptorPool::new(acceptor.clone()); + let work = move |stream| handle_connection(stream, &handler); Ok(Listening { - acceptor: acceptor, - guard: Some(guard), + _guard: pool.accept(work, threads), socket: socket, + acceptor: acceptor }) } @@ -146,22 +91,56 @@ S: NetworkStream + Clone + Send> Server { } +fn handle_connection(mut stream: S, handler: &H) +where S: NetworkStream + Clone, H: Handler { + debug!("Incoming stream"); + let addr = match stream.peer_name() { + Ok(addr) => addr, + Err(e) => { + error!("Peer Name error: {:?}", e); + return; + } + }; + + let mut rdr = BufferedReader::new(stream.clone()); + let mut wrt = BufferedWriter::new(stream); + + let mut keep_alive = true; + while keep_alive { + let mut res = Response::new(&mut wrt); + let req = match Request::new(&mut rdr, addr) { + Ok(req) => req, + Err(e@HttpIoError(_)) => { + debug!("ioerror in keepalive loop = {:?}", e); + return; + } + Err(e) => { + //TODO: send a 400 response + error!("request error = {:?}", e); + return; + } + }; + + keep_alive = match (req.version, req.headers.get::()) { + (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false, + (Http11, Some(conn)) if conn.contains(&Close) => false, + _ => true + }; + res.version = req.version; + handler.handle(req, res); + debug!("keep_alive = {:?}", keep_alive); + } +} + /// A listening server, which can later be closed. pub struct Listening { acceptor: A, - guard: Option>, + _guard: JoinGuard<'static, ()>, /// The socket addresses that the server is bound to. pub socket: SocketAddr, } impl Listening { - /// Causes the current thread to wait for this listening to complete. - pub fn await(&mut self) { - if let Some(guard) = self.guard.take() { - let _ = guard.join(); - } - } - /// Stop the server from listening to its socket address. pub fn close(&mut self) -> HttpResult<()> { debug!("closing server");