From 649485b9782dd1807475fb91f1a4580a8e13fd53 Mon Sep 17 00:00:00 2001 From: Jeb Rosen Date: Sun, 10 Mar 2019 22:06:46 -0700 Subject: [PATCH] Convert core to async and add support for async routes. Minimum rustc bump required for rust-lang/rust#61775 --- core/codegen/src/attribute/catch.rs | 16 +- core/codegen/src/attribute/route.rs | 32 +- core/codegen/src/lib.rs | 3 + core/codegen/tests/route.rs | 2 +- core/http/Cargo.toml | 9 +- core/http/src/cookies.rs | 39 ++- core/http/src/hyper.rs | 6 +- core/http/src/tls.rs | 7 +- core/lib/Cargo.toml | 4 +- core/lib/build.rs | 4 +- core/lib/src/catcher.rs | 15 +- core/lib/src/codegen.rs | 4 +- core/lib/src/config/config.rs | 32 +- core/lib/src/data/data.rs | 117 +++++-- core/lib/src/data/data_stream.rs | 71 ++-- core/lib/src/data/from_data.rs | 101 +++--- core/lib/src/data/mod.rs | 3 +- core/lib/src/data/net_stream.rs | 94 ------ core/lib/src/error.rs | 9 +- core/lib/src/ext.rs | 63 +++- core/lib/src/handler.rs | 15 +- core/lib/src/lib.rs | 1 + core/lib/src/local/request.rs | 56 ++-- core/lib/src/logger.rs | 6 +- core/lib/src/request/form/form.rs | 46 +-- core/lib/src/request/form/from_form.rs | 2 +- core/lib/src/request/form/lenient.rs | 12 +- core/lib/src/request/request.rs | 127 ++++--- core/lib/src/response/responder.rs | 9 +- core/lib/src/response/response.rs | 91 +++-- core/lib/src/response/status.rs | 20 +- core/lib/src/response/stream.rs | 15 +- core/lib/src/rocket.rs | 443 ++++++++++++++----------- core/lib/src/router/mod.rs | 8 +- examples/cookies/src/main.rs | 2 +- examples/errors/src/main.rs | 2 +- examples/form_kitchen_sink/src/main.rs | 2 +- examples/hello_world/src/main.rs | 2 +- 38 files changed, 823 insertions(+), 667 deletions(-) delete mode 100644 core/lib/src/data/net_stream.rs diff --git a/core/codegen/src/attribute/catch.rs b/core/codegen/src/attribute/catch.rs index 372620d955..9b7feb594f 100644 --- a/core/codegen/src/attribute/catch.rs +++ b/core/codegen/src/attribute/catch.rs @@ -51,7 +51,7 @@ pub fn _catch(args: TokenStream, input: TokenStream) -> Result { let status_code = status.0.code; // Variables names we'll use and reuse. - define_vars_and_mods!(req, catcher, response, Request, Response); + define_vars_and_mods!(req, catcher, Request, Response, ErrorHandlerFuture); // Determine the number of parameters that will be passed in. let (fn_sig, inputs) = match catch.function.decl.inputs.len() { @@ -82,12 +82,14 @@ pub fn _catch(args: TokenStream, input: TokenStream) -> Result { #user_catcher_fn /// Rocket code generated wrapping catch function. - #vis fn #generated_fn_name<'_b>(#req: &'_b #Request) -> #response::Result<'_b> { - let __response = #catcher_response; - #Response::build() - .status(#status) - .merge(__response) - .ok() + #vis fn #generated_fn_name<'_b>(#req: &'_b #Request) -> #ErrorHandlerFuture<'_b> { + Box::pin(async move { + let __response = #catcher_response; + #Response::build() + .status(#status) + .merge(__response) + .ok() + }) } /// Rocket code generated static catcher info. diff --git a/core/codegen/src/attribute/route.rs b/core/codegen/src/attribute/route.rs index 7aa8d945a0..ee97f7449e 100644 --- a/core/codegen/src/attribute/route.rs +++ b/core/codegen/src/attribute/route.rs @@ -175,7 +175,7 @@ fn data_expr(ident: &syn::Ident, ty: &syn::Type) -> TokenStream2 { define_vars_and_mods!(req, data, FromData, Outcome, Transform); let span = ident.span().unstable().join(ty.span()).unwrap().into(); quote_spanned! { span => - let __transform = <#ty as #FromData>::transform(#req, #data); + let __transform = <#ty as #FromData>::transform(#req, #data).await; #[allow(unreachable_patterns, unreachable_code)] let __outcome = match __transform { @@ -192,7 +192,7 @@ fn data_expr(ident: &syn::Ident, ty: &syn::Type) -> TokenStream2 { }; #[allow(non_snake_case, unreachable_patterns, unreachable_code)] - let #ident: #ty = match <#ty as #FromData>::from_data(#req, __outcome) { + let #ident: #ty = match <#ty as #FromData>::from_data(#req, __outcome).await { #Outcome::Success(__d) => __d, #Outcome::Forward(__d) => return #Outcome::Forward(__d), #Outcome::Failure((__c, _)) => return #Outcome::Failure(__c), @@ -365,7 +365,7 @@ fn codegen_route(route: Route) -> Result { } // Gather everything we need. - define_vars_and_mods!(req, data, handler, Request, Data, StaticRouteInfo); + define_vars_and_mods!(req, data, handler, Request, Data, StaticRouteInfo, HandlerFuture); let (vis, user_handler_fn) = (&route.function.vis, &route.function); let user_handler_fn_name = &user_handler_fn.ident; let generated_fn_name = user_handler_fn_name.prepend(ROUTE_FN_PREFIX); @@ -377,6 +377,16 @@ fn codegen_route(route: Route) -> Result { let rank = Optional(route.attribute.rank); let format = Optional(route.attribute.format); + let responder_stmt = if user_handler_fn.asyncness.is_some() { + quote! { + let ___responder = #user_handler_fn_name(#(#parameter_names),*).await; + } + } else { + quote! { + let ___responder = #user_handler_fn_name(#(#parameter_names),*); + } + }; + Ok(quote! { #user_handler_fn @@ -384,13 +394,15 @@ fn codegen_route(route: Route) -> Result { #vis fn #generated_fn_name<'_b>( #req: &'_b #Request, #data: #Data - ) -> #handler::Outcome<'_b> { - #(#req_guard_definitions)* - #(#parameter_definitions)* - #data_stmt - - let ___responder = #user_handler_fn_name(#(#parameter_names),*); - #handler::Outcome::from(#req, ___responder) + ) -> #HandlerFuture<'_b> { + Box::pin(async move { + #(#req_guard_definitions)* + #(#parameter_definitions)* + #data_stmt + + #responder_stmt + #handler::Outcome::from(#req, ___responder) + }) } /// Rocket code generated wrapping URI macro. diff --git a/core/codegen/src/lib.rs b/core/codegen/src/lib.rs index e7fc207c6b..dd159e324a 100644 --- a/core/codegen/src/lib.rs +++ b/core/codegen/src/lib.rs @@ -1,5 +1,6 @@ #![feature(proc_macro_diagnostic, proc_macro_span)] #![feature(crate_visibility_modifier)] +#![feature(async_await)] #![recursion_limit="128"] #![doc(html_root_url = "https://api.rocket.rs/v0.5")] @@ -87,6 +88,8 @@ macro_rules! define_vars_and_mods { (@Data as $v:ident) => (define!(::rocket::Data as $v)); (@StaticRouteInfo as $v:ident) => (define!(::rocket::StaticRouteInfo as $v)); (@SmallVec as $v:ident) => (define!(::rocket::http::private::SmallVec as $v)); + (@HandlerFuture as $v:ident) => (define!(::rocket::handler::HandlerFuture as $v)); + (@ErrorHandlerFuture as $v:ident) => (define!(::rocket::handler::ErrorHandlerFuture as $v)); ($($name:ident),*) => ($(define_vars_and_mods!(@$name as $name);)*) } diff --git a/core/codegen/tests/route.rs b/core/codegen/tests/route.rs index ecc554663e..db583195df 100644 --- a/core/codegen/tests/route.rs +++ b/core/codegen/tests/route.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene, decl_macro)] +#![feature(proc_macro_hygiene, decl_macro, async_await)] // Rocket sometimes generates mangled identifiers that activate the // non_snake_case lint. We deny the lint in this test to ensure that diff --git a/core/http/Cargo.toml b/core/http/Cargo.toml index 6e55053885..ce987dff70 100644 --- a/core/http/Cargo.toml +++ b/core/http/Cargo.toml @@ -16,7 +16,7 @@ edition = "2018" [features] default = [] -tls = ["rustls", "hyper-sync-rustls"] +tls = ["tokio-rustls"] private-cookies = ["cookie/secure"] [dependencies] @@ -27,16 +27,11 @@ http = "0.1.17" mime = "0.3.13" time = "0.1" indexmap = "1.0" -rustls = { version = "0.15", optional = true } state = "0.4" +tokio-rustls = { version = "0.9.2", optional = true } cookie = { version = "0.12", features = ["percent-encode"] } pear = "0.1" unicode-xid = "0.1" -[dependencies.hyper-sync-rustls] -version = "=0.3.0-rc.5" -features = ["server"] -optional = true - [dev-dependencies] rocket = { version = "0.5.0-dev", path = "../lib" } diff --git a/core/http/src/cookies.rs b/core/http/src/cookies.rs index caf7f3ba05..15c9562889 100644 --- a/core/http/src/cookies.rs +++ b/core/http/src/cookies.rs @@ -1,5 +1,4 @@ use std::fmt; -use std::cell::RefMut; use crate::Header; use cookie::Delta; @@ -128,7 +127,7 @@ mod key { /// 32`. pub enum Cookies<'a> { #[doc(hidden)] - Jarred(RefMut<'a, CookieJar>, &'a Key), + Jarred(CookieJar, &'a Key, Box), #[doc(hidden)] Empty(CookieJar) } @@ -137,8 +136,8 @@ impl<'a> Cookies<'a> { /// WARNING: This is unstable! Do not use this method outside of Rocket! #[inline] #[doc(hidden)] - pub fn new(jar: RefMut<'a, CookieJar>, key: &'a Key) -> Cookies<'a> { - Cookies::Jarred(jar, key) + pub fn new(jar: CookieJar, key: &'a Key, on_drop: F) -> Cookies<'a> { + Cookies::Jarred(jar, key, Box::new(on_drop)) } /// WARNING: This is unstable! Do not use this method outside of Rocket! @@ -160,7 +159,7 @@ impl<'a> Cookies<'a> { #[inline] #[doc(hidden)] pub fn add_original(&mut self, cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, _) = *self { + if let Cookies::Jarred(ref mut jar, _, _) = *self { jar.add_original(cookie) } } @@ -180,7 +179,7 @@ impl<'a> Cookies<'a> { /// ``` pub fn get(&self, name: &str) -> Option<&Cookie<'static>> { match *self { - Cookies::Jarred(ref jar, _) => jar.get(name), + Cookies::Jarred(ref jar, _, _) => jar.get(name), Cookies::Empty(_) => None } } @@ -205,7 +204,7 @@ impl<'a> Cookies<'a> { /// } /// ``` pub fn add(&mut self, cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, _) = *self { + if let Cookies::Jarred(ref mut jar, _, _) = *self { jar.add(cookie) } } @@ -231,7 +230,7 @@ impl<'a> Cookies<'a> { /// } /// ``` pub fn remove(&mut self, cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, _) = *self { + if let Cookies::Jarred(ref mut jar, _, _) = *self { jar.remove(cookie) } } @@ -252,7 +251,7 @@ impl<'a> Cookies<'a> { /// ``` pub fn iter(&self) -> impl Iterator> { match *self { - Cookies::Jarred(ref jar, _) => jar.iter(), + Cookies::Jarred(ref jar, _, _) => jar.iter(), Cookies::Empty(ref jar) => jar.iter() } } @@ -262,12 +261,22 @@ impl<'a> Cookies<'a> { #[doc(hidden)] pub fn delta(&self) -> Delta<'_> { match *self { - Cookies::Jarred(ref jar, _) => jar.delta(), + Cookies::Jarred(ref jar, _, _) => jar.delta(), Cookies::Empty(ref jar) => jar.delta() } } } +impl<'a> Drop for Cookies<'a> { + fn drop(&mut self) { + if let Cookies::Jarred(ref mut jar, _, ref mut on_drop) = *self { + let jar = std::mem::replace(jar, CookieJar::new()); + let on_drop = std::mem::replace(on_drop, Box::new(|_| {})); + on_drop(jar); + } + } +} + #[cfg(feature = "private-cookies")] impl Cookies<'_> { /// Returns a reference to the `Cookie` inside this collection with the name @@ -290,7 +299,7 @@ impl Cookies<'_> { /// ``` pub fn get_private(&mut self, name: &str) -> Option> { match *self { - Cookies::Jarred(ref mut jar, key) => jar.private(key).get(name), + Cookies::Jarred(ref mut jar, key, _) => jar.private(key).get(name), Cookies::Empty(_) => None } } @@ -326,7 +335,7 @@ impl Cookies<'_> { /// } /// ``` pub fn add_private(&mut self, mut cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, key) = *self { + if let Cookies::Jarred(ref mut jar, key, _) = *self { Cookies::set_private_defaults(&mut cookie); jar.private(key).add(cookie) } @@ -336,7 +345,7 @@ impl Cookies<'_> { /// WARNING: This is unstable! Do not use this method outside of Rocket! #[doc(hidden)] pub fn add_original_private(&mut self, mut cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, key) = *self { + if let Cookies::Jarred(ref mut jar, key, _) = *self { Cookies::set_private_defaults(&mut cookie); jar.private(key).add_original(cookie) } @@ -390,7 +399,7 @@ impl Cookies<'_> { /// } /// ``` pub fn remove_private(&mut self, mut cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, key) = *self { + if let Cookies::Jarred(ref mut jar, key, _) = *self { if cookie.path().is_none() { cookie.set_path("/"); } @@ -403,7 +412,7 @@ impl Cookies<'_> { impl fmt::Debug for Cookies<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - Cookies::Jarred(ref jar, _) => write!(f, "{:?}", jar), + Cookies::Jarred(ref jar, _, _) => write!(f, "{:?}", jar), Cookies::Empty(ref jar) => write!(f, "{:?}", jar) } } diff --git a/core/http/src/hyper.rs b/core/http/src/hyper.rs index 788c55e50e..f6c93421a8 100644 --- a/core/http/src/hyper.rs +++ b/core/http/src/hyper.rs @@ -4,17 +4,17 @@ //! These types will, with certainty, be removed with time, but they reside here //! while necessary. -#[doc(hidden)] pub use hyper::{Body, Request, Response}; +#[doc(hidden)] pub use hyper::{Body, Request, Response, Server}; #[doc(hidden)] pub use hyper::body::Payload as Payload; #[doc(hidden)] pub use hyper::error::Error; -#[doc(hidden)] pub use hyper::server::Server; #[doc(hidden)] pub use hyper::service::{MakeService, Service}; #[doc(hidden)] pub use hyper::Chunk; +#[doc(hidden)] pub use http::header::HeaderMap; #[doc(hidden)] pub use http::header::HeaderName as HeaderName; #[doc(hidden)] pub use http::header::HeaderValue as HeaderValue; #[doc(hidden)] pub use http::method::Method; -#[doc(hidden)] pub use http::request::Parts; +#[doc(hidden)] pub use http::request::Parts as RequestParts; #[doc(hidden)] pub use http::status::StatusCode; #[doc(hidden)] pub use http::uri::Uri; diff --git a/core/http/src/tls.rs b/core/http/src/tls.rs index b0311be862..97ae1289a0 100644 --- a/core/http/src/tls.rs +++ b/core/http/src/tls.rs @@ -1,2 +1,5 @@ -pub use hyper_sync_rustls::{util, WrappedStream, ServerSession, TlsServer}; -pub use rustls::{Certificate, PrivateKey}; +pub use tokio_rustls::TlsAcceptor; +pub use tokio_rustls::rustls; + +pub use rustls::internal::pemfile; +pub use rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig}; diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index 2a1b3deadf..16bdb95cfa 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -24,12 +24,12 @@ tls = ["rocket_http/tls"] private-cookies = ["rocket_http/private-cookies"] [dependencies] -futures = "0.1" rocket_codegen = { version = "0.5.0-dev", path = "../codegen" } rocket_http = { version = "0.5.0-dev", path = "../http" } +futures-preview = { version = "0.3.0-alpha.14", features = ["compat", "io-compat"] } tokio = "0.1.16" yansi = "0.5" -log = "0.4" +log = { version = "0.4", features = ["std"] } toml = "0.4.7" num_cpus = "1.0" state = "0.4.1" diff --git a/core/lib/build.rs b/core/lib/build.rs index 0f71316dfe..b6ca3322eb 100644 --- a/core/lib/build.rs +++ b/core/lib/build.rs @@ -3,8 +3,8 @@ use yansi::{Paint, Color::{Red, Yellow, Blue}}; // Specifies the minimum nightly version needed to compile Rocket. -const MIN_DATE: &'static str = "2019-04-05"; -const MIN_VERSION: &'static str = "1.35.0-nightly"; +const MIN_DATE: &'static str = "2019-07-03"; +const MIN_VERSION: &'static str = "1.37.0-nightly"; macro_rules! err { ($version:expr, $date:expr, $msg:expr) => ( diff --git a/core/lib/src/catcher.rs b/core/lib/src/catcher.rs index 76ccf76046..bd6b71ab6b 100644 --- a/core/lib/src/catcher.rs +++ b/core/lib/src/catcher.rs @@ -1,3 +1,5 @@ +use futures::future::Future; + use crate::response; use crate::handler::ErrorHandler; use crate::codegen::StaticCatchInfo; @@ -98,7 +100,7 @@ impl Catcher { } #[inline(always)] - crate fn handle<'r>(&self, req: &'r Request<'_>) -> response::Result<'r> { + crate fn handle<'r>(&self, req: &'r Request<'_>) -> impl Future> { (self.handler)(req) } @@ -149,10 +151,12 @@ macro_rules! default_catchers { let mut map = HashMap::new(); $( - fn $fn_name<'r>(req: &'r Request<'_>) -> response::Result<'r> { - status::Custom(Status::from_code($code).unwrap(), - content::Html(error_page_template!($code, $name, $description)) - ).respond_to(req) + fn $fn_name<'r>(req: &'r Request<'_>) -> std::pin::Pin> + Send + 'r>> { + (async move { + status::Custom(Status::from_code($code).unwrap(), + content::Html(error_page_template!($code, $name, $description)) + ).respond_to(req) + }).boxed() } map.insert($code, Catcher::new_default($code, $fn_name)); @@ -164,6 +168,7 @@ macro_rules! default_catchers { pub mod defaults { use super::Catcher; + use futures::future::FutureExt; use std::collections::HashMap; diff --git a/core/lib/src/codegen.rs b/core/lib/src/codegen.rs index 276eea1a32..894cf85403 100644 --- a/core/lib/src/codegen.rs +++ b/core/lib/src/codegen.rs @@ -1,9 +1,11 @@ +use futures::future::Future; + use crate::{Request, Data}; use crate::handler::{Outcome, ErrorHandler}; use crate::http::{Method, MediaType}; /// Type of a static handler, which users annotate with Rocket's attribute. -pub type StaticHandler = for<'r> fn(&'r Request<'_>, Data) -> Outcome<'r>; +pub type StaticHandler = for<'r> fn(&'r Request<'_>, Data) -> std::pin::Pin> + Send + 'r>>; /// Information generated by the `route` attribute during codegen. pub struct StaticRouteInfo { diff --git a/core/lib/src/config/config.rs b/core/lib/src/config/config.rs index d07e680bd9..b237aa3ea5 100644 --- a/core/lib/src/config/config.rs +++ b/core/lib/src/config/config.rs @@ -516,23 +516,33 @@ impl Config { /// ``` #[cfg(feature = "tls")] pub fn set_tls(&mut self, certs_path: &str, key_path: &str) -> Result<()> { - use crate::http::tls::util::{self, Error}; + use crate::http::tls::pemfile::{certs, rsa_private_keys}; + use std::fs::File; + use std::io::BufReader; let pem_err = "malformed PEM file"; + // TODO.async: Fully copy from hyper-sync-rustls, move to http/src/tls + // Partially extracted from hyper-sync-rustls + // Load the certificates. - let certs = util::load_certs(self.root_relative(certs_path)) - .map_err(|e| match e { - Error::Io(e) => ConfigError::Io(e, "tls.certs"), - _ => self.bad_type("tls", pem_err, "a valid certificates file") - })?; + let certs = match File::open(self.root_relative(certs_path)) { + Ok(file) => certs(&mut BufReader::new(file)).map_err(|_| { + self.bad_type("tls", pem_err, "a valid certificates file") + }), + Err(e) => Err(ConfigError::Io(e, "tls.certs"))?, + }?; // And now the private key. - let key = util::load_private_key(self.root_relative(key_path)) - .map_err(|e| match e { - Error::Io(e) => ConfigError::Io(e, "tls.key"), - _ => self.bad_type("tls", pem_err, "a valid private key file") - })?; + let mut keys = match File::open(self.root_relative(key_path)) { + Ok(file) => rsa_private_keys(&mut BufReader::new(file)).map_err(|_| { + self.bad_type("tls", pem_err, "a valid private key file") + }), + Err(e) => Err(ConfigError::Io(e, "tls.key")), + }?; + + // TODO.async: Proper check for one key + let key = keys.remove(0); self.tls = Some(TlsConfig { certs, key }); Ok(()) diff --git a/core/lib/src/data/data.rs b/core/lib/src/data/data.rs index d9fd6762ed..0276b4be81 100644 --- a/core/lib/src/data/data.rs +++ b/core/lib/src/data/data.rs @@ -1,16 +1,16 @@ -use std::io::{self, Read, Write, Cursor, Chain}; use std::path::Path; -use std::fs::File; -use std::time::Duration; +use std::pin::Pin; -#[cfg(feature = "tls")] use super::net_stream::HttpsStream; +use futures::compat::{Future01CompatExt, Stream01CompatExt, AsyncWrite01CompatExt}; +use futures::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite}; +use futures::future::Future; +use futures::stream::TryStreamExt; -use super::data_stream::{DataStream, /* TODO kill_stream */}; -use super::net_stream::NetStream; -use crate::ext::ReadExt; +use super::data_stream::{DataStream, kill_stream}; -use crate::http::hyper::{self, Payload}; -use futures::{Async, Future}; +use crate::http::hyper; + +use crate::ext::AsyncReadExt; /// The number of bytes to read into the "peek" buffer. const PEEK_BYTES: usize = 512; @@ -48,7 +48,9 @@ const PEEK_BYTES: usize = 512; /// body data. This enables partially or fully reading from a `Data` object /// without consuming the `Data` object. pub struct Data { - body: Vec, + buffer: Vec, + is_complete: bool, + stream: Box, } impl Data { @@ -69,11 +71,15 @@ impl Data { /// } /// ``` pub fn open(mut self) -> DataStream { - // FIXME: Insert a `BufReader` in front of the `NetStream` with capacity - // 4096. We need the new `Chain` methods to get the inner reader to - // actually do this, however. - let stream = ::std::mem::replace(&mut self.body, vec![]); - DataStream(Cursor::new(stream)) + let buffer = std::mem::replace(&mut self.buffer, vec![]); + let stream = std::mem::replace(&mut self.stream, Box::new(&[][..])); + DataStream(buffer, stream) + } + + crate fn from_hyp(body: hyper::Body) -> impl Future { + // TODO.async: This used to also set the read timeout to 5 seconds. + + Data::new(body) } /// Retrieve the `peek` buffer. @@ -94,10 +100,10 @@ impl Data { /// ``` #[inline(always)] pub fn peek(&self) -> &[u8] { - if self.body.len() > PEEK_BYTES { - &self.body[..PEEK_BYTES] + if self.buffer.len() > PEEK_BYTES { + &self.buffer[..PEEK_BYTES] } else { - &self.body + &self.buffer } } @@ -118,8 +124,7 @@ impl Data { /// ``` #[inline(always)] pub fn peek_complete(&self) -> bool { - // TODO self.is_complete - true + self.is_complete } /// A helper method to write the body of the request to any `Write` type. @@ -139,8 +144,11 @@ impl Data { /// } /// ``` #[inline(always)] - pub fn stream_to(self, writer: &mut W) -> io::Result { - io::copy(&mut self.open(), writer) + pub fn stream_to<'w, W: AsyncWrite + Unpin>(self, writer: &'w mut W) -> impl Future> + 'w { + Box::pin(async move { + let stream = self.open(); + stream.copy_into(writer).await + }) } /// A helper method to write the body of the request to a file at the path @@ -161,8 +169,11 @@ impl Data { /// } /// ``` #[inline(always)] - pub fn stream_to_file>(self, path: P) -> io::Result { - io::copy(&mut self.open(), &mut File::create(path)?) + pub fn stream_to_file + Send + 'static>(self, path: P) -> impl Future> { + Box::pin(async move { + let mut file = tokio::fs::File::create(path).compat().await?.compat(); + self.stream_to(&mut file).await + }) } // Creates a new data object with an internal buffer `buf`, where the cursor @@ -170,8 +181,62 @@ impl Data { // bytes `vec[pos..cap]` are buffered and unread. The remainder of the data // bytes can be read from `stream`. #[inline(always)] - crate fn new(body: Vec) -> Data { - Data { body } + crate fn new(body: hyper::Body) -> Pin + Send>> { + trace_!("Data::new({:?})", body); + + let mut stream = body.compat().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }).into_async_read(); + + Box::pin(async { + let mut peek_buf = vec![0; PEEK_BYTES]; + + let eof = match stream.read_max(&mut peek_buf[..]).await { + Ok(n) => { + trace_!("Filled peek buf with {} bytes.", n); + + // TODO.async: This has not gone away, and I don't entirely + // understand what's happening here + + // We can use `set_len` here instead of `truncate`, but we'll + // take the performance hit to avoid `unsafe`. All of this code + // should go away when we migrate away from hyper 0.10.x. + + peek_buf.truncate(n); + n < PEEK_BYTES + } + Err(e) => { + error_!("Failed to read into peek buffer: {:?}.", e); + // Likewise here as above. + peek_buf.truncate(0); + false + } + }; + + trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES); + Data { buffer: peek_buf, stream: Box::new(stream), is_complete: eof } + }) } + /// This creates a `data` object from a local data source `data`. + #[inline] + crate fn local(data: Vec) -> Data { + Data { + buffer: data, + stream: Box::new(&[][..]), + is_complete: true, + } + } +} + +impl std::borrow::Borrow<()> for Data { + fn borrow(&self) -> &() { + &() + } +} + +impl Drop for Data { + fn drop(&mut self) { + kill_stream(&mut self.stream); + } } diff --git a/core/lib/src/data/data_stream.rs b/core/lib/src/data/data_stream.rs index fe237edc9d..3aad522d4c 100644 --- a/core/lib/src/data/data_stream.rs +++ b/core/lib/src/data/data_stream.rs @@ -1,50 +1,61 @@ -use std::io::{self, Chain, Cursor, Read, Write}; -use std::net::Shutdown; +use std::pin::Pin; -pub type InnerStream = Cursor>; +use futures::io::{AsyncRead, Error as IoError}; +use futures::task::{Poll, Context}; +// TODO.async: Consider storing the real type here instead of a Box to avoid +// the dynamic dispatch /// Raw data stream of a request body. /// /// This stream can only be obtained by calling /// [`Data::open()`](crate::data::Data::open()). The stream contains all of the data /// in the body of the request. It exposes no methods directly. Instead, it must /// be used as an opaque [`Read`] structure. -pub struct DataStream(crate InnerStream); +pub struct DataStream(crate Vec, crate Box); + +// TODO.async: Consider implementing `AsyncBufRead` // TODO: Have a `BufRead` impl for `DataStream`. At the moment, this isn't // possible since Hyper's `HttpReader` doesn't implement `BufRead`. -impl Read for DataStream { +impl AsyncRead for DataStream { #[inline(always)] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - trace_!("DataStream::read()"); - self.0.read(buf) + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + trace_!("DataStream::poll_read()"); + if self.0.len() > 0 { + let count = std::cmp::min(buf.len(), self.0.len()); + trace_!("Reading peeked {} into dest {} = {} bytes", self.0.len(), buf.len(), count); + let next = self.0.split_off(count); + (&mut buf[..count]).copy_from_slice(&self.0[..]); + self.0 = next; + Poll::Ready(Ok(count)) + } else { + trace_!("Delegating to remaining stream"); + Pin::new(&mut self.1).poll_read(cx, buf) + } } } -/* pub fn kill_stream(stream: &mut BodyReader) { - // Only do the expensive reading if we're not sure we're done. - // TODO use self::HttpReader::*; - match *stream { - SizedReader(_, n) | ChunkedReader(_, Some(n)) if n > 0 => { /* continue */ }, - _ => return - }; - - // Take <= 1k from the stream. If there might be more data, force close. - const FLUSH_LEN: u64 = 1024; - match io::copy(&mut stream.take(FLUSH_LEN), &mut io::sink()) { - Ok(FLUSH_LEN) | Err(_) => { - warn_!("Data left unread. Force closing network stream."); - let (_, network) = stream.get_mut().get_mut(); - if let Err(e) = network.close(Shutdown::Read) { - error_!("Failed to close network stream: {:?}", e); - } - } - Ok(n) => debug!("flushed {} unread bytes", n) - } -}*/ +// TODO.async: Either implement this somehow, or remove the +// `Drop` impl and other references to kill_stream +pub fn kill_stream(_stream: &mut dyn AsyncRead) { +// // Only do the expensive reading if we're not sure we're done. +// +// // Take <= 1k from the stream. If there might be more data, force close. +// const FLUSH_LEN: u64 = 1024; +// match io::copy(&mut stream.take(FLUSH_LEN), &mut io::sink()) { +// Ok(FLUSH_LEN) | Err(_) => { +// warn_!("Data left unread. Force closing network stream."); +// let (_, network) = stream.get_mut().get_mut(); +// if let Err(e) = network.close(Shutdown::Read) { +// error_!("Failed to close network stream: {:?}", e); +// } +// } +// Ok(n) => debug!("flushed {} unread bytes", n) +// } +} impl Drop for DataStream { fn drop(&mut self) { - // TODO kill_stream(&mut self.0.get_mut().1); + kill_stream(&mut self.1); } } diff --git a/core/lib/src/data/from_data.rs b/core/lib/src/data/from_data.rs index 03d6357600..75a9489c6a 100644 --- a/core/lib/src/data/from_data.rs +++ b/core/lib/src/data/from_data.rs @@ -1,4 +1,8 @@ use std::borrow::Borrow; +use std::pin::Pin; + +use futures::future::{ready, Future, FutureExt}; +use futures::io::AsyncReadExt; use crate::outcome::{self, IntoOutcome}; use crate::outcome::Outcome::*; @@ -108,6 +112,9 @@ pub type Transformed<'a, T> = Outcome<&'a >::Borrowed, >::Error> >; +pub type TransformFuture<'a, T, E> = Pin>> + Send + 'a>>; +pub type FromDataFuture<'a, T, E> = Pin> + Send + 'a>>; + /// Trait implemented by data guards to derive a value from request body data. /// /// # Data Guards @@ -321,7 +328,7 @@ pub type Transformed<'a, T> = /// [`FromDataSimple`] documentation. pub trait FromData<'a>: Sized { /// The associated error to be returned when the guard fails. - type Error; + type Error: Send; /// The owned type returned from [`FromData::transform()`]. /// @@ -354,7 +361,7 @@ pub trait FromData<'a>: Sized { /// If transformation succeeds, an outcome of `Success` is returned. /// If the data is not appropriate given the type of `Self`, `Forward` is /// returned. On failure, `Failure` is returned. - fn transform(request: &Request<'_>, data: Data) -> Transform>; + fn transform(request: &Request<'_>, data: Data) -> TransformFuture<'a, Self::Owned, Self::Error>; /// Validates, parses, and converts the incoming request body data into an /// instance of `Self`. @@ -383,23 +390,23 @@ pub trait FromData<'a>: Sized { /// # unimplemented!() /// # } /// ``` - fn from_data(request: &Request<'_>, outcome: Transformed<'a, Self>) -> Outcome; + fn from_data(request: &Request<'_>, outcome: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error>; } /// The identity implementation of `FromData`. Always returns `Success`. -impl<'f> FromData<'f> for Data { +impl<'a> FromData<'a> for Data { type Error = std::convert::Infallible; type Owned = Data; - type Borrowed = Data; + type Borrowed = (); #[inline(always)] - fn transform(_: &Request<'_>, data: Data) -> Transform> { - Transform::Owned(Success(data)) + fn transform(_: &Request<'_>, data: Data) -> TransformFuture<'a, Self::Owned, Self::Error> { + Box::pin(ready(Transform::Owned(Success(data)))) } #[inline(always)] - fn from_data(_: &Request<'_>, outcome: Transformed<'f, Self>) -> Outcome { - Success(outcome.owned()?) + fn from_data(_: &Request<'_>, outcome: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> { + Box::pin(ready(outcome.owned())) } } @@ -493,8 +500,9 @@ impl<'f> FromData<'f> for Data { /// # fn main() { } /// ``` pub trait FromDataSimple: Sized { + // TODO.async: Can/should we relax this 'static? And how? /// The associated error to be returned when the guard fails. - type Error; + type Error: Send + 'static; /// Validates, parses, and converts an instance of `Self` from the incoming /// request body data. @@ -502,22 +510,25 @@ pub trait FromDataSimple: Sized { /// If validation and parsing succeeds, an outcome of `Success` is returned. /// If the data is not appropriate given the type of `Self`, `Forward` is /// returned. If parsing fails, `Failure` is returned. - fn from_data(request: &Request<'_>, data: Data) -> Outcome; + fn from_data(request: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error>; } -impl<'a, T: FromDataSimple> FromData<'a> for T { +impl<'a, T: FromDataSimple + 'a> FromData<'a> for T { type Error = T::Error; type Owned = Data; - type Borrowed = Data; + type Borrowed = (); #[inline(always)] - fn transform(_: &Request<'_>, d: Data) -> Transform> { - Transform::Owned(Success(d)) + fn transform(_: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> { + Box::pin(ready(Transform::Owned(Success(d)))) } #[inline(always)] - fn from_data(req: &Request<'_>, o: Transformed<'a, Self>) -> Outcome { - T::from_data(req, o.owned()?) + fn from_data(req: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> { + match o.owned() { + Success(data) => T::from_data(req, data), + _ => unreachable!(), + } } } @@ -527,17 +538,17 @@ impl<'a, T: FromData<'a> + 'a> FromData<'a> for Result { type Borrowed = T::Borrowed; #[inline(always)] - fn transform(r: &Request<'_>, d: Data) -> Transform> { + fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> { T::transform(r, d) } #[inline(always)] - fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> Outcome { - match T::from_data(r, o) { + fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> { + Box::pin(T::from_data(r, o).map(|x| match x { Success(val) => Success(Ok(val)), Forward(data) => Forward(data), Failure((_, e)) => Success(Err(e)), - } + })) } } @@ -547,46 +558,52 @@ impl<'a, T: FromData<'a> + 'a> FromData<'a> for Option { type Borrowed = T::Borrowed; #[inline(always)] - fn transform(r: &Request<'_>, d: Data) -> Transform> { + fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> { T::transform(r, d) } #[inline(always)] - fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> Outcome { - match T::from_data(r, o) { + fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> { + Box::pin(T::from_data(r, o).map(|x| match x { Success(val) => Success(Some(val)), Failure(_) | Forward(_) => Success(None), - } + })) } } -#[cfg(debug_assertions)] -use std::io::{self, Read}; - #[cfg(debug_assertions)] impl FromDataSimple for String { - type Error = io::Error; + type Error = std::io::Error; #[inline(always)] - fn from_data(_: &Request<'_>, data: Data) -> Outcome { - let mut string = String::new(); - match data.open().read_to_string(&mut string) { - Ok(_) => Success(string), - Err(e) => Failure((Status::BadRequest, e)) - } + fn from_data(_: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error> { + Box::pin(async { + let mut stream = data.open(); + let mut buf = Vec::new(); + if let Err(e) = stream.read_to_end(&mut buf).await { + return Failure((Status::BadRequest, e)); + } + match String::from_utf8(buf) { + Ok(s) => Success(s), + Err(e) => Failure((Status::BadRequest, std::io::Error::new(std::io::ErrorKind::Other, e))), + } + }) } } #[cfg(debug_assertions)] impl FromDataSimple for Vec { - type Error = io::Error; + type Error = std::io::Error; #[inline(always)] - fn from_data(_: &Request<'_>, data: Data) -> Outcome { - let mut bytes = Vec::new(); - match data.open().read_to_end(&mut bytes) { - Ok(_) => Success(bytes), - Err(e) => Failure((Status::BadRequest, e)) - } + fn from_data(_: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error> { + Box::pin(async { + let mut stream = data.open(); + let mut buf = Vec::new(); + match stream.read_to_end(&mut buf).await { + Ok(_) => Success(buf), + Err(e) => Failure((Status::BadRequest, e)), + } + }) } } diff --git a/core/lib/src/data/mod.rs b/core/lib/src/data/mod.rs index 20523fac52..350b268574 100644 --- a/core/lib/src/data/mod.rs +++ b/core/lib/src/data/mod.rs @@ -2,9 +2,8 @@ mod data; mod data_stream; -mod net_stream; mod from_data; pub use self::data::Data; pub use self::data_stream::DataStream; -pub use self::from_data::{FromData, FromDataSimple, Outcome, Transform, Transformed}; +pub use self::from_data::{FromData, FromDataFuture, FromDataSimple, Outcome, Transform, Transformed, TransformFuture}; diff --git a/core/lib/src/data/net_stream.rs b/core/lib/src/data/net_stream.rs deleted file mode 100644 index 09e0762bf7..0000000000 --- a/core/lib/src/data/net_stream.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::io; -use std::net::{SocketAddr, Shutdown}; -use std::time::Duration; - -#[cfg(feature = "tls")] use crate::http::tls::{WrappedStream, ServerSession}; -// TODO use http::hyper::net::{HttpStream, NetworkStream}; - -use self::NetStream::*; - -#[cfg(feature = "tls")] pub type HttpsStream = WrappedStream; - -// This is a representation of all of the possible network streams we might get. -// This really shouldn't be necessary, but, you know, Hyper. -#[derive(Clone)] -pub enum NetStream { - Http/* TODO (HttpStream) */, - #[cfg(feature = "tls")] - Https(HttpsStream), - Empty, -} - -impl io::Read for NetStream { - #[inline(always)] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - trace_!("NetStream::read()"); - let res = match *self { - Http/*(ref mut stream)*/ => Ok(0) /* TODO stream.read(buf)*/, - #[cfg(feature = "tls")] Https(ref mut stream) => stream.read(buf), - Empty => Ok(0), - }; - - trace_!("NetStream::read() -- complete"); - res - } -} - -impl io::Write for NetStream { - #[inline(always)] - fn write(&mut self, buf: &[u8]) -> io::Result { - trace_!("NetStream::write()"); - match *self { - Http/* TODO (ref mut stream) => stream.write(buf)*/ => Ok(0), - #[cfg(feature = "tls")] Https(ref mut stream) => stream.write(buf), - Empty => Ok(0), - } - } - - #[inline(always)] - fn flush(&mut self) -> io::Result<()> { - match *self { - Http/* TODO (ref mut stream) => stream.flush()*/ => Ok(()), - #[cfg(feature = "tls")] Https(ref mut stream) => stream.flush(), - Empty => Ok(()), - } - } -} - -//impl NetworkStream for NetStream { -// #[inline(always)] -// fn peer_addr(&mut self) -> io::Result { -// match *self { -// Http/* TODO (ref mut stream) => stream.peer_addr()*/ => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)), -// #[cfg(feature = "tls")] Https(ref mut stream) => stream.peer_addr(), -// Empty => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)), -// } -// } -// -// #[inline(always)] -// fn set_read_timeout(&self, dur: Option) -> io::Result<()> { -// match *self { -// Http/* TODO (ref stream) => stream.set_read_timeout(dur)*/ => Ok(()), -// #[cfg(feature = "tls")] Https(ref stream) => stream.set_read_timeout(dur), -// Empty => Ok(()), -// } -// } -// -// #[inline(always)] -// fn set_write_timeout(&self, dur: Option) -> io::Result<()> { -// match *self { -// Http/* TODO (ref stream) => stream.set_write_timeout(dur)*/ => Ok(()), -// #[cfg(feature = "tls")] Https(ref stream) => stream.set_write_timeout(dur), -// Empty => Ok(()), -// } -// } -// -// #[inline(always)] -// fn close(&mut self, how: Shutdown) -> io::Result<()> { -// match *self { -// Http/* TODO (ref mut stream) => stream.close(how)*/ => Ok(()), -// #[cfg(feature = "tls")] Https(ref mut stream) => stream.close(how), -// Empty => Ok(()), -// } -// } -//} diff --git a/core/lib/src/error.rs b/core/lib/src/error.rs index 71cf0ddd30..bcdb84443a 100644 --- a/core/lib/src/error.rs +++ b/core/lib/src/error.rs @@ -19,7 +19,7 @@ use crate::router::Route; #[derive(Debug)] pub enum LaunchErrorKind { /// Binding to the provided address/port failed. - Bind(std::io::Error), + Bind(io::Error), /// An I/O error occurred during launch. Io(io::Error), /// Route collisions were detected. @@ -123,10 +123,9 @@ impl LaunchError { impl From for LaunchError { #[inline] fn from(error: hyper::Error) -> LaunchError { - match error { - // TODO hyper::Error::Io(e) => LaunchError::new(LaunchErrorKind::Io(e)), - e => LaunchError::new(LaunchErrorKind::Unknown(Box::new(e))) - } + // TODO.async: Should "hyper error" be another variant of LaunchErrorKind? + // Or should this use LaunchErrorKind::Io? + LaunchError::new(LaunchErrorKind::Unknown(Box::new(error))) } } diff --git a/core/lib/src/ext.rs b/core/lib/src/ext.rs index 8813b74177..6cb1c16bf0 100644 --- a/core/lib/src/ext.rs +++ b/core/lib/src/ext.rs @@ -1,19 +1,58 @@ use std::io; +use std::pin::Pin; -pub trait ReadExt: io::Read { - fn read_max(&mut self, mut buf: &mut [u8]) -> io::Result { - let start_len = buf.len(); - while !buf.is_empty() { - match self.read(buf) { - Ok(0) => break, - Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; } - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } +use futures::io::{AsyncRead, AsyncReadExt as _}; +use futures::future::{Future}; +use futures::task::{Poll, Context}; + +// Based on std::io::Take, but for AsyncRead instead of Read +pub struct Take{ + inner: R, + limit: u64, +} + +// TODO.async: Verify correctness of this implementation. +impl AsyncRead for Take where R: AsyncRead + Unpin { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + if self.limit == 0 { + return Poll::Ready(Ok(0)); + } + + let max = std::cmp::min(buf.len() as u64, self.limit) as usize; + match Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max]) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(n)) => { + self.limit -= n as u64; + Poll::Ready(Ok(n)) + }, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), } + } +} + +pub trait AsyncReadExt: AsyncRead { + fn take(self, limit: u64) -> Take where Self: Sized { + Take { inner: self, limit } + } + + // TODO.async: Verify correctness of this implementation. + fn read_max<'a>(&'a mut self, mut buf: &'a mut [u8]) -> Pin> + Send + '_>> + where Self: Send + Unpin + { + Box::pin(async move { + let start_len = buf.len(); + while !buf.is_empty() { + match self.read(buf).await { + Ok(0) => break, + Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } - Ok(start_len - buf.len()) + Ok(start_len - buf.len()) + }) } } -impl ReadExt for T { } +impl AsyncReadExt for T { } diff --git a/core/lib/src/handler.rs b/core/lib/src/handler.rs index d9facc6937..0ab1e16b98 100644 --- a/core/lib/src/handler.rs +++ b/core/lib/src/handler.rs @@ -1,5 +1,7 @@ //! Types and traits for request and error handlers and their return values. +use futures::future::Future; + use crate::data::Data; use crate::request::Request; use crate::response::{self, Response, Responder}; @@ -9,6 +11,9 @@ use crate::outcome; /// Type alias for the `Outcome` of a `Handler`. pub type Outcome<'r> = outcome::Outcome, Status, Data>; +/// Type alias for the unwieldy `Handler` return type +pub type HandlerFuture<'r> = std::pin::Pin> + Send + 'r>>; + /// Trait implemented by types that can handle requests. /// /// In general, you will never need to implement `Handler` manually or be @@ -142,7 +147,7 @@ pub trait Handler: Cloneable + Send + Sync + 'static { /// a response. Otherwise, if the return value is `Forward(Data)`, the next /// matching route is attempted. If there are no other matching routes, the /// `404` error catcher is invoked. - fn handle<'r>(&self, request: &'r Request<'_>, data: Data) -> Outcome<'r>; + fn handle<'r>(&self, request: &'r Request<'_>, data: Data) -> HandlerFuture<'r>; } /// Unfortunate but necessary hack to be able to clone a `Box`. @@ -170,16 +175,18 @@ impl Clone for Box { } impl Handler for F - where for<'r> F: Fn(&'r Request<'_>, Data) -> Outcome<'r> + where for<'r> F: Fn(&'r Request<'_>, Data) -> HandlerFuture<'r> { #[inline(always)] - fn handle<'r>(&self, req: &'r Request<'_>, data: Data) -> Outcome<'r> { + fn handle<'r>(&self, req: &'r Request<'_>, data: Data) -> HandlerFuture<'r> { self(req, data) } } /// The type of an error handler. -pub type ErrorHandler = for<'r> fn(&'r Request<'_>) -> response::Result<'r>; +pub type ErrorHandler = for<'r> fn(&'r Request<'_>) -> ErrorHandlerFuture<'r>; + +pub type ErrorHandlerFuture<'r> = std::pin::Pin> + Send + 'r>>; impl<'r> Outcome<'r> { /// Return the `Outcome` of response to `req` from `responder`. diff --git a/core/lib/src/lib.rs b/core/lib/src/lib.rs index 7e1afa7a29..727a9ac218 100644 --- a/core/lib/src/lib.rs +++ b/core/lib/src/lib.rs @@ -4,6 +4,7 @@ #![feature(proc_macro_hygiene)] #![feature(crate_visibility_modifier)] #![feature(label_break_value)] +#![feature(async_await)] #![recursion_limit="256"] diff --git a/core/lib/src/local/request.rs b/core/lib/src/local/request.rs index 5091ccc476..1138da49aa 100644 --- a/core/lib/src/local/request.rs +++ b/core/lib/src/local/request.rs @@ -107,9 +107,7 @@ impl<'c> LocalRequest<'c> { uri: Cow<'c, str> ) -> LocalRequest<'c> { // We set a dummy string for now and check the user's URI on dispatch. - let config = &client.rocket().config; - let state = &client.rocket().state; - let request = Request::new(config, state, method, Origin::dummy()); + let request = Request::new(client.rocket(), method, Origin::dummy()); // Set up any cookies we know about. if let Some(ref jar) = client.cookies { @@ -399,40 +397,46 @@ impl<'c> LocalRequest<'c> { uri: &str, data: Vec ) -> LocalResponse<'c> { + let maybe_uri = Origin::parse(uri); + // First, validate the URI, returning an error response (generated from // an error catcher) immediately if it's invalid. - if let Ok(uri) = Origin::parse(uri) { + if let Ok(uri) = maybe_uri { request.set_uri(uri.into_owned()); } else { error!("Malformed request URI: {}", uri); - let res = client.rocket().handle_error(Status::BadRequest, request); - return LocalResponse { _request: owned_request, response: res }; + return futures::executor::block_on(async move { + let res = client.rocket().handle_error(Status::BadRequest, request).await; + LocalResponse { _request: owned_request, response: res } + }) } - // Actually dispatch the request. - let response = client.rocket().dispatch(request, Data::new(data)); - - // If the client is tracking cookies, updates the internal cookie jar - // with the changes reflected by `response`. - if let Some(ref jar) = client.cookies { - let mut jar = jar.write().expect("LocalRequest::_dispatch() write lock"); - let current_time = time::now(); - for cookie in response.cookies() { - if let Some(expires) = cookie.expires() { - if expires <= current_time { - jar.force_remove(cookie); - continue; + futures::executor::block_on(async move { + // Actually dispatch the request. + let response = client.rocket().dispatch(request, Data::local(data)).await; + + // If the client is tracking cookies, updates the internal cookie jar + // with the changes reflected by `response`. + if let Some(ref jar) = client.cookies { + let mut jar = jar.write().expect("LocalRequest::_dispatch() write lock"); + let current_time = time::now(); + for cookie in response.cookies() { + if let Some(expires) = cookie.expires() { + if expires <= current_time { + jar.force_remove(cookie); + continue; + } } - } - jar.add(cookie.into_owned()); + jar.add(cookie.into_owned()); + } } - } - LocalResponse { - _request: owned_request, - response: response - } + LocalResponse { + _request: owned_request, + response: response + } + }) } } diff --git a/core/lib/src/logger.rs b/core/lib/src/logger.rs index 3dcf4faff1..81e62750e5 100644 --- a/core/lib/src/logger.rs +++ b/core/lib/src/logger.rs @@ -158,16 +158,16 @@ crate fn try_init(level: LoggingLevel, verbose: bool) -> bool { } push_max_level(level); -/* if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) { + if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) { if verbose { eprintln!("Logger failed to initialize: {}", e); } pop_max_level(); return false; - }*/ + } - false + true } use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; diff --git a/core/lib/src/request/form/form.rs b/core/lib/src/request/form/form.rs index 6ac95786fd..162bd187ac 100644 --- a/core/lib/src/request/form/form.rs +++ b/core/lib/src/request/form/form.rs @@ -1,9 +1,12 @@ use std::ops::Deref; +use futures::io::AsyncReadExt; + use crate::outcome::Outcome::*; use crate::request::{Request, form::{FromForm, FormItems, FormDataError}}; -use crate::data::{Outcome, Transform, Transformed, Data, FromData}; +use crate::data::{Outcome, Transform, Transformed, Data, FromData, TransformFuture, FromDataFuture}; use crate::http::{Status, uri::{Query, FromUriParam}}; +use crate::ext::AsyncReadExt as _; /// A data guard for parsing [`FromForm`] types strictly. /// @@ -184,7 +187,7 @@ impl<'f, T: FromForm<'f>> Form { /// /// All relevant warnings and errors are written to the console in Rocket /// logging format. -impl<'f, T: FromForm<'f>> FromData<'f> for Form { +impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for Form { type Error = FormDataError<'f, T::Error>; type Owned = String; type Borrowed = str; @@ -192,30 +195,31 @@ impl<'f, T: FromForm<'f>> FromData<'f> for Form { fn transform( request: &Request<'_>, data: Data - ) -> Transform> { - use std::{cmp::min, io::Read}; - - let outcome = 'o: { - if !request.content_type().map_or(false, |ct| ct.is_form()) { - warn_!("Form data does not have form content type."); - break 'o Forward(data); - } + ) -> TransformFuture<'f, Self::Owned, Self::Error> { + if !request.content_type().map_or(false, |ct| ct.is_form()) { + warn_!("Form data does not have form content type."); + return Box::pin(futures::future::ready(Transform::Borrowed(Forward(data)))); + } - let limit = request.limits().forms; - let mut stream = data.open().take(limit); - let mut form_string = String::with_capacity(min(4096, limit) as usize); - if let Err(e) = stream.read_to_string(&mut form_string) { - break 'o Failure((Status::InternalServerError, FormDataError::Io(e))); + let limit = request.limits().forms; + let mut stream = data.open().take(limit); + Box::pin(async move { + let mut buf = Vec::new(); + if let Err(e) = stream.read_to_end(&mut buf).await { + return Transform::Borrowed(Failure((Status::InternalServerError, FormDataError::Io(e)))); } - break 'o Success(form_string); - }; - - Transform::Borrowed(outcome) + Transform::Borrowed(match String::from_utf8(buf) { + Ok(s) => Success(s), + Err(e) => Failure((Status::BadRequest, FormDataError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))), + }) + }) } - fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> Outcome { - >::from_data(o.borrowed()?, true).map(Form) + fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> { + Box::pin(futures::future::ready(o.borrowed().and_then(|data| { + >::from_data(data, true).map(Form) + }))) } } diff --git a/core/lib/src/request/form/from_form.rs b/core/lib/src/request/form/from_form.rs index 45b5d788d6..847d46146f 100644 --- a/core/lib/src/request/form/from_form.rs +++ b/core/lib/src/request/form/from_form.rs @@ -93,7 +93,7 @@ use crate::request::FormItems; /// ``` pub trait FromForm<'f>: Sized { /// The associated error to be returned when parsing fails. - type Error; + type Error: Send; /// Parses an instance of `Self` from the iterator of form items `it`. /// diff --git a/core/lib/src/request/form/lenient.rs b/core/lib/src/request/form/lenient.rs index 4cae348f11..d30f3f1b4b 100644 --- a/core/lib/src/request/form/lenient.rs +++ b/core/lib/src/request/form/lenient.rs @@ -1,7 +1,7 @@ use std::ops::Deref; use crate::request::{Request, form::{Form, FormDataError, FromForm}}; -use crate::data::{Data, Transform, Transformed, FromData, Outcome}; +use crate::data::{Data, Transformed, FromData, TransformFuture, FromDataFuture}; use crate::http::uri::{Query, FromUriParam}; /// A data guard for parsing [`FromForm`] types leniently. @@ -95,17 +95,19 @@ impl Deref for LenientForm { } } -impl<'f, T: FromForm<'f>> FromData<'f> for LenientForm { +impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for LenientForm { type Error = FormDataError<'f, T::Error>; type Owned = String; type Borrowed = str; - fn transform(r: &Request<'_>, d: Data) -> Transform> { + fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'f, Self::Owned, Self::Error> { >::transform(r, d) } - fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> Outcome { - >::from_data(o.borrowed()?, false).map(LenientForm) + fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> { + Box::pin(futures::future::ready(o.borrowed().and_then(|form| { + >::from_data(form, false).map(LenientForm) + }))) } } diff --git a/core/lib/src/request/request.rs b/core/lib/src/request/request.rs index 94a3cb9286..c1d55e73be 100644 --- a/core/lib/src/request/request.rs +++ b/core/lib/src/request/request.rs @@ -1,5 +1,4 @@ -use std::rc::Rc; -use std::cell::{Cell, RefCell}; +use std::sync::{Arc, RwLock, Mutex}; use std::net::{IpAddr, SocketAddr}; use std::fmt; use std::str; @@ -26,26 +25,26 @@ type Indices = (usize, usize); /// should likely only be used when writing [`FromRequest`] implementations. It /// contains all of the information for a given web request except for the body /// data. This includes the HTTP method, URI, cookies, headers, and more. -#[derive(Clone)] +//#[derive(Clone)] pub struct Request<'r> { - method: Cell, + method: RwLock, uri: Origin<'r>, headers: HeaderMap<'r>, remote: Option, crate state: RequestState<'r>, } -#[derive(Clone)] +//#[derive(Clone)] crate struct RequestState<'r> { crate config: &'r Config, crate managed: &'r Container, crate path_segments: SmallVec<[Indices; 12]>, crate query_items: Option>, - crate route: Cell>, - crate cookies: RefCell, + crate route: RwLock>, + crate cookies: Mutex>, crate accept: Storage>, crate content_type: Storage>, - crate cache: Rc, + crate cache: Arc, } #[derive(Clone)] @@ -59,26 +58,25 @@ impl<'r> Request<'r> { /// Create a new `Request` with the given `method` and `uri`. #[inline(always)] crate fn new<'s: 'r>( - config: &'r Config, - managed: &'r Container, + rocket: &'r Rocket, method: Method, uri: Origin<'s> ) -> Request<'r> { let mut request = Request { - method: Cell::new(method), + method: RwLock::new(method), uri: uri, headers: HeaderMap::new(), remote: None, state: RequestState { path_segments: SmallVec::new(), query_items: None, - config, - managed, - route: Cell::new(None), - cookies: RefCell::new(CookieJar::new()), + config: &rocket.config, + managed: &rocket.state, + route: RwLock::new(None), + cookies: Mutex::new(Some(CookieJar::new())), accept: Storage::new(), content_type: Storage::new(), - cache: Rc::new(Container::new()), + cache: Arc::new(Container::new()), } }; @@ -101,7 +99,7 @@ impl<'r> Request<'r> { /// ``` #[inline(always)] pub fn method(&self) -> Method { - self.method.get() + *self.method.read().unwrap() } /// Set the method of `self`. @@ -290,9 +288,13 @@ impl<'r> Request<'r> { /// ``` pub fn cookies(&self) -> Cookies<'_> { // FIXME: Can we do better? This is disappointing. - match self.state.cookies.try_borrow_mut() { - Ok(jar) => Cookies::new(jar, self.state.config.secret_key()), - Err(_) => { + let mut guard = self.state.cookies.lock().expect("cookies lock"); + match guard.take() { + Some(jar) => { + let mutex = &self.state.cookies; + Cookies::new(jar, self.state.config.secret_key(), move |jar| *mutex.lock().expect("cookies lock") = Some(jar)) + } + None => { error_!("Multiple `Cookies` instances are active at once."); info_!("An instance of `Cookies` must be dropped before another \ can be retrieved."); @@ -497,7 +499,7 @@ impl<'r> Request<'r> { /// # }); /// ``` pub fn route(&self) -> Option<&'r Route> { - self.state.route.get() + *self.state.route.read().unwrap() } /// Invokes the request guard implementation for `T`, returning its outcome. @@ -700,7 +702,7 @@ impl<'r> Request<'r> { pub fn example)>(method: Method, uri: &str, f: F) { let rocket = Rocket::custom(Config::development()); let uri = Origin::parse(uri).expect("invalid URI in example"); - let mut request = Request::new(&rocket.config, &rocket.state, method, uri); + let mut request = Request::new(&rocket, method, uri); f(&mut request); } @@ -771,79 +773,66 @@ impl<'r> Request<'r> { /// was `route`. Use during routing when attempting a given route. #[inline(always)] crate fn set_route(&self, route: &'r Route) { - self.state.route.set(Some(route)); + * self.state.route.write().unwrap() = Some(route); } /// Set the method of `self`, even when `self` is a shared reference. Used /// during routing to override methods for re-routing. #[inline(always)] crate fn _set_method(&self, method: Method) { - self.method.set(method); + *self.method.write().unwrap() = method; } /// Convert from Hyper types into a Rocket Request. crate fn from_hyp( - config: &'r Config, - managed: &'r Container, - request_parts: &hyper::Parts, + rocket: &'r Rocket, + h_method: hyper::Method, + h_headers: hyper::HeaderMap, + h_uri: hyper::Uri, + h_addr: SocketAddr, ) -> Result, String> { - - let h_uri = &request_parts.uri; - let h_headers = &request_parts.headers; - let h_version = &request_parts.version; - let h_method = &request_parts.method;; - -// if !h_uri.is_absolute() { -// return Err(format!("Bad URI: {}", h_uri)); -// }; + // TODO.async: Can we avoid this allocation? + // TODO.async: Assert that uri is "absolute" + // Get a copy of the URI for later use. + let uri = h_uri.to_string(); // Ensure that the method is known. TODO: Allow made-up methods? - let method = match Method::from_hyp(h_method) { + let method = match Method::from_hyp(&h_method) { Some(method) => method, - None => return Err(format!("Unknown method: {}", h_method)) + None => return Err(format!("Unknown or invalid method: {}", h_method)) }; // We need to re-parse the URI since we don't trust Hyper... :( - let uri = Origin::parse_owned(format!("{}", h_uri)).map_err(|e| e.to_string())?; + let uri = Origin::parse_owned(format!("{}", uri)).map_err(|e| e.to_string())?; // Construct the request object. - let mut request = Request::new(config, managed, method, uri); -// request.set_remote(match hyp_req.remote_addr() { -// Some(remote) => remote, -// None => return Err(String::from("Missing remote address")) -// }); + let mut request = Request::new(rocket, method, uri); + request.set_remote(h_addr); // Set the request cookies, if they exist. - let cookie_headers = h_headers.get_all("Cookie").iter(); - // TODO if cookie_headers.peek().is_some() { - let mut cookie_jar = CookieJar::new(); - for header in cookie_headers { - let raw_str = match std::str::from_utf8(header.as_bytes()) { - Ok(string) => string, - Err(_) => continue - }; - - for cookie_str in raw_str.split(';').map(|s| s.trim()) { - if let Some(cookie) = Cookies::parse_cookie(cookie_str) { - cookie_jar.add_original(cookie); - } + let mut cookie_jar = CookieJar::new(); + for header in h_headers.get_all("Cookie") { + // TODO.async: This used to only allow UTF-8 but now only allows ASCII + // (needs verification) + let raw_str = match header.to_str() { + Ok(string) => string, + Err(_) => continue + }; + + for cookie_str in raw_str.split(';').map(|s| s.trim()) { + if let Some(cookie) = Cookies::parse_cookie(cookie_str) { + cookie_jar.add_original(cookie); } } - - request.state.cookies = RefCell::new(cookie_jar); - // TODO } + } + request.state.cookies = Mutex::new(Some(cookie_jar)); // Set the rest of the headers. for (name, value) in h_headers.iter() { - - // TODO if let Some(header_values) = h_headers.get_all(hyp.name()) { - - // This is not totally correct since values needn't be UTF8. - let value_str = String::from_utf8_lossy(value.as_bytes()).into_owned(); - let header = Header::new(name.to_string(), value_str); - request.add_header(header); - - // TODO } + // This is not totally correct since values needn't be UTF8. + let value_str = String::from_utf8_lossy(value.as_bytes()).into_owned(); + let header = Header::new(name.to_string(), value_str); + request.add_header(header); } Ok(request) diff --git a/core/lib/src/response/responder.rs b/core/lib/src/response/responder.rs index f0a83a2fdf..ddae4b8ac2 100644 --- a/core/lib/src/response/responder.rs +++ b/core/lib/src/response/responder.rs @@ -2,6 +2,8 @@ use std::fs::File; use std::io::{Cursor, BufReader}; use std::fmt; +use futures::compat::AsyncRead01CompatExt; + use crate::http::{Status, ContentType, StatusClass}; use crate::response::{self, Response, Body}; use crate::request::Request; @@ -242,10 +244,11 @@ impl Responder<'_> for Vec { /// Returns a response with a sized body for the file. Always returns `Ok`. impl Responder<'_> for File { fn respond_to(self, _: &Request<'_>) -> response::Result<'static> { - let (metadata, file) = (self.metadata(), BufReader::new(self)); + let metadata = self.metadata(); + let stream = BufReader::new(tokio::fs::File::from_std(self)).compat(); match metadata { - Ok(md) => Response::build().raw_body(Body::Sized(file, md.len())).ok(), - Err(_) => Response::build().streamed_body(file).ok() + Ok(md) => Response::build().raw_body(Body::Sized(stream, md.len())).ok(), + Err(_) => Response::build().streamed_body(stream).ok() } } } diff --git a/core/lib/src/response/response.rs b/core/lib/src/response/response.rs index 64a2afaa2d..3339189f14 100644 --- a/core/lib/src/response/response.rs +++ b/core/lib/src/response/response.rs @@ -1,8 +1,13 @@ use std::{io, fmt, str}; use std::borrow::Cow; +use std::pin::Pin; + +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncRead, AsyncReadExt}; use crate::response::Responder; use crate::http::{Header, HeaderMap, Status, ContentType, Cookie}; +use crate::ext::AsyncReadExt as _; /// The default size, in bytes, of a chunk for streamed responses. pub const DEFAULT_CHUNK_SIZE: u64 = 4096; @@ -59,31 +64,34 @@ impl Body { } } -impl Body { +impl Body { /// Attempts to read `self` into a `Vec` and returns it. If reading fails, /// returns `None`. - pub fn into_bytes(self) -> Option> { - let mut vec = Vec::new(); - let mut body = self.into_inner(); - if let Err(e) = body.read_to_end(&mut vec) { - error_!("Error reading body: {:?}", e); - return None; - } + pub fn into_bytes(self) -> impl Future>> { + Box::pin(async move { + let mut vec = Vec::new(); + let mut body = self.into_inner(); + if let Err(e) = body.read_to_end(&mut vec).await { + error_!("Error reading body: {:?}", e); + return None; + } - Some(vec) + Some(vec) + }) } /// Attempts to read `self` into a `String` and returns it. If reading or /// conversion fails, returns `None`. - pub fn into_string(self) -> Option { - self.into_bytes() - .and_then(|bytes| match String::from_utf8(bytes) { + pub fn into_string(self) -> impl Future> { + self.into_bytes().map(|bytes| { + bytes.and_then(|bytes| match String::from_utf8(bytes) { Ok(string) => Some(string), Err(e) => { error_!("Body is invalid UTF-8: {}", e); None } }) + }) } } @@ -350,7 +358,7 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn sized_body(&mut self, body: B) -> &mut ResponseBuilder<'r> - where B: io::Read + io::Seek + 'r + where B: AsyncRead + io::Seek + Send + Unpin + 'r { self.response.set_sized_body(body); self @@ -376,7 +384,7 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn streamed_body(&mut self, body: B) -> &mut ResponseBuilder<'r> - where B: io::Read + 'r + where B: AsyncRead + Send + 'r { self.response.set_streamed_body(body); self @@ -402,7 +410,7 @@ impl<'r> ResponseBuilder<'r> { /// # } /// ``` #[inline(always)] - pub fn chunked_body(&mut self, body: B, chunk_size: u64) + pub fn chunked_body(&mut self, body: B, chunk_size: u64) -> &mut ResponseBuilder<'r> { self.response.set_chunked_body(body, chunk_size); @@ -425,7 +433,7 @@ impl<'r> ResponseBuilder<'r> { /// .finalize(); /// ``` #[inline(always)] - pub fn raw_body(&mut self, body: Body) + pub fn raw_body(&mut self, body: Body) -> &mut ResponseBuilder<'r> { self.response.set_raw_body(body); @@ -560,7 +568,7 @@ impl<'r> ResponseBuilder<'r> { pub struct Response<'r> { status: Option, headers: HeaderMap<'r>, - body: Option>>, + body: Option>>>, } impl<'r> Response<'r> { @@ -889,7 +897,7 @@ impl<'r> Response<'r> { /// assert_eq!(response.body_string(), Some("Hello, world!".to_string())); /// ``` #[inline(always)] - pub fn body(&mut self) -> Option> { + pub fn body(&mut self) -> Option> { // Looks crazy, right? Needed so Rust infers lifetime correctly. Weird. match self.body.as_mut() { Some(body) => Some(match body.as_mut() { @@ -919,8 +927,14 @@ impl<'r> Response<'r> { /// assert!(response.body().is_none()); /// ``` #[inline(always)] - pub fn body_string(&mut self) -> Option { - self.take_body().and_then(Body::into_string) + pub fn body_string(&mut self) -> impl Future> + 'r { + let body = self.take_body(); + Box::pin(async move { + match body { + Some(body) => body.into_string().await, + None => None, + } + }) } /// Consumes `self's` body and reads it into a `Vec` of `u8` bytes. If @@ -941,8 +955,14 @@ impl<'r> Response<'r> { /// assert!(response.body().is_none()); /// ``` #[inline(always)] - pub fn body_bytes(&mut self) -> Option> { - self.take_body().and_then(Body::into_bytes) + pub fn body_bytes(&mut self) -> impl Future>> + 'r { + let body = self.take_body(); + Box::pin(async move { + match body { + Some(body) => body.into_bytes().await, + None => None, + } + }) } /// Moves the body of `self` out and returns it, if there is one, leaving no @@ -966,17 +986,17 @@ impl<'r> Response<'r> { /// assert!(response.body().is_none()); /// ``` #[inline(always)] - pub fn take_body(&mut self) -> Option>> { + pub fn take_body(&mut self) -> Option>>> { self.body.take() } - // Makes the `Read`er in the body empty but leaves the size of the body if + // Makes the `AsyncRead`er in the body empty but leaves the size of the body if // it exists. Only meant to be used to handle HEAD requests automatically. #[inline(always)] crate fn strip_body(&mut self) { if let Some(body) = self.take_body() { self.body = match body { - Body::Sized(_, n) => Some(Body::Sized(Box::new(io::empty()), n)), + Body::Sized(_, n) => Some(Body::Sized(Box::pin(io::empty()), n)), Body::Chunked(..) => None }; } @@ -1004,13 +1024,13 @@ impl<'r> Response<'r> { /// ``` #[inline] pub fn set_sized_body(&mut self, mut body: B) - where B: io::Read + io::Seek + 'r + where B: AsyncRead + io::Seek + Send + Unpin + 'r { let size = body.seek(io::SeekFrom::End(0)) .expect("Attempted to retrieve size by seeking, but failed."); body.seek(io::SeekFrom::Start(0)) .expect("Attempted to reset body by seeking after getting size."); - self.body = Some(Body::Sized(Box::new(body.take(size)), size)); + self.body = Some(Body::Sized(Box::pin(body.take(size)), size)); } /// Sets the body of `self` to be `body`, which will be streamed. The chunk @@ -1021,7 +1041,7 @@ impl<'r> Response<'r> { /// # Example /// /// ```rust - /// use std::io::{Read, repeat}; + /// use std::io::{AsyncRead, repeat}; /// use rocket::Response; /// /// let mut response = Response::new(); @@ -1029,7 +1049,7 @@ impl<'r> Response<'r> { /// assert_eq!(response.body_string(), Some("aaaaa".to_string())); /// ``` #[inline(always)] - pub fn set_streamed_body(&mut self, body: B) where B: io::Read + 'r { + pub fn set_streamed_body(&mut self, body: B) where B: AsyncRead + Send + 'r { self.set_chunked_body(body, DEFAULT_CHUNK_SIZE); } @@ -1039,7 +1059,7 @@ impl<'r> Response<'r> { /// # Example /// /// ```rust - /// use std::io::{Read, repeat}; + /// use std::io::{AsyncRead, repeat}; /// use rocket::Response; /// /// let mut response = Response::new(); @@ -1048,8 +1068,8 @@ impl<'r> Response<'r> { /// ``` #[inline(always)] pub fn set_chunked_body(&mut self, body: B, chunk_size: u64) - where B: io::Read + 'r { - self.body = Some(Body::Chunked(Box::new(body), chunk_size)); + where B: AsyncRead + Send + 'r { + self.body = Some(Body::Chunked(Box::pin(body), chunk_size)); } /// Sets the body of `self` to be `body`. This method should typically not @@ -1070,10 +1090,11 @@ impl<'r> Response<'r> { /// assert_eq!(response.body_string(), Some("Hello!".to_string())); /// ``` #[inline(always)] - pub fn set_raw_body(&mut self, body: Body) { + pub fn set_raw_body(&mut self, body: Body) + where T: AsyncRead + Send + Unpin + 'r { self.body = Some(match body { - Body::Sized(b, n) => Body::Sized(Box::new(b.take(n)), n), - Body::Chunked(b, n) => Body::Chunked(Box::new(b), n), + Body::Sized(b, n) => Body::Sized(Box::pin(b.take(n)), n), + Body::Chunked(b, n) => Body::Chunked(Box::pin(b), n), }); } diff --git a/core/lib/src/response/status.rs b/core/lib/src/response/status.rs index a8d24403ad..8bce41a570 100644 --- a/core/lib/src/response/status.rs +++ b/core/lib/src/response/status.rs @@ -13,7 +13,7 @@ use std::collections::hash_map::DefaultHasher; use crate::request::Request; use crate::response::{Responder, Response}; use crate::http::hyper::header; -use crate::http::{Header, Status}; +use crate::http::Status; /// Sets the status of the response to 201 (Created). /// @@ -47,10 +47,8 @@ impl<'r, R: Responder<'r>> Responder<'r> for Created { build.merge(responder.respond_to(req)?); } - build.status(Status::Created).header(Header::new( - header::LOCATION.as_str(), - self.0 - )).ok() + // TODO.async: Using a raw header + build.status(Status::Created).raw_header(header::LOCATION.as_str(), self.0).ok() } } @@ -67,16 +65,12 @@ impl<'r, R: Responder<'r> + Hash> Responder<'r> for Created { let hash = hasher.finish().to_string(); build.merge(responder.respond_to(req)?); - build.header(Header::new( - header::ETAG.as_str(), - hash, // TODO header::EntityTag::strong(hash) - )); + // TODO.async: Using a raw header + build.raw_header(header::ETAG.as_str(), format!("\"{}\"", hash)); } - build.status(Status::Created).header(Header::new( - header::LOCATION.as_str(), - self.0 - )).ok() + // TODO.async: Using a raw header + build.status(Status::Created).raw_header(header::LOCATION.as_str(), self.0).ok() } } diff --git a/core/lib/src/response/stream.rs b/core/lib/src/response/stream.rs index 84e106cc57..37dcfbdbbb 100644 --- a/core/lib/src/response/stream.rs +++ b/core/lib/src/response/stream.rs @@ -1,19 +1,20 @@ -use std::io::Read; use std::fmt::{self, Debug}; +use futures::io::AsyncRead; + use crate::request::Request; use crate::response::{Response, Responder, DEFAULT_CHUNK_SIZE}; use crate::http::Status; -/// Streams a response to a client from an arbitrary `Read`er type. +/// Streams a response to a client from an arbitrary `AsyncRead`er type. /// /// The client is sent a "chunked" response, where the chunk size is at most /// 4KiB. This means that at most 4KiB are stored in memory while the response /// is being sent. This type should be used when sending responses that are /// arbitrarily large in size, such as when streaming from a local socket. -pub struct Stream(T, u64); +pub struct Stream(T, u64); -impl Stream { +impl Stream { /// Create a new stream from the given `reader` and sets the chunk size for /// each streamed chunk to `chunk_size` bytes. /// @@ -34,7 +35,7 @@ impl Stream { } } -impl Debug for Stream { +impl Debug for Stream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Stream({:?})", self.0) } @@ -54,7 +55,7 @@ impl Debug for Stream { /// # #[allow(unused_variables)] /// let response = Stream::from(io::stdin()); /// ``` -impl From for Stream { +impl From for Stream { fn from(reader: T) -> Self { Stream(reader, DEFAULT_CHUNK_SIZE) } @@ -68,7 +69,7 @@ impl From for Stream { /// If reading from the input stream fails at any point during the response, the /// response is abandoned, and the response ends abruptly. An error is printed /// to the console with an indication of what went wrong. -impl<'r, T: Read + 'r> Responder<'r> for Stream { +impl<'r, T: AsyncRead + Send + 'r> Responder<'r> for Stream { fn respond_to(self, _: &Request<'_>) -> Result, Status> { Response::build().chunked_body(self.0, self.1).ok() } diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index 237850868d..4ce89254cc 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -1,18 +1,20 @@ use std::collections::HashMap; use std::convert::From; -use std::str::{from_utf8, FromStr}; use std::cmp::min; +use std::io; use std::mem; use std::net::ToSocketAddrs; use std::sync::Arc; +use std::pin::Pin; -use futures::{Future, Stream}; -use futures::future::{self, FutureResult}; +use futures::compat::Compat; +use futures::future::{Future, FutureExt, TryFutureExt}; +use futures::io::AsyncReadExt; use yansi::Paint; use state::Container; use tokio::net::TcpListener; -use tokio::prelude::{Future as _, Stream as _}; +use tokio::prelude::Stream as _; #[cfg(feature = "tls")] use crate::http::tls::TlsAcceptor; @@ -58,13 +60,13 @@ impl std::ops::Deref for RocketHyperService { impl hyper::MakeService for RocketHyperService { type ReqBody = hyper::Body; type ResBody = hyper::Body; - type Error = hyper::Error; + type Error = io::Error; type Service = RocketHyperService; - type Future = FutureResult; + type Future = Compat>>; type MakeError = Self::Error; fn make_service(&mut self, _: Ctx) -> Self::Future { - future::ok(RocketHyperService { rocket: self.rocket.clone() }) + futures::future::ok(RocketHyperService { rocket: self.rocket.clone() }).compat() } } @@ -72,8 +74,8 @@ impl hyper::MakeService for RocketHyperService { impl hyper::Service for RocketHyperService { type ReqBody = hyper::Body; type ResBody = hyper::Body; - type Error = hyper::Error; - type Future = Box, Error = Self::Error> + Send>; + type Error = io::Error; + type Future = Compat, Self::Error>> + Send>>>; // This function tries to hide all of the Hyper-ness from Rocket. It // essentially converts Hyper types into Rocket types, then calls the @@ -84,44 +86,120 @@ impl hyper::Service for RocketHyperService { &mut self, hyp_req: hyper::Request, ) -> Self::Future { - let (parts, body) = hyp_req.into_parts(); - - // Convert the Hyper request into a Rocket request. - let req_res = Request::from_hyp(&self.config, &self.state, &parts); - let mut req = match req_res { - Ok(req) => req, - Err(e) => { - error!("Bad incoming request: {}", e); - // TODO: We don't have a request to pass in, so we just - // fabricate one. This is weird. We should let the user know - // that we failed to parse a request (by invoking some special - // handler) instead of doing this. - let dummy = Request::new(&self.config, &self.state, Method::Get, Origin::dummy()); - let r = self.handle_error(Status::BadRequest, &dummy); - return Box::new(future::ok(hyper::Response::from(r))); + let rocket = self.rocket.clone(); + async move { + // Get all of the information from Hyper. + let (h_parts, h_body) = hyp_req.into_parts(); + + // TODO.async: Get the client address somehow. + let h_addr = "0.0.0.0:0".parse().expect("socket addr"); + + // Convert the Hyper request into a Rocket request. + let req_res = Request::from_hyp(&rocket, h_parts.method, h_parts.headers, h_parts.uri, h_addr); + let mut req = match req_res { + Ok(req) => req, + Err(e) => { + error!("Bad incoming request: {}", e); + // TODO: We don't have a request to pass in, so we just + // fabricate one. This is weird. We should let the user know + // that we failed to parse a request (by invoking some special + // handler) instead of doing this. + let dummy = Request::new(&rocket, Method::Get, Origin::dummy()); + let r = rocket.handle_error(Status::BadRequest, &dummy).await; + return rocket.issue_response(r).await; + } + }; + + // Retrieve the data from the hyper body. + let data = Data::from_hyp(h_body).await; + + // Dispatch the request to get a response, then write that response out. + let r = rocket.dispatch(&mut req, data).await; + rocket.issue_response(r).await + }.boxed().compat() + } +} + +impl Rocket { + // TODO.async: Reconsider io::Result + #[inline] + fn issue_response<'r>(&self, response: Response<'r>) -> impl Future>> + 'r { + let result = self.write_response(response); + Box::pin(async move { + match result.await { + Ok(r) => { + info_!("{}", Paint::green("Response succeeded.")); + Ok(r) + } + Err(e) => { + error_!("Failed to write response: {:?}.", e); + Err(e) + } } - }; + }) + } - let this = self.clone(); + #[inline] + fn write_response<'r>( + &self, + mut response: Response<'r>, + ) -> impl Future>> + 'r { + Box::pin(async move { + let mut hyp_res = hyper::Response::builder(); + hyp_res.status(response.status().code); + + for header in response.headers().iter() { + let name = header.name.as_str(); + let value = header.value.as_bytes(); + hyp_res.header(name, value); + } - let response = body.concat2() - .map(move |chunk| { - let body = chunk.iter().rev().cloned().collect::>(); - let data = Data::new(body); + let body = match response.body() { + None => { + hyp_res.header(header::CONTENT_LENGTH, "0"); + hyper::Body::empty() + } + Some(Body::Sized(body, size)) => { + hyp_res.header(header::CONTENT_LENGTH, size.to_string()); + + // TODO.async: Stream the data instead of buffering. + // TODO.async: Possible truncation (u64 -> usize) + let mut buffer = Vec::with_capacity(size as usize); + body.read_to_end(&mut buffer).await?; + hyper::Body::from(buffer) + } + Some(Body::Chunked(body, _chunk_size)) => { + // // This _might_ happen on a 32-bit machine! + // if chunk_size > (usize::max_value() as u64) { + // let msg = "chunk size exceeds limits of usize type"; + // return Err(io::Error::new(io::ErrorKind::Other, msg)); + // } + // + // // The buffer stores the current chunk being written out. + // let mut buffer = vec![0; chunk_size as usize]; + // let mut stream = hyp_res.start()?; + // loop { + // match body.read_max(&mut buffer)? { + // 0 => break, + // n => stream.write_all(&buffer[..n])?, + // } + // } + // + // stream.end() - // TODO: Due to life time constraints the clone of the service has been made. - // TODO: It should not be necessary but it is required to find a better solution - let mut req = Request::from_hyp(&this.config, &this.state, &parts).unwrap(); - // Dispatch the request to get a response, then write that response out. - let response = this.dispatch(&mut req, data); - hyper::Response::from(response) - }); + // TODO.async: Stream the data instead of buffering. + let mut buffer = Vec::new(); + body.read_to_end(&mut buffer).await?; + hyper::Body::from(buffer) + } + }; - Box::new(response) + Ok(hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?) + }) } } -impl RocketHyperService { +impl Rocket { /// Preprocess the request for Rocket things. Currently, this means: /// /// * Rewriting the method in the request if _method form field exists. @@ -135,7 +213,7 @@ impl RocketHyperService { let is_form = req.content_type().map_or(false, |ct| ct.is_form()); if is_form && req.method() == Method::Post && data_len >= min_len { - if let Ok(form) = from_utf8(&data.peek()[..min(data_len, max_len)]) { + if let Ok(form) = std::str::from_utf8(&data.peek()[..min(data_len, max_len)]) { let method: Option> = FormItems::from(form) .filter(|item| item.key.as_str() == "_method") .map(|item| item.value.parse()) @@ -149,71 +227,76 @@ impl RocketHyperService { } #[inline] - crate fn dispatch<'s, 'r>( + crate fn dispatch<'s, 'r: 's>( &'s self, request: &'r mut Request<'s>, data: Data - ) -> Response<'r> { - info!("{}:", request); + ) -> impl Future> + 's { + async move { + info!("{}:", request); - // Do a bit of preprocessing before routing. - self.preprocess_request(request, &data); + // Do a bit of preprocessing before routing. + self.preprocess_request(request, &data); - // Run the request fairings. - self.fairings.handle_request(request, &data); + // Run the request fairings. + self.fairings.handle_request(request, &data); - // Remember if the request is a `HEAD` request for later body stripping. - let was_head_request = request.method() == Method::Head; + // Remember if the request is a `HEAD` request for later body stripping. + let was_head_request = request.method() == Method::Head; - // Route the request and run the user's handlers. - let mut response = self.route_and_process(request, data); + // Route the request and run the user's handlers. + let mut response = self.route_and_process(request, data).await; - // Add a default 'Server' header if it isn't already there. - // TODO: If removing Hyper, write out `Date` header too. - if !response.headers().contains("Server") { - response.set_header(Header::new("Server", "Rocket")); - } + // Add a default 'Server' header if it isn't already there. + // TODO: If removing Hyper, write out `Date` header too. + if !response.headers().contains("Server") { + response.set_header(Header::new("Server", "Rocket")); + } - // Run the response fairings. - self.fairings.handle_response(request, &mut response); + // Run the response fairings. + self.fairings.handle_response(request, &mut response); - // Strip the body if this is a `HEAD` request. - if was_head_request { - response.strip_body(); - } + // Strip the body if this is a `HEAD` request. + if was_head_request { + response.strip_body(); + } - response + response + } } /// Route the request and process the outcome to eventually get a response. - fn route_and_process<'s, 'r>( + fn route_and_process<'s, 'r: 's>( &'s self, request: &'r Request<'s>, data: Data - ) -> Response<'r> { - match self.route(request, data) { - Outcome::Success(mut response) => { - // A user's route responded! Set the cookies. - for cookie in request.cookies().delta() { - response.adjoin_header(cookie); + ) -> impl Future> + Send + 's { + async move { + match self.route(request, data).await { + Outcome::Success(mut response) => { + // A user's route responded! Set the cookies. + for cookie in request.cookies().delta() { + response.adjoin_header(cookie); + } + + response } - - response - } - Outcome::Forward(data) => { - // There was no matching route. Autohandle `HEAD` requests. - if request.method() == Method::Head { - info_!("Autohandling {} request.", Paint::default("HEAD").bold()); - - // Dispatch the request again with Method `GET`. - request._set_method(Method::Get); - self.route_and_process(request, data) - } else { - // No match was found and it can't be autohandled. 404. - self.handle_error(Status::NotFound, request) + Outcome::Forward(data) => { + // There was no matching route. Autohandle `HEAD` requests. + if request.method() == Method::Head { + info_!("Autohandling {} request.", Paint::default("HEAD").bold()); + + // Dispatch the request again with Method `GET`. + request._set_method(Method::Get); + let try_next: Pin + Send>> = Box::pin(self.route_and_process(request, data)); + try_next.await + } else { + // No match was found and it can't be autohandled. 404. + self.handle_error(Status::NotFound, request).await + } } + Outcome::Failure(status) => self.handle_error(status, request).await } - Outcome::Failure(status) => self.handle_error(status, request) } } @@ -229,32 +312,34 @@ impl RocketHyperService { // (ensuring `handler` takes an immutable borrow), any caller to `route` // should be able to supply an `&mut` and retain an `&` after the call. #[inline] - crate fn route<'s, 'r>( + crate fn route<'s, 'r: 's>( &'s self, request: &'r Request<'s>, mut data: Data, - ) -> handler::Outcome<'r> { - // Go through the list of matching routes until we fail or succeed. - let matches = self.router.route(request); - for route in matches { - // Retrieve and set the requests parameters. - info_!("Matched: {}", route); - request.set_route(route); - - // Dispatch the request to the handler. - let outcome = route.handler.handle(request, data); - - // Check if the request processing completed or if the request needs - // to be forwarded. If it does, continue the loop to try again. - info_!("{} {}", Paint::default("Outcome:").bold(), outcome); - match outcome { - o@Outcome::Success(_) | o@Outcome::Failure(_) => return o, - Outcome::Forward(unused_data) => data = unused_data, - }; - } + ) -> impl Future> + 's { + async move { + // Go through the list of matching routes until we fail or succeed. + let matches = self.router.route(request); + for route in matches { + // Retrieve and set the requests parameters. + info_!("Matched: {}", route); + request.set_route(route); + + // Dispatch the request to the handler. + let outcome = route.handler.handle(request, data).await; + + // Check if the request processing completed (Some) or if the request needs + // to be forwarded. If it does, continue the loop (None) to try again. + info_!("{} {}", Paint::default("Outcome:").bold(), outcome); + match outcome { + o@Outcome::Success(_) | o@Outcome::Failure(_) => return o, + Outcome::Forward(unused_data) => data = unused_data, + } + } - error_!("No matching routes for {}.", request); - Outcome::Forward(data) + error_!("No matching routes for {}.", request); + Outcome::Forward(data) + } } // Finds the error catcher for the status `status` and executes it for the @@ -262,48 +347,35 @@ impl RocketHyperService { // catcher is called. If the catcher fails to return a good response, the // 500 catcher is executed. If there is no registered catcher for `status`, // the default catcher is used. - crate fn handle_error<'r>( - &self, + crate fn handle_error<'s, 'r: 's>( + &'s self, status: Status, - req: &'r Request<'_> - ) -> Response<'r> { - warn_!("Responding with {} catcher.", Paint::red(&status)); - - // Try to get the active catcher but fallback to user's 500 catcher. - let catcher = self.catchers.get(&status.code).unwrap_or_else(|| { - error_!("No catcher found for {}. Using 500 catcher.", status); - self.catchers.get(&500).expect("500 catcher.") - }); - - // Dispatch to the user's catcher. If it fails, use the default 500. - catcher.handle(req).unwrap_or_else(|err_status| { - error_!("Catcher failed with status: {}!", err_status); - warn_!("Using default 500 error catcher."); - let default = self.default_catchers.get(&500).expect("Default 500"); - default.handle(req).expect("Default 500 response.") - }) + req: &'r Request<'s> + ) -> impl Future> + 's { + async move { + warn_!("Responding with {} catcher.", Paint::red(&status)); + + // Try to get the active catcher but fallback to user's 500 catcher. + let catcher = self.catchers.get(&status.code).unwrap_or_else(|| { + error_!("No catcher found for {}. Using 500 catcher.", status); + self.catchers.get(&500).expect("500 catcher.") + }); + + // Dispatch to the user's catcher. If it fails, use the default 500. + match catcher.handle(req).await { + Ok(r) => return r, + Err(err_status) => { + error_!("Catcher failed with status: {}!", err_status); + warn_!("Using default 500 error catcher."); + let default = self.default_catchers.get(&500).expect("Default 500"); + default.handle(req).await.expect("Default 500 response.") + } + } + } } } impl Rocket { - - #[inline] - crate fn dispatch<'s, 'r>( - &'s self, - request: &'r mut Request<'s>, - data: Data - ) -> Response<'r> { - unimplemented!("TODO") - } - - crate fn handle_error<'r>( - &self, - status: Status, - req: &'r Request<'_> - ) -> Response<'r> { - unimplemented!("TODO") - } - /// Create a new `Rocket` application using the configuration information in /// `Rocket.toml`. If the file does not exist or if there is an I/O error /// reading the file, the defaults are used. See the [`config`] @@ -482,7 +554,6 @@ impl Rocket { panic!("Invalid mount point."); } - let mut router = self.router.clone(); for mut route in routes.into() { let path = route.uri.clone(); if let Err(e) = route.set_uri(base_uri.clone(), path) { @@ -491,11 +562,9 @@ impl Rocket { } info_!("{}", route); - router.add(route); + self.router.add(route); } - self.router = router; - self } @@ -656,6 +725,8 @@ impl Rocket { /// # } /// ``` pub fn launch(mut self) -> LaunchError { + #[cfg(feature = "tls")] use crate::http::tls; + self = match self.prelaunch_check() { Ok(rocket) => rocket, Err(launch_error) => return launch_error @@ -670,30 +741,36 @@ impl Rocket { .build() .expect("Cannot build runtime!"); - let threads = self.config.workers as usize; - - let full_addr = format!("{}:{}", self.config.address, self.config.port) - .to_socket_addrs() - .expect("A valid socket address") - .next() - .unwrap(); + let full_addr = format!("{}:{}", self.config.address, self.config.port); + let addrs = match full_addr.to_socket_addrs() { + Ok(a) => a.collect::>(), + // TODO.async: Reconsider this error type + Err(e) => return From::from(io::Error::new(io::ErrorKind::Other, e)), + }; - let listener = match TcpListener::bind(&full_addr) { + let listener = match TcpListener::bind(&addrs[0]) { Ok(listener) => listener, Err(e) => return LaunchError::new(LaunchErrorKind::Bind(e)), }; // Determine the address and port we actually binded to. match listener.local_addr() { - Ok(server_addr) => /* TODO self.config.port = */ server_addr.port(), + Ok(server_addr) => self.config.port = server_addr.port(), Err(e) => return LaunchError::from(e), - }; + } + + // TODO.async Move all of this to http crate somewhere + // TODO.async Is boxing everything really the best we can do here? + trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send { } + impl AsyncReadWrite for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send { } let proto; - let incoming; + let incoming: Box, Error=std::io::Error> + Send>; #[cfg(feature = "tls")] { + use tokio::prelude::Future; + // TODO.async: Can/should we make the clone unnecessary (by reference, or by moving out?) if let Some(tls) = self.config.tls.clone() { proto = "https://"; @@ -706,21 +783,26 @@ impl Rocket { incoming = Box::new(listener.incoming().and_then(move |stream| { config.accept(stream) - .map(|stream| Box::new(stream)) + .map(|stream| Box::new(stream) as Box) })); - } - else { + } else { proto = "http://"; - incoming = Box::new(listener.incoming().map(|stream| Box::new(stream))); + incoming = Box::new(listener.incoming().map(|stream| Box::new(stream) as Box)); } } + // TODO.async: Duplicated code #[cfg(not(feature = "tls"))] { proto = "http://"; - incoming = Box::new(listener.incoming().map(|stream| Box::new(stream))); + incoming = Box::new(listener.incoming().map(|stream| Box::new(stream) as Box)); } + // TODO.async: Set the keep-alive. +// // Set the keep-alive. +// let timeout = self.config.keep_alive.map(|s| Duration::from_secs(s as u64)); +// server.keep_alive(timeout); + // Freeze managed state for synchronization-free accesses later. self.state.freeze(); @@ -746,7 +828,7 @@ impl Rocket { // TODO.async: Use with_graceful_shutdown, and let launch() return a Result<(), Error> runtime.block_on(server).expect("TODO.async handle error"); - unreachable!("the call to `handle_threads` should block on success") + unreachable!("the call to `block_on` should block on success") } /// Returns an iterator over all of the routes mounted on this instance of @@ -831,34 +913,3 @@ impl Rocket { &self.config } } - -// TODO: consider try_from here? -impl<'a> From> for hyper::Response { - fn from(mut response: Response<'_>) -> Self { - - let mut builder = hyper::Response::builder(); - builder.status(hyper::StatusCode::from_u16(response.status().code).expect("")); - - for header in response.headers().iter() { - // FIXME: Using hyper here requires two allocations. - let name = hyper::HeaderName::from_str(&header.name.into_string()).unwrap(); - let value = hyper::HeaderValue::from_bytes(header.value.as_bytes()).unwrap(); - builder.header(name, value); - } - - match response.body() { - None => { - builder.body(hyper::Body::empty()) - }, - Some(Body::Sized(body, size)) => { - let mut buffer = Vec::with_capacity(size as usize); - body.read_to_end(&mut buffer); - builder.header(header::CONTENT_LENGTH, hyper::HeaderValue::from(size)); - builder.body(hyper::Body::from(buffer)) - }, - Some(Body::Chunked(mut body, chunk_size)) => { - unimplemented!() - } - }.unwrap() - } -} diff --git a/core/lib/src/router/mod.rs b/core/lib/src/router/mod.rs index 771f2399d0..d2e8c3b94a 100644 --- a/core/lib/src/router/mod.rs +++ b/core/lib/src/router/mod.rs @@ -3,6 +3,8 @@ mod route; use std::collections::hash_map::HashMap; +use futures::future::{Future, FutureExt}; + pub use self::route::Route; use crate::request::Request; @@ -12,11 +14,11 @@ use crate::http::Method; type Selector = Method; // A handler to use when one is needed temporarily. -crate fn dummy_handler<'r>(r: &'r crate::Request<'_>, _: crate::Data) -> crate::handler::Outcome<'r> { - crate::Outcome::from(r, ()) +crate fn dummy_handler<'r>(r: &'r Request<'_>, _: crate::Data) -> std::pin::Pin> + Send + 'r>> { + futures::future::ready(crate::Outcome::from(r, ())).boxed() } -#[derive(Default, Clone)] +#[derive(Default)] pub struct Router { routes: HashMap>, } diff --git a/examples/cookies/src/main.rs b/examples/cookies/src/main.rs index 3c4871f7fb..c7aca65784 100644 --- a/examples/cookies/src/main.rs +++ b/examples/cookies/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene, decl_macro)] +#![feature(proc_macro_hygiene, decl_macro, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/errors/src/main.rs b/examples/errors/src/main.rs index 1004d39712..016d0bae48 100644 --- a/examples/errors/src/main.rs +++ b/examples/errors/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene, decl_macro)] +#![feature(proc_macro_hygiene, decl_macro, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/form_kitchen_sink/src/main.rs b/examples/form_kitchen_sink/src/main.rs index f3330af728..1780e6330f 100644 --- a/examples/form_kitchen_sink/src/main.rs +++ b/examples/form_kitchen_sink/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene, decl_macro)] +#![feature(proc_macro_hygiene, decl_macro, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/hello_world/src/main.rs b/examples/hello_world/src/main.rs index fb370fe29c..1279aaf06a 100644 --- a/examples/hello_world/src/main.rs +++ b/examples/hello_world/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene, decl_macro)] +#![feature(proc_macro_hygiene, decl_macro, async_await)] #[macro_use] extern crate rocket;