Skip to content

Commit

Permalink
feat(transport): Expose tcp keepalive to clients & servers (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
zenria authored and LucioFranco committed Dec 6, 2019
1 parent 3387ef9 commit caccfad
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 9 deletions.
17 changes: 17 additions & 0 deletions tonic/src/transport/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct Endpoint {
Option<Arc<dyn Fn(&mut http::HeaderMap) + Send + Sync + 'static>>,
pub(super) init_stream_window_size: Option<u32>,
pub(super) init_connection_window_size: Option<u32>,
pub(super) tcp_keepalive: Option<Duration>,
}

impl Endpoint {
Expand Down Expand Up @@ -83,6 +84,21 @@ impl Endpoint {
}
}

/// Set whether TCP keepalive messages are enabled on accepted connections.
///
/// If `None` is specified, keepalive is disabled, otherwise the duration
/// specified will be the time to remain idle before sending TCP keepalive
/// probes.
///
/// Default is no keepalive (`None`)
///
pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
Endpoint {
tcp_keepalive,
..self
}
}

/// Apply a concurrency limit to each request.
///
/// ```
Expand Down Expand Up @@ -174,6 +190,7 @@ impl From<Uri> for Endpoint {
interceptor_headers: None,
init_stream_window_size: None,
init_connection_window_size: None,
tcp_keepalive: None,
}
}
}
Expand Down
23 changes: 21 additions & 2 deletions tonic/src/transport/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use hyper::{
server::{accept::Accept, conn},
Body,
};
use std::time::Duration;
use std::{
fmt,
future::Future,
Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct Server {
init_stream_window_size: Option<u32>,
init_connection_window_size: Option<u32>,
max_concurrent_streams: Option<u32>,
tcp_keepalive: Option<Duration>,
}

/// A stack based `Service` router.
Expand Down Expand Up @@ -147,6 +149,21 @@ impl Server {
}
}

/// Set whether TCP keepalive messages are enabled on accepted connections.
///
/// If `None` is specified, keepalive is disabled, otherwise the duration
/// specified will be the time to remain idle before sending TCP keepalive
/// probes.
///
/// Default is no keepalive (`None`)
///
pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
Server {
tcp_keepalive,
..self
}
}

/// Intercept the execution of gRPC methods.
///
/// ```
Expand Down Expand Up @@ -204,11 +221,12 @@ impl Server {
let init_connection_window_size = self.init_connection_window_size;
let init_stream_window_size = self.init_stream_window_size;
let max_concurrent_streams = self.max_concurrent_streams;
let tcp_keepalive = self.tcp_keepalive;
// let timeout = self.timeout.clone();

let incoming = hyper::server::accept::from_stream::<_, _, crate::Error>(
async_stream::try_stream! {
let mut tcp = TcpIncoming::bind(addr)?;
let mut tcp = TcpIncoming::bind(addr, tcp_keepalive)?;

while let Some(stream) = tcp.try_next().await? {
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -400,9 +418,10 @@ struct TcpIncoming {
}

impl TcpIncoming {
fn bind(addr: SocketAddr) -> Result<Self, crate::Error> {
fn bind(addr: SocketAddr, tcp_keepalive: Option<Duration>) -> Result<Self, crate::Error> {
let mut inner = conn::AddrIncoming::bind(&addr).map_err(Box::new)?;
inner.set_nodelay(true);
inner.set_keepalive(tcp_keepalive);

Ok(Self { inner })
}
Expand Down
4 changes: 2 additions & 2 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ pub(crate) struct Connection {
impl Connection {
pub(crate) async fn new(endpoint: Endpoint) -> Result<Self, crate::Error> {
#[cfg(feature = "tls")]
let connector = connector(endpoint.tls.clone());
let connector = connector(endpoint.tls.clone(), endpoint.tcp_keepalive);

#[cfg(not(feature = "tls"))]
let connector = connector();
let connector = connector(endpoint.tcp_keepalive);

let settings = Builder::new()
.http2_initial_stream_window_size(endpoint.init_stream_window_size)
Expand Down
12 changes: 7 additions & 5 deletions tonic/src/transport/service/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ use hyper::client::connect::HttpConnector;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tower_make::MakeConnection;
use tower_service::Service;

#[cfg(not(feature = "tls"))]
pub(crate) fn connector() -> HttpConnector {
pub(crate) fn connector(tcp_keepalive: Option<Duration>) -> HttpConnector {
let mut http = HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(true);
http.set_keepalive(tcp_keepalive);
http
}

#[cfg(feature = "tls")]
pub(crate) fn connector(tls: Option<TlsConnector>) -> Connector {
Connector::new(tls)
pub(crate) fn connector(tls: Option<TlsConnector>, tcp_keepalive: Option<Duration>) -> Connector {
Connector::new(tls, tcp_keepalive)
}

pub(crate) struct Connector {
Expand All @@ -30,11 +32,11 @@ pub(crate) struct Connector {

impl Connector {
#[cfg(feature = "tls")]
pub(crate) fn new(tls: Option<TlsConnector>) -> Self {
pub(crate) fn new(tls: Option<TlsConnector>, tcp_keepalive: Option<Duration>) -> Self {
let mut http = HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(true);

http.set_keepalive(tcp_keepalive);
Self { http, tls }
}
}
Expand Down

0 comments on commit caccfad

Please sign in to comment.