Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ability to read data directly from the underlying reader #783

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

### New Features

- [#623]: Added `Reader::stream()` that can be used to read arbitrary data
from the inner reader while track position for XML reader.

### Bug Fixes

### Misc Changes

[#623]: https://github.com/tafia/quick-xml/issues/623


## 0.36.0 -- 2024-07-08

Expand Down
48 changes: 46 additions & 2 deletions src/reader/async_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
//! as underlying byte stream. This reader fully implements async/await so reading
//! can use non-blocking I/O.

use tokio::io::{self, AsyncBufRead, AsyncBufReadExt};
use std::pin::Pin;
use std::task::{Context, Poll};

use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};

use crate::errors::{Error, Result, SyntaxError};
use crate::events::Event;
use crate::name::{QName, ResolveResult};
use crate::parser::{ElementParser, Parser, PiParser};
use crate::reader::buffered_reader::impl_buffered_source;
use crate::reader::{BangType, NsReader, ParseState, ReadTextResult, Reader, Span};
use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span};
use crate::utils::is_whitespace;

/// A struct for read XML asynchronously from an [`AsyncBufRead`].
Expand All @@ -24,6 +27,47 @@ impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {

////////////////////////////////////////////////////////////////////////////////////////////////////

impl<'r, R> AsyncRead for BinaryStream<'r, R>
where
R: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let start = buf.remaining();
let this = self.get_mut();
let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);

// If something was read, update offset
if let Poll::Ready(Ok(_)) = poll {
let amt = start - buf.remaining();
*this.offset += amt as u64;
}
poll
}
}

impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
where
R: AsyncBufRead + Unpin,
{
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
}

#[inline]
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.get_mut();
this.inner.consume(amt);
*this.offset += amt as u64;
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////

impl<R: AsyncBufRead + Unpin> Reader<R> {
/// An asynchronous version of [`read_event_into()`]. Reads the next event into
/// given buffer.
Expand Down
128 changes: 128 additions & 0 deletions src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,68 @@ impl EncodingRef {

////////////////////////////////////////////////////////////////////////////////////////////////////

/// A direct stream to the underlying [`Reader`]s reader which updates
/// [`Reader::buffer_position()`] when read from it.
#[derive(Debug)]
#[must_use = "streams do nothing unless read or polled"]
pub struct BinaryStream<'r, R> {
inner: &'r mut R,
offset: &'r mut u64,
}

impl<'r, R> BinaryStream<'r, R> {
/// Returns current position in bytes in the original source.
#[inline]
pub const fn offset(&self) -> u64 {
*self.offset
}

/// Gets a reference to the underlying reader.
#[inline]
pub const fn get_ref(&self) -> &R {
self.inner
}

/// Gets a mutable reference to the underlying reader.
///
/// Avoid read from this reader because this will not update reader's position
/// and will lead to incorrect positions of errors. Read from this stream instead.
#[inline]
pub fn get_mut(&mut self) -> &mut R {
self.inner
}
}

impl<'r, R> io::Read for BinaryStream<'r, R>
where
R: io::Read,
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let amt = self.inner.read(buf)?;
*self.offset += amt as u64;
Ok(amt)
}
}

impl<'r, R> io::BufRead for BinaryStream<'r, R>
where
R: io::BufRead,
{
#[inline]
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.inner.fill_buf()
}

#[inline]
fn consume(&mut self, amt: usize) {
self.inner.consume(amt);
*self.offset += amt as u64;
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////

/// A low level encoding-agnostic XML event reader.
///
/// Consumes bytes and streams XML [`Event`]s.
Expand Down Expand Up @@ -716,6 +778,12 @@ impl<R> Reader<R> {
}

/// Gets a mutable reference to the underlying reader.
///
/// Avoid read from this reader because this will not update reader's position
/// and will lead to incorrect positions of errors. If you want to read, use
/// [`stream()`] instead.
///
/// [`stream()`]: Self::stream
pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}
Expand Down Expand Up @@ -759,6 +827,66 @@ impl<R> Reader<R> {
pub const fn decoder(&self) -> Decoder {
self.state.decoder()
}

/// Get the direct access to the underlying reader, but tracks the amount of
/// read data and update [`Reader::buffer_position()`] accordingly.
///
/// Note, that this method gives you access to the internal reader and read
/// data will not be returned in any subsequent events read by `read_event`
/// family of methods.
///
/// # Example
///
/// This example demonstrates how to read stream raw bytes from an XML document.
/// This could be used to implement streaming read of text, or to read raw binary
/// bytes embedded in an XML document. (Documents with embedded raw bytes are not
/// valid XML, but XML-derived file formats exist where such documents are valid).
///
/// ```
/// # use pretty_assertions::assert_eq;
/// use std::io::{BufRead, Read};
/// use quick_xml::events::{BytesEnd, BytesStart, Event};
/// use quick_xml::reader::Reader;
///
/// let mut reader = Reader::from_str("<tag>binary << data&></tag>");
/// // ^ ^ ^ ^
/// // 0 5 21 27
///
/// assert_eq!(
/// (reader.read_event().unwrap(), reader.buffer_position()),
/// // 5 - end of the `<tag>`
/// (Event::Start(BytesStart::new("tag")), 5)
/// );
///
/// // Reading directly from underlying reader will not update position
/// // let mut inner = reader.get_mut();
///
/// // Reading from the stream() advances position
/// let mut inner = reader.stream();
///
/// // Read binary data. We must know its size
/// let mut binary = [0u8; 16];
/// inner.read_exact(&mut binary).unwrap();
/// assert_eq!(&binary, b"binary << data&>");
/// // 21 - end of the `binary << data&>`
/// assert_eq!(inner.offset(), 21);
/// assert_eq!(reader.buffer_position(), 21);
///
/// assert_eq!(
/// (reader.read_event().unwrap(), reader.buffer_position()),
/// // 27 - end of the `</tag>`
/// (Event::End(BytesEnd::new("tag")), 27)
/// );
///
/// assert_eq!(reader.read_event().unwrap(), Event::Eof);
/// ```
#[inline]
pub fn stream(&mut self) -> BinaryStream<R> {
BinaryStream {
inner: &mut self.reader,
offset: &mut self.state.offset,
}
}
}

/// Private sync reading methods
Expand Down
46 changes: 45 additions & 1 deletion tests/async-tokio.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::io::Cursor;
use std::iter;

use pretty_assertions::assert_eq;
use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event::*};
use quick_xml::name::QName;
use quick_xml::reader::Reader;
use tokio::io::BufReader;
use quick_xml::utils::Bytes;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};

// Import `small_buffers_tests!`
#[macro_use]
Expand Down Expand Up @@ -88,6 +90,48 @@ mod read_to_end {
}
}

#[tokio::test]
async fn issue623() {
let mut buf = Vec::new();
let mut reader = Reader::from_reader(Cursor::new(
b"
<AppendedData>
_binary << data&>
</AppendedData>
",
));
reader.config_mut().trim_text(true);

assert_eq!(
(
reader.read_event_into_async(&mut buf).await.unwrap(),
reader.buffer_position()
),
(Start(BytesStart::new("AppendedData")), 23)
);

let mut inner = reader.stream();
// Read to start of data marker
inner.read_until(b'_', &mut buf).await.unwrap();

// Read binary data. We must know its size
let mut binary = [0u8; 16];
inner.read_exact(&mut binary).await.unwrap();
assert_eq!(Bytes(&binary), Bytes(b"binary << data&>"));
assert_eq!(inner.offset(), 53);
assert_eq!(reader.buffer_position(), 53);

assert_eq!(
(
reader.read_event_into_async(&mut buf).await.unwrap(),
reader.buffer_position()
),
(End(BytesEnd::new("AppendedData")), 77)
);

assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Eof);
}

/// Regression test for https://github.com/tafia/quick-xml/issues/751
///
/// Actually, that error was not found in async reader, but we would to test it as well.
Expand Down
Loading