Skip to content

Commit

Permalink
perf(rust): add MessageStream benches (#1213)
Browse files Browse the repository at this point in the history
Benches MessageStream on 1M messages uncompressed/zstd/lz4.

### Baseline (m1 pro)

```
mcap_read/MessageStream_1M_uncompressed
                        time:   [67.954 ms 68.277 ms 68.946 ms]
                        thrpt:  [14.504 Melem/s 14.646 Melem/s 14.716 Melem/s]
mcap_read/MessageStream_1M_lz4
                        time:   [337.77 ms 344.16 ms 350.76 ms]
                        thrpt:  [2.8510 Melem/s 2.9056 Melem/s 2.9606 Melem/s]
mcap_read/MessageStream_1M_zstd
                        time:   [381.62 ms 384.54 ms 388.35 ms]
                        thrpt:  [2.5750 Melem/s 2.6005 Melem/s 2.6204 Melem/s]
```
  • Loading branch information
AaronO committed Aug 14, 2024
1 parent c95275f commit 7f82b06
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
10 changes: 10 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,20 @@ anyhow = "1"
atty = "0.2"
camino = "1.0"
clap = { version = "3.2", features = ["derive"]}
criterion = "0.5.1"
itertools = "0.10"
memmap = "0.7"
rayon = "1.5"
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1"
simplelog = "0.12"
tempfile = "3.3"

[[bench]]
name = "reader"
harness = false

[profile.bench]
opt-level = 3
debug = true
lto = true
91 changes: 91 additions & 0 deletions rust/benches/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use criterion::{criterion_group, criterion_main, Criterion};
use mcap::{Channel, Message, MessageStream, Schema};
use std::borrow::Cow;
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;

fn create_test_mcap(n: usize, compression: Option<mcap::Compression>) -> Vec<u8> {
let mut buffer = Vec::new();
{
let mut writer = mcap::WriteOptions::new()
.compression(compression)
.profile("fooey")
.create(Cursor::new(&mut buffer))
.unwrap();
// Mock message data to align with reader benchmarks in ts
const MESSAGE_DATA: &[u8] = &[42; 10];

let schema = Arc::new(Schema {
name: "TestSchema".to_string(),
encoding: "raw".to_string(),
data: Cow::Borrowed(b"{}"),
});

let channel = Arc::new(Channel {
topic: "test_topic".to_string(),
message_encoding: "raw".to_string(),
metadata: Default::default(),
schema: Some(schema),
});

for i in 0..n {
let message = Message {
channel: channel.clone(),
sequence: i as u32,
log_time: i as u64,
publish_time: i as u64,
data: Cow::Borrowed(&MESSAGE_DATA),
};
writer.write(&message).unwrap();
}

writer.finish().unwrap();
}
buffer
}

fn bench_read_messages(c: &mut Criterion) {
const N: usize = 1_000_000;
let mcap_data_uncompressed = create_test_mcap(N, None);
let mcap_data_lz4 = create_test_mcap(N, Some(mcap::Compression::Lz4));
let mcap_data_zstd = create_test_mcap(N, Some(mcap::Compression::Zstd));
let mut group = c.benchmark_group("mcap_read");
group.throughput(criterion::Throughput::Elements(N as u64));

group.bench_function("MessageStream_1M_uncompressed", |b| {
b.iter(|| {
let stream = MessageStream::new(&mcap_data_uncompressed).unwrap();
for message in stream {
std::hint::black_box(message.unwrap());
}
});
});

group.bench_function("MessageStream_1M_lz4", |b| {
b.iter(|| {
let stream = MessageStream::new(&mcap_data_lz4).unwrap();
for message in stream {
std::hint::black_box(message.unwrap());
}
});
});

group.bench_function("MessageStream_1M_zstd", |b| {
b.iter(|| {
let stream = MessageStream::new(&mcap_data_zstd).unwrap();
for message in stream {
std::hint::black_box(message.unwrap());
}
});
});

group.finish();
}

criterion_group! {
name = benches;
config = Criterion::default().warm_up_time(Duration::from_secs(1)).sample_size(10);
targets = bench_read_messages
}
criterion_main!(benches);

0 comments on commit 7f82b06

Please sign in to comment.