Skip to content

Commit

Permalink
fix(lib): properly handle body streaming errors
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jan 11, 2018
1 parent 7888451 commit 7a48d0e
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 77 deletions.
35 changes: 24 additions & 11 deletions src/proto/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,18 +235,31 @@ where I: AsyncRead + AsyncWrite,

let (reading, ret) = match self.state.reading {
Reading::Body(ref mut decoder) => {
let slice = try_ready!(decoder.decode(&mut self.io));
if !slice.is_empty() {
return Ok(Async::Ready(Some(super::Chunk::from(slice))));
} else if decoder.is_eof() {
debug!("incoming body completed");
(Reading::KeepAlive, Ok(Async::Ready(None)))
} else {
trace!("decode stream unexpectedly ended");
//TODO: Should this return an UnexpectedEof?
(Reading::Closed, Ok(Async::Ready(None)))
match decoder.decode(&mut self.io) {
Ok(Async::Ready(slice)) => {
let chunk = if !slice.is_empty() {
Some(super::Chunk::from(slice))
} else {
None
};
let reading = if decoder.is_eof() {
debug!("incoming body completed");
Reading::KeepAlive
} else if chunk.is_some() {
Reading::Body(decoder.clone())
} else {
trace!("decode stream unexpectedly ended");
//TODO: Should this return an UnexpectedEof?

This comment has been minimized.

Copy link
@sfackler

sfackler Jan 11, 2018

Contributor

This would trigger if the server cuts off the response midway through? It should trigger an error then I think.

This comment has been minimized.

Copy link
@seanmonstar

seanmonstar Jan 11, 2018

Author Member

I actually believe this can be replaced with an unreachable, because the Decoder already has checks for zero reads and returns errors if required...

This comment has been minimized.

Copy link
@sfackler

sfackler Jan 11, 2018

Contributor

Ah cool

Reading::Closed
};
(reading, Ok(Async::Ready(chunk)))
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
trace!("decode stream error: {}", e);
(Reading::Closed, Err(e))
},
}

},
_ => unreachable!("read_body invalid state: {:?}", self.state.reading),
};
Expand Down
104 changes: 52 additions & 52 deletions src/proto/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,66 +85,25 @@ where
if self.is_closing {
return Ok(Async::Ready(()));
} else if self.conn.can_read_head() {
// can dispatch receive, or does it still care about, an incoming message?
match self.dispatch.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"),
Err(()) => {
trace!("dispatch no longer receiving messages");
self.close();
return Ok(Async::Ready(()));
}
}
// dispatch is ready for a message, try to read one
match self.conn.read_head() {
Ok(Async::Ready(Some((head, has_body)))) => {
let body = if has_body {
let (mut tx, rx) = super::body::channel();
let _ = tx.poll_ready(); // register this task if rx is dropped
self.body_tx = Some(tx);
Some(rx)
} else {
None
};
self.dispatch.recv_msg(Ok((head, body)))?;
},
Ok(Async::Ready(None)) => {
// read eof, conn will start to shutdown automatically
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("read_head error: {}", err);
self.dispatch.recv_msg(Err(err))?;
// if here, the dispatcher gave the user the error
// somewhere else. we still need to shutdown, but
// not as a second error.
return Ok(Async::Ready(()));
}
}
try_ready!(self.poll_read_head());
} else if self.conn.can_write_continue() {
try_nb!(self.conn.flush());
} else if let Some(mut body) = self.body_tx.take() {
let can_read_body = self.conn.can_read_body();
match body.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
self.body_tx = Some(body);
return Ok(Async::NotReady);
},
Err(_canceled) => {
// user doesn't care about the body
// so we should stop reading
if can_read_body {
if self.conn.can_read_body() {
match body.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
self.body_tx = Some(body);
return Ok(Async::NotReady);
},
Err(_canceled) => {
// user doesn't care about the body
// so we should stop reading
trace!("body receiver dropped before eof, closing");
self.conn.close_read();
return Ok(Async::Ready(()));
}
// else the conn body is done, and user dropped,
// so everything is fine!
}
}
if can_read_body {
match self.conn.read_body() {
Ok(Async::Ready(Some(chunk))) => {
match body.start_send(Ok(chunk)) {
Expand Down Expand Up @@ -183,6 +142,47 @@ where
}
}

fn poll_read_head(&mut self) -> Poll<(), ::Error> {
// can dispatch receive, or does it still care about, an incoming message?
match self.dispatch.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"),
Err(()) => {
trace!("dispatch no longer receiving messages");
self.close();
return Ok(Async::Ready(()));
}
}
// dispatch is ready for a message, try to read one
match self.conn.read_head() {
Ok(Async::Ready(Some((head, has_body)))) => {
let body = if has_body {
let (mut tx, rx) = super::body::channel();
let _ = tx.poll_ready(); // register this task if rx is dropped
self.body_tx = Some(tx);
Some(rx)
} else {
None
};
self.dispatch.recv_msg(Ok((head, body)))?;
Ok(Async::Ready(()))
},
Ok(Async::Ready(None)) => {
// read eof, conn will start to shutdown automatically
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
debug!("read_head error: {}", err);
self.dispatch.recv_msg(Err(err))?;
// if here, the dispatcher gave the user the error
// somewhere else. we still need to shutdown, but
// not as a second error.
Ok(Async::Ready(()))
}
}
}

fn poll_write(&mut self) -> Poll<(), ::Error> {
loop {
if self.is_closing {
Expand Down
25 changes: 19 additions & 6 deletions src/proto/h1/decode.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::error::Error as StdError;
use std::fmt;
use std::usize;
use std::io;
Expand Down Expand Up @@ -97,7 +98,7 @@ impl Decoder {
if num > *remaining {
*remaining = 0;
} else if num == 0 {
return Err(io::Error::new(io::ErrorKind::Other, "early eof"));
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, IncompleteBody));
} else {
*remaining -= num;
}
Expand Down Expand Up @@ -262,7 +263,7 @@ impl ChunkedState {

if count == 0 {
*rem = 0;
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof"));
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, IncompleteBody));
}
*buf = Some(slice);
*rem -= count as u64;
Expand Down Expand Up @@ -300,9 +301,23 @@ impl ChunkedState {
}
}

#[derive(Debug)]
struct IncompleteBody;

impl fmt::Display for IncompleteBody {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(self.description())
}
}

impl StdError for IncompleteBody {
fn description(&self) -> &str {
"end of file before message length reached"
}
}

#[cfg(test)]
mod tests {
use std::error::Error;
use std::io;
use std::io::Write;
use super::Decoder;
Expand Down Expand Up @@ -422,8 +437,7 @@ mod tests {
let mut decoder = Decoder::length(10);
assert_eq!(decoder.decode(&mut bytes).unwrap().unwrap().len(), 7);
let e = decoder.decode(&mut bytes).unwrap_err();
assert_eq!(e.kind(), io::ErrorKind::Other);
assert_eq!(e.description(), "early eof");
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
}

#[test]
Expand All @@ -436,7 +450,6 @@ mod tests {
assert_eq!(decoder.decode(&mut bytes).unwrap().unwrap().len(), 7);
let e = decoder.decode(&mut bytes).unwrap_err();
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
assert_eq!(e.description(), "early eof");
}

#[test]
Expand Down
56 changes: 48 additions & 8 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_core::net::TcpListener;
use tokio_core::reactor::{Core, Timeout};
use tokio_io::{AsyncRead, AsyncWrite};

use std::net::{TcpStream, SocketAddr};
use std::net::{TcpStream, Shutdown, SocketAddr};
use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
Expand Down Expand Up @@ -223,6 +223,26 @@ fn post_with_chunked_body() {
assert_eq!(server.body(), b"qwert");
}

#[test]
fn post_with_incomplete_body() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let server = serve();
let mut req = connect(server.addr());
req.write_all(b"\
POST / HTTP/1.1\r\n\
Host: example.domain\r\n\
Content-Length: 10\r\n\
\r\n\
12345\
").expect("write");
req.shutdown(Shutdown::Write).expect("shutdown write");

server.body_err();

req.read(&mut [0; 256]).expect("read");
}

#[test]
fn empty_response_chunked() {
let server = serve();
Expand Down Expand Up @@ -746,24 +766,34 @@ impl Serve {
}

pub fn remote_addr(&self) -> SocketAddr {
match self.msg_rx.try_recv() {
match self.msg_rx.recv() {
Ok(Msg::Addr(addr)) => addr,
other => panic!("expected remote addr, found: {:?}", other),
}
}

fn body(&self) -> Vec<u8> {
self.try_body().expect("body")
}

fn body_err(&self) -> hyper::Error {
self.try_body().expect_err("body_err")
}

fn try_body(&self) -> Result<Vec<u8>, hyper::Error> {
let mut buf = vec![];
loop {
match self.msg_rx.try_recv() {
match self.msg_rx.recv() {
Ok(Msg::Chunk(msg)) => {
buf.extend(&msg);
},
Ok(Msg::Addr(_)) => {},
Err(_) => break,
Ok(Msg::Error(e)) => return Err(e),
Ok(Msg::End) => break,
Err(e) => panic!("expected body, found: {:?}", e),
}
}
buf
Ok(buf)
}

fn reply(&self) -> ReplyBuilder {
Expand Down Expand Up @@ -821,6 +851,8 @@ enum Msg {
//Head(Request),
Addr(SocketAddr),
Chunk(Vec<u8>),
Error(hyper::Error),
End,
}

impl NewService for TestService {
Expand All @@ -841,15 +873,23 @@ impl Service for TestService {
type Error = hyper::Error;
type Future = Box<Future<Item=Response, Error=hyper::Error>>;
fn call(&self, req: Request) -> Self::Future {
let tx = self.tx.clone();
let tx1 = self.tx.clone();
let tx2 = self.tx.clone();

#[allow(deprecated)]
let remote_addr = req.remote_addr().expect("remote_addr");
tx.lock().unwrap().send(Msg::Addr(remote_addr)).unwrap();
tx1.lock().unwrap().send(Msg::Addr(remote_addr)).unwrap();

let replies = self.reply.clone();
Box::new(req.body().for_each(move |chunk| {
tx.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap();
tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap();
Ok(())
}).then(move |result| {
let msg = match result {
Ok(()) => Msg::End,
Err(e) => Msg::Error(e),
};
tx2.lock().unwrap().send(msg).unwrap();
Ok(())
}).map(move |_| {
let mut res = Response::new();
Expand Down

0 comments on commit 7a48d0e

Please sign in to comment.