From 19a573ac9a4dd28587af7989f82e8db9490df501 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 1 Nov 2018 13:21:09 -0700 Subject: [PATCH] feat(server): change `NewService` to `MakeService` with connection context This adjusts the way `Service`s are created for a `hyper::Server`. The `MakeService` trait allows receiving an argument when creating a `Service`. The implementation for `hyper::Server` expects to pass a reference to the accepted transport (so, `&Incoming::Item`). The user can inspect the transport before making a `Service`. In practice, this allows for things like getting the remote socket address, or the TLS certification, or similar. To prevent a breaking change, there is a blanket implementation of `MakeService` for any `NewService`. Besides implementing `MakeService` directly, there is also added `hyper::service::make_service_fn`. Closes #1650 --- examples/hello.rs | 24 ++++------ src/server/conn.rs | 95 +++++++++++++++++++++++++++--------- src/server/mod.rs | 18 +++---- src/server/shutdown.rs | 6 +-- src/server/tcp.rs | 3 +- src/service/make_service.rs | 96 +++++++++++++++++++++++++++++++++++++ src/service/mod.rs | 16 ++++--- src/service/new_service.rs | 19 +++++++- 8 files changed, 219 insertions(+), 58 deletions(-) create mode 100644 src/service/make_service.rs diff --git a/examples/hello.rs b/examples/hello.rs index 3a1d865a35..60d5e6a3a8 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -2,29 +2,23 @@ extern crate hyper; extern crate pretty_env_logger; -use hyper::{Body, Response, Server}; +use hyper::{Body, Request, Response, Server}; use hyper::service::service_fn_ok; use hyper::rt::{self, Future}; -static PHRASE: &'static [u8] = b"Hello World!"; - fn main() { pretty_env_logger::init(); let addr = ([127, 0, 0, 1], 3000).into(); - // new_service is run for each connection, creating a 'service' - // to handle requests for that specific connection. - let new_service = || { - // This is the `Service` that will handle the connection. - // `service_fn_ok` is a helper to convert a function that - // returns a Response into a `Service`. - service_fn_ok(|_| { - Response::new(Body::from(PHRASE)) - }) - }; - let server = Server::bind(&addr) - .serve(new_service) + .serve(|| { + // This is the `Service` that will handle the connection. + // `service_fn_ok` is a helper to convert a function that + // returns a Response into a `Service`. + service_fn_ok(move |_: Request| { + Response::new(Body::from("Hello World!")) + }) + }) .map_err(|e| eprintln!("server error: {}", e)); println!("Listening on http://{}", addr); diff --git a/src/server/conn.rs b/src/server/conn.rs index e4aa424ffc..2902e7cd02 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -25,15 +25,16 @@ use common::exec::{Exec, H2Exec, NewSvcExec}; use common::io::Rewind; use error::{Kind, Parse}; use proto; -use service::{NewService, Service}; +use service::Service; use upgrade::Upgraded; +pub(super) use self::make_service::MakeServiceRef; pub(super) use self::spawn_all::NoopWatcher; use self::spawn_all::NewSvcTask; pub(super) use self::spawn_all::Watcher; pub(super) use self::upgrades::UpgradeableConnection; -#[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming; +#[cfg(feature = "runtime")] pub use super::tcp::{AddrIncoming, AddrStream}; /// A lower-level configuration of the HTTP protocol. /// @@ -69,13 +70,13 @@ enum ConnectionMode { #[derive(Debug)] pub struct Serve { incoming: I, - new_service: S, + make_service: S, protocol: Http, } /// A future building a new `Service` to a `Connection`. /// -/// Wraps the future returned from `NewService` into one that returns +/// Wraps the future returned from `MakeService` into one that returns /// a `Connection`. #[must_use = "futures do nothing unless polled"] #[derive(Debug)] @@ -349,12 +350,16 @@ impl Http { /// /// This method will bind the `addr` provided with a new TCP listener ready /// to accept connections. Each connection will be processed with the - /// `new_service` object provided, creating a new service per + /// `make_service` object provided, creating a new service per /// connection. #[cfg(feature = "runtime")] - pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> + pub fn serve_addr(&self, addr: &SocketAddr, make_service: S) -> ::Result> where - S: NewService, + S: MakeServiceRef< + AddrStream, + ReqBody=Body, + ResBody=Bd, + >, S::Error: Into>, Bd: Payload, E: H2Exec<::Future, Bd>, @@ -363,19 +368,23 @@ impl Http { if self.keep_alive { incoming.set_keepalive(Some(Duration::from_secs(90))); } - Ok(self.serve_incoming(incoming, new_service)) + Ok(self.serve_incoming(incoming, make_service)) } /// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve) /// /// This method will bind the `addr` provided with a new TCP listener ready /// to accept connections. Each connection will be processed with the - /// `new_service` object provided, creating a new service per + /// `make_service` object provided, creating a new service per /// connection. #[cfg(feature = "runtime")] - pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> + pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> ::Result> where - S: NewService, + S: MakeServiceRef< + AddrStream, + ReqBody=Body, + ResBody=Bd, + >, S::Error: Into>, Bd: Payload, E: H2Exec<::Future, Bd>, @@ -384,23 +393,27 @@ impl Http { if self.keep_alive { incoming.set_keepalive(Some(Duration::from_secs(90))); } - Ok(self.serve_incoming(incoming, new_service)) + Ok(self.serve_incoming(incoming, make_service)) } - /// Bind the provided stream of incoming IO objects with a `NewService`. - pub fn serve_incoming(&self, incoming: I, new_service: S) -> Serve + /// Bind the provided stream of incoming IO objects with a `MakeService`. + pub fn serve_incoming(&self, incoming: I, make_service: S) -> Serve where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite, - S: NewService, + S: MakeServiceRef< + I::Item, + ReqBody=Body, + ResBody=Bd, + >, S::Error: Into>, Bd: Payload, E: H2Exec<::Future, Bd>, { Serve { - incoming: incoming, - new_service: new_service, + incoming, + make_service, protocol: self.clone(), } } @@ -604,8 +617,9 @@ where I: Stream, I::Item: AsyncRead + AsyncWrite, I::Error: Into>, - S: NewService, - S::Error: Into>, + S: MakeServiceRef, + //S::Error2: Into>, + //SME: Into>, B: Payload, E: H2Exec<::Future, B>, { @@ -614,7 +628,7 @@ where fn poll(&mut self) -> Poll, Self::Error> { if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) { - let new_fut = self.new_service.new_service(); + let new_fut = self.make_service.make_service_ref(&io); Ok(Async::Ready(Some(Connecting { future: new_fut, io: Some(io), @@ -666,8 +680,11 @@ where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService, - S::Error: Into>, + S: MakeServiceRef< + I::Item, + ReqBody=Body, + ResBody=B, + >, B: Payload, E: H2Exec<::Future, B>, { @@ -873,3 +890,37 @@ mod upgrades { } } +pub(crate) mod make_service { + use std::error::Error as StdError; + + pub trait MakeServiceRef { + type Error: Into>; + type ReqBody: ::body::Payload; + type ResBody: ::body::Payload; + type Service: ::service::Service; + type Future: ::futures::Future; + + fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future; + } + + impl MakeServiceRef for T + where + T: for<'a> ::service::MakeService<&'a Ctx, Error=E, MakeError=ME, Service=S, Future=F, ReqBody=IB, ResBody=OB>, + E: Into>, + ME: Into>, + S: ::service::Service, + F: ::futures::Future, + IB: ::body::Payload, + OB: ::body::Payload, + { + type Error = E; + type Service = S; + type ReqBody = IB; + type ResBody = OB; + type Future = F; + + fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future { + self.make_service(ctx) + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index a02afc4557..a0e0b0184e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -11,7 +11,7 @@ //! # Server //! //! The [`Server`](Server) is main way to start listening for HTTP requests. -//! It wraps a listener with a [`NewService`](::service), and then should +//! It wraps a listener with a [`MakeService`](::service), and then should //! be executed to start serving requests. //! //! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default. @@ -30,8 +30,8 @@ //! // Construct our SocketAddr to listen on... //! let addr = ([127, 0, 0, 1], 3000).into(); //! -//! // And a NewService to handle each connection... -//! let new_service = || { +//! // And a MakeService to handle each connection... +//! let make_service = || { //! service_fn_ok(|_req| { //! Response::new(Body::from("Hello World")) //! }) @@ -39,7 +39,7 @@ //! //! // Then bind and serve... //! let server = Server::bind(&addr) -//! .serve(new_service); +//! .serve(make_service); //! //! // Finally, spawn `server` onto an Executor... //! hyper::rt::run(server.map_err(|e| { @@ -65,10 +65,10 @@ use tokio_io::{AsyncRead, AsyncWrite}; use body::{Body, Payload}; use common::exec::{Exec, H2Exec, NewSvcExec}; -use service::{NewService, Service}; +use service::Service; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... -use self::conn::{Http as Http_, NoopWatcher, SpawnAll}; +use self::conn::{Http as Http_, MakeServiceRef, NoopWatcher, SpawnAll}; use self::shutdown::{Graceful, GracefulWatcher}; #[cfg(feature = "runtime")] use self::tcp::AddrIncoming; @@ -144,7 +144,7 @@ where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService, + S: MakeServiceRef, S::Error: Into>, S::Service: 'static, B: Payload, @@ -203,7 +203,7 @@ where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService, + S: MakeServiceRef, S::Error: Into>, S::Service: 'static, B: Payload, @@ -332,7 +332,7 @@ impl Builder { I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService, + S: MakeServiceRef, S::Error: Into>, S::Service: 'static, B: Payload, diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 6d6c4db3a6..167b3def96 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -4,8 +4,8 @@ use tokio_io::{AsyncRead, AsyncWrite}; use body::{Body, Payload}; use common::drain::{self, Draining, Signal, Watch, Watching}; use common::exec::{H2Exec, NewSvcExec}; -use service::{Service, NewService}; -use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; +use service::Service; +use super::conn::{MakeServiceRef, SpawnAll, UpgradeableConnection, Watcher}; #[allow(missing_debug_implementations)] pub struct Graceful { @@ -40,7 +40,7 @@ where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService, + S: MakeServiceRef, S::Service: 'static, S::Error: Into>, B: Payload, diff --git a/src/server/tcp.rs b/src/server/tcp.rs index 804425c3f5..1538986648 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -8,7 +8,7 @@ use tokio_reactor::Handle; use tokio_tcp::TcpListener; use tokio_timer::Delay; -use self::addr_stream::AddrStream; +pub use self::addr_stream::AddrStream; /// A stream of connections from binding to an address. #[must_use = "streams do nothing unless polled"] @@ -194,6 +194,7 @@ mod addr_stream { use tokio_io::{AsyncRead, AsyncWrite}; + /// A transport returned yieled by `AddrIncoming`. #[derive(Debug)] pub struct AddrStream { inner: TcpStream, diff --git a/src/service/make_service.rs b/src/service/make_service.rs new file mode 100644 index 0000000000..b6182d07ea --- /dev/null +++ b/src/service/make_service.rs @@ -0,0 +1,96 @@ +use std::error::Error as StdError; +use std::fmt; + +use futures::{Future, IntoFuture}; + +use body::Payload; +use super::Service; + +/// An asynchronous constructor of `Service`s. +pub trait MakeService { + /// The `Payload` body of the `http::Request`. + type ReqBody: Payload; + + /// The `Payload` body of the `http::Response`. + type ResBody: Payload; + + /// The error type that can be returned by `Service`s. + type Error: Into>; + + /// The resolved `Service` from `new_service()`. + type Service: Service< + ReqBody=Self::ReqBody, + ResBody=Self::ResBody, + Error=Self::Error, + >; + + /// The future returned from `new_service` of a `Service`. + type Future: Future; + + /// The error type that can be returned when creating a new `Service`. + type MakeError: Into>; + + /// Create a new `Service`. + fn make_service(&mut self, ctx: Ctx) -> Self::Future; +} + + +/// Create a `MakeService` from a function. +/// +/// # Example +/// +/// ```rust +/// use std::net::TcpStream; +/// use hyper::{Body, Request, Response}; +/// use hyper::service::{make_service_fn, service_fn_ok}; +/// +/// let make_svc = make_service_fn(|socket: &TcpStream| { +/// let remote_addr = socket.peer_addr().unwrap(); +/// service_fn_ok(move |_: Request| { +/// Response::new(Body::from(format!("Hello, {}", remote_addr))) +/// }) +/// }); +/// ``` +pub fn make_service_fn(f: F) -> MakeServiceFn +where + F: Fn(Ctx) -> Ret, + Ret: IntoFuture, +{ + MakeServiceFn { + f, + } +} + +// Not exported from crate as this will likely be replaced with `impl Service`. +pub struct MakeServiceFn { + f: F, +} + +impl MakeService for MakeServiceFn +where + F: Fn(Ctx) -> Ret, + Ret: IntoFuture, + Ret::Item: Service, + Ret::Error: Into>, + ReqBody: Payload, + ResBody: Payload, +{ + type ReqBody = ReqBody; + type ResBody = ResBody; + type Error = ::Error; + type Service = Ret::Item; + type Future = Ret::Future; + type MakeError = Ret::Error; + + fn make_service(&mut self, ctx: Ctx) -> Self::Future { + (self.f)(ctx).into_future() + } +} + +impl fmt::Debug for MakeServiceFn { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MakeServiceFn") + .finish() + } +} + diff --git a/src/service/mod.rs b/src/service/mod.rs index 534519df9f..95b2fba510 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,9 +1,9 @@ -//! Services and NewServices +//! Services and MakeServices //! //! - A [`Service`](Service) is a trait representing an asynchronous function //! of a request to a response. It's similar to //! `async fn(Request) -> Result`. -//! - A [`NewService`](NewService) is a trait creating specific instances of a +//! - A [`MakeService`](MakeService) is a trait creating specific instances of a //! `Service`. //! //! These types are conceptually similar to those in @@ -19,17 +19,21 @@ //! [`service_fn`](service_fn) and [`service_fn_ok`](service_fn_ok) should be //! sufficient for most cases. //! -//! # NewService +//! # MakeService //! //! Since a `Service` is bound to a single connection, a [`Server`](::Server) //! needs a way to make them as it accepts connections. This is what a -//! `NewService` does. +//! `MakeService` does. //! //! Resources that need to be shared by all `Service`s can be put into a -//! `NewService`, and then passed to individual `Service`s when `new_service` +//! `MakeService`, and then passed to individual `Service`s when `make_service` //! is called. + +mod make_service; mod new_service; mod service; -pub use self::new_service::{NewService}; +pub use self::make_service::{make_service_fn, MakeService}; +#[doc(hidden)] +pub use self::new_service::NewService; pub use self::service::{service_fn, service_fn_ok, Service}; diff --git a/src/service/new_service.rs b/src/service/new_service.rs index 37a7dbe625..80f0e6bb9e 100644 --- a/src/service/new_service.rs +++ b/src/service/new_service.rs @@ -3,7 +3,7 @@ use std::error::Error as StdError; use futures::{Future, IntoFuture}; use body::Payload; -use super::Service; +use super::{MakeService, Service}; /// An asynchronous constructor of `Service`s. pub trait NewService { @@ -47,9 +47,24 @@ where type Future = R::Future; type InitError = R::Error; - fn new_service(&self) -> Self::Future { (*self)().into_future() } } +impl MakeService for N +where + N: NewService, +{ + type ReqBody = N::ReqBody; + type ResBody = N::ResBody; + type Error = N::Error; + type Service = N::Service; + type Future = N::Future; + type MakeError = N::InitError; + + fn make_service(&mut self, _: Ctx) -> Self::Future { + self.new_service() + } +} +