Skip to content

Commit

Permalink
convert some of the synchronous stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
james-rms committed Sep 27, 2024
1 parent fd06645 commit 4c33499
Showing 1 changed file with 32 additions and 45 deletions.
77 changes: 32 additions & 45 deletions rust/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use log::*;
use crate::{
io_utils::CountingCrcReader,
records::{self, op, Record},
sans_io::read::ReadAction,
Attachment, Channel, McapError, McapResult, Message, Schema, MAGIC,
};

Expand Down Expand Up @@ -448,9 +449,9 @@ fn read_record_from_chunk_stream<'a, R: Read>(r: &mut R) -> McapResult<records::

/// Like [`LinearReader`], but unpacks chunks' records into its stream
pub struct ChunkFlattener<'a> {
top_level: LinearReader<'a>,
dechunk: Option<ChunkReader<'a>>,
malformed: bool,
buf: &'a [u8],
pos: usize,
reader: crate::sans_io::read::RecordReader,
}

impl<'a> ChunkFlattener<'a> {
Expand All @@ -459,64 +460,50 @@ impl<'a> ChunkFlattener<'a> {
}

pub fn new_with_options(buf: &'a [u8], options: EnumSet<Options>) -> McapResult<Self> {
let top_level = LinearReader::new_with_options(buf, options)?;
Ok(Self {
top_level,
dechunk: None,
malformed: false,
buf,
pos: 0,
reader: crate::sans_io::read::RecordReader::new(),
})
}

fn bytes_remaining(&self) -> usize {
self.top_level.bytes_remaining()
self.buf.len() - self.pos
}
}

impl<'a> Iterator for ChunkFlattener<'a> {
type Item = McapResult<records::Record<'a>>;

fn next(&mut self) -> Option<Self::Item> {
if self.malformed {
return None;
}

let n: Option<Self::Item> = loop {
// If we're reading from a chunk, do that until it returns None.
if let Some(d) = &mut self.dechunk {
match d.next() {
Some(d) => break Some(d),
None => self.dechunk = None,
}
}
// Fall through - if we didn't extract a record from a chunk
// (or that chunk ended), move on to the next top-level record.
match self.top_level.next() {
// If it's a chunk, get a new chunk reader going...
Some(Ok(Record::Chunk { header, data })) => {
// Chunks from the LinearReader will always borrow from the file.
// (Getting a normal reference to the underlying data back
// frees us from returning things that reference this local Cow.)
let data: &'a [u8] = match data {
Cow::Borrowed(b) => b,
Cow::Owned(_) => unreachable!(),
};

self.dechunk = match ChunkReader::new(header, data) {
Ok(d) => Some(d),
Err(e) => break Some(Err(e)),
loop {
match self.reader.next_action() {
Ok(ReadAction::GetRecord { data, opcode }) => {
if opcode == crate::records::op::DATA_END {
return None;
}
match parse_record(opcode, data) {
Ok(rec) => return Some(Ok(rec.into_owned())),
Err(err) => return Some(Err(err)),
};
// ...then continue the loop to get the first item from the chunk.
}
// If it's not a chunk, just yield it.
not_a_chunk => break not_a_chunk,
Ok(ReadAction::Fill(mut into_buf)) => {
let end = std::cmp::min(self.pos + into_buf.buf.len(), self.buf.len());
let src = &self.buf[self.pos..std::cmp::min(end, self.buf.len())];
let dst = &mut into_buf.buf[..src.len()];
let n = src.len();
if n > 0 {
dst.copy_from_slice(src);
}
into_buf.set_filled(n);
self.pos += n;
}
Ok(ReadAction::Finished) => {
return None;
}
Err(err) => return Some(Err(err)),
}
};

// Give up on errors
if matches!(n, Some(Err(_))) {
self.malformed = true;
}
n
}
}

Expand Down

0 comments on commit 4c33499

Please sign in to comment.