Skip to content

Commit

Permalink
feat(body): add body::aggregate and body::to_bytes functions
Browse files Browse the repository at this point in the history
Adds utility functions to `hyper::body` to help asynchronously
collecting all the buffers of some `HttpBody` into one.

- `aggregate` will collect all into an `impl Buf` without copying the
  contents. This is ideal if you don't need a contiguous buffer.
- `to_bytes` will copy all the data into a single contiguous `Bytes`
  buffer.
  • Loading branch information
seanmonstar committed Dec 6, 2019
1 parent 5a59875 commit 8ba9a8d
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 128 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ required-features = ["runtime"]
[[example]]
name = "client_json"
path = "examples/client_json.rs"
required-features = ["runtime", "stream"]
required-features = ["runtime"]

[[example]]
name = "echo"
Expand Down Expand Up @@ -162,6 +162,11 @@ path = "examples/web_api.rs"
required-features = ["runtime", "stream"]


[[bench]]
name = "body"
path = "benches/body.rs"
required-features = ["runtime", "stream"]

[[bench]]
name = "connect"
path = "benches/connect.rs"
Expand Down
89 changes: 89 additions & 0 deletions benches/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#![feature(test)]
#![deny(warnings)]

extern crate test;

use bytes::Buf;
use futures_util::stream;
use futures_util::StreamExt;
use hyper::body::Body;

macro_rules! bench_stream {
($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.expect("rt build");

let $total_ident: usize = $bytes * $count;
$bencher.bytes = $total_ident as u64;
let __s: &'static [&'static [u8]] = &[&[b'x'; $bytes] as &[u8]; $count] as _;

$bencher.iter(|| {
rt.block_on(async {
let $body_pat = Body::wrap_stream(
stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)),
);
$block;
});
});
}};
}

macro_rules! benches {
($($name:ident, $bytes:expr, $count:expr;)+) => (
mod aggregate {
use super::*;

$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, body, {
let buf = hyper::body::aggregate(body).await.unwrap();
assert_eq!(buf.remaining(), total);
});
}
)+
}

mod manual_into_vec {
use super::*;

$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, mut body, {
let mut vec = Vec::new();
while let Some(chunk) = body.next().await {
vec.extend_from_slice(&chunk.unwrap());
}
assert_eq!(vec.len(), total);
});
}
)+
}

mod to_bytes {
use super::*;

$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, body, {
let bytes = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(bytes.len(), total);
});
}
)+
}
)
}

// ===== Actual Benchmarks =====

benches! {
bytes_1_000_count_2, 1_000, 2;
bytes_1_000_count_10, 1_000, 10;
bytes_10_000_count_1, 10_000, 1;
bytes_10_000_count_10, 10_000, 10;
}
2 changes: 2 additions & 0 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
println!("Response: {}", res.status());
println!("Headers: {:#?}\n", res.headers());

// Stream the body, writing each chunk to stdout as we get it
// (instead of buffering and printing at the end).
while let Some(next) = res.body_mut().data().await {
let chunk = next?;
io::stdout().write_all(&chunk).await?;
Expand Down
15 changes: 7 additions & 8 deletions examples/client_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#[macro_use]
extern crate serde_derive;

use futures_util::StreamExt;
use bytes::buf::BufExt as _;
use hyper::Client;

// A simple type alias so as to DRY.
Expand All @@ -27,14 +27,13 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
let client = Client::new();

// Fetch the url...
let mut res = client.get(url).await?;
// asynchronously concatenate chunks of the body
let mut body = Vec::new();
while let Some(chunk) = res.body_mut().next().await {
body.extend_from_slice(&chunk?);
}
let res = client.get(url).await?;

// asynchronously aggregate the chunks of the body
let body = hyper::body::aggregate(res.into_body()).await?;

// try to parse as json with serde_json
let users = serde_json::from_slice(&body)?;
let users = serde_json::from_reader(body.reader())?;

Ok(users)
}
Expand Down
11 changes: 4 additions & 7 deletions examples/echo.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//#![deny(warnings)]
#![deny(warnings)]

use futures_util::{StreamExt, TryStreamExt};
use futures_util::TryStreamExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};

/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
async fn echo(mut req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
// Serve some instructions at /
(&Method::GET, "/") => Ok(Response::new(Body::from(
Expand Down Expand Up @@ -34,10 +34,7 @@ async fn echo(mut req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// So here we do `.await` on the future, waiting on concatenating the full body,
// then afterwards the content can be reversed. Only then can we return a `Response`.
(&Method::POST, "/echo/reversed") => {
let mut whole_body = Vec::new();
while let Some(chunk) = req.body_mut().next().await {
whole_body.extend_from_slice(&chunk?);
}
let whole_body = hyper::body::to_bytes(req.into_body()).await?;

let reversed_body = whole_body.iter().rev().cloned().collect::<Vec<u8>>();
Ok(Response::new(Body::from(reversed_body)))
Expand Down
8 changes: 2 additions & 6 deletions examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};

use futures_util::StreamExt;
use std::collections::HashMap;
use url::form_urlencoded;

Expand All @@ -13,15 +12,12 @@ static MISSING: &[u8] = b"Missing field";
static NOTNUMERIC: &[u8] = b"Number field is not numeric";

// Using service_fn, we can turn this function into a `Service`.
async fn param_example(mut req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(INDEX.into())),
(&Method::POST, "/post") => {
// Concatenate the body...
let mut b = Vec::new();
while let Some(chunk) = req.body_mut().next().await {
b.extend_from_slice(&chunk?);
}
let b = hyper::body::to_bytes(req.into_body()).await?;
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
Expand Down
26 changes: 13 additions & 13 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![deny(warnings)]

use futures_util::{StreamExt, TryStreamExt};
use bytes::buf::BufExt;
use futures_util::{stream, StreamExt};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};
Expand All @@ -24,25 +25,24 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo

let web_res = client.request(req).await?;
// Compare the JSON we sent (before) with what we received (after):
let body = Body::wrap_stream(web_res.into_body().map_ok(|b| {
format!(
"<b>POST request body</b>: {}<br><b>Response</b>: {}",
let before = stream::once(async {
Ok(format!(
"<b>POST request body</b>: {}<br><b>Response</b>: ",
POST_DATA,
std::str::from_utf8(&b).unwrap()
)
}));
.into())
});
let after = web_res.into_body();
let body = Body::wrap_stream(before.chain(after));

Ok(Response::new(body))
}

async fn api_post_response(mut req: Request<Body>) -> Result<Response<Body>> {
// Concatenate the body...
let mut whole_body = Vec::new();
while let Some(chunk) = req.body_mut().next().await {
whole_body.extend_from_slice(&chunk?);
}
async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> {
// Aggregate the body...
let whole_body = hyper::body::aggregate(req.into_body()).await?;
// Decode as JSON...
let mut data: serde_json::Value = serde_json::from_slice(&whole_body)?;
let mut data: serde_json::Value = serde_json::from_reader(whole_body.reader())?;
// Change the JSON...
data["test"] = serde_json::Value::from("test_value");
// And respond with the new JSON.
Expand Down
25 changes: 25 additions & 0 deletions src/body/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use bytes::Buf;

use super::HttpBody;
use crate::common::buf::BufList;

/// Aggregate the data buffers from a body asynchronously.
///
/// The returned `impl Buf` groups the `Buf`s from the `HttpBody` without
/// copying them. This is ideal if you don't require a contiguous buffer.
pub async fn aggregate<T>(body: T) -> Result<impl Buf, T::Error>
where
T: HttpBody,
{
let mut bufs = BufList::new();

futures_util::pin_mut!(body);
while let Some(buf) = body.data().await {
let buf = buf?;
if buf.has_remaining() {
bufs.push(buf);
}
}

Ok(bufs)
}
5 changes: 5 additions & 0 deletions src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
pub use bytes::{Buf, Bytes};
pub use http_body::Body as HttpBody;

pub use self::aggregate::aggregate;
pub use self::body::{Body, Sender};
pub use self::to_bytes::to_bytes;

pub(crate) use self::payload::Payload;

mod aggregate;
mod body;
mod payload;
mod to_bytes;

/// An optimization to try to take a full body if immediately available.
///
Expand Down
36 changes: 36 additions & 0 deletions src/body/to_bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use bytes::{Buf, BufMut, Bytes};

use super::HttpBody;

/// dox
pub async fn to_bytes<T>(body: T) -> Result<Bytes, T::Error>
where
T: HttpBody,
{
futures_util::pin_mut!(body);

// If there's only 1 chunk, we can just return Buf::to_bytes()
let mut first = if let Some(buf) = body.data().await {
buf?
} else {
return Ok(Bytes::new());
};

let second = if let Some(buf) = body.data().await {
buf?
} else {
return Ok(first.to_bytes());
};

// With more than 1 buf, we gotta flatten into a Vec first.
let cap = first.remaining() + second.remaining() + body.size_hint().lower() as usize;
let mut vec = Vec::with_capacity(cap);
vec.put(first);
vec.put(second);

while let Some(buf) = body.data().await {
vec.put(buf?);
}

Ok(vec.into())
}
Loading

0 comments on commit 8ba9a8d

Please sign in to comment.