Skip to content

Commit

Permalink
perf(ts): reuse Reader whilst parsing records (#1212)
Browse files Browse the repository at this point in the history
Reduces `McapStreamReader` heap usage by ~25% and boosts throughput by
~30%.

This is both a refactor and perf boost, with more room for improvement,
key changes:
- Removes StreamBuffer, hoisted into McapStreamReader
- Reuses Reader and DataView class across parse calls, only resetting
them when necessary (e.g: append in McapStreamReader)
- Splits `parseRecord` into small scoped parsing functions, this itself
is perf neutral (slightly positive) but facilitates future monomorphic
fast paths
- Moves offsets tracking into Reader which is cleaner and faster

### Before

```
McapStreamReader
        3.48±0.03 op/s  Heap Used: 49.56±12.75 MB/op    Heap Total: 41.47±11.83 MB/op   ArrayBuffers: 112.95±6.87 MB/op
McapIndexedReader
        2.15±0.02 op/s  Heap Used: 70.02±2.84 MB/op     Heap Total: 58.34±3.36 MB/op    ArrayBuffers: 17.86±0.76 MB/op
McapIndexedReader_reverse
        2.18±0.01 op/s  Heap Used: 59.92±2.86 MB/op     Heap Total: 39.81±1.00 MB/op    ArrayBuffers: 14.58±1.42 MB/op
```

### After

```
McapStreamReader
        4.47±0.08 op/s  Heap Used: 42.35±2.23 MB/op     Heap Total: 32.93±3.76 MB/op    ArrayBuffers: 105.93±12.19 MB/op
McapIndexedReader
        2.38±0.02 op/s  Heap Used: 72.00±1.70 MB/op     Heap Total: 55.12±2.51 MB/op    ArrayBuffers: 17.86±1.85 MB/op
McapIndexedReader_reverse
        2.38±0.02 op/s  Heap Used: 63.41±1.55 MB/op     Heap Total: 39.33±0.53 MB/op    ArrayBuffers: 18.40±1.60 MB/op
```

### Followups

- Simplify parsing code further after exploring monomorphic callpaths
- Can further improve DataView/Reader churn in indexed readers
  • Loading branch information
AaronO committed Aug 15, 2024
1 parent 7fb9dfa commit 4fcdd33
Show file tree
Hide file tree
Showing 8 changed files with 556 additions and 600 deletions.
25 changes: 10 additions & 15 deletions typescript/core/src/ChunkCursor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Reader from "./Reader";
import { parseRecord } from "./parse";
import { sortedIndexBy } from "./sortedIndexBy";
import { sortedLastIndexBy } from "./sortedLastIndex";
Expand Down Expand Up @@ -136,31 +137,25 @@ export class ChunkCursor {
messageIndexes.byteLength,
);

let offset = 0;
const reader = new Reader(messageIndexesView);
const arrayOfMessageOffsets: [logTime: bigint, offset: bigint][][] = [];
for (
let result;
(result = parseRecord({ view: messageIndexesView, startOffset: offset, validateCrcs: true })),
result.record;
offset += result.usedBytes
) {
if (result.record.type !== "MessageIndex") {
let record;
while ((record = parseRecord(reader, true))) {
if (record.type !== "MessageIndex") {
continue;
}
if (
result.record.records.length === 0 ||
(this.#relevantChannels && !this.#relevantChannels.has(result.record.channelId))
record.records.length === 0 ||
(this.#relevantChannels && !this.#relevantChannels.has(record.channelId))
) {
continue;
}

arrayOfMessageOffsets.push(result.record.records);
arrayOfMessageOffsets.push(record.records);
}

if (offset !== messageIndexesView.byteLength) {
throw new Error(
`${messageIndexesView.byteLength - offset} bytes remaining in message index section`,
);
if (reader.bytesRemaining() !== 0) {
throw new Error(`${reader.bytesRemaining()} bytes remaining in message index section`);
}

this.#orderedMessageOffsets = arrayOfMessageOffsets
Expand Down
159 changes: 70 additions & 89 deletions typescript/core/src/McapIndexedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { crc32, crc32Final, crc32Init, crc32Update } from "@foxglove/crc";
import Heap from "heap-js";

import { ChunkCursor } from "./ChunkCursor";
import Reader from "./Reader";
import { MCAP_MAGIC } from "./constants";
import { parseMagic, parseRecord } from "./parse";
import { DecompressHandlers, IReadable, TypedMcapRecords } from "./types";
Expand Down Expand Up @@ -111,7 +112,7 @@ export class McapIndexedReader {
headerPrefix.byteOffset,
headerPrefix.byteLength,
);
void parseMagic(headerPrefixView, 0);
void parseMagic(new Reader(headerPrefixView));
const headerContentLength = headerPrefixView.getBigUint64(
MCAP_MAGIC.length + /* Opcode.HEADER */ 1,
true,
Expand All @@ -121,26 +122,19 @@ export class McapIndexedReader {

const headerRecord = await readable.read(BigInt(MCAP_MAGIC.length), headerReadLength);
headerEndOffset = BigInt(MCAP_MAGIC.length) + headerReadLength;
const headerResult = parseRecord({
view: new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength),
startOffset: 0,
validateCrcs: true,
});
if (headerResult.record?.type !== "Header") {
const headerReader = new Reader(
new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength),
);
const headerResult = parseRecord(headerReader, true);
if (headerResult?.type !== "Header") {
throw new Error(
`Unable to read header at beginning of file; found ${
headerResult.record?.type ?? "nothing"
}`,
`Unable to read header at beginning of file; found ${headerResult?.type ?? "nothing"}`,
);
}
if (headerResult.usedBytes !== headerRecord.byteLength) {
throw new Error(
`${
headerRecord.byteLength - headerResult.usedBytes
} bytes remaining after parsing header`,
);
if (headerReader.bytesRemaining() !== 0) {
throw new Error(`${headerReader.bytesRemaining()} bytes remaining after parsing header`);
}
header = headerResult.record;
header = headerResult;
}

function errorWithLibrary(message: string): Error {
Expand Down Expand Up @@ -179,33 +173,32 @@ export class McapIndexedReader {
}

try {
void parseMagic(footerAndMagicView, footerAndMagicView.byteLength - MCAP_MAGIC.length);
void parseMagic(
new Reader(footerAndMagicView, footerAndMagicView.byteLength - MCAP_MAGIC.length),
);
} catch (error) {
throw errorWithLibrary((error as Error).message);
}

let footer: TypedMcapRecords["Footer"];
{
const footerResult = parseRecord({
view: footerAndMagicView,
startOffset: 0,
validateCrcs: true,
});
if (footerResult.record?.type !== "Footer") {
const footerReader = new Reader(footerAndMagicView);
const footerRecord = parseRecord(footerReader, true);
if (footerRecord?.type !== "Footer") {
throw errorWithLibrary(
`Unable to read footer from end of file (offset ${footerOffset}); found ${
footerResult.record?.type ?? "nothing"
footerRecord?.type ?? "nothing"
}`,
);
}
if (footerResult.usedBytes !== footerAndMagicView.byteLength - MCAP_MAGIC.length) {
if (footerReader.bytesRemaining() !== MCAP_MAGIC.length) {
throw errorWithLibrary(
`${
footerAndMagicView.byteLength - MCAP_MAGIC.length - footerResult.usedBytes
footerReader.bytesRemaining() - MCAP_MAGIC.length
} bytes remaining after parsing footer`,
);
}
footer = footerResult.record;
footer = footerRecord;
}
if (footer.summaryStart === 0n) {
throw errorWithLibrary("File is not indexed");
Expand Down Expand Up @@ -261,6 +254,7 @@ export class McapIndexedReader {
dataEndAndSummarySection.byteOffset,
dataEndAndSummarySection.byteLength,
);
const indexReader = new Reader(indexView);

const channelsById = new Map<number, TypedMcapRecords["Channel"]>();
const schemasById = new Map<number, TypedMcapRecords["Schema"]>();
Expand All @@ -271,46 +265,42 @@ export class McapIndexedReader {
let statistics: TypedMcapRecords["Statistics"] | undefined;
let dataSectionCrc: number | undefined;

let offset = 0;
for (
let result;
(result = parseRecord({ view: indexView, startOffset: offset, validateCrcs: true })),
result.record;
offset += result.usedBytes
) {
if (offset === 0 && result.record.type !== "DataEnd") {
let first = true;
let result;
while ((result = parseRecord(indexReader, true))) {
if (first && result.type !== "DataEnd") {
throw errorWithLibrary(
`Expected DataEnd record to precede summary section, but found ${result.record.type}`,
`Expected DataEnd record to precede summary section, but found ${result.type}`,
);
}
switch (result.record.type) {
first = false;
switch (result.type) {
case "Schema":
schemasById.set(result.record.id, result.record);
schemasById.set(result.id, result);
break;
case "Channel":
channelsById.set(result.record.id, result.record);
channelsById.set(result.id, result);
break;
case "ChunkIndex":
chunkIndexes.push(result.record);
chunkIndexes.push(result);
break;
case "AttachmentIndex":
attachmentIndexes.push(result.record);
attachmentIndexes.push(result);
break;
case "MetadataIndex":
metadataIndexes.push(result.record);
metadataIndexes.push(result);
break;
case "Statistics":
if (statistics) {
throw errorWithLibrary("Duplicate Statistics record");
}
statistics = result.record;
statistics = result;
break;
case "SummaryOffset":
summaryOffsetsByOpcode.set(result.record.groupOpcode, result.record);
summaryOffsetsByOpcode.set(result.groupOpcode, result);
break;
case "DataEnd":
dataSectionCrc =
result.record.dataSectionCrc === 0 ? undefined : result.record.dataSectionCrc;
dataSectionCrc = result.dataSectionCrc === 0 ? undefined : result.dataSectionCrc;
break;
case "Header":
case "Footer":
Expand All @@ -319,13 +309,13 @@ export class McapIndexedReader {
case "MessageIndex":
case "Attachment":
case "Metadata":
throw errorWithLibrary(`${result.record.type} record not allowed in index section`);
throw errorWithLibrary(`${result.type} record not allowed in index section`);
case "Unknown":
break;
}
}
if (offset !== indexView.byteLength) {
throw errorWithLibrary(`${indexView.byteLength - offset} bytes remaining in index section`);
if (indexReader.bytesRemaining() !== 0) {
throw errorWithLibrary(`${indexReader.bytesRemaining()} bytes remaining in index section`);
}

return new McapIndexedReader({
Expand Down Expand Up @@ -395,6 +385,7 @@ export class McapIndexedReader {
// cursor becomes active (i.e. when we first need to access messages from the chunk) and removed
// when the cursor is removed from the heap.
const chunkViewCache = new Map<bigint, DataView>();
const chunkReader = new Reader(new DataView(new ArrayBuffer(0)));
for (let cursor; (cursor = chunkCursors.peek()); ) {
if (!cursor.hasMessageIndexes()) {
// If we encounter a chunk whose message indexes have not been loaded yet, load them and re-organize the heap.
Expand All @@ -421,27 +412,24 @@ export class McapIndexedReader {
`Message offset beyond chunk bounds (log time ${logTime}, offset ${offset}, chunk data length ${chunkView.byteLength}) in chunk at offset ${cursor.chunkIndex.chunkStartOffset}`,
);
}
const result = parseRecord({
view: chunkView,
startOffset: Number(offset),
validateCrcs: validateCrcs ?? true,
});
if (!result.record) {
chunkReader.reset(chunkView, Number(offset));
const record = parseRecord(chunkReader, validateCrcs ?? true);
if (!record) {
throw this.#errorWithLibrary(
`Unable to parse record at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset}`,
);
}
if (result.record.type !== "Message") {
if (record.type !== "Message") {
throw this.#errorWithLibrary(
`Unexpected record type ${result.record.type} in message index (time ${logTime}, offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
`Unexpected record type ${record.type} in message index (time ${logTime}, offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
);
}
if (result.record.logTime !== logTime) {
if (record.logTime !== logTime) {
throw this.#errorWithLibrary(
`Message log time ${result.record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
`Message log time ${record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
);
}
yield result.record;
yield record;

if (cursor.hasMoreMessages()) {
// There is no need to reorganize the heap when chunks are ordered and not overlapping.
Expand All @@ -468,19 +456,18 @@ export class McapIndexedReader {
continue;
}
const metadataData = await this.#readable.read(metadataIndex.offset, metadataIndex.length);
const metadataResult = parseRecord({
view: new DataView(metadataData.buffer, metadataData.byteOffset, metadataData.byteLength),
startOffset: 0,
validateCrcs: false,
});
if (metadataResult.record?.type !== "Metadata") {
const metadataReader = new Reader(
new DataView(metadataData.buffer, metadataData.byteOffset, metadataData.byteLength),
);
const metadataRecord = parseRecord(metadataReader, false);
if (metadataRecord?.type !== "Metadata") {
throw this.#errorWithLibrary(
`Metadata data at offset ${
metadataIndex.offset
} does not point to metadata record (found ${String(metadataResult.record?.type)})`,
} does not point to metadata record (found ${String(metadataRecord?.type)})`,
);
}
yield metadataResult.record;
yield metadataRecord;
}
}

Expand Down Expand Up @@ -519,23 +506,18 @@ export class McapIndexedReader {
attachmentIndex.offset,
attachmentIndex.length,
);
const attachmentResult = parseRecord({
view: new DataView(
attachmentData.buffer,
attachmentData.byteOffset,
attachmentData.byteLength,
),
startOffset: 0,
validateCrcs: validateCrcs ?? true,
});
if (attachmentResult.record?.type !== "Attachment") {
const attachmentReader = new Reader(
new DataView(attachmentData.buffer, attachmentData.byteOffset, attachmentData.byteLength),
);
const attachmentRecord = parseRecord(attachmentReader, validateCrcs ?? true);
if (attachmentRecord?.type !== "Attachment") {
throw this.#errorWithLibrary(
`Attachment data at offset ${
attachmentIndex.offset
} does not point to attachment record (found ${String(attachmentResult.record?.type)})`,
} does not point to attachment record (found ${String(attachmentRecord?.type)})`,
);
}
yield attachmentResult.record;
yield attachmentRecord;
}
}

Expand All @@ -547,20 +529,19 @@ export class McapIndexedReader {
chunkIndex.chunkStartOffset,
chunkIndex.chunkLength,
);
const chunkResult = parseRecord({
view: new DataView(chunkData.buffer, chunkData.byteOffset, chunkData.byteLength),
startOffset: 0,
validateCrcs: options?.validateCrcs ?? true,
});
if (chunkResult.record?.type !== "Chunk") {
const chunkReader = new Reader(
new DataView(chunkData.buffer, chunkData.byteOffset, chunkData.byteLength),
);
const chunkRecord = parseRecord(chunkReader, options?.validateCrcs ?? true);
if (chunkRecord?.type !== "Chunk") {
throw this.#errorWithLibrary(
`Chunk start offset ${
chunkIndex.chunkStartOffset
} does not point to chunk record (found ${String(chunkResult.record?.type)})`,
} does not point to chunk record (found ${String(chunkRecord?.type)})`,
);
}

const chunk = chunkResult.record;
const chunk = chunkRecord;
let buffer = chunk.records;
if (chunk.compression !== "" && buffer.byteLength > 0) {
const decompress = this.#decompressHandlers?.[chunk.compression];
Expand Down
Loading

0 comments on commit 4fcdd33

Please sign in to comment.