diff --git a/Cargo.toml b/Cargo.toml index 86fed2d..9cde8f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,9 @@ description = "The bare essentials of std::io for use in no_std. No alloc!" license = "Apache-2.0 OR MIT" edition = "2018" +[dependencies] +memchr = { version = "2", default-features = false, optional = true } + [features] -# default = ["std"] +default = ["memchr"] std = [] \ No newline at end of file diff --git a/rust-toolchain b/rust-toolchain new file mode 100644 index 0000000..bf867e0 --- /dev/null +++ b/rust-toolchain @@ -0,0 +1 @@ +nightly diff --git a/src/buffered.rs b/src/buffered.rs new file mode 100644 index 0000000..abd2aba --- /dev/null +++ b/src/buffered.rs @@ -0,0 +1,1111 @@ +//! Buffering wrappers for I/O traits + +use crate::{BufRead, Error, ErrorKind, Read, Result, Seek, SeekFrom, Write}; +use core::{cmp, fmt}; + +/// The `BufReader` struct adds buffering to any reader. +/// +/// It can be excessively inefficient to work directly with a [`Read`] instance. +/// For example, every call to [`read`][`TcpStream::read`] on [`TcpStream`] +/// results in a system call. A `BufReader` performs large, infrequent reads on +/// the underlying [`Read`] and maintains an in-memory buffer of the results. +/// +/// `BufReader` can improve the speed of programs that make *small* and +/// *repeated* read calls to the same file or network socket. It does not +/// help when reading very large amounts at once, or reading just one or a few +/// times. It also provides no advantage when reading from a source that is +/// already in memory, like a [`Vec`]``. +/// +/// When the `BufReader` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufReader` on the same +/// stream can cause data loss. Reading from the underlying reader after +/// unwrapping the `BufReader` with [`BufReader::into_inner`] can also cause +/// data loss. +/// +/// [`TcpStream::read`]: Read::read +/// [`TcpStream`]: crate::net::TcpStream +/// +/// # Examples +/// +/// ```no_run +/// use std::prelude::*; +/// use std::BufReader; +/// use std::fs::File; +/// +/// fn main() -> std::Result<()> { +/// let f = File::open("log.txt")?; +/// let mut reader = BufReader::new(f); +/// +/// let mut line = String::new(); +/// let len = reader.read_line(&mut line)?; +/// println!("First line is {} bytes long", len); +/// Ok(()) +/// } +/// ``` +pub struct BufReader { + inner: R, + buf: [u8; S], + pos: usize, + cap: usize, +} + +impl BufReader { + /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::Result<()> { + /// let f = File::open("log.txt")?; + /// let reader = BufReader::new(f); + /// Ok(()) + /// } + /// ``` + pub fn new(inner: R) -> BufReader { + BufReader { + inner, + buf: [0; S], + pos: 0, + cap: 0, + } + } +} + +impl BufReader { + /// Gets a reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::Result<()> { + /// let f1 = File::open("log.txt")?; + /// let reader = BufReader::new(f1); + /// + /// let f2 = reader.get_ref(); + /// Ok(()) + /// } + /// ``` + pub fn get_ref(&self) -> &R { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::Result<()> { + /// let f1 = File::open("log.txt")?; + /// let mut reader = BufReader::new(f1); + /// + /// let f2 = reader.get_mut(); + /// Ok(()) + /// } + /// ``` + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner + } + + /// Returns a reference to the internally buffered data. + /// + /// Unlike [`fill_buf`], this will not attempt to fill the buffer if it is empty. + /// + /// [`fill_buf`]: BufRead::fill_buf + /// + /// # Examples + /// + /// ```no_run + /// use std::{BufReader, BufRead}; + /// use std::fs::File; + /// + /// fn main() -> std::Result<()> { + /// let f = File::open("log.txt")?; + /// let mut reader = BufReader::new(f); + /// assert!(reader.buffer().is_empty()); + /// + /// if reader.fill_buf()?.len() > 0 { + /// assert!(!reader.buffer().is_empty()); + /// } + /// Ok(()) + /// } + /// ``` + pub fn buffer(&self) -> &[u8] { + &self.buf[self.pos..self.cap] + } + + /// Returns the number of bytes the internal buffer can hold at once. + /// + /// # Examples + /// + /// ```no_run + /// use std::{BufReader, BufRead}; + /// use std::fs::File; + /// + /// fn main() -> std::Result<()> { + /// let f = File::open("log.txt")?; + /// let mut reader = BufReader::new(f); + /// + /// let capacity = reader.capacity(); + /// let buffer = reader.fill_buf()?; + /// assert!(buffer.len() <= capacity); + /// Ok(()) + /// } + /// ``` + pub fn capacity(&self) -> usize { + S + } + + /// Unwraps this `BufReader`, returning the underlying reader. + /// + /// Note that any leftover data in the internal buffer is lost. Therefore, + /// a following read from the underlying reader may lead to data loss. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::Result<()> { + /// let f1 = File::open("log.txt")?; + /// let reader = BufReader::new(f1); + /// + /// let f2 = reader.into_inner(); + /// Ok(()) + /// } + /// ``` + pub fn into_inner(self) -> R { + self.inner + } + + /// Invalidates all data in the internal buffer. + #[inline] + fn discard_buffer(&mut self) { + self.pos = 0; + self.cap = 0; + } +} + +impl Read for BufReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.pos == self.cap && buf.len() >= S { + self.discard_buffer(); + return self.inner.read(buf); + } + let nread = { + let mut rem = self.fill_buf()?; + rem.read(buf)? + }; + self.consume(nread); + Ok(nread) + } +} + +impl BufRead for BufReader { + fn fill_buf(&mut self) -> Result<&[u8]> { + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the underlying reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + if self.pos >= self.cap { + debug_assert!(self.pos == self.cap); + self.cap = self.inner.read(&mut self.buf)?; + self.pos = 0; + } + Ok(&self.buf[self.pos..self.cap]) + } + + fn consume(&mut self, amt: usize) { + self.pos = cmp::min(self.pos + amt, self.cap); + } +} + +impl fmt::Debug for BufReader +where + R: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BufReader") + .field("reader", &self.inner) + .field("buffer", &format_args!("{}/{}", self.cap - self.pos, S)) + .finish() + } +} + +impl Seek for BufReader { + /// Seek to an offset, in bytes, in the underlying reader. + /// + /// The position used for seeking with [`SeekFrom::Current`]`(_)` is the + /// position the underlying reader would be at if the `BufReader` had no + /// internal buffer. + /// + /// Seeking always discards the internal buffer, even if the seek position + /// would otherwise fall within it. This guarantees that calling + /// [`BufReader::into_inner()`] immediately after a seek yields the underlying reader + /// at the same position. + /// + /// To seek without discarding the internal buffer, use [`BufReader::seek_relative`]. + /// + /// See [`std::Seek`] for more details. + /// + /// Note: In the edge case where you're seeking with [`SeekFrom::Current`]`(n)` + /// where `n` minus the internal buffer length overflows an `i64`, two + /// seeks will be performed instead of one. If the second seek returns + /// [`Err`], the underlying reader will be left at the same position it would + /// have if you called `seek` with [`SeekFrom::Current`]`(0)`. + /// + /// [`std::Seek`]: Seek + fn seek(&mut self, pos: SeekFrom) -> Result { + let result: u64; + if let SeekFrom::Current(n) = pos { + let remainder = (self.cap - self.pos) as i64; + // it should be safe to assume that remainder fits within an i64 as the alternative + // means we managed to allocate 8 exbibytes and that's absurd. + // But it's not out of the realm of possibility for some weird underlying reader to + // support seeking by i64::MIN so we need to handle underflow when subtracting + // remainder. + if let Some(offset) = n.checked_sub(remainder) { + result = self.inner.seek(SeekFrom::Current(offset))?; + } else { + // seek backwards by our remainder, and then by the offset + self.inner.seek(SeekFrom::Current(-remainder))?; + self.discard_buffer(); + result = self.inner.seek(SeekFrom::Current(n))?; + } + } else { + // Seeking with Start/End doesn't care about our buffer length. + result = self.inner.seek(pos)?; + } + self.discard_buffer(); + Ok(result) + } +} + +/// Wraps a writer and buffers its output. +/// +/// It can be excessively inefficient to work directly with something that +/// implements [`Write`]. For example, every call to +/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A +/// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying +/// writer in large, infrequent batches. +/// +/// `BufWriter` can improve the speed of programs that make *small* and +/// *repeated* write calls to the same file or network socket. It does not +/// help when writing very large amounts at once, or writing just one or a few +/// times. It also provides no advantage when writing to a destination that is +/// in memory, like a [`Vec`]`. +/// +/// It is critical to call [`flush`] before `BufWriter` is dropped. Though +/// dropping will attempt to flush the contents of the buffer, any errors +/// that happen in the process of dropping will be ignored. Calling [`flush`] +/// ensures that the buffer is empty and thus dropping will not even attempt +/// file operations. +/// +/// # Examples +/// +/// Let's write the numbers one through ten to a [`TcpStream`]: +/// +/// ```no_run +/// use std::prelude::*; +/// use std::net::TcpStream; +/// +/// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); +/// +/// for i in 0..10 { +/// stream.write(&[i+1]).unwrap(); +/// } +/// ``` +/// +/// Because we're not buffering, we write each one in turn, incurring the +/// overhead of a system call per byte written. We can fix this with a +/// `BufWriter`: +/// +/// ```no_run +/// use std::prelude::*; +/// use std::BufWriter; +/// use std::net::TcpStream; +/// +/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); +/// +/// for i in 0..10 { +/// stream.write(&[i+1]).unwrap(); +/// } +/// stream.flush().unwrap(); +/// ``` +/// +/// By wrapping the stream with a `BufWriter`, these ten writes are all grouped +/// together by the buffer and will all be written out in one system call when +/// the `stream` is flushed. +/// +/// [`TcpStream::write`]: Write::write +/// [`TcpStream`]: crate::net::TcpStream +/// [`flush`]: Write::flush +pub struct BufWriter { + inner: Option, + buf: [u8; S], + len: usize, + // #30888: If the inner writer panics in a call to write, we don't want to + // write the buffered data a second time in BufWriter's destructor. This + // flag tells the Drop impl if it should skip the flush. + panicked: bool, +} + +/// An error returned by [`BufWriter::into_inner`] which combines an error that +/// happened while writing out the buffer, and the buffered writer object +/// which may be used to recover from the condition. +/// +/// # Examples +/// +/// ```no_run +/// use std::BufWriter; +/// use std::net::TcpStream; +/// +/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); +/// +/// // do stuff with the stream +/// +/// // we want to get our `TcpStream` back, so let's try: +/// +/// let stream = match stream.into_inner() { +/// Ok(s) => s, +/// Err(e) => { +/// // Here, e is an IntoInnerError +/// panic!("An error occurred"); +/// } +/// }; +/// ``` +#[derive(Debug)] +pub struct IntoInnerError(W, Error); + +impl BufWriter +where + W: Write, +{ + /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// ``` + pub fn new(inner: W) -> BufWriter { + BufWriter { + inner: Some(inner), + buf: [0; S], + len: 0, + panicked: false, + } + } + + /// Send data in our local buffer into the inner writer, looping as + /// necessary until either it's all been sent or an error occurs. + /// + /// Because all the data in the buffer has been reported to our owner as + /// "successfully written" (by returning nonzero success values from + /// `write`), any 0-length writes from `inner` must be reported as i/o + /// errors from this method. + fn flush_buf(&mut self) -> Result<()> { + /// Helper struct to ensure the buffer is updated after all the writes + /// are complete. It tracks the number of written bytes and drains them + /// all from the front of the buffer when dropped. + struct BufGuard<'a, const S: usize> { + buffer: &'a mut [u8; S], + written: usize, + } + + impl<'a, const S: usize> BufGuard<'a, S> { + fn new(buffer: &'a mut [u8; S]) -> Self { + Self { buffer, written: 0 } + } + + /// The unwritten part of the buffer + fn remaining(&self) -> &[u8] { + &self.buffer[self.written..] + } + + /// Flag some bytes as removed from the front of the buffer + fn consume(&mut self, amt: usize) { + self.written += amt; + } + + /// true if all of the bytes have been written + fn done(&self) -> bool { + self.written >= self.buffer.len() + } + } + + impl Drop for BufGuard<'_, S> { + fn drop(&mut self) { + if self.written > 0 { + let mut new_buf = [0; S]; + new_buf.copy_from_slice(&self.buffer[self.written..]); + *self.buffer = new_buf; + } + } + } + + let mut guard = BufGuard::new(&mut self.buf); + let inner = self.inner.as_mut().unwrap(); + while !guard.done() { + self.panicked = true; + let r = inner.write(guard.remaining()); + self.panicked = false; + + match r { + Ok(0) => { + return Err(Error::new( + ErrorKind::WriteZero, + "failed to write the buffered data", + )); + } + Ok(n) => guard.consume(n), + Err(ref e) if e.kind() == ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + /// Buffer some data without flushing it, regardless of the size of the + /// data. Writes as much as possible without exceeding capacity. Returns + /// the number of bytes written. + fn write_to_buf(&mut self, buf: &[u8]) -> usize { + let available = S - self.len; + let amt_to_buffer = available.min(buf.len()); + (&mut self.buf[available..]).copy_from_slice(&buf[..amt_to_buffer]); + self.len += amt_to_buffer; + amt_to_buffer + } + + /// Gets a reference to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // we can use reference just like buffer + /// let reference = buffer.get_ref(); + /// ``` + pub fn get_ref(&self) -> &W { + self.inner.as_ref().unwrap() + } + + /// Gets a mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // we can use reference just like buffer + /// let reference = buffer.get_mut(); + /// ``` + pub fn get_mut(&mut self) -> &mut W { + self.inner.as_mut().unwrap() + } + + /// Returns a reference to the internally buffered data. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufWriter; + /// use std::net::TcpStream; + /// + /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // See how many bytes are currently buffered + /// let bytes_buffered = buf_writer.buffer().len(); + /// ``` + pub fn buffer(&self) -> &[u8] { + &self.buf + } + + /// Returns the number of bytes the internal buffer can hold without flushing. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufWriter; + /// use std::net::TcpStream; + /// + /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // Check the capacity of the inner buffer + /// let capacity = buf_writer.capacity(); + /// // Calculate how many bytes can be written without flushing + /// let without_flush = capacity - buf_writer.buffer().len(); + /// ``` + pub fn capacity(&self) -> usize { + S + } + + /// Unwraps this `BufWriter`, returning the underlying writer. + /// + /// The buffer is written out before returning the writer. + /// + /// # Errors + /// + /// An [`Err`] will be returned if an error occurs while flushing the buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // unwrap the TcpStream and flush the buffer + /// let stream = buffer.into_inner().unwrap(); + /// ``` + pub fn into_inner(mut self) -> core::result::Result>> { + match self.flush_buf() { + Err(e) => Err(IntoInnerError(self, e)), + Ok(()) => Ok(self.inner.take().unwrap()), + } + } +} + +impl Write for BufWriter { + fn write(&mut self, buf: &[u8]) -> Result { + if self.len + buf.len() > S { + self.flush_buf()?; + } + // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 + if buf.len() >= S { + self.panicked = true; + let r = self.get_mut().write(buf); + self.panicked = false; + r + } else { + self.buf.copy_from_slice(buf); + Ok(buf.len()) + } + } + + fn write_all(&mut self, buf: &[u8]) -> Result<()> { + // Normally, `write_all` just calls `write` in a loop. We can do better + // by calling `self.get_mut().write_all()` directly, which avoids + // round trips through the buffer in the event of a series of partial + // writes in some circumstances. + if self.len + buf.len() > S { + self.flush_buf()?; + } + // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 + if buf.len() >= S { + self.panicked = true; + let r = self.get_mut().write_all(buf); + self.panicked = false; + r + } else { + self.buf.copy_from_slice(buf); + Ok(()) + } + } + + fn flush(&mut self) -> Result<()> { + self.flush_buf().and_then(|()| self.get_mut().flush()) + } +} + +impl fmt::Debug for BufWriter +where + W: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BufWriter") + .field("writer", &self.inner.as_ref().unwrap()) + .field("buffer", &format_args!("{}/{}", self.buf.len(), S)) + .finish() + } +} + +impl Seek for BufWriter { + /// Seek to the offset, in bytes, in the underlying writer. + /// + /// Seeking always writes out the internal buffer before seeking. + fn seek(&mut self, pos: SeekFrom) -> Result { + self.flush_buf()?; + self.get_mut().seek(pos) + } +} + +impl Drop for BufWriter { + fn drop(&mut self) { + if self.inner.is_some() && !self.panicked { + // dtors should not panic, so we ignore a failed flush + let _r = self.flush_buf(); + } + } +} + +impl IntoInnerError { + /// Returns the error which caused the call to [`BufWriter::into_inner()`] + /// to fail. + /// + /// This error was returned when attempting to write the internal buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // do stuff with the stream + /// + /// // we want to get our `TcpStream` back, so let's try: + /// + /// let stream = match stream.into_inner() { + /// Ok(s) => s, + /// Err(e) => { + /// // Here, e is an IntoInnerError, let's log the inner error. + /// // + /// // We'll just 'log' to stdout for this example. + /// println!("{}", e.error()); + /// + /// panic!("An unexpected error occurred."); + /// } + /// }; + /// ``` + pub fn error(&self) -> &Error { + &self.1 + } + + /// Returns the buffered writer instance which generated the error. + /// + /// The returned object can be used for error recovery, such as + /// re-inspecting the buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // do stuff with the stream + /// + /// // we want to get our `TcpStream` back, so let's try: + /// + /// let stream = match stream.into_inner() { + /// Ok(s) => s, + /// Err(e) => { + /// // Here, e is an IntoInnerError, let's re-examine the buffer: + /// let buffer = e.into_inner(); + /// + /// // do stuff to try to recover + /// + /// // afterwards, let's just return the stream + /// buffer.into_inner().unwrap() + /// } + /// }; + /// ``` + pub fn into_inner(self) -> W { + self.0 + } +} + +impl From> for Error { + fn from(iie: IntoInnerError) -> Error { + iie.1 + } +} + +impl fmt::Display for IntoInnerError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.error().fmt(f) + } +} + +/// Private helper struct for implementing the line-buffered writing logic. +/// This shim temporarily wraps a BufWriter, and uses its internals to +/// implement a line-buffered writer (specifically by using the internal +/// methods like write_to_buf and flush_buf). In this way, a more +/// efficient abstraction can be created than one that only had access to +/// `write` and `flush`, without needlessly duplicating a lot of the +/// implementation details of BufWriter. This also allows existing +/// `BufWriters` to be temporarily given line-buffering logic; this is what +/// enables Stdout to be alternately in line-buffered or block-buffered mode. +#[derive(Debug)] +pub(super) struct LineWriterShim<'a, W: Write, const S: usize> { + buffer: &'a mut BufWriter, +} + +impl<'a, W: Write, const S: usize> LineWriterShim<'a, W, S> { + pub fn new(buffer: &'a mut BufWriter) -> Self { + Self { buffer } + } + + /// Get a mutable reference to the inner writer (that is, the writer + /// wrapped by the BufWriter). Be careful with this writer, as writes to + /// it will bypass the buffer. + fn inner_mut(&mut self) -> &mut W { + self.buffer.get_mut() + } + + /// Get the content currently buffered in self.buffer + fn buffered(&self) -> &[u8] { + self.buffer.buffer() + } + + /// Flush the buffer iff the last byte is a newline (indicating that an + /// earlier write only succeeded partially, and we want to retry flushing + /// the buffered line before continuing with a subsequent write) + fn flush_if_completed_line(&mut self) -> Result<()> { + match self.buffered().last().copied() { + Some(b'\n') => self.buffer.flush_buf(), + _ => Ok(()), + } + } +} + +impl<'a, W: Write, const S: usize> Write for LineWriterShim<'a, W, S> { + /// Write some data into this BufReader with line buffering. This means + /// that, if any newlines are present in the data, the data up to the last + /// newline is sent directly to the underlying writer, and data after it + /// is buffered. Returns the number of bytes written. + /// + /// This function operates on a "best effort basis"; in keeping with the + /// convention of `Write::write`, it makes at most one attempt to write + /// new data to the underlying writer. If that write only reports a partial + /// success, the remaining data will be buffered. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it ends with a + /// newline, even if the incoming data does not contain any newlines. + fn write(&mut self, buf: &[u8]) -> Result { + let newline_idx = match memchr::memrchr(b'\n', buf) { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write (which may flush if + // we exceed the inner buffer's size) + None => { + self.flush_if_completed_line()?; + return self.buffer.write(buf); + } + // Otherwise, arrange for the lines to be written directly to the + // inner writer. + Some(newline_idx) => newline_idx + 1, + }; + + // Flush existing content to prepare for our write. We have to do this + // before attempting to write `buf` in order to maintain consistency; + // if we add `buf` to the buffer then try to flush it all at once, + // we're obligated to return Ok(), which would mean suppressing any + // errors that occur during flush. + self.buffer.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let lines = &buf[..newline_idx]; + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.buffer.panicked here. + let flushed = self.inner_mut().write(lines)?; + + // If buffer returns Ok(0), propagate that to the caller without + // doing additional buffering; otherwise we're just guaranteeing + // an "ErrorKind::WriteZero" later. + if flushed == 0 { + return Ok(0); + } + + // Now that the write has succeeded, buffer the rest (or as much of + // the rest as possible). If there were any unwritten newlines, we + // only buffer out to the last unwritten newline that fits in the + // buffer; this helps prevent flushing partial lines on subsequent + // calls to LineWriterShim::write. + + // Handle the cases in order of most-common to least-common, under + // the presumption that most writes succeed in totality, and that most + // writes are smaller than the buffer. + // - Is this a partial line (ie, no newlines left in the unwritten tail) + // - If not, does the data out to the last unwritten newline fit in + // the buffer? + // - If not, scan for the last newline that *does* fit in the buffer + let tail = if flushed >= newline_idx { + &buf[flushed..] + } else if newline_idx - flushed <= self.buffer.capacity() { + &buf[flushed..newline_idx] + } else { + let scan_area = &buf[flushed..]; + let scan_area = &scan_area[..self.buffer.capacity()]; + match memchr::memrchr(b'\n', scan_area) { + Some(newline_idx) => &scan_area[..newline_idx + 1], + None => scan_area, + } + }; + + let buffered = self.buffer.write_to_buf(tail); + Ok(flushed + buffered) + } + + fn flush(&mut self) -> Result<()> { + self.buffer.flush() + } + + /// Write some data into this BufReader with line buffering. This means + /// that, if any newlines are present in the data, the data up to the last + /// newline is sent directly to the underlying writer, and data after it + /// is buffered. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines, even if the incoming data does not contain any newlines. + fn write_all(&mut self, buf: &[u8]) -> Result<()> { + match memchr::memrchr(b'\n', buf) { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write (which may flush if + // we exceed the inner buffer's size) + None => { + self.flush_if_completed_line()?; + self.buffer.write_all(buf) + } + Some(newline_idx) => { + let (lines, tail) = buf.split_at(newline_idx + 1); + + if self.buffered().is_empty() { + self.inner_mut().write_all(lines)?; + } else { + // If there is any buffered data, we add the incoming lines + // to that buffer before flushing, which saves us at least + // one write call. We can't really do this with `write`, + // since we can't do this *and* not suppress errors *and* + // report a consistent state to the caller in a return + // value, but here in write_all it's fine. + self.buffer.write_all(lines)?; + self.buffer.flush_buf()?; + } + + self.buffer.write_all(tail) + } + } + } +} + +/// Wraps a writer and buffers output to it, flushing whenever a newline +/// (`0x0a`, `'\n'`) is detected. +/// +/// The [`BufWriter`] struct wraps a writer and buffers its output. +/// But it only does this batched write when it goes out of scope, or when the +/// internal buffer is full. Sometimes, you'd prefer to write each line as it's +/// completed, rather than the entire buffer at once. Enter `LineWriter`. It +/// does exactly that. +/// +/// Like [`BufWriter`], a `LineWriter`’s buffer will also be flushed when the +/// `LineWriter` goes out of scope or when its internal buffer is full. +/// +/// If there's still a partial line in the buffer when the `LineWriter` is +/// dropped, it will flush those contents. +/// +/// # Examples +/// +/// We can use `LineWriter` to write one line at a time, significantly +/// reducing the number of actual writes to the file. +/// +/// ```no_run +/// use std::fs::{self, File}; +/// use std::prelude::*; +/// use std::LineWriter; +/// +/// fn main() -> std::Result<()> { +/// let road_not_taken = b"I shall be telling this with a sigh +/// Somewhere ages and ages hence: +/// Two roads diverged in a wood, and I - +/// I took the one less traveled by, +/// And that has made all the difference."; +/// +/// let file = File::create("poem.txt")?; +/// let mut file = LineWriter::new(file); +/// +/// file.write_all(b"I shall be telling this with a sigh")?; +/// +/// // No bytes are written until a newline is encountered (or +/// // the internal buffer is filled). +/// assert_eq!(fs::read_to_string("poem.txt")?, ""); +/// file.write_all(b"\n")?; +/// assert_eq!( +/// fs::read_to_string("poem.txt")?, +/// "I shall be telling this with a sigh\n", +/// ); +/// +/// // Write the rest of the poem. +/// file.write_all(b"Somewhere ages and ages hence: +/// Two roads diverged in a wood, and I - +/// I took the one less traveled by, +/// And that has made all the difference.")?; +/// +/// // The last line of the poem doesn't end in a newline, so +/// // we have to flush or drop the `LineWriter` to finish +/// // writing. +/// file.flush()?; +/// +/// // Confirm the whole poem was written. +/// assert_eq!(fs::read("poem.txt")?, &road_not_taken[..]); +/// Ok(()) +/// } +/// ``` +pub struct LineWriter { + inner: BufWriter, +} + +impl LineWriter { + /// Creates a new `LineWriter`. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::LineWriter; + /// + /// fn main() -> std::Result<()> { + /// let file = File::create("poem.txt")?; + /// let file = LineWriter::new(file); + /// Ok(()) + /// } + /// ``` + pub fn new(inner: W) -> LineWriter { + LineWriter { + inner: BufWriter::new(inner), + } + } + + /// Gets a reference to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::LineWriter; + /// + /// fn main() -> std::Result<()> { + /// let file = File::create("poem.txt")?; + /// let file = LineWriter::new(file); + /// + /// let reference = file.get_ref(); + /// Ok(()) + /// } + /// ``` + pub fn get_ref(&self) -> &W { + self.inner.get_ref() + } + + /// Gets a mutable reference to the underlying writer. + /// + /// Caution must be taken when calling methods on the mutable reference + /// returned as extra writes could corrupt the output stream. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::LineWriter; + /// + /// fn main() -> std::Result<()> { + /// let file = File::create("poem.txt")?; + /// let mut file = LineWriter::new(file); + /// + /// // we can use reference just like file + /// let reference = file.get_mut(); + /// Ok(()) + /// } + /// ``` + pub fn get_mut(&mut self) -> &mut W { + self.inner.get_mut() + } + + /// Unwraps this `LineWriter`, returning the underlying writer. + /// + /// The internal buffer is written out before returning the writer. + /// + /// # Errors + /// + /// An [`Err`] will be returned if an error occurs while flushing the buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::LineWriter; + /// + /// fn main() -> std::Result<()> { + /// let file = File::create("poem.txt")?; + /// + /// let writer: LineWriter = LineWriter::new(file); + /// + /// let file: File = writer.into_inner()?; + /// Ok(()) + /// } + /// ``` + pub fn into_inner(self) -> core::result::Result>> { + self.inner + .into_inner() + .map_err(|IntoInnerError(buf, e)| IntoInnerError(LineWriter { inner: buf }, e)) + } +} + +impl Write for LineWriter { + fn write(&mut self, buf: &[u8]) -> Result { + LineWriterShim::new(&mut self.inner).write(buf) + } + + fn flush(&mut self) -> Result<()> { + self.inner.flush() + } + + fn write_all(&mut self, buf: &[u8]) -> Result<()> { + LineWriterShim::new(&mut self.inner).write_all(buf) + } + + fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> Result<()> { + LineWriterShim::new(&mut self.inner).write_fmt(fmt) + } +} + +impl fmt::Debug for LineWriter +where + W: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("LineWriter") + .field("writer", &self.inner.inner) + .field("buffer", &format_args!("{}/{}", self.inner.len, S)) + .finish() + } +} diff --git a/src/impl.rs b/src/impl.rs index 00a9020..49c349c 100644 --- a/src/impl.rs +++ b/src/impl.rs @@ -71,7 +71,6 @@ use core::{cmp, fmt, slice}; /// [`std::io`]: self /// [`File`]: crate::fs::File /// [slice]: ../../std/primitive.slice.html -#[doc(spotlight)] pub trait Read { /// Pull some bytes from this source into the specified buffer, returning /// how many bytes were read. @@ -417,7 +416,6 @@ pub trait Read { /// `write` in a loop until its entire input has been written. /// /// [`write_all`]: Write::write_all -#[doc(spotlight)] pub trait Write { /// Write a buffer into this writer, returning how many bytes were written. /// diff --git a/src/lib.rs b/src/lib.rs index a21de18..7f6812d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,8 @@ +#![feature(min_const_generics)] #![cfg_attr(not(feature = "std"), no_std)] +#[cfg(not(feature = "std"))] +mod buffered; #[cfg(not(feature = "std"))] mod cursor; #[cfg(not(feature = "std"))] @@ -20,3 +23,6 @@ pub use r#impl::{BufRead, Bytes, Chain, Read, Seek, SeekFrom, Take, Write}; pub use std::io::{ BufRead, Bytes, Chain, Cursor, Error, ErrorKind, Read, Result, Seek, SeekFrom, Take, Write, }; + +// Use this crate's implementation on both std and no_std +pub use buffered::{BufReader, BufWriter, LineWriter};