Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rust: Support writing without chunks #1201

Merged
merged 3 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,8 @@
"C_Cpp.default.cppStandard": "c++17",
"[go]": {
"editor.defaultFormatter": "golang.go"
},
"[rust]": {
"editor.defaultFormatter": "rust-lang.rust-analyzer"
}
}
92 changes: 76 additions & 16 deletions rust/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub struct WriteOptions {
compression: Option<Compression>,
profile: String,
chunk_size: Option<u64>,
use_chunks: bool,
}

impl Default for WriteOptions {
Expand All @@ -119,6 +120,7 @@ impl Default for WriteOptions {
compression: None,
profile: String::new(),
chunk_size: Some(1024 * 768),
use_chunks: true,
}
}
}
Expand Down Expand Up @@ -157,6 +159,18 @@ impl WriteOptions {
}
}

/// specifies whether to use chunks for storing messages.
///
/// If `false`, messages will be written directly to the data section of the file.
/// This prevents using compression or indexing, but may be useful on small embedded systems
/// that cannot afford the memory overhead of storing chunk metadata for the entire recording.
///
/// Note that it's often useful to post-process a non-chunked file using `mcap recover` to add
/// indexes for efficient processing.
pub fn use_chunks(self, use_chunks: bool) -> Self {
Self { use_chunks, ..self }
}

/// Creates a [`Writer`] whch writes to `w` using the given options
pub fn create<'a, W: Write + Seek>(self, w: W) -> McapResult<Writer<'a, W>> {
Writer::with_options(w, self)
Expand Down Expand Up @@ -227,8 +241,21 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
.channels
.insert(chan.clone(), next_channel_id)
.is_none());
self.chunkin_time()?
.write_channel(next_channel_id, schema_id, chan)?;
if self.options.use_chunks {
self.chunkin_time()?
.write_channel(next_channel_id, schema_id, chan)?;
} else {
write_record(
self.finish_chunk()?,
&Record::Channel(records::Channel {
id: next_channel_id,
schema_id,
topic: chan.topic.clone(),
message_encoding: chan.message_encoding.clone(),
metadata: chan.metadata.clone(),
}),
)?;
}
Ok(next_channel_id)
}

Expand All @@ -244,7 +271,21 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
.schemas
.insert(schema.clone(), next_schema_id)
.is_none());
self.chunkin_time()?.write_schema(next_schema_id, schema)?;
if self.options.use_chunks {
self.chunkin_time()?.write_schema(next_schema_id, schema)?;
} else {
write_record(
self.finish_chunk()?,
&Record::Schema {
header: records::SchemaHeader {
id: next_schema_id,
name: schema.name.clone(),
encoding: schema.encoding.clone(),
},
data: Cow::Borrowed(&schema.data),
},
)?;
}
Ok(next_schema_id)
}

Expand Down Expand Up @@ -301,7 +342,17 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
}
}

self.chunkin_time()?.write_message(header, data)?;
if self.options.use_chunks {
self.chunkin_time()?.write_message(header, data)?;
} else {
write_record(
self.finish_chunk()?,
&Record::Message {
header: *header,
data: Cow::Borrowed(data),
},
)?;
}
Ok(())
}

Expand Down Expand Up @@ -389,6 +440,10 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
// (That would leave it in an unspecified state if we bailed here!)
// Instead briefly swap it out for a null writer while we set up the chunker
// The writer will only be None if finish() was called.
if !self.options.use_chunks {
unreachable!("Trying to write to a chunk when chunking is disabled")
eloff marked this conversation as resolved.
Show resolved Hide resolved
}

let prev_writer = self.writer.take().expect(Self::WHERE_WRITER);

self.writer = Some(match prev_writer {
Expand Down Expand Up @@ -566,18 +621,23 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
});
}

// Write all chunk indexes.
let chunk_indexes_start = channels_end;
for index in chunk_indexes {
write_record(&mut ccw, &Record::ChunkIndex(index))?;
}
let chunk_indexes_end = posit(&mut ccw)?;
if chunk_indexes_end - chunk_indexes_start > 0 {
offsets.push(records::SummaryOffset {
group_opcode: op::CHUNK_INDEX,
group_start: chunk_indexes_start,
group_length: chunk_indexes_end - chunk_indexes_start,
});
let chunk_indexes_end;
if self.options.use_chunks {
// Write all chunk indexes.
let chunk_indexes_start = channels_end;
for index in chunk_indexes {
write_record(&mut ccw, &Record::ChunkIndex(index))?;
}
chunk_indexes_end = posit(&mut ccw)?;
if chunk_indexes_end - chunk_indexes_start > 0 {
offsets.push(records::SummaryOffset {
group_opcode: op::CHUNK_INDEX,
group_start: chunk_indexes_start,
group_length: chunk_indexes_end - chunk_indexes_start,
});
}
} else {
chunk_indexes_end = channels_end;
}

// ...and attachment indexes
Expand Down
Loading