From 4c334999573586a82dbac4356b238598eea8924d Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 27 Sep 2024 20:30:00 +1000 Subject: [PATCH] convert some of the synchronous stuff --- rust/src/read.rs | 77 ++++++++++++++++++++---------------------------- 1 file changed, 32 insertions(+), 45 deletions(-) diff --git a/rust/src/read.rs b/rust/src/read.rs index 94d8d5360..b925caf8a 100644 --- a/rust/src/read.rs +++ b/rust/src/read.rs @@ -25,6 +25,7 @@ use log::*; use crate::{ io_utils::CountingCrcReader, records::{self, op, Record}, + sans_io::read::ReadAction, Attachment, Channel, McapError, McapResult, Message, Schema, MAGIC, }; @@ -448,9 +449,9 @@ fn read_record_from_chunk_stream<'a, R: Read>(r: &mut R) -> McapResult { - top_level: LinearReader<'a>, - dechunk: Option>, - malformed: bool, + buf: &'a [u8], + pos: usize, + reader: crate::sans_io::read::RecordReader, } impl<'a> ChunkFlattener<'a> { @@ -459,16 +460,15 @@ impl<'a> ChunkFlattener<'a> { } pub fn new_with_options(buf: &'a [u8], options: EnumSet) -> McapResult { - let top_level = LinearReader::new_with_options(buf, options)?; Ok(Self { - top_level, - dechunk: None, - malformed: false, + buf, + pos: 0, + reader: crate::sans_io::read::RecordReader::new(), }) } fn bytes_remaining(&self) -> usize { - self.top_level.bytes_remaining() + self.buf.len() - self.pos } } @@ -476,47 +476,34 @@ impl<'a> Iterator for ChunkFlattener<'a> { type Item = McapResult>; fn next(&mut self) -> Option { - if self.malformed { - return None; - } - - let n: Option = loop { - // If we're reading from a chunk, do that until it returns None. - if let Some(d) = &mut self.dechunk { - match d.next() { - Some(d) => break Some(d), - None => self.dechunk = None, - } - } - // Fall through - if we didn't extract a record from a chunk - // (or that chunk ended), move on to the next top-level record. - match self.top_level.next() { - // If it's a chunk, get a new chunk reader going... - Some(Ok(Record::Chunk { header, data })) => { - // Chunks from the LinearReader will always borrow from the file. - // (Getting a normal reference to the underlying data back - // frees us from returning things that reference this local Cow.) - let data: &'a [u8] = match data { - Cow::Borrowed(b) => b, - Cow::Owned(_) => unreachable!(), - }; - - self.dechunk = match ChunkReader::new(header, data) { - Ok(d) => Some(d), - Err(e) => break Some(Err(e)), + loop { + match self.reader.next_action() { + Ok(ReadAction::GetRecord { data, opcode }) => { + if opcode == crate::records::op::DATA_END { + return None; + } + match parse_record(opcode, data) { + Ok(rec) => return Some(Ok(rec.into_owned())), + Err(err) => return Some(Err(err)), }; - // ...then continue the loop to get the first item from the chunk. } - // If it's not a chunk, just yield it. - not_a_chunk => break not_a_chunk, + Ok(ReadAction::Fill(mut into_buf)) => { + let end = std::cmp::min(self.pos + into_buf.buf.len(), self.buf.len()); + let src = &self.buf[self.pos..std::cmp::min(end, self.buf.len())]; + let dst = &mut into_buf.buf[..src.len()]; + let n = src.len(); + if n > 0 { + dst.copy_from_slice(src); + } + into_buf.set_filled(n); + self.pos += n; + } + Ok(ReadAction::Finished) => { + return None; + } + Err(err) => return Some(Err(err)), } - }; - - // Give up on errors - if matches!(n, Some(Err(_))) { - self.malformed = true; } - n } }