diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 075f98d6e0..581d34b731 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -30,7 +30,7 @@ fn get_one_at_a_time(b: &mut test::Bencher) { b.iter(move || { client.get(url.clone()) .and_then(|res| { - res.into_body().into_stream().for_each(|_chunk| { + res.into_body().for_each(|_chunk| { Ok(()) }) }) @@ -55,7 +55,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) { *req.method_mut() = Method::POST; *req.uri_mut() = url.clone(); client.request(req).and_then(|res| { - res.into_body().into_stream().for_each(|_chunk| { + res.into_body().for_each(|_chunk| { Ok(()) }) }).wait().expect("client wait"); @@ -75,7 +75,6 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr { let service = const_service(service_fn(|req: Request| { req.into_body() - .into_stream() .concat2() .map(|_| { Response::new(Body::from(PHRASE)) diff --git a/examples/client.rs b/examples/client.rs index 1032167825..be943676bd 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -40,7 +40,7 @@ fn main() { println!("Response: {}", res.status()); println!("Headers: {:#?}", res.headers()); - res.into_body().into_stream().for_each(|chunk| { + res.into_body().for_each(|chunk| { io::stdout().write_all(&chunk) .map_err(|e| panic!("example expects stdout is open, error={}", e)) }) diff --git a/examples/params.rs b/examples/params.rs index d362840f52..0ba6f43805 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -32,7 +32,7 @@ impl Service for ParamExample { Box::new(futures::future::ok(Response::new(INDEX.into()))) }, (&Method::POST, "/post") => { - Box::new(req.into_parts().1.into_stream().concat2().map(|b| { + Box::new(req.into_body().concat2().map(|b| { // Parse the request body. form_urlencoded::parse // always succeeds, but in general parsing may // fail (for example, an invalid post of json), so diff --git a/examples/web_api.rs b/examples/web_api.rs index 7923d4afd9..16aedccc50 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -44,7 +44,7 @@ impl Service for ResponseExamples { let web_res_future = client.request(req); Box::new(web_res_future.map(|web_res| { - let body = Body::wrap_stream(web_res.into_body().into_stream().map(|b| { + let body = Body::wrap_stream(web_res.into_body().map(|b| { Chunk::from(format!("before: '{:?}'
after: '{:?}'", std::str::from_utf8(LOWERCASE).unwrap(), std::str::from_utf8(&b).unwrap())) @@ -54,7 +54,7 @@ impl Service for ResponseExamples { }, (&Method::POST, "/web_api") => { // A web api to run against. Simple upcasing of the body. - let body = Body::wrap_stream(req.into_body().into_stream().map(|chunk| { + let body = Body::wrap_stream(req.into_body().map(|chunk| { let upper = chunk.iter().map(|byte| byte.to_ascii_uppercase()) .collect::>(); Chunk::from(upper) diff --git a/src/proto/body.rs b/src/body.rs similarity index 86% rename from src/proto/body.rs rename to src/body.rs index 48f988e499..6e93b32fc6 100644 --- a/src/proto/body.rs +++ b/src/body.rs @@ -13,7 +13,7 @@ use super::Chunk; type BodySender = mpsc::Sender>; /// This trait represents a streaming body of a `Request` or `Response`. -pub trait Entity { +pub trait Payload { /// A buffer of bytes representing a single chunk of a body. type Data: AsRef<[u8]>; @@ -63,7 +63,7 @@ pub trait Entity { } } -impl Entity for Box { +impl Payload for Box { type Data = E::Data; type Error = E::Error; @@ -84,43 +84,10 @@ impl Entity for Box { } } -/// A wrapper to consume an `Entity` as a futures `Stream`. -#[must_use = "streams do nothing unless polled"] -#[derive(Debug)] -pub struct EntityStream { - is_data_eof: bool, - entity: E, -} - -impl Stream for EntityStream { - type Item = E::Data; - type Error = E::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - loop { - if self.is_data_eof { - return self.entity.poll_trailers() - .map(|async| { - async.map(|_opt| { - // drop the trailers and return that Stream is done - None - }) - }); - } - - let opt = try_ready!(self.entity.poll_data()); - if let Some(data) = opt { - return Ok(Async::Ready(Some(data))); - } else { - self.is_data_eof = true; - } - } - } -} -/// An `Entity` of `Chunk`s, used when receiving bodies. +/// A `Payload` of `Chunk`s, used when receiving bodies. /// -/// Also a good default `Entity` to use in many applications. +/// Also a good default `Payload` to use in many applications. #[must_use = "streams do nothing unless polled"] pub struct Body { kind: Kind, @@ -229,35 +196,6 @@ impl Body { Body::new(Kind::Wrapped(Box::new(mapped))) } - /// Convert this `Body` into a `Stream`. - /// - /// # Example - /// - /// ``` - /// # extern crate futures; - /// # extern crate hyper; - /// # use futures::{Future, Stream}; - /// # use hyper::{Body, Request}; - /// # fn request_concat(some_req: Request) { - /// let req: Request = some_req; - /// let body = req.into_body(); - /// - /// let stream = body.into_stream(); - /// stream.concat2() - /// .map(|buf| { - /// println!("body length: {}", buf.len()); - /// }); - /// # } - /// # fn main() {} - /// ``` - #[inline] - pub fn into_stream(self) -> EntityStream { - EntityStream { - is_data_eof: false, - entity: self, - } - } - /// Returns if this body was constructed via `Body::empty()`. /// /// # Note @@ -345,7 +283,7 @@ impl Default for Body { } } -impl Entity for Body { +impl Payload for Body { type Data = Chunk; type Error = ::Error; @@ -373,6 +311,15 @@ impl Entity for Body { } } +impl Stream for Body { + type Item = Chunk; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + self.poll_data() + } +} + impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Body") @@ -489,10 +436,10 @@ fn test_body_stream_concat() { let body = Body::from("hello world"); - let total = body.into_stream() + let total = body .concat2() .wait() .unwrap(); assert_eq!(total.as_ref(), b"hello world"); - } + diff --git a/src/proto/chunk.rs b/src/chunk.rs similarity index 100% rename from src/proto/chunk.rs rename to src/chunk.rs diff --git a/src/client/conn.rs b/src/client/conn.rs index f2862298b1..feea7bf785 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -15,8 +15,8 @@ use futures::{Async, Future, Poll}; use futures::future::{self, Either}; use tokio_io::{AsyncRead, AsyncWrite}; +use body::Payload; use proto; -use proto::body::Entity; use super::dispatch; use {Body, Request, Response, StatusCode}; @@ -45,7 +45,7 @@ pub struct SendRequest { pub struct Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, { inner: proto::dispatch::Dispatcher< proto::dispatch::Client, @@ -138,7 +138,7 @@ impl SendRequest impl SendRequest where - B: Entity + 'static, + B: Payload + 'static, { /// Sends a `Request` on the associated connection. /// @@ -262,7 +262,7 @@ impl fmt::Debug for SendRequest { impl Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, { /// Return the inner IO object, and additional information. pub fn into_parts(self) -> Parts { @@ -289,7 +289,7 @@ where impl Future for Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, { type Item = (); type Error = ::Error; @@ -302,7 +302,7 @@ where impl fmt::Debug for Connection where T: AsyncRead + AsyncWrite + fmt::Debug, - B: Entity + 'static, + B: Payload + 'static, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") @@ -331,7 +331,7 @@ impl Builder { pub fn handshake(&self, io: T) -> Handshake where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, { Handshake { inner: HandshakeInner { @@ -345,7 +345,7 @@ impl Builder { pub(super) fn handshake_no_upgrades(&self, io: T) -> HandshakeNoUpgrades where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, { HandshakeNoUpgrades { inner: HandshakeInner { @@ -362,7 +362,7 @@ impl Builder { impl Future for Handshake where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, { type Item = (SendRequest, Connection); type Error = ::Error; @@ -387,7 +387,7 @@ impl fmt::Debug for Handshake { impl Future for HandshakeNoUpgrades where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, { type Item = (SendRequest, proto::dispatch::Dispatcher< proto::dispatch::Client, @@ -406,7 +406,7 @@ where impl Future for HandshakeInner where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, R: proto::Http1Transaction< Incoming=StatusCode, Outgoing=proto::RequestLine, @@ -470,7 +470,7 @@ impl AssertSendSync for SendRequest {} impl AssertSend for Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, B::Data: Send + 'static, {} @@ -478,7 +478,7 @@ where impl AssertSendSync for Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Payload + 'static, B::Data: Send + Sync + 'static, {} diff --git a/src/client/mod.rs b/src/client/mod.rs index 909445eae5..5f8ef96c1a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -16,8 +16,7 @@ use tokio::reactor::Handle; use tokio_executor::spawn; pub use tokio_service::Service; -use proto::body::{Body, Entity}; -use proto; +use body::{Body, Payload}; use self::pool::Pool; pub use self::connect::{Connect, HttpConnector}; @@ -34,7 +33,7 @@ mod pool; mod tests; /// A Client to make outgoing HTTP requests. -pub struct Client { +pub struct Client { connector: Arc, executor: Exec, h1_writev: bool, @@ -43,21 +42,21 @@ pub struct Client { set_host: bool, } -impl Client { +impl Client { /// Create a new Client with the default config. #[inline] - pub fn new(handle: &Handle) -> Client { + pub fn new(handle: &Handle) -> Client { Config::default().build(handle) } } -impl Default for Client { - fn default() -> Client { +impl Default for Client { + fn default() -> Client { Client::new(&Handle::current()) } } -impl Client { +impl Client { /// Configure a Client. /// /// # Example @@ -76,7 +75,7 @@ impl Client { /// # } /// ``` #[inline] - pub fn configure() -> Config { + pub fn configure() -> Config { Config::default() } } @@ -99,7 +98,7 @@ impl Client where C: Connect + Sync + 'static, C::Transport: 'static, C::Future: 'static, - B: Entity + Send + 'static, + B: Payload + Send + 'static, B::Data: Send, { @@ -107,16 +106,16 @@ where C: Connect + Sync + 'static, /// /// # Note /// - /// This requires that the `Entity` type have a `Default` implementation. + /// This requires that the `Payload` type have a `Default` implementation. /// It *should* return an "empty" version of itself, such that - /// `Entity::is_end_stream` is `true`. + /// `Payload::is_end_stream` is `true`. pub fn get(&self, uri: Uri) -> FutureResponse where B: Default, { let body = B::default(); if !body.is_end_stream() { - warn!("default Entity used for get() does not return true for is_end_stream"); + warn!("default Payload used for get() does not return true for is_end_stream"); } let mut req = Request::new(body); @@ -291,7 +290,7 @@ where C: Connect + Sync + 'static, impl Service for Client where C: Connect + 'static, C::Future: 'static, - B: Entity + Send + 'static, + B: Payload + Send + 'static, B::Data: Send, { type Request = Request; @@ -354,7 +353,7 @@ impl Future for RetryableSendRequest where C: Connect + 'static, C::Future: 'static, - B: Entity + Send + 'static, + B: Payload + Send + 'static, B::Data: Send, { type Item = Response; @@ -444,10 +443,10 @@ pub struct Config { #[derive(Debug, Clone, Copy)] pub struct UseDefaultConnector(()); -impl Default for Config { - fn default() -> Config { +impl Default for Config { + fn default() -> Config { Config { - _body_type: PhantomData::, + _body_type: PhantomData::, connector: UseDefaultConnector(()), keep_alive: true, keep_alive_timeout: Some(Duration::from_secs(90)), @@ -566,7 +565,7 @@ impl Config where C: Connect, C::Transport: 'static, C::Future: 'static, - B: Entity + Send, + B: Payload + Send, B::Data: Send, { /// Construct the Client with this configuration. @@ -589,7 +588,7 @@ where C: Connect, impl Config where - B: Entity + Send, + B: Payload + Send, B::Data: Send, { /// Construct the Client with this configuration. diff --git a/src/error.rs b/src/error.rs index 8ef77fd52b..4d2f41a7b7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -50,7 +50,7 @@ pub(crate) enum Kind { Body, /// Error while writing a body to connection. BodyWrite, - /// Error calling user's Entity::poll_data(). + /// Error calling user's Payload::poll_data(). BodyUser, /// Error calling AsyncWrite::shutdown() Shutdown, @@ -257,7 +257,7 @@ impl StdError for Error { Kind::Service => "error from user's server service", Kind::Body => "error reading a body from connection", Kind::BodyWrite => "error write a body to connection", - Kind::BodyUser => "error from user's Entity stream", + Kind::BodyUser => "error from user's Payload stream", Kind::Shutdown => "error shutting down connection", Kind::UnsupportedVersion => "request has unsupported HTTP version", Kind::UnsupportedRequestMethod => "request has unsupported HTTP method", diff --git a/src/lib.rs b/src/lib.rs index 9ef8176ee0..860a528a12 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,12 +46,15 @@ pub use http::{ pub use client::Client; pub use error::{Result, Error}; -pub use proto::{body, Body, Chunk}; +pub use body::{Body}; +pub use chunk::Chunk; pub use server::Server; mod common; #[cfg(test)] mod mock; +pub mod body; +mod chunk; pub mod client; pub mod error; mod headers; diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index d2bafec8e2..3ec87968e1 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -8,7 +8,8 @@ use futures::task::Task; use http::{Method, Version}; use tokio_io::{AsyncRead, AsyncWrite}; -use proto::{BodyLength, Chunk, Decode, Http1Transaction, MessageHead}; +use ::Chunk; +use proto::{BodyLength, Decode, Http1Transaction, MessageHead}; use super::io::{Cursor, Buffered}; use super::{EncodedBuf, Encoder, Decoder}; diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 40bece5be7..01926742ea 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -4,13 +4,13 @@ use http::{Request, Response, StatusCode}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_service::Service; -use proto::body::Entity; -use proto::{Body, BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; +use body::{Body, Payload}; +use proto::{BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; pub(crate) struct Dispatcher { conn: Conn, dispatch: D, - body_tx: Option<::proto::body::Sender>, + body_tx: Option<::body::Sender>, body_rx: Option, is_closing: bool, } @@ -45,7 +45,7 @@ where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, - Bs: Entity, + Bs: Payload, { pub fn new(dispatch: D, conn: Conn) -> Self { Dispatcher { @@ -292,7 +292,7 @@ where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, - Bs: Entity, + Bs: Payload, { type Item = (); type Error = ::Error; @@ -318,7 +318,7 @@ impl Dispatch for Server where S: Service, Response=Response>, S::Error: Into>, - Bs: Entity, + Bs: Payload, { type PollItem = MessageHead; type PollBody = Bs; @@ -388,7 +388,7 @@ impl Client { impl Dispatch for Client where - B: Entity, + B: Payload, { type PollItem = RequestHead; type PollBody = B; diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 035e6d90a6..ced7613ec6 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -428,7 +428,7 @@ impl Client<()> { fn set_length(headers: &mut HeaderMap, body: BodyLength, can_chunked: bool) -> Encoder { // If the user already set specific headers, we should respect them, regardless - // of what the Entity knows about itself. They set them for a reason. + // of what the Payload knows about itself. They set them for a reason. // Because of the borrow checker, we can't check the for an existing // Content-Length header while holding an `Entry` for the Transfer-Encoding diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 71356ef62b..e03ccef14e 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -4,12 +4,8 @@ use http::{HeaderMap, Method, StatusCode, Uri, Version}; use headers; -pub use self::body::Body; -pub use self::chunk::Chunk; pub(crate) use self::h1::{dispatch, Conn}; -pub mod body; -mod chunk; mod h1; //mod h2; diff --git a/src/server/conn.rs b/src/server/conn.rs index 5b8e780992..0a987e757c 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -15,7 +15,7 @@ use futures::{Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use proto; -use proto::body::{Body, Entity}; +use body::{Body, Payload}; use super::{HyperService, Request, Response, Service}; /// A future binding a connection with a Service. @@ -25,13 +25,13 @@ use super::{HyperService, Request, Response, Service}; pub struct Connection where S: HyperService, - S::ResponseBody: Entity, + S::ResponseBody: Payload, { pub(super) conn: proto::dispatch::Dispatcher< proto::dispatch::Server, S::ResponseBody, I, - ::Data, + ::Data, proto::ServerTransaction, >, } @@ -63,7 +63,7 @@ where S: Service, Response=Response> + 'static, S::Error: Into>, I: AsyncRead + AsyncWrite + 'static, - B: Entity + 'static, + B: Payload + 'static, { /// Disables keep-alive for this connection. pub fn disable_keep_alive(&mut self) { @@ -102,7 +102,7 @@ where S: Service, Response=Response> + 'static, S::Error: Into>, I: AsyncRead + AsyncWrite + 'static, - B: Entity + 'static, + B: Payload + 'static, { type Item = (); type Error = ::Error; @@ -115,7 +115,7 @@ where impl fmt::Debug for Connection where S: HyperService, - S::ResponseBody: Entity, + S::ResponseBody: Payload, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") diff --git a/src/server/mod.rs b/src/server/mod.rs index 7088bc428b..9070ea9d87 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -24,7 +24,7 @@ use tokio::reactor::Handle; use tokio::net::TcpListener; pub use tokio_service::{NewService, Service}; -use proto::body::{Body, Entity}; +use body::{Body, Payload}; use proto; use self::addr_stream::AddrStream; use self::hyper_service::HyperService; @@ -51,7 +51,7 @@ pub struct Http { /// address and then serving TCP connections accepted with the service provided. pub struct Server where - B: Entity, + B: Payload, { protocol: Http, new_service: S, @@ -168,7 +168,7 @@ impl + 'static> Http { where S: NewService, Response=Response> + 'static, S::Error: Into>, - Bd: Entity, + Bd: Payload, { let handle = Handle::current(); let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; @@ -193,7 +193,7 @@ impl + 'static> Http { where S: NewService, Response=Response>, S::Error: Into>, - Bd: Entity, + Bd: Payload, { let handle = Handle::current(); let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; @@ -217,7 +217,7 @@ impl + 'static> Http { where S: NewService, Response = Response>, S::Error: Into>, - Bd: Entity, + Bd: Payload, { let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?; @@ -238,7 +238,7 @@ impl + 'static> Http { I::Item: AsyncRead + AsyncWrite, S: NewService, Response = Response>, S::Error: Into>, - Bd: Entity, + Bd: Payload, { Serve { incoming: incoming, @@ -291,7 +291,7 @@ impl + 'static> Http { where S: Service, Response = Response>, S::Error: Into>, - Bd: Entity, + Bd: Payload, I: AsyncRead + AsyncWrite, { let mut conn = proto::Conn::new(io); @@ -357,7 +357,7 @@ where S::Error: Into>, ::Instance: Send, <::Instance as Service>::Future: Send, - B: Entity + Send + 'static, + B: Payload + Send + 'static, B::Data: Send, { /// Returns the local address that this server is bound to. @@ -479,7 +479,7 @@ where } } -impl fmt::Debug for Server +impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Server") @@ -516,7 +516,7 @@ where I::Item: AsyncRead + AsyncWrite, S: NewService, Response=Response>, S::Error: Into>, - B: Entity, + B: Payload, { type Item = Connection; type Error = ::Error; @@ -795,7 +795,7 @@ impl Future for WaitUntilZero { } mod hyper_service { - use super::{Body, Entity, Request, Response, Service}; + use super::{Body, Payload, Request, Response, Service}; /// A "trait alias" for any type that implements `Service` with hyper's /// Request, Response, and Error types, and a streaming body. /// @@ -826,7 +826,7 @@ mod hyper_service { Response=Response, >, S::Error: Into>, - B: Entity, + B: Payload, {} impl HyperService for S @@ -837,7 +837,7 @@ mod hyper_service { >, S::Error: Into>, S: Sealed, - B: Entity, + B: Payload, { type ResponseBody = B; type Sealed = Opaque; diff --git a/tests/client.rs b/tests/client.rs index 8c20302b27..c8de6dcf4c 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -113,7 +113,7 @@ macro_rules! test { let body = res .into_body() - .into_stream() + .concat2() .wait() .expect("body concat wait"); @@ -726,7 +726,7 @@ mod dispatch_impl { .unwrap(); client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }).and_then(|_| { Delay::new(Duration::from_secs(1)) .expect("timeout") @@ -779,7 +779,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); res.join(rx).map(|r| r.0).wait().unwrap(); @@ -946,7 +946,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); res.join(rx).map(|r| r.0).wait().unwrap(); @@ -994,7 +994,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); res.join(rx).map(|r| r.0).wait().unwrap(); @@ -1039,7 +1039,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); @@ -1384,7 +1384,7 @@ mod conn { .unwrap(); let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); @@ -1430,7 +1430,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); @@ -1470,7 +1470,7 @@ mod conn { .unwrap(); let res1 = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }); // pipelined request will hit NotReady, and thus should return an Error::Cancel @@ -1543,7 +1543,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::SWITCHING_PROTOCOLS); assert_eq!(res.headers()["Upgrade"], "foobar"); - res.into_body().into_stream().concat2() + res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); @@ -1623,7 +1623,7 @@ mod conn { let res = client.send_request(req) .and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().into_stream().concat2() + res.into_body().concat2() }) .map(|body| { assert_eq!(body.as_ref(), b""); diff --git a/tests/server.rs b/tests/server.rs index 8623ebe0c0..d5209d5a59 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -110,7 +110,7 @@ fn get_implicitly_empty() { fn call(&self, req: Request) -> Self::Future { Box::new(req.into_body() - .into_stream() + .concat2() .map(|buf| { assert!(buf.is_empty()); @@ -256,7 +256,7 @@ mod response_body_lengths { fn get_auto_response_with_entity_unknown_length() { run_test(TestCase { version: 1, - // no headers means trying to guess from Entity + // no headers means trying to guess from Payload headers: &[], body: Bd::Unknown("foo bar baz"), expects_chunked: true, @@ -268,7 +268,7 @@ mod response_body_lengths { fn get_auto_response_with_entity_known_length() { run_test(TestCase { version: 1, - // no headers means trying to guess from Entity + // no headers means trying to guess from Payload headers: &[], body: Bd::Known("foo bar baz"), expects_chunked: false, @@ -281,7 +281,7 @@ mod response_body_lengths { fn http_10_get_auto_response_with_entity_unknown_length() { run_test(TestCase { version: 0, - // no headers means trying to guess from Entity + // no headers means trying to guess from Payload headers: &[], body: Bd::Unknown("foo bar baz"), expects_chunked: false, @@ -1287,7 +1287,7 @@ impl Service for TestService { let tx2 = self.tx.clone(); let replies = self.reply.clone(); - Box::new(req.into_body().into_stream().for_each(move |chunk| { + Box::new(req.into_body().for_each(move |chunk| { tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); Ok(()) }).then(move |result| {