Skip to content

Commit

Permalink
strip out linear reader
Browse files Browse the repository at this point in the history
  • Loading branch information
james-rms committed Sep 28, 2024
1 parent 2606f80 commit 6298078
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 155 deletions.
199 changes: 45 additions & 154 deletions rust/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::{
collections::{BTreeMap, HashMap},
fmt,
io::{self, prelude::*, Cursor},
mem::size_of,
sync::Arc,
};

Expand All @@ -25,7 +24,7 @@ use log::*;
use crate::{
io_utils::CountingCrcReader,
records::{self, op, Record},
sans_io::read::{ReadAction, RecordReaderOptions},
sans_io::read::{ReadAction, RecordReader, RecordReaderOptions},
Attachment, Channel, McapError, McapResult, Message, Schema, MAGIC,
};

Expand All @@ -46,7 +45,7 @@ pub enum Options {
/// and is mostly meant as a building block for higher-level readers.
pub struct LinearReader<'a> {
buf: &'a [u8],
malformed: bool,
reader: crate::sans_io::read::RecordReader,
}

impl<'a> LinearReader<'a> {
Expand All @@ -58,18 +57,16 @@ impl<'a> LinearReader<'a> {

/// Create a reader for the given file with special options.
pub fn new_with_options(buf: &'a [u8], options: EnumSet<Options>) -> McapResult<Self> {
if !buf.starts_with(MAGIC)
|| (!options.contains(Options::IgnoreEndMagic)
&& (!buf.ends_with(MAGIC) || buf.len() < 2 * MAGIC.len()))
{
return Err(McapError::BadMagic);
}
let buf = &buf[MAGIC.len()..];
if buf.ends_with(MAGIC) {
Ok(Self::sans_magic(&buf[0..buf.len() - MAGIC.len()]))
} else {
Ok(Self::sans_magic(buf))
}
Ok(Self {
buf,
reader: RecordReader::new_with_options(RecordReaderOptions {
skip_start_magic: false,
skip_end_magic: options.contains(Options::IgnoreEndMagic),
emit_chunks: true,
validate_chunk_crcs: false,
validate_data_section_crc: true,
}),
})
}

/// Like [`new()`](Self::new), but assumes `buf` has the magic bytes sliced off.
Expand All @@ -78,7 +75,13 @@ impl<'a> LinearReader<'a> {
pub fn sans_magic(buf: &'a [u8]) -> Self {
Self {
buf,
malformed: false,
reader: RecordReader::new_with_options(RecordReaderOptions {
skip_start_magic: true,
skip_end_magic: true,
emit_chunks: true,
validate_chunk_crcs: false,
validate_data_section_crc: true,
}),
}
}

Expand All @@ -95,53 +98,25 @@ impl<'a> Iterator for LinearReader<'a> {
type Item = McapResult<records::Record<'a>>;

fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
return None;
}

// After an unrecoverable error (due to something wonky in the file),
// don't keep trying to walk it.
if self.malformed {
return None;
}

let record = match read_record_from_slice(&mut self.buf) {
Ok(k) => k,
Err(e) => {
self.malformed = true;
return Some(Err(e));
loop {
match self.reader.next_action() {
Ok(ReadAction::Fill(mut into_buf)) => {
let len = std::cmp::min(self.buf.len(), into_buf.buf.len());
let src = &self.buf[..len];
let dst = &mut into_buf.buf[..len];
dst.copy_from_slice(src);
into_buf.set_filled(len);
self.buf = &self.buf[len..];
}
Ok(ReadAction::Finished) => return None,
Ok(ReadAction::GetRecord { data, opcode }) => match parse_record(opcode, data) {
Ok(record) => return Some(Ok(record.into_owned())),
Err(err) => return Some(Err(err)),
},
Err(err) => return Some(Err(err)),
}
};

Some(Ok(record))
}
}

/// Read a record and advance the slice
fn read_record_from_slice<'a>(buf: &mut &'a [u8]) -> McapResult<records::Record<'a>> {
if buf.len() < (size_of::<u64>() + size_of::<u8>()) {
warn!("Malformed MCAP - not enough space for record + length!");
return Err(McapError::UnexpectedEof);
}

let op = read_u8(buf);
let len = read_u64(buf);

if buf.len() < len as usize {
warn!(
"Malformed MCAP - record with length {len}, but only {} bytes remain",
buf.len()
);
return Err(McapError::UnexpectedEof);
}
}

let body = &buf[..len as usize];
debug!("slice: opcode {op:02X}, length {len}");
let record = parse_record(op, body)?;
trace!(" {:?}", record);

*buf = &buf[len as usize..];
Ok(record)
}

/// Given a records' opcode and data, parse into a Record. The resulting Record will contain
Expand Down Expand Up @@ -834,8 +809,10 @@ impl<'a> Summary<'a> {
}

// Get the chunk (as a header and its data) out of the file at the given offset.
let mut reader = LinearReader::sans_magic(&mcap[index.chunk_start_offset as usize..end]);
let (h, d) = match reader.next().ok_or(McapError::BadIndex)? {
let chunk_record_buf = &mcap[(index.chunk_start_offset as usize) + 9..end];
let chunk = parse_record(op::CHUNK, chunk_record_buf);

let (h, d) = match chunk {
Ok(records::Record::Chunk { header, data }) => (header, data),
Ok(_other_record) => return Err(McapError::BadIndex),
Err(e) => return Err(e),
Expand All @@ -848,11 +825,6 @@ impl<'a> Summary<'a> {
Cow::Owned(_) => unreachable!(),
};

if reader.next().is_some() {
// Wut - multiple records in the given slice?
return Err(McapError::BadIndex);
}

// Now let's stream messages out of the chunk.
let messages = ChunkReader::new(h, d)?.filter_map(|record| match record {
Ok(records::Record::Message { header, data }) => {
Expand Down Expand Up @@ -962,9 +934,11 @@ impl<'a> Summary<'a> {
if mcap.len() < end {
return Err(McapError::BadIndex);
}

let mut reader = LinearReader::sans_magic(&mcap[index.chunk_start_offset as usize..end]);
let (h, d) = match reader.next().ok_or(McapError::BadIndex)? {
let chunk = parse_record(
op::CHUNK,
&mcap[(index.chunk_start_offset + 9) as usize..end],
);
let (h, d) = match chunk {
Ok(records::Record::Chunk { header, data }) => (header, data),
Ok(_other_record) => return Err(McapError::BadIndex),
Err(e) => return Err(e),
Expand All @@ -977,11 +951,6 @@ impl<'a> Summary<'a> {
Cow::Owned(_) => unreachable!(),
};

if reader.next().is_some() {
// Wut - multiple records in the given slice?
return Err(McapError::BadIndex);
}

let mut chunk_reader = ChunkReader::new(h, d)?;

// Do unspeakable things to seek to the message.
Expand Down Expand Up @@ -1091,81 +1060,3 @@ pub fn metadata(mcap: &[u8], index: &records::MetadataIndex) -> McapResult<recor

Ok(m)
}

// All of the following panic if they walk off the back of the data block;
// callers are assumed to have made sure they got enoug bytes back with
// `validate_response()`

/// Builds a `read_<type>(&mut buf)` function that reads a given type
/// off the buffer and advances it the appropriate number of bytes.
macro_rules! reader {
($type:ty) => {
paste::paste! {
#[inline]
fn [<read_ $type>](block: &mut &[u8]) -> $type {
const SIZE: usize = size_of::<$type>();
let res = $type::from_le_bytes(
block[0..SIZE].try_into().unwrap()
);
*block = &block[SIZE..];
res
}
}
};
}

reader!(u8);
reader!(u64);

#[cfg(test)]
mod test {
use super::*;

// Can we read a file that's only magic?
// (Probably considered malformed by the spec, but let's not panic on user input)

#[test]
fn only_two_magics() {
let two_magics = MAGIC.repeat(2);
let mut reader = LinearReader::new(&two_magics).unwrap();
assert!(reader.next().is_none());
}

#[test]
fn only_one_magic() {
assert!(matches!(LinearReader::new(MAGIC), Err(McapError::BadMagic)));
}

#[test]
fn only_two_magic_with_ignore_end_magic() {
let two_magics = MAGIC.repeat(2);
let mut reader =
LinearReader::new_with_options(&two_magics, enum_set!(Options::IgnoreEndMagic))
.unwrap();
assert!(reader.next().is_none());
}

#[test]
fn only_one_magic_with_ignore_end_magic() {
let mut reader =
LinearReader::new_with_options(MAGIC, enum_set!(Options::IgnoreEndMagic)).unwrap();
assert!(reader.next().is_none());
}

#[test]
fn test_read_record_from_slice_fails_on_too_short_chunks() {
let res = read_record_from_slice(&mut [0_u8; 4].as_slice());
assert!(matches!(res, Err(McapError::UnexpectedEof)));

let res = read_record_from_slice(&mut [0_u8; 8].as_slice());
assert!(matches!(res, Err(McapError::UnexpectedEof)));
}

#[test]
fn test_read_record_from_slice_parses_for_big_enough_records() {
let res = read_record_from_slice(&mut [0_u8; 9].as_slice());
assert!(res.is_ok());
// Not a very strong test, but we are only testing that it checks the buffer size correctly
assert!(matches!(res, Ok(Record::Unknown { opcode: _, data: _ })));
}
}
2 changes: 1 addition & 1 deletion rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, hash::Hasher};
use std::collections::HashMap;

use super::{
decompressor::{Decompressor, NoneDecompressor},
Expand Down

0 comments on commit 6298078

Please sign in to comment.