Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make read_event_into_async an async-fn #569

Merged
merged 6 commits into from
Mar 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions src/reader/async_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
//! as underlying byte stream. This reader fully implements async/await so reading
//! can use non-blocking I/O.

use std::future::Future;
use std::pin::Pin;
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt};

use crate::events::Event;
Expand Down Expand Up @@ -81,19 +79,17 @@ impl<R: AsyncBufRead + Unpin> Reader<R> {
/// ```
///
/// [`read_event_into()`]: Reader::read_event_into
pub fn read_event_into_async<'reader, 'b: 'reader>(
pub async fn read_event_into_async<'reader, 'b: 'reader>(
vilunov marked this conversation as resolved.
Show resolved Hide resolved
&'reader mut self,
buf: &'b mut Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<Event<'b>>> + 'reader>> {
Box::pin(async move {
read_event_impl!(
self, buf,
TokioAdapter(&mut self.reader),
read_until_open_async,
read_until_close_async,
await
)
})
mut buf: &'b mut Vec<u8>,
) -> Result<Event<'b>> {
read_event_impl!(
self, buf,
TokioAdapter(&mut self.reader),
read_until_open_async,
read_until_close_async,
await
)
}

/// An asynchronous version of [`read_to_end_into()`].
Expand Down Expand Up @@ -155,7 +151,13 @@ impl<R: AsyncBufRead + Unpin> Reader<R> {
}

/// Read until '<' is found, moves reader to an `OpenedTag` state and returns a `Text` event.
async fn read_until_open_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
///
/// Returns inner Ok if the loop should be broken and an event returned.
/// Returns inner Err with the same [buf] because Rust borrowck stumbles upon this case in particular.
vilunov marked this conversation as resolved.
Show resolved Hide resolved
async fn read_until_open_async<'b>(
&mut self,
buf: &'b mut Vec<u8>,
) -> Result<std::result::Result<Event<'b>, &'b mut Vec<u8>>> {
read_until_open!(self, buf, TokioAdapter(&mut self.reader), read_event_into_async, await)
}

Expand Down
67 changes: 41 additions & 26 deletions src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,34 +166,46 @@ macro_rules! read_event_impl {
$read_until_close:ident
$(, $await:ident)?
) => {{
let event = match $self.parser.state {
ParseState::Init => {
// If encoding set explicitly, we not need to detect it. For example,
// explicit UTF-8 set automatically if Reader was created using `from_str`.
// But we still need to remove BOM for consistency with no encoding
// feature enabled path
#[cfg(feature = "encoding")]
if let Some(encoding) = $reader.detect_encoding() $(.$await)? ? {
if $self.parser.encoding.can_be_refined() {
$self.parser.encoding = crate::reader::EncodingRef::BomDetected(encoding);
let event = loop {
match $self.parser.state {
ParseState::Init => {
// If encoding set explicitly, we not need to detect it. For example,
// explicit UTF-8 set automatically if Reader was created using `from_str`.
// But we still need to remove BOM for consistency with no encoding
// feature enabled path
#[cfg(feature = "encoding")]
if let Some(encoding) = $reader.detect_encoding() $(.$await)? ? {
if $self.parser.encoding.can_be_refined() {
$self.parser.encoding = crate::reader::EncodingRef::BomDetected(encoding);
}
}
}

// Removes UTF-8 BOM if it is present
#[cfg(not(feature = "encoding"))]
$reader.remove_utf8_bom() $(.$await)? ?;
// Removes UTF-8 BOM if it is present
#[cfg(not(feature = "encoding"))]
$reader.remove_utf8_bom() $(.$await)? ?;

$self.$read_until_open($buf) $(.$await)?
},
ParseState::ClosedTag => $self.$read_until_open($buf) $(.$await)?,
ParseState::OpenedTag => $self.$read_until_close($buf) $(.$await)?,
ParseState::Empty => $self.parser.close_expanded_empty(),
ParseState::Exit => return Ok(Event::Eof),
match $self.$read_until_open($buf) $(.$await)? {
Ok(Ok(ev)) => break Ok(ev),
Ok(Err(b)) => $buf = b,
Err(err) => break Err(err),
}
},
ParseState::ClosedTag => {
match $self.$read_until_open($buf) $(.$await)? {
Ok(Ok(ev)) => break Ok(ev),
Ok(Err(b)) => $buf = b,
Err(err) => break Err(err),
}
},
ParseState::OpenedTag => break $self.$read_until_close($buf) $(.$await)?,
ParseState::Empty => break $self.parser.close_expanded_empty(),
ParseState::Exit => break Ok(Event::Eof),
};
};
match event {
Err(_) | Ok(Event::Eof) => $self.parser.state = ParseState::Exit,
_ => {}
}
};
vilunov marked this conversation as resolved.
Show resolved Hide resolved
event
}};
}
Expand All @@ -213,15 +225,15 @@ macro_rules! read_until_open {

// If we already at the `<` symbol, do not try to return an empty Text event
if $reader.skip_one(b'<', &mut $self.parser.offset) $(.$await)? ? {
return $self.$read_event($buf) $(.$await)?;
return Ok(Err($buf));
}

match $reader
.read_bytes_until(b'<', $buf, &mut $self.parser.offset)
$(.$await)?
{
Ok(Some(bytes)) => $self.parser.read_text(bytes),
Ok(None) => Ok(Event::Eof),
Ok(Some(bytes)) => $self.parser.read_text(bytes).map(Ok),
Ok(None) => Ok(Ok(Event::Eof)),
Err(e) => Err(e),
}
}};
Expand Down Expand Up @@ -593,15 +605,18 @@ impl<R> Reader<R> {
/// Read text into the given buffer, and return an event that borrows from
/// either that buffer or from the input itself, based on the type of the
/// reader.
fn read_event_impl<'i, B>(&mut self, buf: B) -> Result<Event<'i>>
fn read_event_impl<'i, B>(&mut self, mut buf: B) -> Result<Event<'i>>
where
R: XmlSource<'i, B>,
{
read_event_impl!(self, buf, self.reader, read_until_open, read_until_close)
}

/// Read until '<' is found, moves reader to an `OpenedTag` state and returns a `Text` event.
fn read_until_open<'i, B>(&mut self, buf: B) -> Result<Event<'i>>
///
/// Returns inner Ok if the loop should be broken and an event returned.
/// Returns inner Err with the same [buf] because Rust borrowck stumbles upon this case in particular.
vilunov marked this conversation as resolved.
Show resolved Hide resolved
fn read_until_open<'i, B>(&mut self, buf: B) -> Result<std::result::Result<Event<'i>, B>>
where
R: XmlSource<'i, B>,
{
Expand Down