Skip to content

Commit

Permalink
Don't use bool in reserve and use stack allocated buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
tzx committed Apr 16, 2023
1 parent 06c5025 commit be27479
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 30 deletions.
55 changes: 38 additions & 17 deletions tokio/src/io/util/read_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::marker::PhantomPinned;
use std::mem;
use std::mem::{self, MaybeUninit};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -67,41 +67,62 @@ fn poll_read_to_end<V: VecU8, R: AsyncRead + ?Sized>(
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
let try_small_read = buf.reserve(32);
// amount of data to return. When the vector is full with its starting
// capacity, we first try to read into a small buffer to see if we reached
// an EOF. This only happens when the starting capacity is >= NUM_BYTES, since
// we allocate at least NUM_BYTES each time. This avoids the unnecessary
// allocation that we attempt before reading into the vector.

// Get a ReadBuf into the vector.
let mut read_buf = buf.get_read_buf();
const NUM_BYTES: usize = 32;
let try_small_read = buf.try_small_read_first(NUM_BYTES);

// Get a ReadBuf into the vector.
let mut read_buf;
let poll_result;
let filled_before = read_buf.filled().len();
let filled_after;
if try_small_read {
let mut small_buf = Vec::with_capacity(32);
let mut small_read_buf = ReadBuf::new(&mut small_buf);

let n = if try_small_read {
// Read some bytes using a small read.
let mut small_buf: [MaybeUninit<u8>; NUM_BYTES] = [MaybeUninit::uninit(); NUM_BYTES];
let mut small_read_buf = ReadBuf::uninit(&mut small_buf);
poll_result = read.poll_read(cx, &mut small_read_buf);
let filled = small_read_buf.filled().len();
read_buf.put_slice(&small_buf[..filled]);
filled_after = filled_before + filled;
let to_write = small_read_buf.filled();

// Ensure we have enough space to fill our vector with what we read.
read_buf = buf.get_read_buf();
if to_write.len() > read_buf.remaining() {
buf.reserve(NUM_BYTES);
read_buf = buf.get_read_buf();
}
read_buf.put_slice(to_write);

to_write.len()
} else {
// Ensure we have enough space for reading.
buf.reserve(NUM_BYTES);
read_buf = buf.get_read_buf();

// Read data directly into vector.
let filled_before = read_buf.filled().len();
poll_result = read.poll_read(cx, &mut read_buf);
filled_after = read_buf.filled().len();

// Compute the number of bytes read.
read_buf.filled().len() - filled_before
};

// Update the length of the vector using the result of poll_read.
let read_buf_parts = into_read_buf_parts(read_buf);
buf.apply_read_buf(read_buf_parts);
let n = filled_after - filled_before;

match poll_result {
Poll::Pending => {
// In this case, nothing should have been read. However we still
// update the vector in case the poll_read call initialized parts of
// the vector's unused capacity.
debug_assert_eq!(filled_before, filled_after);
debug_assert_eq!(n, 0);
Poll::Pending
}
Poll::Ready(Err(err)) => {
debug_assert_eq!(filled_before, filled_after);
debug_assert_eq!(n, 0);
Poll::Ready(Err(err))
}
Poll::Ready(Ok(())) => Poll::Ready(Ok(n)),
Expand Down
21 changes: 11 additions & 10 deletions tokio/src/io/util/vec_with_initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,15 @@ where
}
}

// Returns a boolean telling the caller to try reading into a small local buffer first if true.
// Doing so would avoid overallocating when vec is filled to capacity and we reached EOF.
pub(crate) fn reserve(&mut self, num_bytes: usize) -> bool {
pub(crate) fn reserve(&mut self, num_bytes: usize) {
let vec = self.vec.as_mut();
if vec.capacity() - vec.len() >= num_bytes {
return false;
return;
}

if self.starting_capacity == vec.capacity() && self.starting_capacity >= num_bytes {
return true;
}

// SAFETY: Setting num_initialized to `vec.len()` is correct as
// `reserve` does not change the length of the vector.
self.num_initialized = vec.len();
vec.reserve(num_bytes);
false
}

#[cfg(feature = "io-util")]
Expand Down Expand Up @@ -121,6 +113,15 @@ where
vec.set_len(parts.len);
}
}

// Returns a boolean telling the caller to try reading into a small local buffer first if true.
// Doing so would avoid overallocating when vec is filled to capacity and we reached EOF.
pub(crate) fn try_small_read_first(&self, num_bytes: usize) -> bool {
let vec = self.vec.as_ref();
vec.capacity() - vec.len() < num_bytes
&& self.starting_capacity == vec.capacity()
&& self.starting_capacity >= num_bytes
}
}

pub(crate) struct ReadBufParts {
Expand Down
40 changes: 37 additions & 3 deletions tokio/tests/io_read_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,45 @@ async fn read_to_end_uninit() {

#[tokio::test]
async fn read_to_end_doesnt_grow_with_capacity() {
let bytes = b"imlargerthan32bytessoIcanhelpwiththetest";
let arr: Vec<u8> = (0..100).collect();

// We only test from 32 since we allocate at least 32 bytes each time
for len in 32..100 {
let bytes = &arr[..len];
for split in 0..len {
for cap in 0..101 {
let mut mock = if split == 0 {
Builder::new().read(bytes).build()
} else {
Builder::new()
.read(&bytes[..split])
.read(&bytes[split..])
.build()
};
let mut buf = Vec::with_capacity(cap);
AsyncReadExt::read_to_end(&mut mock, &mut buf)
.await
.unwrap();
// It has the right data.
assert_eq!(buf.as_slice(), bytes);
// Unless cap was smaller than length, then we did not reallocate.
if cap >= len {
assert_eq!(buf.capacity(), cap);
}
}
}
}
}

#[tokio::test]
async fn read_to_end_grows_capacity_if_unfit() {
let bytes = b"the_vector_startingcap_will_be_smaller";
let mut mock = Builder::new().read(bytes).build();
let mut buf = Vec::with_capacity(bytes.len());
let initial_capacity = bytes.len() - 4;
let mut buf = Vec::with_capacity(initial_capacity);
AsyncReadExt::read_to_end(&mut mock, &mut buf)
.await
.unwrap();
assert_eq!(bytes.len(), buf.capacity());
// *4 since it doubles when it doesn't fit and again when reaching EOF
assert_eq!(buf.capacity(), initial_capacity * 4);
}

0 comments on commit be27479

Please sign in to comment.