Skip to content

Commit

Permalink
proxy: Clarify Outbound::recognize (linkerd#1144)
Browse files Browse the repository at this point in the history
The comments in Outbound::recognize had become somewhat stale as the
logic changed. Furthermore, this implementation may be easier to
understand if broken into smaller pieces.

This change reorganizes the Outbound:recognize method into helper
methods--`destination`, `host_port`, and `normalize`--each with
accompanying docstrings that more accurately reflect the current
implementation.

This also has the side-effect benefit of eliminating a string clone on
every request.
  • Loading branch information
olix0r committed Jun 19, 2018
1 parent 10f327e commit 902b7ef
Showing 1 changed file with 81 additions and 55 deletions.
136 changes: 81 additions & 55 deletions proxy/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ const MAX_IN_FLIGHT: usize = 10_000;
/// This default is used by Finagle.
const DEFAULT_DECAY: Duration = Duration::from_secs(10);

/// Describes a destination for HTTP requests.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Destination {
Hostname(DnsNameAndPort),
ImplicitOriginalDst(SocketAddr),
/// A logical, lazily-bound endpoint.
Name(DnsNameAndPort),

/// A single, bound endpoint.
Addr(SocketAddr),
}

// ===== impl Outbound =====
Expand All @@ -54,6 +58,65 @@ impl<B> Outbound<B> {
bind_timeout,
}
}


/// TODO: Return error when `HostAndPort::normalize()` fails.
/// TODO: Use scheme-appropriate default port.
fn normalize(authority: &http::uri::Authority) -> Option<HostAndPort> {
const DEFAULT_PORT: Option<u16> = Some(80);
HostAndPort::normalize(authority, DEFAULT_PORT).ok()
}

/// Determines the logical host:port of the request.
///
/// If the parsed URI includes an authority, use that. Otherwise, try to load the
/// authority from the `Host` header.
///
/// The port is either parsed from the authority or a default of 80 is used.
fn host_port(req: &http::Request<B>) -> Option<HostAndPort> {
// Note: Calls to `normalize` cannot be deduped without cloning `authority`.
req.uri()
.authority_part()
.and_then(Self::normalize)
.or_else(|| {
h1::authority_from_host(req)
.and_then(|h| Self::normalize(&h))
})
}

/// Determines the destination for a request.
///
/// Typically, a request's authority is used to produce a `Destination`. If the
/// authority addresses a DNS name, a `Destination::Name` is returned; and, otherwise,
/// it addresses a fixed IP address and a `Destination::Addr` is returned. The port is
/// inferred if not specified in the authority.
///
/// If no authority is available, the `SO_ORIGINAL_DST` socket option is checked. If
/// it's available, it is used to return a `Destination::Addr`. This socket option is
/// typically set by `iptables(8)` in containerized environments like Kubernetes (as
/// configured by the `proxy-init` program).
///
/// If none of this information is available, no `Destination` is returned.
fn destination(req: &http::Request<B>) -> Option<Destination> {
match Self::host_port(req) {
Some(HostAndPort { host: Host::DnsName(host), port }) => {
let dst = DnsNameAndPort { host, port };
Some(Destination::Name(dst))
}

Some(HostAndPort { host: Host::Ip(ip), port }) => {
let dst = SocketAddr::from((ip, port));
Some(Destination::Addr(dst))
}

None => {
req.extensions()
.get::<Arc<ctx::transport::Server>>()
.and_then(|ctx| ctx.orig_dst_if_not_local())
.map(Destination::Addr)
}
}
}
}

impl<B> Clone for Outbound<B>
Expand Down Expand Up @@ -88,45 +151,11 @@ where
choose::PowerOfTwoChoices,
>>>>;

// Route the request by its destination AND PROTOCOL. This prevents HTTP/1
// requests from being routed to HTTP/2 servers, and vice versa.
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
let dest = Self::destination(req)?;
let proto = bind::Protocol::detect(req);

// The request URI and Host: header have not yet been normalized
// by `NormalizeUri`, as we need to know whether the request will
// be routed by Host/authority or by SO_ORIGINAL_DST, in order to
// determine whether the service is reusable.
let authority = req.uri().authority_part()
.cloned()
// Therefore, we need to check the host header as well as the URI
// for a valid authority, before we fall back to SO_ORIGINAL_DST.
.or_else(|| h1::authority_from_host(req));

// TODO: Return error when `HostAndPort::normalize()` fails.
let mut dest = match authority.as_ref()
.and_then(|auth| HostAndPort::normalize(auth, Some(80)).ok()) {
Some(HostAndPort { host: Host::DnsName(dns_name), port }) =>
Some(Destination::Hostname(DnsNameAndPort { host: dns_name, port })),
Some(HostAndPort { host: Host::Ip(_), .. }) |
None => None,
};

if dest.is_none() {
dest = req.extensions()
.get::<Arc<ctx::transport::Server>>()
.and_then(|ctx| {
ctx.orig_dst_if_not_local()
})
.map(Destination::ImplicitOriginalDst)
};

// If there is no authority in the request URI or in the Host header,
// and there is no original dst, then we have nothing! In that case,
// return `None`, which results an "unrecognized" error. In practice,
// this shouldn't ever happen, since we expect the proxy to be run on
// Linux servers, with iptables setup, so there should always be an
// original destination.
let dest = dest?;

Some((dest, proto))
}

Expand All @@ -141,16 +170,12 @@ where
let &(ref dest, ref protocol) = key;
debug!("building outbound {:?} client to {:?}", protocol, dest);

let resolve = match *dest {
Destination::Hostname(ref authority) => {
Discovery::NamedSvc(self.discovery.resolve(
authority,
self.bind.clone().with_protocol(protocol.clone()),
))
},
Destination::ImplicitOriginalDst(addr) => {
Discovery::ImplicitOriginalDst(Some((addr, self.bind.clone()
.with_protocol(protocol.clone()))))
let resolve = {
let proto = self.bind.clone().with_protocol(protocol.clone());
match *dest {
Destination::Name(ref authority) =>
Discovery::Name(self.discovery.resolve(authority, proto)),
Destination::Addr(addr) => Discovery::Addr(Some((addr, proto))),
}
};

Expand All @@ -172,8 +197,8 @@ where
}

pub enum Discovery<B> {
NamedSvc(Resolution<BindProtocol<B>>),
ImplicitOriginalDst(Option<(SocketAddr, BindProtocol<B>)>),
Name(Resolution<BindProtocol<B>>),
Addr(Option<(SocketAddr, BindProtocol<B>)>),
}

impl<B> Discover for Discovery<B>
Expand All @@ -190,9 +215,9 @@ where

fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
match *self {
Discovery::NamedSvc(ref mut w) => w.poll()
Discovery::Name(ref mut w) => w.poll()
.map_err(|_| BindError::Internal),
Discovery::ImplicitOriginalDst(ref mut opt) => {
Discovery::Addr(ref mut opt) => {
// This "discovers" a single address for an external service
// that never has another change. This can mean it floats
// in the Balancer forever. However, when we finally add
Expand All @@ -209,6 +234,7 @@ where
}
}
}

#[derive(Copy, Clone, Debug)]
pub enum BindError {
External { addr: SocketAddr },
Expand Down Expand Up @@ -243,10 +269,10 @@ struct Dst(Destination);
impl fmt::Display for Dst {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
Destination::Hostname(ref name) => {
Destination::Name(ref name) => {
write!(f, "{}:{}", name.host, name.port)
}
Destination::ImplicitOriginalDst(ref addr) => addr.fmt(f),
Destination::Addr(ref addr) => addr.fmt(f),
}
}
}

0 comments on commit 902b7ef

Please sign in to comment.