diff --git a/typescript/core/src/ChunkCursor.ts b/typescript/core/src/ChunkCursor.ts index 3a33303010..2113cc2b42 100644 --- a/typescript/core/src/ChunkCursor.ts +++ b/typescript/core/src/ChunkCursor.ts @@ -1,3 +1,4 @@ +import Reader from "./Reader"; import { parseRecord } from "./parse"; import { sortedIndexBy } from "./sortedIndexBy"; import { sortedLastIndexBy } from "./sortedLastIndex"; @@ -136,31 +137,25 @@ export class ChunkCursor { messageIndexes.byteLength, ); - let offset = 0; + const reader = new Reader(messageIndexesView); const arrayOfMessageOffsets: [logTime: bigint, offset: bigint][][] = []; - for ( - let result; - (result = parseRecord({ view: messageIndexesView, startOffset: offset, validateCrcs: true })), - result.record; - offset += result.usedBytes - ) { - if (result.record.type !== "MessageIndex") { + let record; + while ((record = parseRecord(reader, true))) { + if (record.type !== "MessageIndex") { continue; } if ( - result.record.records.length === 0 || - (this.#relevantChannels && !this.#relevantChannels.has(result.record.channelId)) + record.records.length === 0 || + (this.#relevantChannels && !this.#relevantChannels.has(record.channelId)) ) { continue; } - arrayOfMessageOffsets.push(result.record.records); + arrayOfMessageOffsets.push(record.records); } - if (offset !== messageIndexesView.byteLength) { - throw new Error( - `${messageIndexesView.byteLength - offset} bytes remaining in message index section`, - ); + if (reader.bytesRemaining() !== 0) { + throw new Error(`${reader.bytesRemaining()} bytes remaining in message index section`); } this.#orderedMessageOffsets = arrayOfMessageOffsets diff --git a/typescript/core/src/McapIndexedReader.ts b/typescript/core/src/McapIndexedReader.ts index 5955300a40..51ec00044e 100644 --- a/typescript/core/src/McapIndexedReader.ts +++ b/typescript/core/src/McapIndexedReader.ts @@ -2,6 +2,7 @@ import { crc32, crc32Final, crc32Init, crc32Update } from "@foxglove/crc"; import Heap from "heap-js"; import { ChunkCursor } from "./ChunkCursor"; +import Reader from "./Reader"; import { MCAP_MAGIC } from "./constants"; import { parseMagic, parseRecord } from "./parse"; import { DecompressHandlers, IReadable, TypedMcapRecords } from "./types"; @@ -111,7 +112,7 @@ export class McapIndexedReader { headerPrefix.byteOffset, headerPrefix.byteLength, ); - void parseMagic(headerPrefixView, 0); + void parseMagic(new Reader(headerPrefixView)); const headerContentLength = headerPrefixView.getBigUint64( MCAP_MAGIC.length + /* Opcode.HEADER */ 1, true, @@ -121,26 +122,19 @@ export class McapIndexedReader { const headerRecord = await readable.read(BigInt(MCAP_MAGIC.length), headerReadLength); headerEndOffset = BigInt(MCAP_MAGIC.length) + headerReadLength; - const headerResult = parseRecord({ - view: new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength), - startOffset: 0, - validateCrcs: true, - }); - if (headerResult.record?.type !== "Header") { + const headerReader = new Reader( + new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength), + ); + const headerResult = parseRecord(headerReader, true); + if (headerResult?.type !== "Header") { throw new Error( - `Unable to read header at beginning of file; found ${ - headerResult.record?.type ?? "nothing" - }`, + `Unable to read header at beginning of file; found ${headerResult?.type ?? "nothing"}`, ); } - if (headerResult.usedBytes !== headerRecord.byteLength) { - throw new Error( - `${ - headerRecord.byteLength - headerResult.usedBytes - } bytes remaining after parsing header`, - ); + if (headerReader.bytesRemaining() !== 0) { + throw new Error(`${headerReader.bytesRemaining()} bytes remaining after parsing header`); } - header = headerResult.record; + header = headerResult; } function errorWithLibrary(message: string): Error { @@ -179,33 +173,32 @@ export class McapIndexedReader { } try { - void parseMagic(footerAndMagicView, footerAndMagicView.byteLength - MCAP_MAGIC.length); + void parseMagic( + new Reader(footerAndMagicView, footerAndMagicView.byteLength - MCAP_MAGIC.length), + ); } catch (error) { throw errorWithLibrary((error as Error).message); } let footer: TypedMcapRecords["Footer"]; { - const footerResult = parseRecord({ - view: footerAndMagicView, - startOffset: 0, - validateCrcs: true, - }); - if (footerResult.record?.type !== "Footer") { + const footerReader = new Reader(footerAndMagicView); + const footerRecord = parseRecord(footerReader, true); + if (footerRecord?.type !== "Footer") { throw errorWithLibrary( `Unable to read footer from end of file (offset ${footerOffset}); found ${ - footerResult.record?.type ?? "nothing" + footerRecord?.type ?? "nothing" }`, ); } - if (footerResult.usedBytes !== footerAndMagicView.byteLength - MCAP_MAGIC.length) { + if (footerReader.bytesRemaining() !== MCAP_MAGIC.length) { throw errorWithLibrary( `${ - footerAndMagicView.byteLength - MCAP_MAGIC.length - footerResult.usedBytes + footerReader.bytesRemaining() - MCAP_MAGIC.length } bytes remaining after parsing footer`, ); } - footer = footerResult.record; + footer = footerRecord; } if (footer.summaryStart === 0n) { throw errorWithLibrary("File is not indexed"); @@ -261,6 +254,7 @@ export class McapIndexedReader { dataEndAndSummarySection.byteOffset, dataEndAndSummarySection.byteLength, ); + const indexReader = new Reader(indexView); const channelsById = new Map(); const schemasById = new Map(); @@ -271,46 +265,42 @@ export class McapIndexedReader { let statistics: TypedMcapRecords["Statistics"] | undefined; let dataSectionCrc: number | undefined; - let offset = 0; - for ( - let result; - (result = parseRecord({ view: indexView, startOffset: offset, validateCrcs: true })), - result.record; - offset += result.usedBytes - ) { - if (offset === 0 && result.record.type !== "DataEnd") { + let first = true; + let result; + while ((result = parseRecord(indexReader, true))) { + if (first && result.type !== "DataEnd") { throw errorWithLibrary( - `Expected DataEnd record to precede summary section, but found ${result.record.type}`, + `Expected DataEnd record to precede summary section, but found ${result.type}`, ); } - switch (result.record.type) { + first = false; + switch (result.type) { case "Schema": - schemasById.set(result.record.id, result.record); + schemasById.set(result.id, result); break; case "Channel": - channelsById.set(result.record.id, result.record); + channelsById.set(result.id, result); break; case "ChunkIndex": - chunkIndexes.push(result.record); + chunkIndexes.push(result); break; case "AttachmentIndex": - attachmentIndexes.push(result.record); + attachmentIndexes.push(result); break; case "MetadataIndex": - metadataIndexes.push(result.record); + metadataIndexes.push(result); break; case "Statistics": if (statistics) { throw errorWithLibrary("Duplicate Statistics record"); } - statistics = result.record; + statistics = result; break; case "SummaryOffset": - summaryOffsetsByOpcode.set(result.record.groupOpcode, result.record); + summaryOffsetsByOpcode.set(result.groupOpcode, result); break; case "DataEnd": - dataSectionCrc = - result.record.dataSectionCrc === 0 ? undefined : result.record.dataSectionCrc; + dataSectionCrc = result.dataSectionCrc === 0 ? undefined : result.dataSectionCrc; break; case "Header": case "Footer": @@ -319,13 +309,13 @@ export class McapIndexedReader { case "MessageIndex": case "Attachment": case "Metadata": - throw errorWithLibrary(`${result.record.type} record not allowed in index section`); + throw errorWithLibrary(`${result.type} record not allowed in index section`); case "Unknown": break; } } - if (offset !== indexView.byteLength) { - throw errorWithLibrary(`${indexView.byteLength - offset} bytes remaining in index section`); + if (indexReader.bytesRemaining() !== 0) { + throw errorWithLibrary(`${indexReader.bytesRemaining()} bytes remaining in index section`); } return new McapIndexedReader({ @@ -395,6 +385,7 @@ export class McapIndexedReader { // cursor becomes active (i.e. when we first need to access messages from the chunk) and removed // when the cursor is removed from the heap. const chunkViewCache = new Map(); + const chunkReader = new Reader(new DataView(new ArrayBuffer(0))); for (let cursor; (cursor = chunkCursors.peek()); ) { if (!cursor.hasMessageIndexes()) { // If we encounter a chunk whose message indexes have not been loaded yet, load them and re-organize the heap. @@ -421,27 +412,24 @@ export class McapIndexedReader { `Message offset beyond chunk bounds (log time ${logTime}, offset ${offset}, chunk data length ${chunkView.byteLength}) in chunk at offset ${cursor.chunkIndex.chunkStartOffset}`, ); } - const result = parseRecord({ - view: chunkView, - startOffset: Number(offset), - validateCrcs: validateCrcs ?? true, - }); - if (!result.record) { + chunkReader.reset(chunkView, Number(offset)); + const record = parseRecord(chunkReader, validateCrcs ?? true); + if (!record) { throw this.#errorWithLibrary( `Unable to parse record at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset}`, ); } - if (result.record.type !== "Message") { + if (record.type !== "Message") { throw this.#errorWithLibrary( - `Unexpected record type ${result.record.type} in message index (time ${logTime}, offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, + `Unexpected record type ${record.type} in message index (time ${logTime}, offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, ); } - if (result.record.logTime !== logTime) { + if (record.logTime !== logTime) { throw this.#errorWithLibrary( - `Message log time ${result.record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, + `Message log time ${record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, ); } - yield result.record; + yield record; if (cursor.hasMoreMessages()) { // There is no need to reorganize the heap when chunks are ordered and not overlapping. @@ -468,19 +456,18 @@ export class McapIndexedReader { continue; } const metadataData = await this.#readable.read(metadataIndex.offset, metadataIndex.length); - const metadataResult = parseRecord({ - view: new DataView(metadataData.buffer, metadataData.byteOffset, metadataData.byteLength), - startOffset: 0, - validateCrcs: false, - }); - if (metadataResult.record?.type !== "Metadata") { + const metadataReader = new Reader( + new DataView(metadataData.buffer, metadataData.byteOffset, metadataData.byteLength), + ); + const metadataRecord = parseRecord(metadataReader, false); + if (metadataRecord?.type !== "Metadata") { throw this.#errorWithLibrary( `Metadata data at offset ${ metadataIndex.offset - } does not point to metadata record (found ${String(metadataResult.record?.type)})`, + } does not point to metadata record (found ${String(metadataRecord?.type)})`, ); } - yield metadataResult.record; + yield metadataRecord; } } @@ -519,23 +506,18 @@ export class McapIndexedReader { attachmentIndex.offset, attachmentIndex.length, ); - const attachmentResult = parseRecord({ - view: new DataView( - attachmentData.buffer, - attachmentData.byteOffset, - attachmentData.byteLength, - ), - startOffset: 0, - validateCrcs: validateCrcs ?? true, - }); - if (attachmentResult.record?.type !== "Attachment") { + const attachmentReader = new Reader( + new DataView(attachmentData.buffer, attachmentData.byteOffset, attachmentData.byteLength), + ); + const attachmentRecord = parseRecord(attachmentReader, validateCrcs ?? true); + if (attachmentRecord?.type !== "Attachment") { throw this.#errorWithLibrary( `Attachment data at offset ${ attachmentIndex.offset - } does not point to attachment record (found ${String(attachmentResult.record?.type)})`, + } does not point to attachment record (found ${String(attachmentRecord?.type)})`, ); } - yield attachmentResult.record; + yield attachmentRecord; } } @@ -547,20 +529,19 @@ export class McapIndexedReader { chunkIndex.chunkStartOffset, chunkIndex.chunkLength, ); - const chunkResult = parseRecord({ - view: new DataView(chunkData.buffer, chunkData.byteOffset, chunkData.byteLength), - startOffset: 0, - validateCrcs: options?.validateCrcs ?? true, - }); - if (chunkResult.record?.type !== "Chunk") { + const chunkReader = new Reader( + new DataView(chunkData.buffer, chunkData.byteOffset, chunkData.byteLength), + ); + const chunkRecord = parseRecord(chunkReader, options?.validateCrcs ?? true); + if (chunkRecord?.type !== "Chunk") { throw this.#errorWithLibrary( `Chunk start offset ${ chunkIndex.chunkStartOffset - } does not point to chunk record (found ${String(chunkResult.record?.type)})`, + } does not point to chunk record (found ${String(chunkRecord?.type)})`, ); } - const chunk = chunkResult.record; + const chunk = chunkRecord; let buffer = chunk.records; if (chunk.compression !== "" && buffer.byteLength > 0) { const decompress = this.#decompressHandlers?.[chunk.compression]; diff --git a/typescript/core/src/McapStreamReader.ts b/typescript/core/src/McapStreamReader.ts index ddd412ef38..6ed8292ce9 100644 --- a/typescript/core/src/McapStreamReader.ts +++ b/typescript/core/src/McapStreamReader.ts @@ -1,6 +1,6 @@ import { crc32 } from "@foxglove/crc"; -import StreamBuffer from "./StreamBuffer"; +import Reader from "./Reader"; import { MCAP_MAGIC } from "./constants"; import { parseMagic, parseRecord } from "./parse"; import { Channel, DecompressHandlers, McapMagic, TypedMcapRecord, TypedMcapRecords } from "./types"; @@ -50,7 +50,9 @@ type McapReaderOptions = { * ``` */ export default class McapStreamReader { - #buffer = new StreamBuffer(MCAP_MAGIC.length * 2); + #buffer = new ArrayBuffer(MCAP_MAGIC.length * 2); + #view = new DataView(this.#buffer); + #reader = new Reader(this.#view, MCAP_MAGIC.length * 2); // Cursor starts at end of initial buffer #decompressHandlers; #includeChunks; #validateCrcs; @@ -78,7 +80,7 @@ export default class McapStreamReader { /** @returns The number of bytes that have been received by `append()` but not yet parsed. */ bytesRemaining(): number { - return this.#buffer.bytesRemaining(); + return this.#reader.bytesRemaining(); } /** @@ -89,7 +91,41 @@ export default class McapStreamReader { if (this.#doneReading) { throw new Error("Already done reading"); } - this.#buffer.append(data); + this.#appendOrShift(data); + this.#reader.reset(this.#view); + } + #appendOrShift(data: Uint8Array): void { + /** Add data to the buffer, shifting existing data or reallocating if necessary. */ + const remainingBytes = this.#reader.bytesRemaining(); + const totalNeededBytes = remainingBytes + data.byteLength; + + if (totalNeededBytes <= this.#buffer.byteLength) { + // Data fits in the current buffer + if (this.#view.byteOffset + totalNeededBytes <= this.#buffer.byteLength) { + // Data fits by appending only + const array = new Uint8Array(this.#buffer, this.#view.byteOffset); + array.set(data, remainingBytes); + this.#view = new DataView(this.#buffer, this.#view.byteOffset, totalNeededBytes); + } else { + // Data fits but requires moving existing data to start of buffer + const existingData = new Uint8Array(this.#buffer, this.#view.byteOffset, remainingBytes); + const array = new Uint8Array(this.#buffer); + array.set(existingData, 0); + array.set(data, existingData.byteLength); + this.#view = new DataView(this.#buffer, 0, totalNeededBytes); + } + } else { + // New data doesn't fit, copy to a new buffer + + // Currently, the new buffer size may be smaller than the old size. For future optimizations, + // we could consider making the buffer size increase monotonically. + this.#buffer = new ArrayBuffer(totalNeededBytes * 2); + const array = new Uint8Array(this.#buffer); + const existingData = new Uint8Array(this.#view.buffer, this.#view.byteOffset, remainingBytes); + array.set(existingData, 0); + array.set(data, existingData.byteLength); + this.#view = new DataView(this.#buffer, 0, totalNeededBytes); + } } /** @@ -129,11 +165,10 @@ export default class McapStreamReader { *#read(): Generator { if (!this.#noMagicPrefix) { - let magic: McapMagic | undefined, usedBytes: number | undefined; - while ((({ magic, usedBytes } = parseMagic(this.#buffer.view, 0)), !magic)) { + let magic: McapMagic | undefined; + while (((magic = parseMagic(this.#reader)), !magic)) { yield; } - this.#buffer.consume(usedBytes); } let header: TypedMcapRecords["Header"] | undefined; @@ -144,20 +179,10 @@ export default class McapStreamReader { for (;;) { let record; - { - let usedBytes; - while ( - (({ record, usedBytes } = parseRecord({ - view: this.#buffer.view, - startOffset: 0, - validateCrcs: this.#validateCrcs, - })), - !record) - ) { - yield; - } - this.#buffer.consume(usedBytes); + while (((record = parseRecord(this.#reader, this.#validateCrcs)), !record)) { + yield; } + switch (record.type) { case "Unknown": break; @@ -206,18 +231,10 @@ export default class McapStreamReader { } } const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength); - let chunkOffset = 0; - for ( - let chunkResult; - (chunkResult = parseRecord({ - view, - startOffset: chunkOffset, - validateCrcs: this.#validateCrcs, - })), - chunkResult.record; - chunkOffset += chunkResult.usedBytes - ) { - switch (chunkResult.record.type) { + const chunkReader = new Reader(view); + let chunkRecord; + while ((chunkRecord = parseRecord(chunkReader, this.#validateCrcs))) { + switch (chunkRecord.type) { case "Unknown": break; case "Header": @@ -232,34 +249,31 @@ export default class McapStreamReader { case "MetadataIndex": case "SummaryOffset": case "DataEnd": - throw errorWithLibrary( - `${chunkResult.record.type} record not allowed inside a chunk`, - ); + throw errorWithLibrary(`${chunkRecord.type} record not allowed inside a chunk`); case "Schema": case "Channel": case "Message": - yield chunkResult.record; + yield chunkRecord; break; } } - if (chunkOffset !== buffer.byteLength) { - throw errorWithLibrary(`${buffer.byteLength - chunkOffset} bytes remaining in chunk`); + if (chunkReader.bytesRemaining() !== 0) { + throw errorWithLibrary(`${chunkReader.bytesRemaining()} bytes remaining in chunk`); } break; } case "Footer": try { - let magic, usedBytes; - while ((({ magic, usedBytes } = parseMagic(this.#buffer.view, 0)), !magic)) { + let magic; + while (((magic = parseMagic(this.#reader)), !magic)) { yield; } - this.#buffer.consume(usedBytes); } catch (error) { throw errorWithLibrary((error as Error).message); } - if (this.#buffer.bytesRemaining() !== 0) { + if (this.#reader.bytesRemaining() !== 0) { throw errorWithLibrary( - `${this.#buffer.bytesRemaining()} bytes remaining after MCAP footer and trailing magic`, + `${this.#reader.bytesRemaining()} bytes remaining after MCAP footer and trailing magic`, ); } return record; diff --git a/typescript/core/src/McapWriter.test.ts b/typescript/core/src/McapWriter.test.ts index 57bdcd82fc..e16bc515d5 100644 --- a/typescript/core/src/McapWriter.test.ts +++ b/typescript/core/src/McapWriter.test.ts @@ -3,6 +3,7 @@ import { crc32 } from "@foxglove/crc"; import { McapIndexedReader } from "./McapIndexedReader"; import McapStreamReader from "./McapStreamReader"; import { McapWriter } from "./McapWriter"; +import Reader from "./Reader"; import { TempBuffer } from "./TempBuffer"; import { MCAP_MAGIC, Opcode } from "./constants"; import { parseMagic, parseRecord } from "./parse"; @@ -278,13 +279,12 @@ describe("McapWriter", () => { const array = tempBuffer.get(); const view = new DataView(array.buffer, array.byteOffset, array.byteLength); + const reader = new Reader(view); const records: TypedMcapRecord[] = []; - for ( - let offset = parseMagic(view, 0).usedBytes, result; - (result = parseRecord({ view, startOffset: offset, validateCrcs: true })), result.record; - offset += result.usedBytes - ) { - records.push(result.record); + parseMagic(reader); + let result; + while ((result = parseRecord(reader, true))) { + records.push(result); } const expectedChunkData = new Uint8Array([ diff --git a/typescript/core/src/Reader.ts b/typescript/core/src/Reader.ts index fcc2887237..d0136c648b 100644 --- a/typescript/core/src/Reader.ts +++ b/typescript/core/src/Reader.ts @@ -7,13 +7,27 @@ const textDecoder = new TextDecoder(); export default class Reader { #view: DataView; + #viewU8: Uint8Array; offset: number; constructor(view: DataView, offset = 0) { this.#view = view; + this.#viewU8 = new Uint8Array(view.buffer, view.byteOffset, view.byteLength); this.offset = offset; } + // Should be ~identical to the constructor, it allows us to reinitialize the reader when + // the view changes, without creating a new instance, avoiding allocation / GC overhead + reset(view: DataView, offset = 0): void { + this.#view = view; + this.#viewU8 = new Uint8Array(view.buffer, view.byteOffset, view.byteLength); + this.offset = offset; + } + + bytesRemaining(): number { + return this.#viewU8.length - this.offset; + } + uint8(): number { const value = this.#view.getUint8(this.offset); this.offset += 1; @@ -40,14 +54,12 @@ export default class Reader { string(): string { const length = this.uint32(); - if (this.offset + length > this.#view.byteLength) { + if (length === 0) { + return ""; + } else if (length > this.bytesRemaining()) { throw new Error(`String length ${length} exceeds bounds of buffer`); } - const value = textDecoder.decode( - new Uint8Array(this.#view.buffer, this.#view.byteOffset + this.offset, length), - ); - this.offset += length; - return value; + return textDecoder.decode(this.u8ArrayBorrow(length)); } keyValuePairs(readKey: (reader: Reader) => K, readValue: (reader: Reader) => V): [K, V][] { @@ -103,4 +115,18 @@ export default class Reader { } return result; } + + // Read a borrowed Uint8Array, useful temp references or borrow semantics + u8ArrayBorrow(length: number): Uint8Array { + const result = this.#viewU8.subarray(this.offset, this.offset + length); + this.offset += length; + return result; + } + + // Read a copied Uint8Array from the underlying buffer, use when you need to keep the data around + u8ArrayCopy(length: number): Uint8Array { + const result = this.#viewU8.slice(this.offset, this.offset + length); + this.offset += length; + return result; + } } diff --git a/typescript/core/src/StreamBuffer.test.ts b/typescript/core/src/StreamBuffer.test.ts deleted file mode 100644 index a45175b3e3..0000000000 --- a/typescript/core/src/StreamBuffer.test.ts +++ /dev/null @@ -1,47 +0,0 @@ -import StreamBuffer from "./StreamBuffer"; - -function toArray(view: DataView) { - return new Uint8Array(view.buffer, view.byteOffset, view.byteLength); -} - -describe("ByteStorage", () => { - it("handles basic append and consume", () => { - const buffer = new StreamBuffer(); - expect(buffer.bytesRemaining()).toBe(0); - - buffer.append(new Uint8Array([1, 2, 3])); - expect(buffer.bytesRemaining()).toBe(3); - expect(() => { - buffer.consume(4); - }).toThrow(); - - expect(toArray(buffer.view)).toEqual(new Uint8Array([1, 2, 3])); - buffer.consume(3); - expect(buffer.bytesRemaining()).toBe(0); - }); - - it("handles partial consume", () => { - const buffer = new StreamBuffer(); - - buffer.append(new Uint8Array([1, 2, 3, 4, 5])); - expect(buffer.bytesRemaining()).toBe(5); - buffer.consume(2); - expect(buffer.bytesRemaining()).toBe(3); - - expect(toArray(buffer.view)).toEqual(new Uint8Array([3, 4, 5])); - buffer.consume(3); - expect(buffer.bytesRemaining()).toBe(0); - }); - - it("reuses buffer within allocated capacity", () => { - const buffer = new StreamBuffer(5); - const rawBuffer = buffer.view.buffer; - buffer.append(new Uint8Array([1, 2])); - expect(buffer.view.buffer).toBe(rawBuffer); - buffer.append(new Uint8Array([3, 4, 5])); - expect(buffer.view.buffer).toBe(rawBuffer); - buffer.append(new Uint8Array([6, 7])); - expect(buffer.view.buffer).not.toBe(rawBuffer); - expect(toArray(buffer.view)).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6, 7])); - }); -}); diff --git a/typescript/core/src/StreamBuffer.ts b/typescript/core/src/StreamBuffer.ts deleted file mode 100644 index 98eaa785d5..0000000000 --- a/typescript/core/src/StreamBuffer.ts +++ /dev/null @@ -1,58 +0,0 @@ -/** - * A growable buffer for use when processing a stream of data. - */ -export default class StreamBuffer { - #buffer: ArrayBuffer; - public view: DataView; - - constructor(initialCapacity = 0) { - this.#buffer = new ArrayBuffer(initialCapacity); - this.view = new DataView(this.#buffer, 0, 0); - } - - bytesRemaining(): number { - return this.view.byteLength; - } - - /** Mark some data as consumed, so the memory can be reused when new data is appended. */ - consume(count: number): void { - this.view = new DataView( - this.#buffer, - this.view.byteOffset + count, - this.view.byteLength - count, - ); - } - - /** Add data to the buffer, shifting existing data or reallocating if necessary. */ - append(data: Uint8Array): void { - if (this.view.byteOffset + this.view.byteLength + data.byteLength <= this.#buffer.byteLength) { - // Data fits by appending only - const array = new Uint8Array(this.view.buffer, this.view.byteOffset); - array.set(data, this.view.byteLength); - this.view = new DataView( - this.#buffer, - this.view.byteOffset, - this.view.byteLength + data.byteLength, - ); - } else if (this.view.byteLength + data.byteLength <= this.#buffer.byteLength) { - // Data fits in allocated buffer but requires moving existing data to start of buffer - const oldData = new Uint8Array(this.#buffer, this.view.byteOffset, this.view.byteLength); - const array = new Uint8Array(this.#buffer); - array.set(oldData, 0); - array.set(data, oldData.byteLength); - this.view = new DataView(this.#buffer, 0, this.view.byteLength + data.byteLength); - } else { - // New data doesn't fit, copy to a new buffer - - // Currently, the new buffer size may be smaller than the old size. For future optimizations, - // we could consider making the buffer size increase monotonically. - - const oldData = new Uint8Array(this.#buffer, this.view.byteOffset, this.view.byteLength); - this.#buffer = new ArrayBuffer((this.view.byteLength + data.byteLength) * 2); - const array = new Uint8Array(this.#buffer); - array.set(oldData, 0); - array.set(data, oldData.byteLength); - this.view = new DataView(this.#buffer, 0, this.view.byteLength + data.byteLength); - } - } -} diff --git a/typescript/core/src/parse.ts b/typescript/core/src/parse.ts index 7f2fe80285..95d0105750 100644 --- a/typescript/core/src/parse.ts +++ b/typescript/core/src/parse.ts @@ -1,374 +1,419 @@ import { crc32 } from "@foxglove/crc"; import Reader from "./Reader"; -import { isKnownOpcode, MCAP_MAGIC, Opcode } from "./constants"; +import { MCAP_MAGIC, Opcode } from "./constants"; import { McapMagic, TypedMcapRecord } from "./types"; /** * Parse a MCAP magic string at `startOffset` in `view`. */ -export function parseMagic( - view: DataView, - startOffset: number, -): { magic: McapMagic; usedBytes: number } | { magic?: undefined; usedBytes: 0 } { - if (startOffset + MCAP_MAGIC.length > view.byteLength) { - return { usedBytes: 0 }; +export function parseMagic(reader: Reader): McapMagic | undefined { + if (reader.bytesRemaining() < MCAP_MAGIC.length) { + return undefined; } - if (!MCAP_MAGIC.every((val, i) => val === view.getUint8(startOffset + i))) { + const magic = reader.u8ArrayBorrow(MCAP_MAGIC.length); + if (!MCAP_MAGIC.every((val, i) => val === magic[i])) { throw new Error( `Expected MCAP magic '${MCAP_MAGIC.map((val) => val.toString(16).padStart(2, "0")).join( " ", - )}', found '${Array.from(MCAP_MAGIC, (_, i) => - view - .getUint8(startOffset + i) - .toString(16) - .padStart(2, "0"), - ).join(" ")}'`, + )}', found '${Array.from(magic, (_, i) => magic[i]!.toString(16).padStart(2, "0")).join( + " ", + )}'`, ); } - return { - magic: { specVersion: "0" }, - usedBytes: MCAP_MAGIC.length, - }; + return { specVersion: "0" }; } /** - * Parse a MCAP record beginning at `startOffset` in `view`. + * Parse a MCAP record from the given reader */ -export function parseRecord({ - view, - startOffset, - validateCrcs, -}: { - view: DataView; - startOffset: number; - validateCrcs: boolean; -}): { record: TypedMcapRecord; usedBytes: number } | { record?: undefined; usedBytes: 0 } { - if (startOffset + /*opcode*/ 1 + /*record content length*/ 8 >= view.byteLength) { - return { usedBytes: 0 }; +// NOTE: internal function in the hot path, (de)structuring args would be wasteful, acceptable perf/clarity tradeoff +// eslint-disable-next-line @foxglove/no-boolean-parameters +export function parseRecord(reader: Reader, validateCrcs = false): TypedMcapRecord | undefined { + const RECORD_HEADER_SIZE = 1 /*opcode*/ + 8; /*record content length*/ + if (reader.bytesRemaining() < RECORD_HEADER_SIZE) { + return undefined; } - const headerReader = new Reader(view, startOffset); + const start = reader.offset; + const opcode = reader.uint8(); + const recordLength = reader.uint64(); - const opcode = headerReader.uint8(); - - const recordLength = headerReader.uint64(); if (recordLength > Number.MAX_SAFE_INTEGER) { throw new Error(`Record content length ${recordLength} is too large`); } + const recordLengthNum = Number(recordLength); - const recordEndOffset = headerReader.offset + recordLengthNum; - if (recordEndOffset > view.byteLength) { - return { usedBytes: 0 }; + + if (reader.bytesRemaining() < recordLengthNum) { + reader.offset = start; // Rewind to the start of the record + return undefined; } - if (!isKnownOpcode(opcode)) { - const data = new Uint8Array( - view.buffer, - view.byteOffset + headerReader.offset, - recordLengthNum, - ); - const record: TypedMcapRecord = { - type: "Unknown", - opcode, - data, - }; - return { record, usedBytes: recordEndOffset - startOffset }; + let result: TypedMcapRecord; + switch (opcode as Opcode) { + case Opcode.HEADER: + result = parseHeader(reader, recordLengthNum); + break; + case Opcode.FOOTER: + result = parseFooter(reader, recordLengthNum); + break; + case Opcode.SCHEMA: + result = parseSchema(reader, recordLengthNum); + break; + case Opcode.CHANNEL: + result = parseChannel(reader, recordLengthNum); + break; + case Opcode.MESSAGE: + result = parseMessage(reader, recordLengthNum); + break; + case Opcode.CHUNK: + result = parseChunk(reader, recordLengthNum); + break; + case Opcode.MESSAGE_INDEX: + result = parseMessageIndex(reader, recordLengthNum); + break; + case Opcode.CHUNK_INDEX: + result = parseChunkIndex(reader, recordLengthNum); + break; + case Opcode.ATTACHMENT: + result = parseAttachment(reader, recordLengthNum, validateCrcs); + break; + case Opcode.ATTACHMENT_INDEX: + result = parseAttachmentIndex(reader, recordLengthNum); + break; + case Opcode.STATISTICS: + result = parseStatistics(reader, recordLengthNum); + break; + case Opcode.METADATA: + result = parseMetadata(reader, recordLengthNum); + break; + case Opcode.METADATA_INDEX: + result = parseMetadataIndex(reader, recordLengthNum); + break; + case Opcode.SUMMARY_OFFSET: + result = parseSummaryOffset(reader, recordLengthNum); + break; + case Opcode.DATA_END: + result = parseDataEnd(reader, recordLengthNum); + break; + default: + result = parseUnknown(reader, recordLengthNum, opcode); + break; } - const recordView = new DataView( - view.buffer, - view.byteOffset + headerReader.offset, - recordLengthNum, + // NOTE: a bit redundant, but ensures we've advanced by the full record length + // TODO: simplify this when we explore monomorphic paths + reader.offset = start + RECORD_HEADER_SIZE + recordLengthNum; + + return result; +} + +function parseUnknown(reader: Reader, recordLength: number, opcode: number): TypedMcapRecord { + const data = reader.u8ArrayBorrow(recordLength); + return { + type: "Unknown", + opcode, + data, + }; +} + +function parseHeader(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const profile = reader.string(); + const library = reader.string(); + reader.offset = startOffset + recordLength; + return { type: "Header", profile, library }; +} + +function parseFooter(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const summaryStart = reader.uint64(); + const summaryOffsetStart = reader.uint64(); + const summaryCrc = reader.uint32(); + reader.offset = startOffset + recordLength; + return { + type: "Footer", + summaryStart, + summaryOffsetStart, + summaryCrc, + }; +} + +function parseSchema(reader: Reader, recordLength: number): TypedMcapRecord { + const start = reader.offset; + const id = reader.uint16(); + const name = reader.string(); + const encoding = reader.string(); + const dataLen = reader.uint32(); + const end = reader.offset; + if (recordLength - (end - start) < dataLen) { + throw new Error(`Schema data length ${dataLen} exceeds bounds of record`); + } + const data = reader.u8ArrayCopy(dataLen); + reader.offset = start + recordLength; + + return { + type: "Schema", + id, + encoding, + name, + data, + }; +} + +function parseChannel(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const channelId = reader.uint16(); + const schemaId = reader.uint16(); + const topicName = reader.string(); + const messageEncoding = reader.string(); + const metadata = reader.map( + (r) => r.string(), + (r) => r.string(), ); - const reader = new Reader(recordView); - - switch (opcode) { - case Opcode.HEADER: { - const profile = reader.string(); - const library = reader.string(); - const record: TypedMcapRecord = { type: "Header", profile, library }; - return { record, usedBytes: recordEndOffset - startOffset }; - } + reader.offset = startOffset + recordLength; - case Opcode.FOOTER: { - const summaryStart = reader.uint64(); - const summaryOffsetStart = reader.uint64(); - const summaryCrc = reader.uint32(); - const record: TypedMcapRecord = { - type: "Footer", - summaryStart, - summaryOffsetStart, - summaryCrc, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } + return { + type: "Channel", + id: channelId, + schemaId, + topic: topicName, + messageEncoding, + metadata, + }; +} - case Opcode.SCHEMA: { - const id = reader.uint16(); - const name = reader.string(); - const encoding = reader.string(); - const dataLen = reader.uint32(); - if (reader.offset + dataLen > recordView.byteLength) { - throw new Error(`Schema data length ${dataLen} exceeds bounds of record`); - } - const data = new Uint8Array( - recordView.buffer, - recordView.byteOffset + reader.offset, - dataLen, - ).slice(); - reader.offset += dataLen; - - const record: TypedMcapRecord = { - type: "Schema", - id, - encoding, - name, - data, - }; - - return { record, usedBytes: recordEndOffset - startOffset }; - } +function parseMessage(reader: Reader, recordLength: number): TypedMcapRecord { + const MESSAGE_PREFIX_SIZE = 2 + 4 + 8 + 8; // channelId, sequence, logTime, publishTime + const channelId = reader.uint16(); + const sequence = reader.uint32(); + const logTime = reader.uint64(); + const publishTime = reader.uint64(); + const data = reader.u8ArrayCopy(recordLength - MESSAGE_PREFIX_SIZE); + return { + type: "Message", + channelId, + sequence, + logTime, + publishTime, + data, + }; +} - case Opcode.CHANNEL: { - const channelId = reader.uint16(); - const schemaId = reader.uint16(); - const topicName = reader.string(); - const messageEncoding = reader.string(); - const metadata = reader.map( - (r) => r.string(), - (r) => r.string(), - ); - - const record: TypedMcapRecord = { - type: "Channel", - id: channelId, - schemaId, - topic: topicName, - messageEncoding, - metadata, - }; - - return { record, usedBytes: recordEndOffset - startOffset }; - } +function parseChunk(reader: Reader, recordLength: number): TypedMcapRecord { + const start = reader.offset; + const startTime = reader.uint64(); + const endTime = reader.uint64(); + const uncompressedSize = reader.uint64(); + const uncompressedCrc = reader.uint32(); + const compression = reader.string(); + const recordsByteLength = Number(reader.uint64()); + const end = reader.offset; + const prefixSize = end - start; + if (recordsByteLength + prefixSize > recordLength) { + throw new Error("Chunk records length exceeds remaining record size"); + } + const records = reader.u8ArrayCopy(recordsByteLength); + reader.offset = start + recordLength; + return { + type: "Chunk", + messageStartTime: startTime, + messageEndTime: endTime, + compression, + uncompressedSize, + uncompressedCrc, + records, + }; +} - case Opcode.MESSAGE: { - const channelId = reader.uint16(); - const sequence = reader.uint32(); - const logTime = reader.uint64(); - const publishTime = reader.uint64(); - const data = new Uint8Array( - recordView.buffer, - recordView.byteOffset + reader.offset, - recordView.byteLength - reader.offset, - ).slice(); - const record: TypedMcapRecord = { - type: "Message", - channelId, - sequence, - logTime, - publishTime, - data, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } +function parseMessageIndex(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const channelId = reader.uint16(); + const records = reader.keyValuePairs( + (r) => r.uint64(), + (r) => r.uint64(), + ); + reader.offset = startOffset + recordLength; + return { + type: "MessageIndex", + channelId, + records, + }; +} - case Opcode.CHUNK: { - const startTime = reader.uint64(); - const endTime = reader.uint64(); - const uncompressedSize = reader.uint64(); - const uncompressedCrc = reader.uint32(); - const compression = reader.string(); - const recordByteLength = Number(reader.uint64()); - if (recordByteLength + reader.offset > recordView.byteLength) { - throw new Error("Chunk records length exceeds remaining record size"); - } - const records = new Uint8Array( - recordView.buffer, - recordView.byteOffset + reader.offset, - recordByteLength, - ).slice(); - const record: TypedMcapRecord = { - type: "Chunk", - messageStartTime: startTime, - messageEndTime: endTime, - compression, - uncompressedSize, - uncompressedCrc, - records, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } +function parseChunkIndex(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const messageStartTime = reader.uint64(); + const messageEndTime = reader.uint64(); + const chunkStartOffset = reader.uint64(); + const chunkLength = reader.uint64(); + const messageIndexOffsets = reader.map( + (r) => r.uint16(), + (r) => r.uint64(), + ); + const messageIndexLength = reader.uint64(); + const compression = reader.string(); + const compressedSize = reader.uint64(); + const uncompressedSize = reader.uint64(); + reader.offset = startOffset + recordLength; + return { + type: "ChunkIndex", + messageStartTime, + messageEndTime, + chunkStartOffset, + chunkLength, + messageIndexOffsets, + messageIndexLength, + compression, + compressedSize, + uncompressedSize, + }; +} - case Opcode.MESSAGE_INDEX: { - const channelId = reader.uint16(); - const records = reader.keyValuePairs( - (r) => r.uint64(), - (r) => r.uint64(), - ); - const record: TypedMcapRecord = { - type: "MessageIndex", - channelId, - records, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } - case Opcode.CHUNK_INDEX: { - const messageStartTime = reader.uint64(); - const messageEndTime = reader.uint64(); - const chunkStartOffset = reader.uint64(); - const chunkLength = reader.uint64(); - const messageIndexOffsets = reader.map( - (r) => r.uint16(), - (r) => r.uint64(), - ); - const messageIndexLength = reader.uint64(); - const compression = reader.string(); - const compressedSize = reader.uint64(); - const uncompressedSize = reader.uint64(); - const record: TypedMcapRecord = { - type: "ChunkIndex", - messageStartTime, - messageEndTime, - chunkStartOffset, - chunkLength, - messageIndexOffsets, - messageIndexLength, - compression, - compressedSize, - uncompressedSize, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } - case Opcode.ATTACHMENT: { - const logTime = reader.uint64(); - const createTime = reader.uint64(); - const name = reader.string(); - const mediaType = reader.string(); - const dataLen = reader.uint64(); - if (BigInt(recordView.byteOffset + reader.offset) + dataLen > Number.MAX_SAFE_INTEGER) { - throw new Error(`Attachment too large: ${dataLen}`); - } - if (reader.offset + Number(dataLen) + 4 /*crc*/ > recordView.byteLength) { - throw new Error(`Attachment data length ${dataLen} exceeds bounds of record`); - } - const data = new Uint8Array( - recordView.buffer, - recordView.byteOffset + reader.offset, - Number(dataLen), - ).slice(); - reader.offset += Number(dataLen); - const crcLength = reader.offset; - const expectedCrc = reader.uint32(); - if (validateCrcs && expectedCrc !== 0) { - const actualCrc = crc32(new DataView(recordView.buffer, recordView.byteOffset, crcLength)); - if (actualCrc !== expectedCrc) { - throw new Error( - `Attachment CRC32 mismatch: expected ${expectedCrc}, actual ${actualCrc}`, - ); - } - } - - const record: TypedMcapRecord = { - type: "Attachment", - logTime, - createTime, - name, - mediaType, - data, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } - case Opcode.ATTACHMENT_INDEX: { - const offset = reader.uint64(); - const length = reader.uint64(); - const logTime = reader.uint64(); - const createTime = reader.uint64(); - const dataSize = reader.uint64(); - const name = reader.string(); - const mediaType = reader.string(); - - const record: TypedMcapRecord = { - type: "AttachmentIndex", - offset, - length, - logTime, - createTime, - dataSize, - name, - mediaType, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } - case Opcode.STATISTICS: { - const messageCount = reader.uint64(); - const schemaCount = reader.uint16(); - const channelCount = reader.uint32(); - const attachmentCount = reader.uint32(); - const metadataCount = reader.uint32(); - const chunkCount = reader.uint32(); - const messageStartTime = reader.uint64(); - const messageEndTime = reader.uint64(); - const channelMessageCounts = reader.map( - (r) => r.uint16(), - (r) => r.uint64(), - ); - - const record: TypedMcapRecord = { - type: "Statistics", - messageCount, - schemaCount, - channelCount, - attachmentCount, - metadataCount, - chunkCount, - messageStartTime, - messageEndTime, - channelMessageCounts, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } - case Opcode.METADATA: { - const name = reader.string(); - const metadata = reader.map( - (r) => r.string(), - (r) => r.string(), - ); - const record: TypedMcapRecord = { type: "Metadata", metadata, name }; - return { record, usedBytes: recordEndOffset - startOffset }; - } - case Opcode.METADATA_INDEX: { - const offset = reader.uint64(); - const length = reader.uint64(); - const name = reader.string(); - - const record: TypedMcapRecord = { - type: "MetadataIndex", - offset, - length, - name, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } - case Opcode.SUMMARY_OFFSET: { - const groupOpcode = reader.uint8(); - const groupStart = reader.uint64(); - const groupLength = reader.uint64(); - - const record: TypedMcapRecord = { - type: "SummaryOffset", - groupOpcode, - groupStart, - groupLength, - }; - return { record, usedBytes: recordEndOffset - startOffset }; - } - case Opcode.DATA_END: { - const dataSectionCrc = reader.uint32(); - const record: TypedMcapRecord = { - type: "DataEnd", - dataSectionCrc, - }; - return { record, usedBytes: recordEndOffset - startOffset }; +function parseAttachment( + reader: Reader, + recordLength: number, + // NOTE: internal function in the hot path, (de)structuring args would be wasteful, acceptable perf/clarity tradeoff + // eslint-disable-next-line @foxglove/no-boolean-parameters + validateCrcs: boolean, +): TypedMcapRecord { + const startOffset = reader.offset; + const logTime = reader.uint64(); + const createTime = reader.uint64(); + const name = reader.string(); + const mediaType = reader.string(); + const dataLen = reader.uint64(); + // NOTE: probably not necessary, but just in case + if (BigInt(reader.offset) + dataLen > Number.MAX_SAFE_INTEGER) { + throw new Error(`Attachment too large: ${dataLen}`); + } + if (reader.offset + Number(dataLen) + 4 /*crc*/ > startOffset + recordLength) { + throw new Error(`Attachment data length ${dataLen} exceeds bounds of record`); + } + const data = reader.u8ArrayCopy(Number(dataLen)); + const crcLength = reader.offset - startOffset; + const expectedCrc = reader.uint32(); + if (validateCrcs && expectedCrc !== 0) { + reader.offset = startOffset; + const fullData = reader.u8ArrayBorrow(crcLength); + const actualCrc = crc32(fullData); + reader.offset = startOffset + crcLength + 4; + if (actualCrc !== expectedCrc) { + throw new Error(`Attachment CRC32 mismatch: expected ${expectedCrc}, actual ${actualCrc}`); } } + reader.offset = startOffset + recordLength; + + return { + type: "Attachment", + logTime, + createTime, + name, + mediaType, + data, + }; +} + +function parseAttachmentIndex(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const offset = reader.uint64(); + const length = reader.uint64(); + const logTime = reader.uint64(); + const createTime = reader.uint64(); + const dataSize = reader.uint64(); + const name = reader.string(); + const mediaType = reader.string(); + reader.offset = startOffset + recordLength; + + return { + type: "AttachmentIndex", + offset, + length, + logTime, + createTime, + dataSize, + name, + mediaType, + }; +} + +function parseStatistics(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const messageCount = reader.uint64(); + const schemaCount = reader.uint16(); + const channelCount = reader.uint32(); + const attachmentCount = reader.uint32(); + const metadataCount = reader.uint32(); + const chunkCount = reader.uint32(); + const messageStartTime = reader.uint64(); + const messageEndTime = reader.uint64(); + const channelMessageCounts = reader.map( + (r) => r.uint16(), + (r) => r.uint64(), + ); + reader.offset = startOffset + recordLength; + + return { + type: "Statistics", + messageCount, + schemaCount, + channelCount, + attachmentCount, + metadataCount, + chunkCount, + messageStartTime, + messageEndTime, + channelMessageCounts, + }; +} + +function parseMetadata(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const name = reader.string(); + const metadata = reader.map( + (r) => r.string(), + (r) => r.string(), + ); + reader.offset = startOffset + recordLength; + return { type: "Metadata", metadata, name }; +} + +function parseMetadataIndex(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const offset = reader.uint64(); + const length = reader.uint64(); + const name = reader.string(); + reader.offset = startOffset + recordLength; + + return { + type: "MetadataIndex", + offset, + length, + name, + }; +} + +function parseSummaryOffset(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const groupOpcode = reader.uint8(); + const groupStart = reader.uint64(); + const groupLength = reader.uint64(); + reader.offset = startOffset + recordLength; + + return { + type: "SummaryOffset", + groupOpcode, + groupStart, + groupLength, + }; +} + +function parseDataEnd(reader: Reader, recordLength: number): TypedMcapRecord { + const startOffset = reader.offset; + const dataSectionCrc = reader.uint32(); + reader.offset = startOffset + recordLength; + return { + type: "DataEnd", + dataSectionCrc, + }; }