diff --git a/src/body/body.rs b/src/body/body.rs index e6e9f12c6d..4d20e4d1da 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -10,8 +10,7 @@ use http_body::{SizeHint, Body as HttpBody}; use http::HeaderMap; use crate::common::{Future, Never, Pin, Poll, task}; -use super::internal::{FullDataArg, FullDataRet}; -use super::{Chunk, Payload}; +use super::Chunk; use crate::upgrade::OnUpgrade; type BodySender = mpsc::Sender>; @@ -467,14 +466,25 @@ impl Sender { self.tx.poll_ready(cx).map_err(|_| crate::Error::new_closed()) } - /// Sends data on this channel. + /// Send data on this channel when it is ready. + pub async fn send_data(&mut self, chunk: Chunk) -> crate::Result<()> { + futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await?; + self.tx.try_send(Ok(chunk)).map_err(|_| crate::Error::new_closed()) + } + + /// Try to send data on this channel. /// - /// This should be called after `poll_ready` indicated the channel - /// could accept another `Chunk`. + /// # Errors /// /// Returns `Err(Chunk)` if the channel could not (currently) accept /// another `Chunk`. - pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> { + /// + /// # Note + /// + /// This is mostly useful for when trying to send from some other thread + /// that doesn't have an async context. If in an async context, prefer + /// [`send_data`][] instead. + pub fn try_send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> { self.tx .try_send(Ok(chunk)) .map_err(|err| err.into_inner().expect("just sent Ok")) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 3e56c68b95..fc68e4ac4b 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -5,7 +5,6 @@ use http::{Request, Response, StatusCode}; use tokio_io::{AsyncRead, AsyncWrite}; use crate::body::{Body, Payload}; -use crate::body::internal::FullDataArg; use crate::common::{Future, Never, Poll, Pin, Unpin, task}; use crate::proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead}; use super::Http1Transaction; @@ -169,7 +168,7 @@ where } match self.conn.poll_read_body(cx) { Poll::Ready(Some(Ok(chunk))) => { - match body.send_data(chunk) { + match body.try_send_data(chunk) { Ok(()) => { self.body_tx = Some(body); }, @@ -249,7 +248,7 @@ where return Poll::Ready(Ok(())); } else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() { if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) { - let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; + let (head, body) = msg.map_err(crate::Error::new_user_service)?; let body_type = if body.is_end_stream() { self.body_rx.set(None); diff --git a/tests/client.rs b/tests/client.rs index 7eed383bcb..756d5bf385 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1748,7 +1748,7 @@ mod conn { let (mut sender, body) = Body::channel(); let sender = thread::spawn(move || { - sender.send_data("hello".into()).ok().unwrap(); + sender.try_send_data("hello".into()).expect("try_send_data"); Runtime::new().unwrap().block_on(rx).unwrap(); sender.abort(); }); diff --git a/tests/server.rs b/tests/server.rs index d267cecf8d..d1d23efc34 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -101,7 +101,7 @@ mod response_body_lengths { }, Bd::Unknown(b) => { let (mut tx, body) = hyper::Body::channel(); - tx.send_data(b.into()).expect("send_data"); + tx.try_send_data(b.into()).expect("try_send_data"); reply.body_stream(body); b },