Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Typescript: provide message iterator which yields synchronously for messages #1188

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 104 additions & 30 deletions typescript/benchmarks/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
import { McapIndexedReader, McapStreamReader, McapWriter, TempBuffer } from "@mcap/core";
import { McapIndexedReader, McapStreamReader, McapWriter, McapTypes, IWritable } from "@mcap/core";
import assert from "assert";
import { program } from "commander";
import fs from "fs/promises";
import os from "os";
import path from "path";

import { runBenchmark } from "./bench";

class ReadableFile implements McapTypes.IReadable {
#fd: fs.FileHandle;
constructor(fd: fs.FileHandle) {
this.#fd = fd;
}
async read(offset: bigint, size: bigint): Promise<Uint8Array> {
const res = new Uint8Array(Number(size));
await this.#fd.read(res, 0, Number(size), Number(offset));
return res;
}
async size(): Promise<bigint> {
const stat = await this.#fd.stat();
return BigInt(stat.size);
}
}

class WritableFile implements IWritable {
#fd: fs.FileHandle;
#pos: bigint;
constructor(fd: fs.FileHandle) {
this.#fd = fd;
this.#pos = BigInt(0);
}
async write(buffer: Uint8Array): Promise<void> {
await this.#fd.write(buffer);
this.#pos = this.#pos + BigInt(buffer.length);
}
position(): bigint {
return this.#pos;
}
}

/**
* An IWritable that copies data to memory, but overwrites previous data. This allows benchmarking
* the copies without actually allocating the full initial capacity.
Expand Down Expand Up @@ -38,56 +73,95 @@ async function benchmarkReaders() {
const chunkSize = 1024 * 1024 * 4;
const numMessages = 1_000_000;
const messageData = new Uint8Array(messageSize).fill(42);
const buf = new TempBuffer();
const writer = new McapWriter({ writable: buf, chunkSize });
await writer.start({ library: "", profile: "" });
const channelId = await writer.registerChannel({
schemaId: 0,
topic: "",
messageEncoding: "",
metadata: new Map([]),
});
for (let i = 0; i < numMessages; i++) {
await writer.addMessage({
channelId,
sequence: i,
logTime: BigInt(i),
publishTime: BigInt(i),
data: messageData,
const filepath = path.join(os.tmpdir(), "sample.mcap");
{
const fd = await fs.open(filepath, "w");

const writer = new McapWriter({ writable: new WritableFile(fd), chunkSize });
await writer.start({ library: "", profile: "" });
const channelId = await writer.registerChannel({
schemaId: 0,
topic: "",
messageEncoding: "",
metadata: new Map([]),
});
for (let i = 0; i < numMessages; i++) {
await writer.addMessage({
channelId,
sequence: i,
logTime: BigInt(i),
publishTime: BigInt(i),
data: messageData,
});
}
await writer.end();
await fd.close();
}
await writer.end();
await runBenchmark(McapStreamReader.name, async () => {
const fd = await fs.open(filepath);
const stream = fd.createReadStream();
const reader = new McapStreamReader();
reader.append(buf.get());
let messageCount = 0;
for (;;) {
const rec = reader.nextRecord();
if (rec != undefined) {
if (rec.type === "Message") {
stream.on("data", (chunk) => {
reader.append(Buffer.from(chunk));
for (let record; (record = reader.nextRecord()); ) {
if (record.type === "Message") {
messageCount++;
}
} else {
break;
}
}
});
await new Promise((resolve) => stream.on("end", resolve));
stream.close();
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
await runBenchmark(McapIndexedReader.name, async () => {
const reader = await McapIndexedReader.Initialize({ readable: buf });
await runBenchmark("readMessages_async", async () => {
const fd = await fs.open(filepath);
const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) });
let messageCount = 0;
for await (const _ of reader.readMessages()) {
messageCount++;
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
await runBenchmark(McapIndexedReader.name + "_reverse", async () => {
const reader = await McapIndexedReader.Initialize({ readable: buf });
await runBenchmark("readMessages_async_reverse", async () => {
const fd = await fs.open(filepath);
const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) });
let messageCount = 0;
for await (const _ of reader.readMessages({ reverse: true })) {
messageCount++;
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
await runBenchmark("readMessages_sync", async () => {
const fd = await fs.open(filepath);
const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) });
let messageCount = 0;
for (const { promise } of reader.readMessagesSync()) {
if (promise != undefined) {
await promise;
} else {
messageCount++;
}
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
await runBenchmark("readMessages_sync_reverse", async () => {
const fd = await fs.open(filepath);
const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) });
let messageCount = 0;
for (const { promise } of reader.readMessagesSync({ reverse: true })) {
if (promise != undefined) {
await promise;
} else {
messageCount++;
}
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
}

Expand Down
66 changes: 59 additions & 7 deletions typescript/core/src/McapIndexedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ type McapIndexedReaderArgs = {
dataSectionCrc?: number;
};

type ReadMessagesSyncResult =
| {
promise: Promise<void>;
message: undefined;
}
| {
promise: undefined;
message: TypedMcapRecords["Message"];
};

export class McapIndexedReader {
readonly chunkIndexes: readonly TypedMcapRecords["ChunkIndex"][];
readonly attachmentIndexes: readonly TypedMcapRecords["AttachmentIndex"][];
Expand Down Expand Up @@ -345,6 +355,13 @@ export class McapIndexedReader {
});
}

/** Returns an async iterator that iterates over messages in the MCAP file in order of log time.
* @param args.topics: if defined, only messages from channels matching the topics will be yielded.
* @param args.startTime: if defined, only messages with log times on or after this time will be yielded.
* @param args.endTime: if defined, only messages with log times on or before this time will be yielded.
* @param args.reverse: if true, messages will be yielded in reverse log-time order.
* @param args.validateCrcs: if false, chunk CRCs will not be validated while reading the MCAP.
*/
async *readMessages(
args: {
topics?: readonly string[];
Expand All @@ -354,6 +371,33 @@ export class McapIndexedReader {
validateCrcs?: boolean;
} = {},
): AsyncGenerator<TypedMcapRecords["Message"], void, void> {
for (const { promise, message } of this.readMessagesSync(args)) {
if (promise != undefined) {
await promise;
} else {
yield message;
}
}
}

/** Returns an iterator that iterates over messages in the MCAP file in order of log time.
* The returned object will have either the `message` or `promise` member defined. If `promise`
* is not undefined, the caller must wait for it to resolve before calling next().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* is not undefined, the caller must wait for it to resolve before calling next().
* is defined, the caller must wait for it to resolve before calling next().

* @param args.topics: if defined, only messages from channels matching the topics will be yielded.
* @param args.startTime: if defined, only messages with log times on or after this time will be yielded.
* @param args.endTime: if defined, only messages with log times on or before this time will be yielded.
* @param args.reverse: if true, messages will be yielded in reverse log-time order.
* @param args.validateCrcs: if false, chunk CRCs will not be validated while reading the MCAP.
*/
*readMessagesSync(
args: {
topics?: readonly string[];
startTime?: bigint;
endTime?: bigint;
reverse?: boolean;
validateCrcs?: boolean;
} = {},
): Generator<ReadMessagesSyncResult, void, void> {
const {
topics,
startTime = this.#messageStartTime,
Expand Down Expand Up @@ -398,7 +442,10 @@ export class McapIndexedReader {
for (let cursor; (cursor = chunkCursors.peek()); ) {
if (!cursor.hasMessageIndexes()) {
// If we encounter a chunk whose message indexes have not been loaded yet, load them and re-organize the heap.
await cursor.loadMessageIndexes(this.#readable);
yield {
promise: cursor.loadMessageIndexes(this.#readable),
message: undefined,
};
if (cursor.hasMoreMessages()) {
chunkCursors.replace(cursor);
} else {
Expand All @@ -409,12 +456,17 @@ export class McapIndexedReader {

let chunkView = chunkViewCache.get(cursor.chunkIndex.chunkStartOffset);
if (!chunkView) {
chunkView = await this.#loadChunkData(cursor.chunkIndex, {
validateCrcs: validateCrcs ?? true,
});
chunkViewCache.set(cursor.chunkIndex.chunkStartOffset, chunkView);
const promise = (async () => {
chunkView = await this.#loadChunkData(cursor.chunkIndex, {
validateCrcs: validateCrcs ?? true,
});
chunkViewCache.set(cursor.chunkIndex.chunkStartOffset, chunkView);
})();
yield { promise, message: undefined };
}
if (chunkView == undefined) {
throw new Error("must wait on yielded promise before continuing");
}

const [logTime, offset] = cursor.popMessage();
if (offset >= BigInt(chunkView.byteLength)) {
throw this.#errorWithLibrary(
Expand All @@ -441,7 +493,7 @@ export class McapIndexedReader {
`Message log time ${result.record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
);
}
yield result.record;
yield { promise: undefined, message: result.record };

if (cursor.hasMoreMessages()) {
// There is no need to reorganize the heap when chunks are ordered and not overlapping.
Expand Down
Loading