Skip to content

Commit

Permalink
feat(client): change GaiResolver to use a global blocking threadpool
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Calls to `GaiResolver::new` and `HttpConnector::new` no
  longer should pass an integer argument for the number of threads.
  • Loading branch information
seanmonstar committed Aug 29, 2019
1 parent 2664cf5 commit 049b513
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pin-utils = "=0.1.0-alpha.4"
time = "0.1"
tokio = { version = "=0.2.0-alpha.4", optional = true, default-features = false, features = ["rt-full"] }
tower-service = "=0.3.0-alpha.1"
tokio-executor = "=0.2.0-alpha.4"
tokio-executor = { version = "=0.2.0-alpha.4", features = ["blocking"] }

This comment has been minimized.

Copy link
@DoumanAsh

DoumanAsh Aug 30, 2019

Contributor

this feature probably should be enabled with runtime, because resolver requires runtime?

tokio-io = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4"
tokio-net = { version = "=0.2.0-alpha.4", optional = true, features = ["tcp"] }
Expand Down
2 changes: 1 addition & 1 deletion benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Opts {

let addr = spawn_server(&mut rt, &self);

let connector = HttpConnector::new(1);
let connector = HttpConnector::new();
let client = hyper::Client::builder()
.http2_only(self.http2)
.http2_initial_stream_window_size(self.http2_stream_window)
Expand Down
65 changes: 14 additions & 51 deletions src/client/connect/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::net::{
};
use std::str::FromStr;

use futures_util::{FutureExt, StreamExt};
use tokio_sync::{mpsc, oneshot};

use crate::common::{Future, Never, Pin, Poll, Unpin, task};
Expand All @@ -39,10 +38,7 @@ pub struct Name {
/// A resolver using blocking `getaddrinfo` calls in a threadpool.
#[derive(Clone)]
pub struct GaiResolver {
tx: tokio_executor::threadpool::Sender,
/// A handle to keep the threadpool alive until all `GaiResolver` clones
/// have been dropped.
_threadpool_keep_alive: ThreadPoolKeepAlive,
_priv: (),
}

#[derive(Clone)]
Expand All @@ -55,8 +51,7 @@ pub struct GaiAddrs {

/// A future to resole a name returned by `GaiResolver`.
pub struct GaiFuture {
rx: oneshot::Receiver<Result<IpAddrs, io::Error>>,
_threadpool_keep_alive: ThreadPoolKeepAlive,
inner: tokio_executor::blocking::Blocking<Result<IpAddrs, io::Error>>,
}

impl Name {
Expand Down Expand Up @@ -108,40 +103,9 @@ impl Error for InvalidNameError {}

impl GaiResolver {
/// Construct a new `GaiResolver`.
///
/// Takes number of DNS worker threads.
pub fn new(threads: usize) -> Self {
let pool = tokio_executor::threadpool::Builder::new()
.name_prefix("hyper-dns-gai-resolver")
// not for CPU tasks, so only spawn workers
// in blocking mode
.pool_size(1)
.max_blocking(threads)
.build();

let tx = pool.sender().clone();

// The pool will start to shutdown once `pool` is dropped,
// so to keep it alive, we spawn a future onto the pool itself
// that will only resolve once all `GaiResolver` requests
// are finished.
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);

let on_shutdown = shutdown_rx
.into_future()
.map(move |(next, _rx)| {
match next {
Some(never) => match never {},
None => (),
}

drop(pool)
});
tx.spawn(on_shutdown).expect("can spawn on self");

pub fn new() -> Self {
GaiResolver {
tx,
_threadpool_keep_alive: ThreadPoolKeepAlive(shutdown_tx),
_priv: (),
}
}
}
Expand All @@ -151,14 +115,14 @@ impl Resolve for GaiResolver {
type Future = GaiFuture;

fn resolve(&self, name: Name) -> Self::Future {
let (tx, rx) = oneshot::channel();
self.tx.spawn(GaiBlocking {
host: name.host,
tx: Some(tx),
}).expect("spawn GaiBlocking");
let blocking = tokio_executor::blocking::run(move || {
debug!("resolving host={:?}", name.host);
(&*name.host, 0).to_socket_addrs()
.map(|i| IpAddrs { iter: i })
});

GaiFuture {
rx,
_threadpool_keep_alive: self._threadpool_keep_alive.clone(),
inner: blocking,
}
}
}
Expand All @@ -173,10 +137,9 @@ impl Future for GaiFuture {
type Output = Result<GaiAddrs, io::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.rx).poll(cx).map(|res| match res {
Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }),
Ok(Err(err)) => Err(err),
Err(_canceled) => unreachable!("GaiResolver threadpool shutdown"),
Pin::new(&mut self.inner).poll(cx).map(|res| match res {
Ok(addrs) => Ok(GaiAddrs { inner: addrs }),
Err(err) => Err(err),
})
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ impl HttpConnector {
/// Construct a new HttpConnector.
///
/// Takes number of DNS worker threads.
#[inline]
pub fn new(threads: usize) -> HttpConnector {
HttpConnector::new_with_resolver(GaiResolver::new(threads))
pub fn new() -> HttpConnector {
HttpConnector::new_with_resolver(GaiResolver::new())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ impl Builder {
B: Payload + Send,
B::Data: Send,
{
let mut connector = HttpConnector::new(4);
let mut connector = HttpConnector::new();
if self.pool_config.enabled {
connector.set_keepalive(self.pool_config.keep_alive_timeout);
}
Expand Down
18 changes: 9 additions & 9 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ macro_rules! test {
let addr = server.local_addr().expect("local_addr");
let rt = $runtime;

let connector = ::hyper::client::HttpConnector::new(1);
let connector = ::hyper::client::HttpConnector::new();
let client = Client::builder()
.set_host($set_host)
.http1_title_case_headers($title_case_headers)
Expand Down Expand Up @@ -781,7 +781,7 @@ mod dispatch_impl {
let mut rt = Runtime::new().unwrap();
let (closes_tx, closes) = mpsc::channel(10);
let client = Client::builder()
.build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx));

let (tx1, rx1) = oneshot::channel();

Expand Down Expand Up @@ -837,7 +837,7 @@ mod dispatch_impl {

let res = {
let client = Client::builder()
.build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -889,7 +889,7 @@ mod dispatch_impl {
});

let client = Client::builder()
.build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -948,7 +948,7 @@ mod dispatch_impl {

let res = {
let client = Client::builder()
.build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -996,7 +996,7 @@ mod dispatch_impl {

let res = {
let client = Client::builder()
.build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -1046,7 +1046,7 @@ mod dispatch_impl {

let client = Client::builder()
.keep_alive(false)
.build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -1090,7 +1090,7 @@ mod dispatch_impl {
});

let client = Client::builder()
.build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -1527,7 +1527,7 @@ mod dispatch_impl {

impl DebugConnector {
fn new() -> DebugConnector {
let http = HttpConnector::new(1);
let http = HttpConnector::new();
let (tx, _) = mpsc::channel(10);
DebugConnector::with_http_and_closes(http, tx)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ pub fn __run_test(cfg: __TestConfig) {
Version::HTTP_11
};

let connector = HttpConnector::new(1);
let connector = HttpConnector::new();
let client = Client::builder()
.keep_alive_timeout(Duration::from_secs(10))
.http2_only(cfg.client_version == 2)
Expand Down

0 comments on commit 049b513

Please sign in to comment.