Skip to content

Commit

Permalink
proxy: add HTTP/1.1 Upgrade support automatically (linkerd#1126)
Browse files Browse the repository at this point in the history
Any HTTP/1.1 requests seen by the proxy will automatically set up
to prepare such that if the proxied responses agree to an upgrade,
the two connections will converted into a standard TCP proxy duplex.

Implementation
-----------------

This adds a new type, `transparency::Http11Upgrade`, which is a sort of rendezvous type for triggering HTTP/1.1 upgrades. In the h1 server service, if a request looks like an upgrade (`h1::wants_upgrade`), the request body is decorated with this new `Http11Upgrade` type. It is actually a pair, and so the second half is put into the request extensions, so that the h1 client service may look for it right before serialization. If it finds the half in the extensions, it decorates the *response* body with that half (if it looks like a response upgrade (`h1::is_upgrade`)).

The `HttpBody` type now has a `Drop` impl, which will look to see if its been decorated with an `Http11Upgrade` half. If so, it will check for hyper's new `Body::on_upgrade()` future, and insert that into the half. 

When both `Http11Upgrade` halves are dropped, its internal `Drop` will look to if both halves have supplied an upgrade. If so, the two `OnUpgrade` futures from hyper are joined on, and when they succeed, a `transparency::tcp::duplex()` future is created. This chain is spawned into the default executor.

The `drain::Watch` signal is carried along, to ensure upgraded connections still count towards active connections when the proxy wants to shutdown.

Closes linkerd#195
  • Loading branch information
seanmonstar committed Jun 20, 2018
1 parent 8dcb95d commit c52385d
Show file tree
Hide file tree
Showing 13 changed files with 538 additions and 127 deletions.
19 changes: 10 additions & 9 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ dependencies = [
"h2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"inotify 0.5.2-dev (git+https://github.com/inotify-rs/inotify)",
"ipnet 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
Expand Down Expand Up @@ -165,6 +165,7 @@ dependencies = [
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-util 0.1.0 (git+https://github.com/tower-rs/tower)",
"trust-dns-resolver 0.9.0 (git+https://github.com/bluejekyll/trust-dns)",
"try-lock 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"untrusted 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"webpki 0.18.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)",
]
Expand Down Expand Up @@ -416,7 +417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "hyper"
version = "0.12.1"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
Expand All @@ -435,7 +436,7 @@ dependencies = [
"tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-tcp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"want 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
Expand Down Expand Up @@ -1328,7 +1329,7 @@ dependencies = [

[[package]]
name = "try-lock"
version = "0.1.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
Expand Down Expand Up @@ -1399,12 +1400,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "want"
version = "0.0.4"
version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"try-lock 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"try-lock 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
Expand Down Expand Up @@ -1534,7 +1535,7 @@ dependencies = [
"checksum hostname 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "58fab6e177434b0bb4cd344a4dabaa5bd6d7a8d792b1885aebcae7af1091d1cb"
"checksum http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "75df369fd52c60635208a4d3e694777c099569b3dcf4844df8f652dc004644ab"
"checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37"
"checksum hyper 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6416251e6672bff06fe96a3337570772845a44500fba2d178e2e55e0fab58a86"
"checksum hyper 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ad39a4f15051ccd4ea6adf44df851e00fd9062c71734391d806246b94e69dc1f"
"checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d"
"checksum indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b9378f1f3923647a9aea6af4c6b5de68cc8a71415459ad25ef191191c48f5b7"
"checksum inotify 0.5.2-dev (git+https://github.com/inotify-rs/inotify)" = "<none>"
Expand Down Expand Up @@ -1631,7 +1632,7 @@ dependencies = [
"checksum tower-util 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum trust-dns-proto 0.4.0 (git+https://github.com/bluejekyll/trust-dns)" = "<none>"
"checksum trust-dns-resolver 0.9.0 (git+https://github.com/bluejekyll/trust-dns)" = "<none>"
"checksum try-lock 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee2aa4715743892880f70885373966c83d73ef1b0838a664ef0c76fffd35e7c2"
"checksum try-lock 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "119b532a17fbe772d360be65617310164549a07c25a1deab04c84168ce0d4545"
"checksum ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fd2be2d6639d0f8fe6cdda291ad456e23629558d466e2789d2c3e9892bda285d"
"checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5"
"checksum unicode-normalization 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "51ccda9ef9efa3f7ef5d91e8f9b83bbe6955f9bf86aec89d5cce2c874625920f"
Expand All @@ -1643,7 +1644,7 @@ dependencies = [
"checksum url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f808aadd8cfec6ef90e4a14eb46f24511824d1ac596b9682703c87056c8678b7"
"checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
"checksum want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a05d9d966753fa4b5c8db73fcab5eed4549cfe0e1e4e66911e5564a0085c35d1"
"checksum want 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2fffe09593e18ed34950d66dbf44c27deb2e03f3905c493f0641f9f99a3f2349"
"checksum webpki 0.18.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)" = "724897af4bb44f3e0142b9cca300eb15f61b9b34fa559440bed8c43f2ff7afc0"
"checksum which 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49c4f580e93079b70ac522e7bdebbe1568c8afa7d8d05ee534ee737ca37d2f51"
"checksum widestring 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7157704c2e12e3d2189c507b7482c52820a16dfa4465ba91add92f266667cadb"
Expand Down
3 changes: 2 additions & 1 deletion proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ futures-watch = { git = "https://github.com/carllerche/better-future" }
h2 = "0.1.10"
http = "0.1"
httparse = "1.2"
hyper = "0.12"
hyper = "0.12.2"
ipnet = "1.0"
log = "0.4.1"
indexmap = "1.0.0"
prost = "0.4.0"
prost-types = "0.4.0"
rand = "0.5.1"
try-lock = "0.2"

# for config parsing
regex = "1.0.0"
Expand Down
1 change: 1 addition & 0 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ extern crate conduit_proxy_router;
extern crate tower_util;
extern crate tower_in_flight_limit;
extern crate trust_dns_resolver;
extern crate try_lock;

use futures::*;

Expand Down
41 changes: 37 additions & 4 deletions proxy/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use futures::future::{
Future,
ExecuteError,
ExecuteErrorKind,
Executor,
};
pub use futures::future::Executor;

use tokio::{
executor::{
DefaultExecutor,
Expand All @@ -20,6 +21,8 @@ use std::{
io,
};

pub type BoxSendFuture = Box<Future<Item = (), Error = ()> + Send>;

/// An empty type which implements `Executor` by lazily calling
/// `tokio::executor::DefaultExecutor::current().execute(...)`.
///
Expand All @@ -36,6 +39,12 @@ pub struct LazyExecutor;
#[derive(Copy, Clone, Debug, Default)]
pub struct BoxExecutor<E: TokioExecutor>(E);

/// A `futures::executor::Executor` with any generics erased.
///
/// This is useful when some code cannot be generic over any executor,
/// and instead needs a trait object. An example is `Http11Upgrade`.
pub struct ErasedExecutor(Box<Executor<BoxSendFuture> + Send + Sync>);

/// Indicates which Tokio `Runtime` should be used for the main proxy.
///
/// This is either a `tokio::runtime::current_thread::Runtime`, or a
Expand Down Expand Up @@ -71,7 +80,7 @@ pub enum Error {
impl TokioExecutor for LazyExecutor {
fn spawn(
&mut self,
future: Box<Future<Item = (), Error = ()> + 'static + Send>
future: BoxSendFuture,
) -> Result<(), SpawnError>
{
DefaultExecutor::current().spawn(future)
Expand Down Expand Up @@ -118,7 +127,7 @@ impl<E: TokioExecutor> BoxExecutor<E> {
impl<E: TokioExecutor> TokioExecutor for BoxExecutor<E> {
fn spawn(
&mut self,
future: Box<Future<Item = (), Error = ()> + 'static + Send>
future: BoxSendFuture,
) -> Result<(), SpawnError> {
self.0.spawn(future)
}
Expand All @@ -132,7 +141,7 @@ impl<F, E> Executor<F> for BoxExecutor<E>
where
F: Future<Item = (), Error = ()> + 'static + Send,
E: TokioExecutor,
E: Executor<Box<Future<Item = (), Error = ()> + Send + 'static>>,
E: Executor<BoxSendFuture>,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
// Check the status of the executor first, so that we can return the
Expand All @@ -154,6 +163,30 @@ where
}
}

// ===== impl ErasedExecutor =====;

impl ErasedExecutor {
pub fn erase<E: Executor<BoxSendFuture> + Send + Sync + 'static>(exe: E) -> ErasedExecutor {
ErasedExecutor(Box::new(exe))
}
}

impl<F> Executor<F> for ErasedExecutor
where
F: Future<Item = (), Error = ()> + 'static + Send,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.0.execute(Box::new(future))
.map_err(|_| panic!("erased executor error"))
}
}

impl fmt::Debug for ErasedExecutor {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("ErasedExecutor")
}
}

// ===== impl MainRuntime =====

impl MainRuntime {
Expand Down
36 changes: 23 additions & 13 deletions proxy/src/transparency/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use tower_h2;
use bind;
use task::BoxExecutor;
use telemetry::sensor::http::RequestBody;
use super::glue::{BodyStream, HttpBody, HyperConnect};
use super::glue::{BodyPayload, HttpBody, HyperConnect};
use super::upgrade::Http11Upgrade;

type HyperClient<C, B> =
hyper::Client<HyperConnect<C>, BodyStream<RequestBody<B>>>;
hyper::Client<HyperConnect<C>, BodyPayload<RequestBody<B>>>;

/// A `NewService` that can speak either HTTP/1 or HTTP/2.
pub struct Client<C, E, B>
Expand Down Expand Up @@ -209,12 +210,16 @@ where
}

fn call(&mut self, req: Self::Request) -> Self::Future {
debug!("ClientService::call method={} uri={} headers={:?} ext={:?}",
req.method(), req.uri(), req.headers(), req.extensions());
debug!("client request: method={} uri={} headers={:?}",
req.method(), req.uri(), req.headers());
match self.inner {
ClientServiceInner::Http1(ref h1) => {
let mut req = hyper::Request::from(req.map(BodyStream::new));
ClientServiceFuture::Http1(h1.request(req))
let mut req = req.map(BodyPayload::new);
let upgrade = req.extensions_mut().remove::<Http11Upgrade>();
ClientServiceFuture::Http1 {
future: h1.request(req),
upgrade,
}
},
ClientServiceInner::Http2(ref mut h2) => {
ClientServiceFuture::Http2(h2.call(req))
Expand All @@ -224,7 +229,10 @@ where
}

pub enum ClientServiceFuture {
Http1(hyper::client::ResponseFuture),
Http1 {
future: hyper::client::ResponseFuture,
upgrade: Option<Http11Upgrade>,
},
Http2(tower_h2::client::ResponseFuture),
}

Expand All @@ -233,12 +241,14 @@ impl Future for ClientServiceFuture {
type Error = tower_h2::client::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
ClientServiceFuture::Http1(ref mut f) => {
match f.poll() {
match self {
ClientServiceFuture::Http1 { future, upgrade } => {
match future.poll() {
Ok(Async::Ready(res)) => {
let res = http::Response::from(res);
let res = res.map(HttpBody::Http1);
let res = res.map(move |b| HttpBody::Http1 {
body: Some(b),
upgrade: upgrade.take(),
});
Ok(Async::Ready(res))
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Expand All @@ -248,7 +258,7 @@ impl Future for ClientServiceFuture {
}
}
},
ClientServiceFuture::Http2(ref mut f) => {
ClientServiceFuture::Http2(f) => {
let res = try_ready!(f.poll());
let res = res.map(HttpBody::Http2);
Ok(Async::Ready(res))
Expand Down
Loading

0 comments on commit c52385d

Please sign in to comment.