Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy: Add TLS client infrastructure. #1158

Merged
merged 9 commits into from
Jun 20, 2018
19 changes: 9 additions & 10 deletions proxy/benches/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate test;

use conduit_proxy::{
ctx,
conditional::Conditional,
control::destination,
telemetry::{
event,
Expand All @@ -18,28 +19,26 @@ use std::{
fmt,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant, SystemTime},
time::{Duration, Instant},
};
use test::Bencher;
use conduit_proxy::tls;

const REQUESTS: usize = 100;

const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> =
Conditional::None(tls::ReasonForNoTls::Disabled);

fn addr() -> SocketAddr {
([1, 2, 3, 4], 5678).into()
}

fn process() -> Arc<ctx::Process> {
Arc::new(ctx::Process {
scheduled_namespace: "test".into(),
start_time: SystemTime::now(),
})
ctx::Process::test("test")
}

fn server(proxy: &Arc<ctx::Proxy>) -> Arc<ctx::transport::Server> {
ctx::transport::Server::new(
&proxy, &addr(), &addr(), &Some(addr()),
ctx::transport::TlsStatus::Disabled,
)
ctx::transport::Server::new(&proxy, &addr(), &addr(), &Some(addr()), TLS_DISABLED)
}

fn client<L, S>(proxy: &Arc<ctx::Proxy>, labels: L) -> Arc<ctx::transport::Client>
Expand All @@ -51,7 +50,7 @@ where
&proxy,
&addr(),
destination::Metadata::new(metrics::DstLabels::new(labels), None),
ctx::transport::TlsStatus::Disabled,
TLS_DISABLED,
)
}

Expand Down
16 changes: 11 additions & 5 deletions proxy/src/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use ctx;
use telemetry::{self, sensor};
use transparency::{self, HttpBody, h1};
use transport;
use tls;
use ctx::transport::TlsStatus;

/// Binds a `Service` from a `SocketAddr`.
///
Expand Down Expand Up @@ -205,18 +207,20 @@ where
fn bind_stack(&self, ep: &Endpoint, protocol: &Protocol) -> Stack<B> {
debug!("bind_stack endpoint={:?}, protocol={:?}", ep, protocol);
let addr = ep.address();

let tls = tls::current_connection_config(ep.tls_identity(),
&self.ctx.tls_client_config_watch());

let client_ctx = ctx::transport::Client::new(
&self.ctx,
&addr,
ep.metadata().clone(),
// TODO: when we can use TLS for client connections, indicate
// whether or not the connection was TLS here.
ctx::transport::TlsStatus::Disabled,
TlsStatus::from(&tls),
);

// Map a socket address to a connection.
let connect = self.sensors.connect(
transport::Connect::new(addr, ep.tls_identity().cloned()),
transport::Connect::new(addr, tls),
&client_ctx,
);

Expand Down Expand Up @@ -266,7 +270,9 @@ where


impl<C, B> Bind<C, B> {
pub fn with_protocol(self, protocol: Protocol) -> BindProtocol<C, B> {
pub fn with_protocol(self, protocol: Protocol)
-> BindProtocol<C, B>
{
BindProtocol {
bind: self,
protocol,
Expand Down
55 changes: 55 additions & 0 deletions proxy/src/conditional.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std;

/// Like `std::option::Option<C>` but `None` carries a reason why the value
/// isn't available.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From that description, this basically sounds like a Result.

I understand why you didn't want to use a type which implies one case is an "error" here, though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't like the introduction of Conditional much but this shouldn't be Result since Result should be used strictly for errors. Either makes more sense but I don't want to import another library. Note that this is basically your Either type that you implemented Stream on. We could move Either to a more generic place and then use that instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't be Result since Result should be used strictly for errors.

I think "a reason we were not able to do TLS" falls into a (sufficiently broad) definition of "error"; Err is used elsewhere for signalling that something is not available. However, I'm not going to press that because I think the discussion is just semantics at this point. :)

We could move Either to a more generic place and then use that instead?

I would be fine with that as well, but (coming at it from another direction) I think Conditional does better encode the meaning of each branch, so, having thought about it a bit, I'm ready to put a green check on this as it is.

#[derive(Clone, Debug)]
pub enum Conditional<C, R>
where
C: Clone + std::fmt::Debug,
R: Clone + std::fmt::Debug,
{
Some(C),
None(R),
}

impl<C, R> Copy for Conditional<C, R>
where
C: Copy + Clone + std::fmt::Debug,
R: Copy + Clone + std::fmt::Debug,
{
}

impl<C, R> Eq for Conditional<C, R>
where
C: Eq + Clone + std::fmt::Debug,
R: Eq + Clone + std::fmt::Debug,
{
}

impl<C, R> PartialEq for Conditional<C, R>
where
C: PartialEq + Clone + std::fmt::Debug,
R: PartialEq + Clone + std::fmt::Debug,
{
fn eq(&self, other: &Conditional<C, R>) -> bool {
use self::Conditional::*;
match (self, other) {
(Some(a), Some(b)) => a.eq(b),
(None(a), None(b)) => a.eq(b),
_ => false,
}
}
}

impl<C, R> std::hash::Hash for Conditional<C, R>
where
C: std::hash::Hash + Clone + std::fmt::Debug,
R: std::hash::Hash + Clone + std::fmt::Debug,
{
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
Conditional::Some(c) => c.hash(state),
Conditional::None(r) => r.hash(state),
}
}
}
71 changes: 46 additions & 25 deletions proxy/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,34 @@ use tokio::{
net::{TcpListener, TcpStream, ConnectFuture},
reactor::Handle,
};

use conditional::Conditional;
use ctx::transport::TlsStatus;
use config::Addr;
use transport::{GetOriginalDst, Io, tls};

pub type PlaintextSocket = TcpStream;

pub struct BoundPort {
inner: std::net::TcpListener,
local_addr: SocketAddr,
}

/// Initiates a client connection to the given address.
pub fn connect(addr: &SocketAddr) -> Connecting {
Connecting {
inner: PlaintextSocket::connect(addr),
// TODO: when we can open TLS client connections, this is where we will
// indicate that for telemetry.
tls_status: TlsStatus::Disabled,
pub fn connect(addr: &SocketAddr,
tls: tls::ConditionalConnectionConfig<tls::ClientConfig>)
-> Connecting
{
Connecting::Plaintext {
connect: TcpStream::connect(addr),
tls: Some(tls),
}
}

/// A socket that is in the process of connecting.
pub struct Connecting {
inner: ConnectFuture,
tls_status: TlsStatus,
pub enum Connecting {
Plaintext {
connect: ConnectFuture,
tls: Option<tls::ConditionalConnectionConfig<tls::ClientConfig>>,
},
UpgradeToTls(tls::UpgradeClientToTls),
}

/// Abstracts a plaintext socket vs. a TLS decorated one.
Expand Down Expand Up @@ -138,7 +140,7 @@ impl BoundPort {
// libraries don't have the necessary API for that, so just
// do it here.
set_nodelay_or_warn(&socket);
let tls_status = if let Some((_identity, config_watch)) = &tls {
let why_no_tls = if let Some((_identity, config_watch)) = &tls {
// TODO: use `identity` to differentiate between TLS
// that the proxy should terminate vs. TLS that should
// be passed through.
Expand All @@ -150,12 +152,12 @@ impl BoundPort {
return Either::A(f);
} else {
// No valid TLS configuration.
TlsStatus::NoConfig
tls::ReasonForNoTls::NoConfig
}
} else {
TlsStatus::Disabled
tls::ReasonForNoTls::Disabled
};
let conn = Connection::new(socket, tls_status);
let conn = Connection::plain(socket, why_no_tls);
Either::B(future::ok((conn, remote_addr)))
})
.then(|r| {
Expand All @@ -181,28 +183,47 @@ impl Future for Connecting {
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let socket = try_ready!(self.inner.poll());
set_nodelay_or_warn(&socket);
Ok(Async::Ready(Connection::new(socket, self.tls_status)))
loop {
*self = match self {
Connecting::Plaintext { connect, tls } => {
let plaintext_stream = try_ready!(connect.poll());
set_nodelay_or_warn(&plaintext_stream);
match tls.take().expect("Polled after ready") {
Conditional::Some(config) => {
let upgrade_to_tls = tls::Connection::connect(
plaintext_stream, &config.identity, config.config);
Connecting::UpgradeToTls(upgrade_to_tls)
},
Conditional::None(why) => {
return Ok(Async::Ready(Connection::plain(plaintext_stream, why)));
},
}
},
Connecting::UpgradeToTls(upgrading) => {
let tls_stream = try_ready!(upgrading.poll());
return Ok(Async::Ready(Connection::tls(tls_stream)));
},
};
}
}
}

// ===== impl Connection =====

impl Connection {
fn new<I: Io + 'static>(io: I, tls_status: TlsStatus) -> Self {
fn plain(io: TcpStream, why_no_tls: tls::ReasonForNoTls) -> Self {
Connection {
io: Box::new(io),
peek_buf: BytesMut::new(),
tls_status,
tls_status: Conditional::None(why_no_tls),
}
}

fn tls(tls: tls::Connection) -> Self {
Connection {
fn tls<S: tls::Session + std::fmt::Debug + 'static>(tls: tls::Connection<S>) -> Self {
Connection {
io: Box::new(tls),
peek_buf: BytesMut::new(),
tls_status: TlsStatus::Success,
tls_status: Conditional::Some(()),
}
}

Expand Down Expand Up @@ -312,7 +333,7 @@ impl<T: Peek> Future for PeekFuture<T> {

// Misc.

fn set_nodelay_or_warn(socket: &PlaintextSocket) {
fn set_nodelay_or_warn(socket: &TcpStream) {
if let Err(e) = socket.set_nodelay(true) {
warn!(
"could not set TCP_NODELAY on {:?}/{:?}: {}",
Expand Down
27 changes: 18 additions & 9 deletions proxy/src/ctx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@
use config;
use std::time::SystemTime;
use std::sync::Arc;
use transport::tls;

pub mod http;
pub mod transport;

/// Describes a single running proxy instance.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug)]
pub struct Process {
/// Identifies the Kubernetes namespace in which this proxy is process.
pub scheduled_namespace: String,

pub start_time: SystemTime,

tls_client_config: tls::ClientConfigWatch,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the process context is the right place to put configurations. As per @olix0r in #1050 (comment), the ctx::* types were intended to be for describing a context, not configuring that context.

On the other hand, I'm not sure where else we'd put this, besides passing it around everywhere...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's true that this was initially intended to be "just the metadata" -- every ctx object throughout the proxy ends up with a reference back to this thing -- i don't yet have an opinion on whether there are better approaches for this.

Copy link
Contributor Author

@briansmith briansmith Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, I'm not sure where else we'd put this, besides passing it around everywhere...

Right. If we don't put it in ctx::Process then we end up Arcing more and IIRC we have to thread the watch through lots of things. ctx/mod.rs says it's also for "policy" so I think it's not the end of the world.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. at the very least, let's not let this block things

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that's fair, I just wanted to bring it up since Oliver mentioned something similar while reviewing my PR.

}

/// Indicates the orientation of traffic, relative to a sidecar proxy.
Expand All @@ -29,27 +33,30 @@ pub struct Process {
/// local instance.
/// - The _outbound_ proxy receives traffic from the local instance and forwards it to a
/// remove service.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug)]
pub enum Proxy {
Inbound(Arc<Process>),
Outbound(Arc<Process>),
}

impl Process {
#[cfg(test)]
// Test-only, but we can't use `#[cfg(test)]` because it is used by the
// benchmarks
pub fn test(ns: &str) -> Arc<Self> {
Arc::new(Self {
scheduled_namespace: ns.into(),
start_time: SystemTime::now(),
tls_client_config: tls::ClientConfig::no_tls(),
})
}

/// Construct a new `Process` from environment variables.
pub fn new(config: &config::Config) -> Arc<Self> {
pub fn new(config: &config::Config, tls_client_config: tls::ClientConfigWatch) -> Arc<Self> {
let start_time = SystemTime::now();
Arc::new(Self {
scheduled_namespace: config.namespaces.pod.clone(),
start_time,
tls_client_config,
})
}
}
Expand All @@ -73,6 +80,12 @@ impl Proxy {
pub fn is_outbound(&self) -> bool {
!self.is_inbound()
}

pub fn tls_client_config_watch(&self) -> &tls::ClientConfigWatch {
match self {
Proxy::Inbound(process) | Proxy::Outbound(process) => &process.tls_client_config
}
}
}

#[cfg(test)]
Expand All @@ -82,7 +95,6 @@ pub mod test_util {
fmt,
net::SocketAddr,
sync::Arc,
time::SystemTime,
};

use ctx;
Expand All @@ -94,10 +106,7 @@ pub mod test_util {
}

pub fn process() -> Arc<ctx::Process> {
Arc::new(ctx::Process {
scheduled_namespace: "test".into(),
start_time: SystemTime::now(),
})
ctx::Process::test("test")
}

pub fn server(
Expand Down
Loading