From d2ecb6d2a6a460e309e4985dc001a6b749b3aac7 Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Thu, 30 Mar 2023 23:30:31 +0200 Subject: [PATCH] feat(rosetta): handle tablets with a streaming JSON codec (#4034) Tablet files can grow pretty large, and cache files even larger. In order to avoir running into the maximum string length node supports when encoding or decoding those, use a streaming JSON encoder / decoder. Backports aws/jsii-rosetta#43 --- By submitting this pull request, I confirm that my contribution is made under the terms of the [Apache 2.0 license]. [Apache 2.0 license]: https://www.apache.org/licenses/LICENSE-2.0 --- packages/jsii-rosetta/lib/json.ts | 47 +++++++ packages/jsii-rosetta/lib/tablets/tablets.ts | 38 ++++-- packages/jsii-rosetta/package.json | 2 + packages/jsii-rosetta/test/json.test.ts | 121 +++++++++++++++++++ yarn.lock | 27 +++++ 5 files changed, 223 insertions(+), 12 deletions(-) create mode 100644 packages/jsii-rosetta/lib/json.ts create mode 100644 packages/jsii-rosetta/test/json.test.ts diff --git a/packages/jsii-rosetta/lib/json.ts b/packages/jsii-rosetta/lib/json.ts new file mode 100644 index 0000000000..3c18e8933c --- /dev/null +++ b/packages/jsii-rosetta/lib/json.ts @@ -0,0 +1,47 @@ +import { Readable, Writable, pipeline } from 'node:stream'; +import { promisify } from 'node:util'; +import { parser } from 'stream-json'; +import * as Assembler from 'stream-json/Assembler'; +import { disassembler } from 'stream-json/Disassembler'; +import { stringer } from 'stream-json/Stringer'; + +// NB: In node 15+, there is a node:stream.promises object that has this built-in. +const asyncPipeline = promisify(pipeline); + +/** + * Asynchronously parses a single JSON value from the provided reader. The JSON + * text might be longer than what could fit in a single string value, since the + * processing is done in a streaming manner. + * + * Prefer using JSON.parse if you know the entire JSON text is always small + * enough to fit in a string value, as this would have better performance. + * + * @param reader the reader from which to consume JSON text. + * + * @returns the parse JSON value as a Javascript value. + */ +export async function parse(reader: Readable): Promise { + const assembler = new Assembler(); + const jsonParser = parser(); + assembler.connectTo(jsonParser); + return asyncPipeline(reader, jsonParser).then(() => assembler.current); +} + +/** + * Serializes a possibly large object into the provided writer. The object may + * be large enough that the JSON text cannot fit in a single string value. + * + * Prefer using JSON.stringify if you know the object is always small enough + * that the JSON text can fit in a single string value, as this would have + * better performance. + * + * @param value the value to be serialized. + * @param writer the write in which to write the JSON text. + */ +export async function stringify(value: any, writer: Writable): Promise { + const reader = new Readable({ objectMode: true }); + reader.push(value); + reader.push(null); + + return asyncPipeline(reader, disassembler(), stringer(), writer); +} diff --git a/packages/jsii-rosetta/lib/tablets/tablets.ts b/packages/jsii-rosetta/lib/tablets/tablets.ts index d44b071cff..386eb872bd 100644 --- a/packages/jsii-rosetta/lib/tablets/tablets.ts +++ b/packages/jsii-rosetta/lib/tablets/tablets.ts @@ -1,7 +1,9 @@ -import { existsSync, promises as fs } from 'fs'; +import { createReadStream, createWriteStream, existsSync, promises as fs } from 'fs'; import * as path from 'path'; +import { Readable, Writable } from 'stream'; import * as zlib from 'zlib'; +import { parse, stringify } from '../json'; import { TargetLanguage } from '../languages'; import * as logging from '../logging'; import { TypeScriptSnippet, SnippetLocation, completeSource } from '../snippet'; @@ -141,15 +143,17 @@ export class LanguageTablet { * compressed and decompress accordingly. */ public async load(filename: string) { - let data = await fs.readFile(filename); - // Gzip objects start with 1f 8b 08 - if (data[0] === 0x1f && data[1] === 0x8b && data[2] === 0x08) { - // This is a gz object, so we decompress it now... - data = zlib.gunzipSync(data); + let readStream: Readable; + if (await isGzipped(filename)) { + const gunzip = zlib.createGunzip(); + createReadStream(filename).pipe(gunzip, { end: true }); + readStream = gunzip; this.compressedSource = true; + } else { + readStream = createReadStream(filename); } - const obj: TabletSchema = JSON.parse(data.toString('utf-8')); + const obj: TabletSchema = await parse(readStream); if (!obj.toolVersion || !obj.snippets) { throw new Error(`File '${filename}' does not seem to be a Tablet file`); @@ -181,12 +185,11 @@ export class LanguageTablet { public async save(filename: string, compress = false) { await fs.mkdir(path.dirname(filename), { recursive: true }); - let schema = Buffer.from(JSON.stringify(this.toSchema(), null, 2)); - if (compress) { - schema = zlib.gzipSync(schema); - } + const writeStream: Writable = createWriteStream(filename, { flags: 'w' }); + const gzip = compress ? zlib.createGzip() : undefined; + gzip?.pipe(writeStream, { end: true }); - await fs.writeFile(filename, schema); + return stringify(this.toSchema(), gzip ?? writeStream); } private toSchema(): TabletSchema { @@ -316,3 +319,14 @@ export interface Translation { language: string; didCompile?: boolean; } + +async function isGzipped(filename: string) { + const openFile = await fs.open(filename, 'r'); + try { + // Assumes that we can always read 3 bytes if there's that many in the file... + const { bytesRead, buffer } = await openFile.read(Buffer.alloc(4), 0, 3, 0); + return bytesRead >= 3 && buffer[0] === 0x1f && buffer[1] === 0x8b && buffer[2] === 0x08; + } finally { + await openFile.close(); + } +} diff --git a/packages/jsii-rosetta/package.json b/packages/jsii-rosetta/package.json index 9cc5e84171..ecc549abb4 100644 --- a/packages/jsii-rosetta/package.json +++ b/packages/jsii-rosetta/package.json @@ -18,6 +18,7 @@ "devDependencies": { "@types/commonmark": "^0.27.6", "@types/mock-fs": "^4.13.1", + "@types/stream-json": "^1.7.3", "@types/workerpool": "^6.4.0", "@types/semver": "^7.3.13", "jsii-build-tools": "0.0.0", @@ -33,6 +34,7 @@ "@xmldom/xmldom": "^0.8.6", "workerpool": "^6.4.0", "yargs": "^16.2.0", + "stream-json": "^1.7.5", "semver": "^7.3.8", "semver-intersect": "^1.4.0", "fast-glob": "^3.2.12", diff --git a/packages/jsii-rosetta/test/json.test.ts b/packages/jsii-rosetta/test/json.test.ts new file mode 100644 index 0000000000..1c6aabe0ae --- /dev/null +++ b/packages/jsii-rosetta/test/json.test.ts @@ -0,0 +1,121 @@ +import { kStringMaxLength } from 'node:buffer'; +import { PassThrough, Readable, Writable } from 'node:stream'; + +import { parse, stringify } from '../lib/json'; + +describe(parse, () => { + test('small value', async () => { + const value = { foo: 'bar', baz: 123 }; + const jsonText = JSON.stringify(value); + + const readable = new PassThrough(); + readable.end(jsonText); + + expect(await parse(readable)).toEqual(value); + }); + + test('value is too large to fit in a single string', async () => { + // We'll leverage the fact JSON can contain multiple definitions of the same key multiple times... + const expected = { foo: 'bar', baz: 123, bool: true, null: null, long: 'X'.repeat(102_400) }; + + const readable = Readable.from( + (function* () { + const chunks = Object.entries(expected).map( + ([key, value]) => ` ${JSON.stringify(key)}: ${JSON.stringify(value)}`, + ); + + yield '{\n'; + let counter = 2; + let emitComma = false; + while (counter < kStringMaxLength) { + for (const chunk of chunks) { + if (emitComma) { + yield ',\n'; + counter += 2; + } + yield chunk; + counter += chunk.length; + emitComma = true; + } + } + yield '\n}\n'; + })(), + ); + + const actual = await parse(readable); + expect(actual).toEqual(expected); + }); + + test('invalid JSON input', () => { + const readable = new PassThrough(); + readable.end('{"bad": "JSON",'); + + return expect(parse(readable)).rejects.toThrowErrorMatchingInlineSnapshot( + `"Parser cannot parse input: expected an object key"`, + ); + }); +}); + +describe(stringify, () => { + test('small value', async () => { + const value = { foo: 'bar', baz: 123 }; + const jsonText = JSON.stringify(value); + + const chunks = new Array(); + const writable = new Writable({ + write: (chunk, _encoding, callback) => { + chunks.push(Buffer.from(chunk)); + callback(null); + }, + }); + + await stringify(value, writable); + expect(Buffer.concat(chunks).toString('utf-8')).toBe(jsonText); + }); + + test('value too large for JSON text to fit in a string', async () => { + const value = { key: 'X'.repeat(kStringMaxLength) }; + + const chunks = new Array(); + const writable = new Writable({ + write: (chunk, _encoding, callback) => { + chunks.push(Buffer.from(chunk)); + callback(null); + }, + }); + + await stringify(value, writable); + + expect(headBytes(chunks, 10).toString('utf-8')).toBe('{"key":"XX'); + expect(tailBytes(chunks, 10).toString('utf-8')).toBe('XXXXXXXX"}'); + }); +}); + +function headBytes(chunks: readonly Buffer[], count: number): Buffer { + if (chunks.length === 0) { + return Buffer.alloc(0); + } + const [head, ...tail] = chunks; + const headSlice = head.slice(0, count); + if (headSlice.length === count) { + return headSlice; + } + + const tailSlice = headBytes(tail, count - headSlice.length); + return Buffer.concat([headSlice, tailSlice]); +} + +function tailBytes(chunks: readonly Buffer[], count: number): Buffer { + if (chunks.length === 0) { + return Buffer.alloc(0); + } + + const tail = chunks[chunks.length - 1]; + const tailSlice = tail.slice(Math.max(0, tail.length - count), tail.length); + if (tailSlice.length === count) { + return tailSlice; + } + + const headSlice = tailBytes(chunks.slice(0, chunks.length - 1), count - tailSlice.length); + return Buffer.concat([headSlice, tailSlice]); +} diff --git a/yarn.lock b/yarn.lock index 9fc0ad9351..aa047d74cc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1813,6 +1813,21 @@ resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-2.0.1.tgz#20f18294f797f2209b5f65c8e3b5c8e8261d127c" integrity sha512-Hl219/BT5fLAaz6NDkSuhzasy49dwQS/DSdu4MdggFB8zcXv7vflBI3xp7FEmkmdDkBUI2bPUNeMttp2knYdxw== +"@types/stream-chain@*": + version "2.0.1" + resolved "https://registry.yarnpkg.com/@types/stream-chain/-/stream-chain-2.0.1.tgz#4d3cc47a32609878bc188de0bae420bcfd3bf1f5" + integrity sha512-D+Id9XpcBpampptkegH7WMsEk6fUdf9LlCIX7UhLydILsqDin4L0QT7ryJR0oycwC7OqohIzdfcMHVZ34ezNGg== + dependencies: + "@types/node" "*" + +"@types/stream-json@^1.7.3": + version "1.7.3" + resolved "https://registry.yarnpkg.com/@types/stream-json/-/stream-json-1.7.3.tgz#5596405be23c7db1b6fb9cb977b942b9e1d219df" + integrity sha512-Jqsyq5VPOTWorvEmzWhEWH5tJnHA+bB8vt/Zzb11vSDj8esfSHDMj2rbVjP0mfJQzl3YBJSXBBq08iiyaBK3KA== + dependencies: + "@types/node" "*" + "@types/stream-chain" "*" + "@types/tar-fs@^2.0.1": version "2.0.1" resolved "https://registry.yarnpkg.com/@types/tar-fs/-/tar-fs-2.0.1.tgz#6391dcad1b03dea2d79fac07371585ab54472bb1" @@ -8226,6 +8241,18 @@ standard-version@^9.5.0: stringify-package "^1.0.1" yargs "^16.0.0" +stream-chain@^2.2.5: + version "2.2.5" + resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09" + integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA== + +stream-json@^1.7.5: + version "1.7.5" + resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.7.5.tgz#2ff0563011f22cea4f6a28dbfc0344a53c761fe4" + integrity sha512-NSkoVduGakxZ8a+pTPUlcGEeAGQpWL9rKJhOFCV+J/QtdQUEU5vtBgVg6eJXn8JB8RZvpbJWZGvXkhz70MLWoA== + dependencies: + stream-chain "^2.2.5" + streamroller@^3.1.5: version "3.1.5" resolved "https://registry.yarnpkg.com/streamroller/-/streamroller-3.1.5.tgz#1263182329a45def1ffaef58d31b15d13d2ee7ff"