From 7e31fd88a86ac032d05670ba4e293e3e5fcccbaf Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 18 Oct 2019 13:08:06 -0700 Subject: [PATCH] feat(server): change `http1_half_close` option default to disabled Detecting a read hangup is a useful way to determine that a connection has closed. It's also possible that a client shuts down its read half without closing the connection, but this is rarer. Thus, by default, hyper will now assume a read EOF means the connection has closed. BREAKING CHANGE: The server's behavior will now by default close connections when receiving a read EOF. To allow for clients to close the read half, call `http1_half_close(true)` when configuring a server. --- src/proto/h1/conn.rs | 23 ++++++++++++------ src/proto/h1/dispatch.rs | 14 +++++++++-- src/server/conn.rs | 15 ++++++------ src/server/mod.rs | 8 +++---- tests/server.rs | 52 +++++++++++++++++++++------------------- 5 files changed, 67 insertions(+), 45 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index aa4a42f647..edb55ab753 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -38,7 +38,7 @@ where I: AsyncRead + AsyncWrite + Unpin, Conn { io: Buffered::new(io), state: State { - allow_half_close: true, + allow_half_close: false, cached_headers: None, error: None, keep_alive: KA::Busy, @@ -76,8 +76,8 @@ where I: AsyncRead + AsyncWrite + Unpin, self.state.title_case_headers = true; } - pub(crate) fn set_disable_half_close(&mut self) { - self.state.allow_half_close = false; + pub(crate) fn set_allow_half_close(&mut self) { + self.state.allow_half_close = true; } pub fn into_inner(self) -> (I, Bytes) { @@ -172,7 +172,7 @@ where I: AsyncRead + AsyncWrite + Unpin, // message should be reported as an error. If not, it is just // the connection closing gracefully. let must_error = self.should_error_on_eof(); - self.state.close_read(); + self.close_read(); self.io.consume_leading_lines(); let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); if was_mid_parse || must_error { @@ -185,6 +185,7 @@ where I: AsyncRead + AsyncWrite + Unpin, } } else { debug!("read eof"); + self.close_write(); Poll::Ready(None) } } @@ -204,7 +205,7 @@ where I: AsyncRead + AsyncWrite + Unpin, None }) } else if slice.is_empty() { - error!("decode stream unexpectedly ended"); + error!("incoming body unexpectedly ended"); // This should be unreachable, since all 3 decoders // either set eof=true or return an Err when reading // an empty slice... @@ -216,7 +217,7 @@ where I: AsyncRead + AsyncWrite + Unpin, }, Poll::Pending => return Poll::Pending, Poll::Ready(Err(e)) => { - debug!("decode stream error: {}", e); + debug!("incoming body decode error: {}", e); (Reading::Closed, Poll::Ready(Some(Err(e)))) }, } @@ -294,6 +295,10 @@ where I: AsyncRead + AsyncWrite + Unpin, return Poll::Pending; } + if self.state.is_read_closed() { + return Poll::Ready(Err(crate::Error::new_incomplete())); + } + let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; if num_read == 0 { @@ -306,6 +311,8 @@ where I: AsyncRead + AsyncWrite + Unpin, } fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll> { + debug_assert!(!self.state.is_read_closed()); + let result = ready!(self.io.poll_read_from_io(cx)); Poll::Ready(result.map_err(|e| { trace!("force_io_read; io error = {:?}", e); @@ -619,8 +626,10 @@ where I: AsyncRead + AsyncWrite + Unpin, pub fn disable_keep_alive(&mut self) { if self.state.is_idle() { - self.state.close_read(); + trace!("disable_keep_alive; closing idle connection"); + self.state.close(); } else { + trace!("disable_keep_alive; in-progress connection"); self.state.disable_keep_alive(); } } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 0518c63298..4329208b10 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -60,7 +60,10 @@ where } pub fn disable_keep_alive(&mut self) { - self.conn.disable_keep_alive() + self.conn.disable_keep_alive(); + if self.conn.is_write_closed() { + self.close(); + } } pub fn into_inner(self) -> (I, Bytes, D) { @@ -233,10 +236,17 @@ where // if here, the dispatcher gave the user the error // somewhere else. we still need to shutdown, but // not as a second error. + self.close(); Poll::Ready(Ok(())) }, None => { - // read eof, conn will start to shutdown automatically + // read eof, the write side will have been closed too unless + // allow_read_close was set to true, in which case just do + // nothing... + debug_assert!(self.conn.is_read_closed()); + if self.conn.is_write_closed() { + self.close(); + } Poll::Ready(Ok(())) } } diff --git a/src/server/conn.rs b/src/server/conn.rs index 3940bfbb3d..01ec692c9e 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -194,7 +194,7 @@ impl Http { Http { exec: Exec::Default, - h1_half_close: true, + h1_half_close: false, h1_writev: true, h2_builder, mode: ConnectionMode::Fallback, @@ -221,12 +221,11 @@ impl Http { /// Set whether HTTP/1 connections should support half-closures. /// /// Clients can chose to shutdown their write-side while waiting - /// for the server to respond. Setting this to `false` will - /// automatically close any connection immediately if `read` - /// detects an EOF. + /// for the server to respond. Setting this to `true` will + /// prevent closing the connection immediately if `read` + /// detects an EOF in the middle of a request. /// - /// Default is `true`. - #[inline] + /// Default is `false`. pub fn http1_half_close(&mut self, val: bool) -> &mut Self { self.h1_half_close = val; self @@ -390,8 +389,8 @@ impl Http { if !self.keep_alive { conn.disable_keep_alive(); } - if !self.h1_half_close { - conn.set_disable_half_close(); + if self.h1_half_close { + conn.set_allow_half_close(); } if !self.h1_writev { conn.set_write_strategy_flatten(); diff --git a/src/server/mod.rs b/src/server/mod.rs index 31f508ca82..5d7ac6886a 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -252,11 +252,11 @@ impl Builder { /// Set whether HTTP/1 connections should support half-closures. /// /// Clients can chose to shutdown their write-side while waiting - /// for the server to respond. Setting this to `false` will - /// automatically close any connection immediately if `read` - /// detects an EOF. + /// for the server to respond. Setting this to `true` will + /// prevent closing the connection immediately if `read` + /// detects an EOF in the middle of a request. /// - /// Default is `true`. + /// Default is `false`. pub fn http1_half_close(mut self, val: bool) -> Self { self.protocol.http1_half_close(val); self diff --git a/tests/server.rs b/tests/server.rs index 0103f8734b..cf3bd2007c 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -945,6 +945,7 @@ fn disable_keep_alive_post_request() { #[test] fn empty_parse_eof_does_not_return_error() { + let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -983,13 +984,13 @@ fn nonempty_parse_eof_returns_error() { } #[test] -fn socket_half_closed() { +fn http1_allow_half_close() { let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); - thread::spawn(move || { + let t1 = thread::spawn(move || { let mut tcp = connect(&addr); tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); tcp.shutdown(::std::net::Shutdown::Write).expect("SHDN_WR"); @@ -1005,13 +1006,16 @@ fn socket_half_closed() { .map(Option::unwrap) .map_err(|_| unreachable!()) .and_then(|socket| { - Http::new().serve_connection(socket, service_fn(|_| { + Http::new() + .http1_half_close(true) + .serve_connection(socket, service_fn(|_| { tokio_timer::delay_for(Duration::from_millis(500)) .map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty()))) })) }); rt.block_on(fut).unwrap(); + t1.join().expect("client thread"); } #[test] @@ -1852,28 +1856,28 @@ impl tower_service::Service> for TestService { Ok(()).into() } - fn call(&mut self, req: Request) -> Self::Future { - let tx1 = self.tx.clone(); - let tx2 = self.tx.clone(); + fn call(&mut self, mut req: Request) -> Self::Future { + let tx = self.tx.clone(); let replies = self.reply.clone(); - req - .into_body() - .try_concat() - .map_ok(move |chunk| { - tx1.send(Msg::Chunk(chunk.to_vec())).unwrap(); - () - }) - .map(move |result| { - let msg = match result { - Ok(()) => Msg::End, - Err(e) => Msg::Error(e), - }; - tx2.send(msg).unwrap(); - }) - .map(move |_| { - TestService::build_reply(replies) - }) - .boxed() + hyper::rt::spawn(async move { + while let Some(chunk) = req.body_mut().next().await { + match chunk { + Ok(chunk) => { + tx.send(Msg::Chunk(chunk.to_vec())).unwrap(); + }, + Err(err) => { + tx.send(Msg::Error(err)).unwrap(); + return; + }, + } + } + + tx.send(Msg::End).unwrap(); + }); + + Box::pin(async move { + TestService::build_reply(replies) + }) } }