diff --git a/Cargo.lock b/Cargo.lock index 95e56507ee..6b925bcb1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,7 +134,7 @@ dependencies = [ "h2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", - "hyper 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "inotify 0.5.2-dev (git+https://github.com/inotify-rs/inotify)", "ipnet 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -165,6 +165,7 @@ dependencies = [ "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-util 0.1.0 (git+https://github.com/tower-rs/tower)", "trust-dns-resolver 0.9.0 (git+https://github.com/bluejekyll/trust-dns)", + "try-lock 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "untrusted 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "webpki 0.18.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -416,7 +417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "hyper" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -435,7 +436,7 @@ dependencies = [ "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", - "want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "want 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1328,7 +1329,7 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.1.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1399,12 +1400,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "want" -version = "0.0.4" +version = "0.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "try-lock 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "try-lock 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1534,7 +1535,7 @@ dependencies = [ "checksum hostname 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "58fab6e177434b0bb4cd344a4dabaa5bd6d7a8d792b1885aebcae7af1091d1cb" "checksum http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "75df369fd52c60635208a4d3e694777c099569b3dcf4844df8f652dc004644ab" "checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37" -"checksum hyper 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6416251e6672bff06fe96a3337570772845a44500fba2d178e2e55e0fab58a86" +"checksum hyper 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ad39a4f15051ccd4ea6adf44df851e00fd9062c71734391d806246b94e69dc1f" "checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d" "checksum indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b9378f1f3923647a9aea6af4c6b5de68cc8a71415459ad25ef191191c48f5b7" "checksum inotify 0.5.2-dev (git+https://github.com/inotify-rs/inotify)" = "" @@ -1631,7 +1632,7 @@ dependencies = [ "checksum tower-util 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum trust-dns-proto 0.4.0 (git+https://github.com/bluejekyll/trust-dns)" = "" "checksum trust-dns-resolver 0.9.0 (git+https://github.com/bluejekyll/trust-dns)" = "" -"checksum try-lock 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee2aa4715743892880f70885373966c83d73ef1b0838a664ef0c76fffd35e7c2" +"checksum try-lock 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "119b532a17fbe772d360be65617310164549a07c25a1deab04c84168ce0d4545" "checksum ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fd2be2d6639d0f8fe6cdda291ad456e23629558d466e2789d2c3e9892bda285d" "checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" "checksum unicode-normalization 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "51ccda9ef9efa3f7ef5d91e8f9b83bbe6955f9bf86aec89d5cce2c874625920f" @@ -1643,7 +1644,7 @@ dependencies = [ "checksum url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f808aadd8cfec6ef90e4a14eb46f24511824d1ac596b9682703c87056c8678b7" "checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" -"checksum want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a05d9d966753fa4b5c8db73fcab5eed4549cfe0e1e4e66911e5564a0085c35d1" +"checksum want 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2fffe09593e18ed34950d66dbf44c27deb2e03f3905c493f0641f9f99a3f2349" "checksum webpki 0.18.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)" = "724897af4bb44f3e0142b9cca300eb15f61b9b34fa559440bed8c43f2ff7afc0" "checksum which 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49c4f580e93079b70ac522e7bdebbe1568c8afa7d8d05ee534ee737ca37d2f51" "checksum widestring 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7157704c2e12e3d2189c507b7482c52820a16dfa4465ba91add92f266667cadb" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index b7a2b1eca4..16138c270c 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -22,13 +22,14 @@ futures-watch = { git = "https://github.com/carllerche/better-future" } h2 = "0.1.10" http = "0.1" httparse = "1.2" -hyper = "0.12" +hyper = "0.12.2" ipnet = "1.0" log = "0.4.1" indexmap = "1.0.0" prost = "0.4.0" prost-types = "0.4.0" rand = "0.5.1" +try-lock = "0.2" # for config parsing regex = "1.0.0" diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index d5cbb9ac9c..348790252a 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -47,6 +47,7 @@ extern crate conduit_proxy_router; extern crate tower_util; extern crate tower_in_flight_limit; extern crate trust_dns_resolver; +extern crate try_lock; use futures::*; diff --git a/proxy/src/task.rs b/proxy/src/task.rs index 8f744675c1..9d43589e53 100644 --- a/proxy/src/task.rs +++ b/proxy/src/task.rs @@ -3,8 +3,9 @@ use futures::future::{ Future, ExecuteError, ExecuteErrorKind, - Executor, }; +pub use futures::future::Executor; + use tokio::{ executor::{ DefaultExecutor, @@ -20,6 +21,8 @@ use std::{ io, }; +pub type BoxSendFuture = Box + Send>; + /// An empty type which implements `Executor` by lazily calling /// `tokio::executor::DefaultExecutor::current().execute(...)`. /// @@ -36,6 +39,12 @@ pub struct LazyExecutor; #[derive(Copy, Clone, Debug, Default)] pub struct BoxExecutor(E); +/// A `futures::executor::Executor` with any generics erased. +/// +/// This is useful when some code cannot be generic over any executor, +/// and instead needs a trait object. An example is `Http11Upgrade`. +pub struct ErasedExecutor(Box + Send + Sync>); + /// Indicates which Tokio `Runtime` should be used for the main proxy. /// /// This is either a `tokio::runtime::current_thread::Runtime`, or a @@ -71,7 +80,7 @@ pub enum Error { impl TokioExecutor for LazyExecutor { fn spawn( &mut self, - future: Box + 'static + Send> + future: BoxSendFuture, ) -> Result<(), SpawnError> { DefaultExecutor::current().spawn(future) @@ -118,7 +127,7 @@ impl BoxExecutor { impl TokioExecutor for BoxExecutor { fn spawn( &mut self, - future: Box + 'static + Send> + future: BoxSendFuture, ) -> Result<(), SpawnError> { self.0.spawn(future) } @@ -132,7 +141,7 @@ impl Executor for BoxExecutor where F: Future + 'static + Send, E: TokioExecutor, - E: Executor + Send + 'static>>, + E: Executor, { fn execute(&self, future: F) -> Result<(), ExecuteError> { // Check the status of the executor first, so that we can return the @@ -154,6 +163,30 @@ where } } +// ===== impl ErasedExecutor =====; + +impl ErasedExecutor { + pub fn erase + Send + Sync + 'static>(exe: E) -> ErasedExecutor { + ErasedExecutor(Box::new(exe)) + } +} + +impl Executor for ErasedExecutor +where + F: Future + 'static + Send, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError> { + self.0.execute(Box::new(future)) + .map_err(|_| panic!("erased executor error")) + } +} + +impl fmt::Debug for ErasedExecutor { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("ErasedExecutor") + } +} + // ===== impl MainRuntime ===== impl MainRuntime { diff --git a/proxy/src/transparency/client.rs b/proxy/src/transparency/client.rs index 8e72facfa2..1aa6326294 100644 --- a/proxy/src/transparency/client.rs +++ b/proxy/src/transparency/client.rs @@ -11,10 +11,11 @@ use tower_h2; use bind; use task::BoxExecutor; use telemetry::sensor::http::RequestBody; -use super::glue::{BodyStream, HttpBody, HyperConnect}; +use super::glue::{BodyPayload, HttpBody, HyperConnect}; +use super::upgrade::Http11Upgrade; type HyperClient = - hyper::Client, BodyStream>>; + hyper::Client, BodyPayload>>; /// A `NewService` that can speak either HTTP/1 or HTTP/2. pub struct Client @@ -209,12 +210,16 @@ where } fn call(&mut self, req: Self::Request) -> Self::Future { - debug!("ClientService::call method={} uri={} headers={:?} ext={:?}", - req.method(), req.uri(), req.headers(), req.extensions()); + debug!("client request: method={} uri={} headers={:?}", + req.method(), req.uri(), req.headers()); match self.inner { ClientServiceInner::Http1(ref h1) => { - let mut req = hyper::Request::from(req.map(BodyStream::new)); - ClientServiceFuture::Http1(h1.request(req)) + let mut req = req.map(BodyPayload::new); + let upgrade = req.extensions_mut().remove::(); + ClientServiceFuture::Http1 { + future: h1.request(req), + upgrade, + } }, ClientServiceInner::Http2(ref mut h2) => { ClientServiceFuture::Http2(h2.call(req)) @@ -224,7 +229,10 @@ where } pub enum ClientServiceFuture { - Http1(hyper::client::ResponseFuture), + Http1 { + future: hyper::client::ResponseFuture, + upgrade: Option, + }, Http2(tower_h2::client::ResponseFuture), } @@ -233,12 +241,14 @@ impl Future for ClientServiceFuture { type Error = tower_h2::client::Error; fn poll(&mut self) -> Poll { - match *self { - ClientServiceFuture::Http1(ref mut f) => { - match f.poll() { + match self { + ClientServiceFuture::Http1 { future, upgrade } => { + match future.poll() { Ok(Async::Ready(res)) => { - let res = http::Response::from(res); - let res = res.map(HttpBody::Http1); + let res = res.map(move |b| HttpBody::Http1 { + body: Some(b), + upgrade: upgrade.take(), + }); Ok(Async::Ready(res)) }, Ok(Async::NotReady) => Ok(Async::NotReady), @@ -248,7 +258,7 @@ impl Future for ClientServiceFuture { } } }, - ClientServiceFuture::Http2(ref mut f) => { + ClientServiceFuture::Http2(f) => { let res = try_ready!(f.poll()); let res = res.map(HttpBody::Http2); Ok(Async::Ready(res)) diff --git a/proxy/src/transparency/glue.rs b/proxy/src/transparency/glue.rs index 5e3fea048e..1ee08e02d4 100644 --- a/proxy/src/transparency/glue.rs +++ b/proxy/src/transparency/glue.rs @@ -1,10 +1,9 @@ -use std::cell::RefCell; use std::fmt; use std::io; use std::sync::Arc; use bytes::{Bytes, IntoBuf}; -use futures::{future, Async, Future, Poll, Stream}; +use futures::{future, Async, Future, Poll}; use futures::future::Either; use h2; use http; @@ -15,26 +14,39 @@ use tower_service::{Service, NewService}; use tower_h2; use ctx::transport::{Server as ServerCtx}; +use drain; use super::h1; +use super::upgrade::Http11Upgrade; +use task::{BoxSendFuture, ErasedExecutor, Executor}; /// Glue between `hyper::Body` and `tower_h2::RecvBody`. #[derive(Debug)] pub enum HttpBody { - Http1(hyper::Body), + Http1 { + /// In HttpBody::drop, if this was an HTTP upgrade, the body is taken + /// to be inserted into the Http11Upgrade half. + body: Option, + upgrade: Option + }, Http2(tower_h2::RecvBody), } /// Glue for `tower_h2::Body`s to be used in hyper. #[derive(Debug)] -pub(super) struct BodyStream { +pub(super) struct BodyPayload { body: B, } /// Glue for a `tower::Service` to used as a `hyper::server::Service`. #[derive(Debug)] -pub(super) struct HyperServerSvc { - service: RefCell, +pub(super) struct HyperServerSvc { + service: S, srv_ctx: Arc, + /// Watch any spawned HTTP/1.1 upgrade tasks. + upgrade_drain_signal: drain::Watch, + /// Executor used to spawn HTTP/1.1 upgrade tasks, and TCP proxies + /// after they succeed. + upgrade_executor: E, } /// Future returned by `HyperServerSvc`. @@ -78,16 +90,21 @@ impl tower_h2::Body for HttpBody { type Data = Bytes; fn is_end_stream(&self) -> bool { - match *self { - HttpBody::Http1(ref b) => b.is_end_stream(), - HttpBody::Http2(ref b) => b.is_end_stream(), + match self { + HttpBody::Http1 { body, .. } => { + body + .as_ref() + .expect("only taken in drop") + .is_end_stream() + }, + HttpBody::Http2(b) => b.is_end_stream(), } } fn poll_data(&mut self) -> Poll, h2::Error> { - match *self { - HttpBody::Http1(ref mut b) => { - match b.poll() { + match self { + HttpBody::Http1 { body, .. } => { + match body.as_mut().expect("only taken in drop").poll_data() { Ok(Async::Ready(Some(chunk))) => Ok(Async::Ready(Some(chunk.into()))), Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::NotReady) => Ok(Async::NotReady), @@ -97,14 +114,19 @@ impl tower_h2::Body for HttpBody { } } }, - HttpBody::Http2(ref mut b) => b.poll_data().map(|async| async.map(|opt| opt.map(|data| data.into()))), + HttpBody::Http2(b) => b.poll_data() + .map(|async| { + async.map(|opt| { + opt.map(Bytes::from) + }) + }) } } fn poll_trailers(&mut self) -> Poll, h2::Error> { - match *self { - HttpBody::Http1(_) => Ok(Async::Ready(None)), - HttpBody::Http2(ref mut b) => b.poll_trailers(), + match self { + HttpBody::Http1 { .. } => Ok(Async::Ready(None)), + HttpBody::Http2(b) => b.poll_trailers(), } } } @@ -137,18 +159,36 @@ impl Default for HttpBody { } } -// ===== impl BodyStream ===== +impl Drop for HttpBody { + fn drop(&mut self) { + // If HTTP/1, and an upgrade was wanted, send the upgrade future. + match self { + HttpBody::Http1 { body, upgrade } => { + if let Some(upgrade) = upgrade.take() { + let on_upgrade = body + .take() + .expect("take only on drop") + .on_upgrade(); + upgrade.insert_half(on_upgrade); + } + }, + HttpBody::Http2(_) => (), + } + } +} + +// ===== impl BodyPayload ===== -impl BodyStream { +impl BodyPayload { /// Wrap a `tower_h2::Body` into a `Stream` hyper can understand. pub fn new(body: B) -> Self { - BodyStream { + BodyPayload { body, } } } -impl hyper::body::Payload for BodyStream +impl hyper::body::Payload for BodyPayload where B: tower_h2::Body + Send + 'static, ::Buf: Send, @@ -179,16 +219,23 @@ where // ===== impl HyperServerSvc ===== -impl HyperServerSvc { - pub fn new(svc: S, ctx: Arc) -> Self { +impl HyperServerSvc { + pub fn new( + service: S, + srv_ctx: Arc, + upgrade_drain_signal: drain::Watch, + upgrade_executor: E, + ) -> Self { HyperServerSvc { - service: RefCell::new(svc), - srv_ctx: ctx, + service, + srv_ctx, + upgrade_drain_signal, + upgrade_executor, } } } -impl hyper::service::Service for HyperServerSvc +impl hyper::service::Service for HyperServerSvc where S: Service< Request=http::Request, @@ -197,32 +244,52 @@ where S::Error: fmt::Debug, B: tower_h2::Body + Default + Send + 'static, ::Buf: Send, + E: Executor + Clone + Send + Sync + 'static, { type ReqBody = hyper::Body; - type ResBody = BodyStream; + type ResBody = BodyPayload; type Error = h2::Error; type Future = Either< HyperServerSvcFuture, future::FutureResult, Self::Error>, >; - fn call(&mut self, req: http::Request) -> Self::Future { - if let &hyper::Method::CONNECT = req.method() { + fn call(&mut self, mut req: http::Request) -> Self::Future { + if let &http::Method::CONNECT = req.method() { debug!("HTTP/1.1 CONNECT not supported"); - let res = hyper::Response::builder() - .status(hyper::StatusCode::BAD_GATEWAY) - .body(BodyStream::new(Default::default())) + let res = http::Response::builder() + .status(http::StatusCode::BAD_GATEWAY) + .body(BodyPayload::new(Default::default())) .expect("building response with empty body should not error!"); return Either::B(future::ok(res)); } - let mut req = req; req.extensions_mut().insert(self.srv_ctx.clone()); - h1::strip_connection_headers(req.headers_mut()); + let upgrade = if h1::wants_upgrade(&req) { + trace!("server request wants HTTP/1.1 upgrade"); + // Upgrade requests include several "connection" headers that + // cannot be removed. + + // Setup HTTP Upgrade machinery. + let halves = Http11Upgrade::new( + self.upgrade_drain_signal.clone(), + ErasedExecutor::erase(self.upgrade_executor.clone()), + ); + req.extensions_mut().insert(halves.client); + + Some(halves.server) + } else { + h1::strip_connection_headers(req.headers_mut()); + None + }; - let req = req.map(|b| HttpBody::Http1(b)); + + let req = req.map(move |b| HttpBody::Http1 { + body: Some(b), + upgrade, + }); let f = HyperServerSvcFuture { - inner: self.service.borrow_mut().call(req), + inner: self.service.call(req), }; Either::A(f) } @@ -233,7 +300,7 @@ where F: Future>, F::Error: fmt::Debug, { - type Item = hyper::Response>; + type Item = http::Response>; type Error = h2::Error; fn poll(&mut self) -> Poll { @@ -242,8 +309,12 @@ where h2::Error::from(io::Error::from(io::ErrorKind::Other)) })); - h1::strip_connection_headers(res.headers_mut()); - Ok(Async::Ready(res.map(BodyStream::new).into())) + if h1::is_upgrade(&res) { + trace!("client response is HTTP/1.1 upgrade"); + } else { + h1::strip_connection_headers(res.headers_mut()); + } + Ok(Async::Ready(res.map(BodyPayload::new))) } } diff --git a/proxy/src/transparency/h1.rs b/proxy/src/transparency/h1.rs index c77cc6f426..46ea09830a 100644 --- a/proxy/src/transparency/h1.rs +++ b/proxy/src/transparency/h1.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use bytes::BytesMut; use http; -use http::header::HOST; +use http::header::{HOST, UPGRADE}; use http::uri::{Authority, Parts, Scheme, Uri}; + use ctx::transport::{Server as ServerCtx}; /// Tries to make sure the `Uri` of the request is in a form needed by @@ -81,4 +82,35 @@ pub fn strip_connection_headers(headers: &mut http::HeaderMap) { } } +/// Checks requests to determine if they want to perform an HTTP upgrade. +pub fn wants_upgrade(req: &http::Request) -> bool { + // HTTP upgrades were added in 1.1, not 1.0. + if req.version() != http::Version::HTTP_11 { + return false; + } + + if let Some(upgrade) = req.headers().get(UPGRADE) { + // If an `h2` upgrade over HTTP/1.1 were to go by the proxy, + // and it succeeded, there would an h2 connection, but it would + // be opaque-to-the-proxy, acting as just a TCP proxy. + // + // A user wouldn't be able to see any usual HTTP telemetry about + // requests going over that connection. Instead of that confusion, + // the proxy strips h2 upgrade headers. + // + // Eventually, the proxy will support h2 upgrades directly. + upgrade != "h2c" + } else { + // No Upgrade header means no upgrade wanted! + false + } + +} + +/// Checks responses to determine if they are successful HTTP upgrades. +pub fn is_upgrade(res: &http::Response) -> bool { + // 101 Switching Protocols + res.status() == http::StatusCode::SWITCHING_PROTOCOLS + && res.version() == http::Version::HTTP_11 +} diff --git a/proxy/src/transparency/mod.rs b/proxy/src/transparency/mod.rs index 1318172649..942cd9d6ff 100644 --- a/proxy/src/transparency/mod.rs +++ b/proxy/src/transparency/mod.rs @@ -1,6 +1,7 @@ mod client; mod glue; pub mod h1; +mod upgrade; mod protocol; mod server; mod tcp; diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs index 80d1963d8b..7de705833a 100644 --- a/proxy/src/transparency/server.rs +++ b/proxy/src/transparency/server.rs @@ -161,43 +161,52 @@ where let tcp = self.tcp.clone(); let new_service = self.new_service.clone(); let drain_signal = self.drain_signal.clone(); + let log_clone = log.clone(); let fut = Either::A(io.peek() .map_err(|e| debug!("peek error: {}", e)) - .and_then(move |io| { - if let Some(proto) = Protocol::detect(io.peeked()) { - Either::A(match proto { - Protocol::Http1 => { - trace!("transparency detected HTTP/1"); - - let fut = new_service.new_service() - .map_err(|_| ()) - .and_then(move |s| { - let svc = HyperServerSvc::new(s, srv_ctx); - drain_signal - .watch(h1.serve_connection(io, svc), |conn| { - conn.graceful_shutdown(); - }) - .map(|_| ()) - .map_err(|e| trace!("http1 server error: {:?}", e)) - }); - Either::A(fut) - }, - Protocol::Http2 => { - trace!("transparency detected HTTP/2"); - let set_ctx = move |request: &mut http::Request<()>| { - request.extensions_mut().insert(srv_ctx.clone()); - }; - - let fut = drain_signal - .watch(h2.serve_modified(io, set_ctx), |conn| { + .and_then(move |io| match Protocol::detect(io.peeked()) { + Some(Protocol::Http1) => Either::A({ + trace!("transparency detected HTTP/1"); + + let fut = new_service.new_service() + .map_err(|e| trace!("h1 new_service error: {:?}", e)) + .and_then(move |s| { + let svc = HyperServerSvc::new( + s, + srv_ctx, + drain_signal.clone(), + log_clone.executor(), + ); + let conn = h1 + .serve_connection(io, svc) + // Since using `Connection`s, enable + // support for HTTP upgrades (CONNECT + // and websockets). + .with_upgrades(); + drain_signal + .watch(conn, |conn| { conn.graceful_shutdown(); }) - .map_err(|e| trace!("h2 server error: {:?}", e)); - - Either::B(fut) - } - }) - } else { + .map(|_| ()) + .map_err(|e| trace!("http1 server error: {:?}", e)) + }); + Either::A(fut) + }), + Some(Protocol::Http2) => Either::A({ + trace!("transparency detected HTTP/2"); + let set_ctx = move |request: &mut http::Request<()>| { + request.extensions_mut().insert(srv_ctx.clone()); + }; + + let fut = drain_signal + .watch(h2.serve_modified(io, set_ctx), |conn| { + conn.graceful_shutdown(); + }) + .map_err(|e| trace!("h2 server error: {:?}", e)); + + Either::B(fut) + }), + None => { trace!("transparency did not detect protocol, treating as TCP"); Either::B(tcp_serve( &tcp, diff --git a/proxy/src/transparency/tcp.rs b/proxy/src/transparency/tcp.rs index a28bd0ceb7..2a8f520c38 100644 --- a/proxy/src/transparency/tcp.rs +++ b/proxy/src/transparency/tcp.rs @@ -77,12 +77,21 @@ impl Proxy { future::Either::A(connect.connect() .map_err(move |e| error!("tcp connect error to {}: {:?}", orig_dst, e)) .and_then(move |tcp_out| { - Duplex::new(tcp_in, tcp_out) - .map_err(|e| error!("tcp duplex error: {}", e)) + duplex(tcp_in, tcp_out) })) } } +pub(super) fn duplex(half_in: In, half_out: Out) + -> impl Future + Send +where + In: AsyncRead + AsyncWrite + Send + 'static, + Out: AsyncRead + AsyncWrite + Send + 'static, +{ + Duplex::new(half_in, half_out) + .map_err(|e| error!("tcp duplex error: {}", e)) +} + /// A future piping data bi-directionally to In and Out. struct Duplex { half_in: HalfDuplex, diff --git a/proxy/src/transparency/upgrade.rs b/proxy/src/transparency/upgrade.rs new file mode 100644 index 0000000000..4afd4bc1a3 --- /dev/null +++ b/proxy/src/transparency/upgrade.rs @@ -0,0 +1,162 @@ +//! HTTP/1.1 Upgrades +use std::fmt; +use std::mem; +use std::sync::Arc; + +use futures::Future; +use hyper::upgrade::OnUpgrade; +use try_lock::TryLock; + +use drain; +use super::tcp; +use task::{ErasedExecutor, Executor}; + +/// A type inserted into `http::Extensions` to bridge together HTTP Upgrades. +/// +/// If the HTTP1 server service detects an upgrade request, this will be +/// inserted into the `Request::extensions()`. If the HTTP1 client service +/// also detects an upgrade, the two `OnUpgrade` futures will be joined +/// together with the glue in this type. +// Note: this relies on their only having been 2 Inner clones, so don't +// implement `Clone` for this type. +pub struct Http11Upgrade { + half: Half, + inner: Arc, +} + +/// A named "tuple" returned by `Http11Upgade::new()` of the two halves of +/// an upgrade. +#[derive(Debug)] +pub struct Http11UpgradeHalves { + /// The "server" half. + pub server: Http11Upgrade, + /// The "client" half. + pub client: Http11Upgrade, + _inner: (), +} + +struct Inner { + server: TryLock>, + client: TryLock>, + upgrade_drain_signal: Option, + /// An ErasedExecutor is used because the containing type, Http11Upgrade, + /// is inserted into `http::Extensions`, which is a type map. + /// + /// If this were instead a generic `E: Executor`, it'd be very easy + /// to specify the wrong when trying to remove the `Http11Upgrade` from + /// the type map, since with different generics, they'd generate + /// different `TypeId`s. + upgrade_executor: ErasedExecutor, +} + +#[derive(Debug)] +enum Half { + Server, + Client, +} + + +// ===== impl Http11Upgrade ===== + +impl Http11Upgrade { + /// Returns a pair of upgrade handles. + /// + /// Each handle is used to insert 1 half of the upgrade. When both handles + /// have inserted, the upgrade future will be spawned onto the executor. + pub fn new( + upgrade_drain_signal: drain::Watch, + upgrade_executor: ErasedExecutor, + ) -> Http11UpgradeHalves { + let inner = Arc::new(Inner { + server: TryLock::new(None), + client: TryLock::new(None), + upgrade_drain_signal: Some(upgrade_drain_signal), + upgrade_executor, + }); + + Http11UpgradeHalves { + server: Http11Upgrade { + half: Half::Server, + inner: inner.clone(), + }, + client: Http11Upgrade { + half: Half::Client, + inner: inner, + }, + _inner: (), + } + } + + pub fn insert_half(self, upgrade: OnUpgrade) { + match self.half { + Half::Server => { + let mut lock = self + .inner + .server + .try_lock() + .expect("only Half::Server touches server TryLock"); + debug_assert!(lock.is_none()); + *lock = Some(upgrade); + }, + Half::Client => { + let mut lock = self + .inner + .client + .try_lock() + .expect("only Half::Client touches client TryLock"); + debug_assert!(lock.is_none()); + *lock = Some(upgrade); + } + } + } +} + +impl fmt::Debug for Http11Upgrade { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Http11Upgrade") + .field("half", &self.half) + .finish() + } +} + +/// When both halves have dropped, check if both sides are inserted, +/// and if so, spawn the upgrade task. +impl Drop for Inner { + fn drop(&mut self) { + // Since this is Inner::drop, no more synchronization is required. + // We can safely take the futures out of their locks. + let server = mem::replace(&mut self.server, TryLock::new(None)).into_inner(); + let client = mem::replace(&mut self.client, TryLock::new(None)).into_inner(); + if let (Some(server), Some(client)) = (server, client) { + trace!("HTTP/1.1 upgrade has both halves"); + + let server_upgrade = server.map_err(|e| { + debug!("server HTTP upgrade error: {}", e) + }); + + let client_upgrade = client.map_err(|e| { + debug!("client HTTP upgrade error: {}", e) + }); + + let both_upgrades = server_upgrade + .join(client_upgrade) + .and_then(|(server_conn, client_conn)| { + trace!("HTTP upgrade successful"); + tcp::duplex(server_conn, client_conn) + }); + + // There's nothing to do when drain is signaled, we just have to hope + // the sockets finish soon. However, the drain signal still needs to + // 'watch' the TCP future so that the process doesn't close early. + let fut = self + .upgrade_drain_signal + .take() + .expect("only taken in drop") + .watch(both_upgrades, |_| ()); + + if let Err(_) = self.upgrade_executor.execute(fut) { + trace!("error spawning HTTP upgrade task"); + } + } + } +} diff --git a/proxy/tests/support/mod.rs b/proxy/tests/support/mod.rs index 89d9705a91..42d1c24e99 100644 --- a/proxy/tests/support/mod.rs +++ b/proxy/tests/support/mod.rs @@ -111,6 +111,20 @@ macro_rules! assert_eventually { }; } +#[macro_export] +macro_rules! assert_contains { + ($haystack:expr, $needle:expr) => { + assert!($haystack.contains($needle), "haystack:\n{:8?}\ndid not contain:\n{:8?}", $haystack, $needle) + } +} + +#[macro_export] +macro_rules! assert_eventually_contains { + ($scrape:expr, $contains:expr) => { + assert_eventually!($scrape.contains($contains), "metrics scrape:\n{:8?}\ndid not contain:\n{:8?}", $scrape, $contains) + } +} + pub mod client; pub mod controller; pub mod proxy; diff --git a/proxy/tests/transparency.rs b/proxy/tests/transparency.rs index 3a57fb1e77..164f2a539c 100644 --- a/proxy/tests/transparency.rs +++ b/proxy/tests/transparency.rs @@ -3,12 +3,6 @@ mod support; use self::support::*; -macro_rules! assert_contains { - ($scrape:expr, $contains:expr) => { - assert_eventually!($scrape.contains($contains), "metrics scrape:\n{:8}\ndid not contain:\n{:8}", $scrape, $contains) - } -} - #[test] fn outbound_http1() { let _ = env_logger::try_init(); @@ -329,33 +323,56 @@ fn tcp_connections_close_if_client_closes() { } #[test] -fn http11_upgrade_not_supported() { +fn http11_upgrades() { let _ = env_logger::try_init(); - // our h1 proxy will strip the Connection header - // and headers it mentions - let msg1 = "\ + // To simplify things for this test, we just use the test TCP + // client and server to do an HTTP upgrade. + // + // This is upgrading to 'chatproto', a made up plaintext protocol + // to simplify testing. + + let upgrade_req = "\ GET /chat HTTP/1.1\r\n\ Host: foo.bar\r\n\ - Connection: Upgrade\r\n\ - Upgrade: websocket\r\n\ + Connection: upgrade\r\n\ + Upgrade: chatproto\r\n\ \r\n\ "; - - // but let's pretend the server tries to upgrade - // anyways - let msg2 = "\ + let upgrade_res = "\ HTTP/1.1 101 Switching Protocols\r\n\ - Upgrade: websocket\r\n\ - Connection: Upgrade\r\n\ + Upgrade: chatproto\r\n\ + Connection: upgrade\r\n\ \r\n\ "; + let upgrade_needle = "\r\nupgrade: chatproto\r\n"; + let chatproto_req = "[chatproto-c]{send}: hi all\n"; + let chatproto_res = "[chatproto-s]{recv}: welcome!\n"; + let srv = server::tcp() - .accept(move |read| { - let head = s(&read); - assert!(!head.contains("Upgrade: websocket")); - msg2 + .accept_fut(move |sock| { + // Read upgrade_req... + tokio_io::io::read(sock, vec![0; 512]) + .and_then(move |(sock, vec, n)| { + let head = s(&vec[..n]); + assert_contains!(head, upgrade_needle); + + // Write upgrade_res back... + tokio_io::io::write_all(sock, upgrade_res) + }) + .and_then(move |(sock, _)| { + // Read the message in 'chatproto' format + tokio_io::io::read(sock, vec![0; 512]) + }) + .and_then(move |(sock, vec, n)| { + assert_eq!(s(&vec[..n]), chatproto_req); + + // Some processing... and then write back in chatproto... + tokio_io::io::write_all(sock, chatproto_res) + }) + .map(|_| ()) + .map_err(|e| panic!("tcp server error: {}", e)) }) .run(); let proxy = proxy::new() @@ -366,10 +383,60 @@ fn http11_upgrade_not_supported() { let tcp_client = client.connect(); - tcp_client.write(msg1); + tcp_client.write(upgrade_req); - let expected = "HTTP/1.1 500 "; - assert_eq!(s(&tcp_client.read()[..expected.len()]), expected); + let resp = tcp_client.read(); + let resp_str = s(&resp); + assert!( + resp_str.starts_with("HTTP/1.1 101 Switching Protocols\r\n"), + "response not an upgrade: {:?}", + resp_str + ); + assert_contains!(resp_str, upgrade_needle); + + // We've upgraded from HTTP to chatproto! Say hi! + tcp_client.write(chatproto_req); + // Did anyone respond? + let chat_resp = tcp_client.read(); + assert_eq!(s(&chat_resp), chatproto_res); +} + +#[test] +fn http11_upgrade_h2_stripped() { + let _ = env_logger::try_init(); + + // If an `h2` upgrade over HTTP/1.1 were to go by the proxy, + // and it succeeded, there would an h2 connection, but it would + // be opaque-to-the-proxy, acting as just a TCP proxy. + // + // A user wouldn't be able to see any usual HTTP telemetry about + // requests going over that connection. Instead of that confusion, + // the proxy strips h2 upgrade headers. + // + // Eventually, the proxy will support h2 upgrades directly. + + let srv = server::http1() + .route_fn("/", |req| { + assert!(!req.headers().contains_key("connection")); + assert!(!req.headers().contains_key("upgrade")); + assert!(!req.headers().contains_key("http2-settings")); + Response::default() + }) + .run(); + let proxy = proxy::new() + .inbound(srv) + .run(); + + let client = client::http1(proxy.inbound, "transparency.test.svc.cluster.local"); + + let res = client.request(client.request_builder("/") + .header("upgrade", "h2c") + .header("http2-settings", "") + .header("connection", "upgrade, http2-settings")); + + // If the assertion is trigger in the above test route, the proxy will + // just send back a 500. + assert_eq!(res.status(), http::StatusCode::OK); } #[test] @@ -755,7 +822,7 @@ fn retry_reconnect_errors() { // wait until metrics has seen our connection, this can be flaky depending on // all the other threads currently running... - assert_contains!( + assert_eventually_contains!( metrics.get("/metrics"), "tcp_open_total{direction=\"inbound\",peer=\"src\"} 1" );