From 0892cb27777858737449a012bc6ea08ee080e5b7 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 28 Dec 2017 17:18:42 -0800 Subject: [PATCH] feat(client): replace default dispatcher --- examples/client.rs | 4 +- src/client/mod.rs | 232 +++++++++++---------------------------------- tests/client.rs | 23 +---- 3 files changed, 58 insertions(+), 201 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index a5014e5a84..e17a86ea01 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -32,9 +32,7 @@ fn main() { let mut core = tokio_core::reactor::Core::new().unwrap(); let handle = core.handle(); - let client = Client::configure() - .no_proto() - .build(&handle); + let client = Client::new(&handle); let work = client.get(url).and_then(|res| { println!("Response: {}", res.status()); diff --git a/src/client/mod.rs b/src/client/mod.rs index 1dc3ddfea5..6b9c8bacb6 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -7,24 +7,17 @@ use std::marker::PhantomData; use std::rc::Rc; use std::time::Duration; -use futures::{future, Poll, Async, Future, Stream}; -use futures::unsync::oneshot; +use futures::{future, Poll, Future, Stream}; #[cfg(feature = "compat")] use http; -use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::Handle; -use tokio_proto::BindClient; -use tokio_proto::streaming::Message; -use tokio_proto::streaming::pipeline::ClientProto; -use tokio_proto::util::client_proxy::ClientProxy; pub use tokio_service::Service; use header::{Headers, Host}; -use proto::{self, RequestHead, TokioBody}; -use proto::response; +use proto; use proto::request; use method::Method; -use self::pool::{Pool, Pooled}; +use self::pool::Pool; use uri::{self, Uri}; use version::HttpVersion; @@ -45,7 +38,7 @@ pub mod compat; pub struct Client { connector: C, handle: Handle, - pool: Dispatch, + pool: Pool>, } impl Client { @@ -93,11 +86,7 @@ impl Client { Client { connector: config.connector, handle: handle.clone(), - pool: if config.no_proto { - Dispatch::Hyper(Pool::new(config.keep_alive, config.keep_alive_timeout)) - } else { - Dispatch::Proto(Pool::new(config.keep_alive, config.keep_alive_timeout)) - } + pool: Pool::new(config.keep_alive, config.keep_alive_timeout) } } } @@ -191,105 +180,54 @@ where C: Connect, headers.extend(head.headers.iter()); head.headers = headers; - match self.pool { - Dispatch::Proto(ref pool) => { - trace!("proto_dispatch"); - let checkout = pool.checkout(domain.as_ref()); - let connect = { - let handle = self.handle.clone(); - let pool = pool.clone(); - let pool_key = Rc::new(domain.to_string()); - self.connector.connect(url) - .map(move |io| { - let (tx, rx) = oneshot::channel(); - let client = HttpClient { - client_rx: RefCell::new(Some(rx)), - }.bind_client(&handle, io); - let pooled = pool.pooled(pool_key, client); - drop(tx.send(pooled.clone())); - pooled - }) - }; - - let race = checkout.select(connect) - .map(|(client, _work)| client) - .map_err(|(e, _work)| { - // the Pool Checkout cannot error, so the only error - // is from the Connector - // XXX: should wait on the Checkout? Problem is - // that if the connector is failing, it may be that we - // never had a pooled stream at all - e.into() - }); - let resp = race.and_then(move |client| { - let msg = match body { - Some(body) => { - Message::WithBody(head, body.into()) - }, - None => Message::WithoutBody(head), + use futures::Sink; + use futures::sync::{mpsc, oneshot}; + + let checkout = self.pool.checkout(domain.as_ref()); + let connect = { + let handle = self.handle.clone(); + let pool = self.pool.clone(); + let pool_key = Rc::new(domain.to_string()); + self.connector.connect(url) + .map(move |io| { + let (tx, rx) = mpsc::channel(0); + let tx = HyperClient { + tx: RefCell::new(tx), + should_close: true, }; - client.call(msg) - }); - FutureResponse(Box::new(resp.map(|msg| { - match msg { - Message::WithoutBody(head) => response::from_wire(head, None), - Message::WithBody(head, body) => response::from_wire(head, Some(body.into())), - } - }))) - }, - Dispatch::Hyper(ref pool) => { - trace!("no_proto dispatch"); - use futures::Sink; - use futures::sync::{mpsc, oneshot}; - - let checkout = pool.checkout(domain.as_ref()); - let connect = { - let handle = self.handle.clone(); - let pool = pool.clone(); - let pool_key = Rc::new(domain.to_string()); - self.connector.connect(url) - .map(move |io| { - let (tx, rx) = mpsc::channel(0); - let tx = HyperClient { - tx: RefCell::new(tx), - should_close: true, - }; - let pooled = pool.pooled(pool_key, tx); - let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone()); - let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); - handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err))); - pooled - }) - }; - - let race = checkout.select(connect) - .map(|(client, _work)| client) - .map_err(|(e, _work)| { - // the Pool Checkout cannot error, so the only error - // is from the Connector - // XXX: should wait on the Checkout? Problem is - // that if the connector is failing, it may be that we - // never had a pooled stream at all - e.into() - }); - - let resp = race.and_then(move |mut client| { - let (callback, rx) = oneshot::channel(); - client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap(); - client.should_close = false; - rx.then(|res| { - match res { - Ok(Ok(res)) => Ok(res), - Ok(Err(err)) => Err(err), - Err(_) => panic!("dispatch dropped without returning error"), - } - }) - }); - - FutureResponse(Box::new(resp)) + let pooled = pool.pooled(pool_key, tx); + let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone()); + let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); + handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err))); + pooled + }) + }; - } - } + let race = checkout.select(connect) + .map(|(client, _work)| client) + .map_err(|(e, _work)| { + // the Pool Checkout cannot error, so the only error + // is from the Connector + // XXX: should wait on the Checkout? Problem is + // that if the connector is failing, it may be that we + // never had a pooled stream at all + e.into() + }); + + let resp = race.and_then(move |mut client| { + let (callback, rx) = oneshot::channel(); + client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap(); + client.should_close = false; + rx.then(|res| { + match res { + Ok(Ok(res)) => Ok(res), + Ok(Err(err)) => Err(err), + Err(_) => panic!("dispatch dropped without returning error"), + } + }) + }); + + FutureResponse(Box::new(resp)) } } @@ -299,10 +237,7 @@ impl Clone for Client { Client { connector: self.connector.clone(), handle: self.handle.clone(), - pool: match self.pool { - Dispatch::Proto(ref pool) => Dispatch::Proto(pool.clone()), - Dispatch::Hyper(ref pool) => Dispatch::Hyper(pool.clone()), - } + pool: self.pool.clone(), } } } @@ -313,8 +248,6 @@ impl fmt::Debug for Client { } } -type ProtoClient = ClientProxy, Message, ::Error>; - struct HyperClient { tx: RefCell<::futures::sync::mpsc::Sender>>, should_close: bool, @@ -338,60 +271,6 @@ impl Drop for HyperClient { } } -enum Dispatch { - Proto(Pool>), - Hyper(Pool>), -} - -struct HttpClient { - client_rx: RefCell>>>>, -} - -impl ClientProto for HttpClient -where T: AsyncRead + AsyncWrite + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, -{ - type Request = proto::RequestHead; - type RequestBody = B::Item; - type Response = proto::ResponseHead; - type ResponseBody = proto::Chunk; - type Error = ::Error; - type Transport = proto::Conn>>; - type BindTransport = BindingClient; - - fn bind_transport(&self, io: T) -> Self::BindTransport { - BindingClient { - rx: self.client_rx.borrow_mut().take().expect("client_rx was lost"), - io: Some(io), - } - } -} - -struct BindingClient { - rx: oneshot::Receiver>>, - io: Option, -} - -impl Future for BindingClient -where T: AsyncRead + AsyncWrite + 'static, - B: Stream, - B::Item: AsRef<[u8]>, -{ - type Item = proto::Conn>>; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(client)) => Ok(Async::Ready( - proto::Conn::new(self.io.take().expect("binding client io lost"), client) - )), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_canceled) => unreachable!(), - } - } -} - /// Configuration for a Client pub struct Config { _body_type: PhantomData, @@ -490,10 +369,9 @@ impl Config { } */ - /// Disable tokio-proto internal usage. - #[inline] - pub fn no_proto(mut self) -> Config { - self.no_proto = true; + #[doc(hidden)] + #[deprecated(since="0.11.11", note="no_proto is always enabled")] + pub fn no_proto(self) -> Config { self } } diff --git a/tests/client.rs b/tests/client.rs index e600757903..57994946a4 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -19,24 +19,13 @@ use futures::sync::oneshot; use tokio_core::reactor::{Core, Handle}; fn client(handle: &Handle) -> Client { - let mut config = Client::configure(); - if env("HYPER_NO_PROTO", "1") { - config = config.no_proto(); - } - config.build(handle) + Client::new(handle) } fn s(buf: &[u8]) -> &str { ::std::str::from_utf8(buf).unwrap() } -fn env(name: &str, val: &str) -> bool { - match ::std::env::var(name) { - Ok(var) => var == val, - Err(_) => false, - } -} - macro_rules! test { ( name: $name:ident, @@ -463,8 +452,7 @@ test! { body: None, proxy: false, error: |err| match err { - &hyper::Error::Version if env("HYPER_NO_PROTO", "1") => true, - &hyper::Error::Io(_) if !env("HYPER_NO_PROTO", "1") => true, + &hyper::Error::Version => true, _ => false, }, @@ -606,7 +594,6 @@ mod dispatch_impl { let closes = Arc::new(AtomicUsize::new(0)); let client = Client::configure() .connector(DebugConnector(HttpConnector::new(1, &core.handle()), closes.clone())) - .no_proto() .build(&handle); let (tx1, rx1) = oneshot::channel(); @@ -666,7 +653,6 @@ mod dispatch_impl { let res = { let client = Client::configure() .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) - .no_proto() .build(&handle); client.get(uri).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::Ok); @@ -717,7 +703,6 @@ mod dispatch_impl { let client = Client::configure() .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) - .no_proto() .build(&handle); let res = client.get(uri).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::Ok); @@ -767,7 +752,6 @@ mod dispatch_impl { let res = { let client = Client::configure() .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) - .no_proto() .build(&handle); client.get(uri) }; @@ -812,7 +796,6 @@ mod dispatch_impl { let res = { let client = Client::configure() .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) - .no_proto() .build(&handle); // notably, havent read body yet client.get(uri) @@ -852,7 +835,6 @@ mod dispatch_impl { let client = Client::configure() .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) - .no_proto() .keep_alive(false) .build(&handle); let res = client.get(uri).and_then(move |res| { @@ -892,7 +874,6 @@ mod dispatch_impl { let client = Client::configure() .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) - .no_proto() .build(&handle); let res = client.get(uri).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::Ok);