From 629807801ae40816b1de9a7ca3f579ff235e7664 Mon Sep 17 00:00:00 2001 From: James Smith Date: Sat, 28 Sep 2024 21:35:04 +1000 Subject: [PATCH] strip out linear reader --- rust/src/read.rs | 199 +++++++++------------------------------ rust/src/sans_io/read.rs | 2 +- 2 files changed, 46 insertions(+), 155 deletions(-) diff --git a/rust/src/read.rs b/rust/src/read.rs index 3c473804a..cc7e58bdb 100644 --- a/rust/src/read.rs +++ b/rust/src/read.rs @@ -12,7 +12,6 @@ use std::{ collections::{BTreeMap, HashMap}, fmt, io::{self, prelude::*, Cursor}, - mem::size_of, sync::Arc, }; @@ -25,7 +24,7 @@ use log::*; use crate::{ io_utils::CountingCrcReader, records::{self, op, Record}, - sans_io::read::{ReadAction, RecordReaderOptions}, + sans_io::read::{ReadAction, RecordReader, RecordReaderOptions}, Attachment, Channel, McapError, McapResult, Message, Schema, MAGIC, }; @@ -46,7 +45,7 @@ pub enum Options { /// and is mostly meant as a building block for higher-level readers. pub struct LinearReader<'a> { buf: &'a [u8], - malformed: bool, + reader: crate::sans_io::read::RecordReader, } impl<'a> LinearReader<'a> { @@ -58,18 +57,16 @@ impl<'a> LinearReader<'a> { /// Create a reader for the given file with special options. pub fn new_with_options(buf: &'a [u8], options: EnumSet) -> McapResult { - if !buf.starts_with(MAGIC) - || (!options.contains(Options::IgnoreEndMagic) - && (!buf.ends_with(MAGIC) || buf.len() < 2 * MAGIC.len())) - { - return Err(McapError::BadMagic); - } - let buf = &buf[MAGIC.len()..]; - if buf.ends_with(MAGIC) { - Ok(Self::sans_magic(&buf[0..buf.len() - MAGIC.len()])) - } else { - Ok(Self::sans_magic(buf)) - } + Ok(Self { + buf, + reader: RecordReader::new_with_options(RecordReaderOptions { + skip_start_magic: false, + skip_end_magic: options.contains(Options::IgnoreEndMagic), + emit_chunks: true, + validate_chunk_crcs: false, + validate_data_section_crc: true, + }), + }) } /// Like [`new()`](Self::new), but assumes `buf` has the magic bytes sliced off. @@ -78,7 +75,13 @@ impl<'a> LinearReader<'a> { pub fn sans_magic(buf: &'a [u8]) -> Self { Self { buf, - malformed: false, + reader: RecordReader::new_with_options(RecordReaderOptions { + skip_start_magic: true, + skip_end_magic: true, + emit_chunks: true, + validate_chunk_crcs: false, + validate_data_section_crc: true, + }), } } @@ -95,53 +98,25 @@ impl<'a> Iterator for LinearReader<'a> { type Item = McapResult>; fn next(&mut self) -> Option { - if self.buf.is_empty() { - return None; - } - - // After an unrecoverable error (due to something wonky in the file), - // don't keep trying to walk it. - if self.malformed { - return None; - } - - let record = match read_record_from_slice(&mut self.buf) { - Ok(k) => k, - Err(e) => { - self.malformed = true; - return Some(Err(e)); + loop { + match self.reader.next_action() { + Ok(ReadAction::Fill(mut into_buf)) => { + let len = std::cmp::min(self.buf.len(), into_buf.buf.len()); + let src = &self.buf[..len]; + let dst = &mut into_buf.buf[..len]; + dst.copy_from_slice(src); + into_buf.set_filled(len); + self.buf = &self.buf[len..]; + } + Ok(ReadAction::Finished) => return None, + Ok(ReadAction::GetRecord { data, opcode }) => match parse_record(opcode, data) { + Ok(record) => return Some(Ok(record.into_owned())), + Err(err) => return Some(Err(err)), + }, + Err(err) => return Some(Err(err)), } - }; - - Some(Ok(record)) - } -} - -/// Read a record and advance the slice -fn read_record_from_slice<'a>(buf: &mut &'a [u8]) -> McapResult> { - if buf.len() < (size_of::() + size_of::()) { - warn!("Malformed MCAP - not enough space for record + length!"); - return Err(McapError::UnexpectedEof); - } - - let op = read_u8(buf); - let len = read_u64(buf); - - if buf.len() < len as usize { - warn!( - "Malformed MCAP - record with length {len}, but only {} bytes remain", - buf.len() - ); - return Err(McapError::UnexpectedEof); + } } - - let body = &buf[..len as usize]; - debug!("slice: opcode {op:02X}, length {len}"); - let record = parse_record(op, body)?; - trace!(" {:?}", record); - - *buf = &buf[len as usize..]; - Ok(record) } /// Given a records' opcode and data, parse into a Record. The resulting Record will contain @@ -834,8 +809,10 @@ impl<'a> Summary<'a> { } // Get the chunk (as a header and its data) out of the file at the given offset. - let mut reader = LinearReader::sans_magic(&mcap[index.chunk_start_offset as usize..end]); - let (h, d) = match reader.next().ok_or(McapError::BadIndex)? { + let chunk_record_buf = &mcap[(index.chunk_start_offset as usize) + 9..end]; + let chunk = parse_record(op::CHUNK, chunk_record_buf); + + let (h, d) = match chunk { Ok(records::Record::Chunk { header, data }) => (header, data), Ok(_other_record) => return Err(McapError::BadIndex), Err(e) => return Err(e), @@ -848,11 +825,6 @@ impl<'a> Summary<'a> { Cow::Owned(_) => unreachable!(), }; - if reader.next().is_some() { - // Wut - multiple records in the given slice? - return Err(McapError::BadIndex); - } - // Now let's stream messages out of the chunk. let messages = ChunkReader::new(h, d)?.filter_map(|record| match record { Ok(records::Record::Message { header, data }) => { @@ -962,9 +934,11 @@ impl<'a> Summary<'a> { if mcap.len() < end { return Err(McapError::BadIndex); } - - let mut reader = LinearReader::sans_magic(&mcap[index.chunk_start_offset as usize..end]); - let (h, d) = match reader.next().ok_or(McapError::BadIndex)? { + let chunk = parse_record( + op::CHUNK, + &mcap[(index.chunk_start_offset + 9) as usize..end], + ); + let (h, d) = match chunk { Ok(records::Record::Chunk { header, data }) => (header, data), Ok(_other_record) => return Err(McapError::BadIndex), Err(e) => return Err(e), @@ -977,11 +951,6 @@ impl<'a> Summary<'a> { Cow::Owned(_) => unreachable!(), }; - if reader.next().is_some() { - // Wut - multiple records in the given slice? - return Err(McapError::BadIndex); - } - let mut chunk_reader = ChunkReader::new(h, d)?; // Do unspeakable things to seek to the message. @@ -1091,81 +1060,3 @@ pub fn metadata(mcap: &[u8], index: &records::MetadataIndex) -> McapResult(&mut buf)` function that reads a given type -/// off the buffer and advances it the appropriate number of bytes. -macro_rules! reader { - ($type:ty) => { - paste::paste! { - #[inline] - fn [](block: &mut &[u8]) -> $type { - const SIZE: usize = size_of::<$type>(); - let res = $type::from_le_bytes( - block[0..SIZE].try_into().unwrap() - ); - *block = &block[SIZE..]; - res - } - } - }; -} - -reader!(u8); -reader!(u64); - -#[cfg(test)] -mod test { - use super::*; - - // Can we read a file that's only magic? - // (Probably considered malformed by the spec, but let's not panic on user input) - - #[test] - fn only_two_magics() { - let two_magics = MAGIC.repeat(2); - let mut reader = LinearReader::new(&two_magics).unwrap(); - assert!(reader.next().is_none()); - } - - #[test] - fn only_one_magic() { - assert!(matches!(LinearReader::new(MAGIC), Err(McapError::BadMagic))); - } - - #[test] - fn only_two_magic_with_ignore_end_magic() { - let two_magics = MAGIC.repeat(2); - let mut reader = - LinearReader::new_with_options(&two_magics, enum_set!(Options::IgnoreEndMagic)) - .unwrap(); - assert!(reader.next().is_none()); - } - - #[test] - fn only_one_magic_with_ignore_end_magic() { - let mut reader = - LinearReader::new_with_options(MAGIC, enum_set!(Options::IgnoreEndMagic)).unwrap(); - assert!(reader.next().is_none()); - } - - #[test] - fn test_read_record_from_slice_fails_on_too_short_chunks() { - let res = read_record_from_slice(&mut [0_u8; 4].as_slice()); - assert!(matches!(res, Err(McapError::UnexpectedEof))); - - let res = read_record_from_slice(&mut [0_u8; 8].as_slice()); - assert!(matches!(res, Err(McapError::UnexpectedEof))); - } - - #[test] - fn test_read_record_from_slice_parses_for_big_enough_records() { - let res = read_record_from_slice(&mut [0_u8; 9].as_slice()); - assert!(res.is_ok()); - // Not a very strong test, but we are only testing that it checks the buffer size correctly - assert!(matches!(res, Ok(Record::Unknown { opcode: _, data: _ }))); - } -} diff --git a/rust/src/sans_io/read.rs b/rust/src/sans_io/read.rs index d3e999791..ff26ee86b 100644 --- a/rust/src/sans_io/read.rs +++ b/rust/src/sans_io/read.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, hash::Hasher}; +use std::collections::HashMap; use super::{ decompressor::{Decompressor, NoneDecompressor},