Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rosetta): handle tablets with a streaming JSON codec #4034

Merged
merged 2 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions packages/jsii-rosetta/lib/json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Readable, Writable, pipeline } from 'node:stream';
import { promisify } from 'node:util';
import { parser } from 'stream-json';
import * as Assembler from 'stream-json/Assembler';
import { disassembler } from 'stream-json/Disassembler';
import { stringer } from 'stream-json/Stringer';

// NB: In node 15+, there is a node:stream.promises object that has this built-in.
const asyncPipeline = promisify(pipeline);

/**
* Asynchronously parses a single JSON value from the provided reader. The JSON
* text might be longer than what could fit in a single string value, since the
* processing is done in a streaming manner.
*
* Prefer using JSON.parse if you know the entire JSON text is always small
* enough to fit in a string value, as this would have better performance.
*
* @param reader the reader from which to consume JSON text.
*
* @returns the parse JSON value as a Javascript value.
*/
export async function parse(reader: Readable): Promise<any> {
const assembler = new Assembler();
const jsonParser = parser();
assembler.connectTo(jsonParser);
return asyncPipeline(reader, jsonParser).then(() => assembler.current);
}

/**
* Serializes a possibly large object into the provided writer. The object may
* be large enough that the JSON text cannot fit in a single string value.
*
* Prefer using JSON.stringify if you know the object is always small enough
* that the JSON text can fit in a single string value, as this would have
* better performance.
*
* @param value the value to be serialized.
* @param writer the write in which to write the JSON text.
*/
export async function stringify(value: any, writer: Writable): Promise<void> {
const reader = new Readable({ objectMode: true });
reader.push(value);
reader.push(null);

return asyncPipeline(reader, disassembler(), stringer(), writer);
}
38 changes: 26 additions & 12 deletions packages/jsii-rosetta/lib/tablets/tablets.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { existsSync, promises as fs } from 'fs';
import { createReadStream, createWriteStream, existsSync, promises as fs } from 'fs';
import * as path from 'path';
import { Readable, Writable } from 'stream';
import * as zlib from 'zlib';

import { parse, stringify } from '../json';
import { TargetLanguage } from '../languages';
import * as logging from '../logging';
import { TypeScriptSnippet, SnippetLocation, completeSource } from '../snippet';
Expand Down Expand Up @@ -141,15 +143,17 @@ export class LanguageTablet {
* compressed and decompress accordingly.
*/
public async load(filename: string) {
let data = await fs.readFile(filename);
// Gzip objects start with 1f 8b 08
if (data[0] === 0x1f && data[1] === 0x8b && data[2] === 0x08) {
// This is a gz object, so we decompress it now...
data = zlib.gunzipSync(data);
let readStream: Readable;
if (await isGzipped(filename)) {
const gunzip = zlib.createGunzip();
createReadStream(filename).pipe(gunzip, { end: true });
readStream = gunzip;
this.compressedSource = true;
} else {
readStream = createReadStream(filename);
}

const obj: TabletSchema = JSON.parse(data.toString('utf-8'));
const obj: TabletSchema = await parse(readStream);

if (!obj.toolVersion || !obj.snippets) {
throw new Error(`File '${filename}' does not seem to be a Tablet file`);
Expand Down Expand Up @@ -181,12 +185,11 @@ export class LanguageTablet {
public async save(filename: string, compress = false) {
await fs.mkdir(path.dirname(filename), { recursive: true });

let schema = Buffer.from(JSON.stringify(this.toSchema(), null, 2));
if (compress) {
schema = zlib.gzipSync(schema);
}
const writeStream: Writable = createWriteStream(filename, { flags: 'w' });
const gzip = compress ? zlib.createGzip() : undefined;
gzip?.pipe(writeStream, { end: true });

await fs.writeFile(filename, schema);
return stringify(this.toSchema(), gzip ?? writeStream);
}

private toSchema(): TabletSchema {
Expand Down Expand Up @@ -316,3 +319,14 @@ export interface Translation {
language: string;
didCompile?: boolean;
}

async function isGzipped(filename: string) {
const openFile = await fs.open(filename, 'r');
try {
// Assumes that we can always read 3 bytes if there's that many in the file...
const { bytesRead, buffer } = await openFile.read(Buffer.alloc(4), 0, 3, 0);
return bytesRead >= 3 && buffer[0] === 0x1f && buffer[1] === 0x8b && buffer[2] === 0x08;
} finally {
await openFile.close();
}
}
2 changes: 2 additions & 0 deletions packages/jsii-rosetta/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"devDependencies": {
"@types/commonmark": "^0.27.6",
"@types/mock-fs": "^4.13.1",
"@types/stream-json": "^1.7.3",
"@types/workerpool": "^6.4.0",
"@types/semver": "^7.3.13",
"jsii-build-tools": "0.0.0",
Expand All @@ -33,6 +34,7 @@
"@xmldom/xmldom": "^0.8.6",
"workerpool": "^6.4.0",
"yargs": "^16.2.0",
"stream-json": "^1.7.5",
"semver": "^7.3.8",
"semver-intersect": "^1.4.0",
"fast-glob": "^3.2.12",
Expand Down
121 changes: 121 additions & 0 deletions packages/jsii-rosetta/test/json.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { kStringMaxLength } from 'node:buffer';
import { PassThrough, Readable, Writable } from 'node:stream';

import { parse, stringify } from '../lib/json';

describe(parse, () => {
test('small value', async () => {
const value = { foo: 'bar', baz: 123 };
const jsonText = JSON.stringify(value);

const readable = new PassThrough();
readable.end(jsonText);

expect(await parse(readable)).toEqual(value);
});

test('value is too large to fit in a single string', async () => {
// We'll leverage the fact JSON can contain multiple definitions of the same key multiple times...
const expected = { foo: 'bar', baz: 123, bool: true, null: null, long: 'X'.repeat(102_400) };

const readable = Readable.from(
(function* () {
const chunks = Object.entries(expected).map(
([key, value]) => ` ${JSON.stringify(key)}: ${JSON.stringify(value)}`,
);

yield '{\n';
let counter = 2;
let emitComma = false;
while (counter < kStringMaxLength) {
for (const chunk of chunks) {
if (emitComma) {
yield ',\n';
counter += 2;
}
yield chunk;
counter += chunk.length;
emitComma = true;
}
}
yield '\n}\n';
})(),
);

const actual = await parse(readable);
expect(actual).toEqual(expected);
});

test('invalid JSON input', () => {
const readable = new PassThrough();
readable.end('{"bad": "JSON",');

return expect(parse(readable)).rejects.toThrowErrorMatchingInlineSnapshot(
`"Parser cannot parse input: expected an object key"`,
);
});
});

describe(stringify, () => {
test('small value', async () => {
const value = { foo: 'bar', baz: 123 };
const jsonText = JSON.stringify(value);

const chunks = new Array<Buffer>();
const writable = new Writable({
write: (chunk, _encoding, callback) => {
chunks.push(Buffer.from(chunk));
callback(null);
},
});

await stringify(value, writable);
expect(Buffer.concat(chunks).toString('utf-8')).toBe(jsonText);
});

test('value too large for JSON text to fit in a string', async () => {
const value = { key: 'X'.repeat(kStringMaxLength) };

const chunks = new Array<Buffer>();
const writable = new Writable({
write: (chunk, _encoding, callback) => {
chunks.push(Buffer.from(chunk));
callback(null);
},
});

await stringify(value, writable);

expect(headBytes(chunks, 10).toString('utf-8')).toBe('{"key":"XX');
expect(tailBytes(chunks, 10).toString('utf-8')).toBe('XXXXXXXX"}');
});
});

function headBytes(chunks: readonly Buffer[], count: number): Buffer {
if (chunks.length === 0) {
return Buffer.alloc(0);
}
const [head, ...tail] = chunks;
const headSlice = head.slice(0, count);
if (headSlice.length === count) {
return headSlice;
}

const tailSlice = headBytes(tail, count - headSlice.length);
return Buffer.concat([headSlice, tailSlice]);
}

function tailBytes(chunks: readonly Buffer[], count: number): Buffer {
if (chunks.length === 0) {
return Buffer.alloc(0);
}

const tail = chunks[chunks.length - 1];
const tailSlice = tail.slice(Math.max(0, tail.length - count), tail.length);
if (tailSlice.length === count) {
return tailSlice;
}

const headSlice = tailBytes(chunks.slice(0, chunks.length - 1), count - tailSlice.length);
return Buffer.concat([headSlice, tailSlice]);
}
27 changes: 27 additions & 0 deletions yarn.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.