Skip to content

Commit

Permalink
feat(server): Add hooks for HttpListener and HttpsListener to be star…
Browse files Browse the repository at this point in the history
…ted from existing listeners.

This allows Servers to be started on existing TcpListeners.
  • Loading branch information
reem committed Oct 19, 2015
1 parent 292b4e6 commit fa0848d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
37 changes: 24 additions & 13 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
pub trait NetworkStream: Read + Write + Any + Send + Typeable {
/// Get the remote address of the underlying connection.
fn peer_addr(&mut self) -> io::Result<SocketAddr>;

/// Set the maximum time to wait for a read to complete.
#[cfg(feature = "timeouts")]
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;

/// Set the maximum time to wait for a write to complete.
#[cfg(feature = "timeouts")]
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;

/// This will be called when Stream should no longer be kept alive.
#[inline]
fn close(&mut self, _how: Shutdown) -> io::Result<()> {
Expand All @@ -66,9 +69,8 @@ pub trait NetworkStream: Read + Write + Any + Send + Typeable {
// Unsure about name and implementation...

#[doc(hidden)]
fn set_previous_response_expected_no_content(&mut self, _expected: bool) {

}
fn set_previous_response_expected_no_content(&mut self, _expected: bool) { }

#[doc(hidden)]
fn previous_response_expected_no_content(&self) -> bool {
false
Expand All @@ -79,6 +81,7 @@ pub trait NetworkStream: Read + Write + Any + Send + Typeable {
pub trait NetworkConnector {
/// Type of Stream to create
type Stream: Into<Box<NetworkStream + Send>>;

/// Connect to a remote address.
fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<Self::Stream>;
}
Expand Down Expand Up @@ -215,13 +218,17 @@ impl Clone for HttpListener {
}
}

impl HttpListener {
impl From<TcpListener> for HttpListener {
fn from(listener: TcpListener) -> HttpListener {
HttpListener(listener)
}
}

impl HttpListener {
/// Start listening to an address over HTTP.
pub fn new<To: ToSocketAddrs>(addr: To) -> ::Result<HttpListener> {
Ok(HttpListener(try!(TcpListener::bind(addr))))
}

}

impl NetworkListener for HttpListener {
Expand Down Expand Up @@ -382,17 +389,17 @@ impl NetworkConnector for HttpConnector {
/// A closure as a connector used to generate TcpStreams per request
///
/// # Example
///
///
/// Basic example:
///
///
/// ```norun
/// Client::with_connector(|addr: &str, port: u16, scheme: &str| {
/// TcpStream::connect(&(addr, port))
/// });
/// ```
///
///
/// Example using TcpBuilder from the net2 crate if you want to configure your source socket:
///
///
/// ```norun
/// Client::with_connector(|addr: &str, port: u16, scheme: &str| {
/// let b = try!(TcpBuilder::new_v4());
Expand Down Expand Up @@ -499,7 +506,6 @@ pub struct HttpsListener<S: Ssl> {
}

impl<S: Ssl> HttpsListener<S> {

/// Start listening to an address over HTTPS.
pub fn new<To: ToSocketAddrs>(addr: To, ssl: S) -> ::Result<HttpsListener<S>> {
HttpListener::new(addr).map(|l| HttpsListener {
Expand All @@ -508,6 +514,13 @@ impl<S: Ssl> HttpsListener<S> {
})
}

/// Construct an HttpsListener from a bound `TcpListener`.
pub fn with_listener(listener: HttpListener, ssl: S) -> HttpsListener<S> {
HttpsListener {
listener: listener,
ssl: ssl
}
}
}

impl<S: Ssl + Clone> NetworkListener for HttpsListener<S> {
Expand Down Expand Up @@ -576,7 +589,6 @@ mod openssl {
use openssl::x509::X509FileType;
use super::{NetworkStream, HttpStream};


/// An implementation of `Ssl` for OpenSSL.
///
/// # Example
Expand Down Expand Up @@ -678,7 +690,6 @@ mod tests {

let mock = stream.downcast::<MockStream>().ok().unwrap();
assert_eq!(mock, Box::new(MockStream::new()));

}

#[test]
Expand All @@ -688,6 +699,6 @@ mod tests {

let mock = unsafe { stream.downcast_unchecked::<MockStream>() };
assert_eq!(mock, Box::new(MockStream::new()));

}
}

2 changes: 1 addition & 1 deletion src/server/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
where A: NetworkListener + Send + 'static,
F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {

thread::spawn(move || {
let _sentinel = Sentinel::new(supervisor, ());

Expand Down Expand Up @@ -77,3 +76,4 @@ impl<T: Send + 'static> Drop for Sentinel<T> {
let _ = self.supervisor.send(self.value.take().unwrap());
}
}

9 changes: 2 additions & 7 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ impl<L: NetworkListener> Server<L> {
pub fn set_write_timeout(&mut self, dur: Option<Duration>) {
self.timeouts.write = dur;
}


}

impl Server<HttpListener> {
Expand All @@ -219,6 +217,7 @@ impl<L: NetworkListener + Send + 'static> Server<L> {
pub fn handle<H: Handler + 'static>(self, handler: H) -> ::Result<Listening> {
self.handle_threads(handler, num_cpus::get() * 5 / 4)
}

/// Binds to a socket and starts handling connections with the provided
/// number of threads.
pub fn handle_threads<H: Handler + 'static>(self, handler: H,
Expand All @@ -228,8 +227,7 @@ impl<L: NetworkListener + Send + 'static> Server<L> {
}

fn handle<H, L>(mut server: Server<L>, handler: H, threads: usize) -> ::Result<Listening>
where H: Handler + 'static,
L: NetworkListener + Send + 'static {
where H: Handler + 'static, L: NetworkListener + Send + 'static {
let socket = try!(server.listener.local_addr());

debug!("threads = {:?}", threads);
Expand All @@ -251,7 +249,6 @@ struct Worker<H: Handler + 'static> {
}

impl<H: Handler + 'static> Worker<H> {

fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
Worker {
handler: handler,
Expand Down Expand Up @@ -299,7 +296,6 @@ impl<H: Handler + 'static> Worker<H> {
self.set_write_timeout(s, self.timeouts.write)
}


#[cfg(not(feature = "timeouts"))]
fn set_write_timeout(&self, _s: &NetworkStream, _timeout: Option<Duration>) -> io::Result<()> {
Ok(())
Expand Down Expand Up @@ -339,7 +335,6 @@ impl<H: Handler + 'static> Worker<H> {
}
};


if !self.handle_expect(&req, wrt) {
return false;
}
Expand Down

0 comments on commit fa0848d

Please sign in to comment.