diff --git a/src/http/conn.rs b/src/http/conn.rs index 261b133ba8..4c8d63b967 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -47,6 +47,35 @@ where I: AsyncRead + AsyncWrite, } } + + fn poll2(&mut self) -> Poll, http::Chunk, ::Error>>, io::Error> { + trace!("Conn::poll()"); + + loop { + if self.is_read_closed() { + trace!("Conn::poll when closed"); + return Ok(Async::Ready(None)); + } else if self.can_read_head() { + return self.read_head(); + } else if self.can_write_continue() { + try_nb!(self.flush()); + } else if self.can_read_body() { + return self.read_body() + .map(|async| async.map(|chunk| Some(Frame::Body { + chunk: chunk + }))) + .or_else(|err| { + self.state.close_read(); + Ok(Async::Ready(Some(Frame::Error { error: err.into() }))) + }); + } else { + trace!("poll when on keep-alive"); + self.maybe_park_read(); + return Ok(Async::NotReady); + } + } + } + fn is_read_closed(&self) -> bool { self.state.is_read_closed() } @@ -89,12 +118,9 @@ where I: AsyncRead + AsyncWrite, self.state.close_read(); self.io.consume_leading_lines(); let was_mid_parse = !self.io.read_buf().is_empty(); - return if was_mid_parse { + return if was_mid_parse || must_respond_with_error { debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len()); Ok(Async::Ready(Some(Frame::Error { error: e }))) - } else if must_respond_with_error { - trace!("parse error with 0 input, err = {:?}", e); - Ok(Async::Ready(Some(Frame::Error { error: e }))) } else { debug!("read eof"); Ok(Async::Ready(None)) @@ -379,32 +405,12 @@ where I: AsyncRead + AsyncWrite, type Item = Frame, http::Chunk, ::Error>; type Error = io::Error; + #[inline] fn poll(&mut self) -> Poll, Self::Error> { - trace!("Conn::poll()"); - - loop { - if self.is_read_closed() { - trace!("Conn::poll when closed"); - return Ok(Async::Ready(None)); - } else if self.can_read_head() { - return self.read_head(); - } else if self.can_write_continue() { - try_nb!(self.flush()); - } else if self.can_read_body() { - return self.read_body() - .map(|async| async.map(|chunk| Some(Frame::Body { - chunk: chunk - }))) - .or_else(|err| { - self.state.close_read(); - Ok(Async::Ready(Some(Frame::Error { error: err.into() }))) - }); - } else { - trace!("poll when on keep-alive"); - self.maybe_park_read(); - return Ok(Async::NotReady); - } - } + self.poll2().map_err(|err| { + debug!("poll error: {}", err); + err + }) } } @@ -460,16 +466,22 @@ where I: AsyncRead + AsyncWrite, } + #[inline] fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { trace!("Conn::poll_complete()"); - let ret = self.flush(); - trace!("Conn::flush = {:?}", ret); - ret + self.flush().map_err(|err| { + debug!("error writing: {}", err); + err + }) } + #[inline] fn close(&mut self) -> Poll<(), Self::SinkError> { try_ready!(self.poll_complete()); - self.io.io_mut().shutdown() + self.io.io_mut().shutdown().map_err(|err| { + debug!("error closing: {}", err); + err + }) } }