From 194d5446800432755ab0ae5755adce9e26fffa2f Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Wed, 17 May 2023 22:49:53 -0400 Subject: [PATCH] Parse chunks with a running parser --- .../react-client/src/ReactFlightClient.js | 148 ++++++++++++++---- 1 file changed, 117 insertions(+), 31 deletions(-) diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index dc9503037d922..a7fca07462753 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -52,6 +52,14 @@ export type JSONValue = | {+[key: string]: JSONValue} | $ReadOnlyArray; +const ROW_ID = 0; +const ROW_TAG = 1; +const ROW_LENGTH = 2; +const ROW_CHUNK_BY_NEWLINE = 3; +const ROW_CHUNK_BY_LENGTH = 4; + +type RowParserState = 0 | 1 | 2 | 3 | 4; + const PENDING = 'pending'; const BLOCKED = 'blocked'; const RESOLVED_MODEL = 'resolved_model'; @@ -165,9 +173,13 @@ export type Response = { _bundlerConfig: SSRManifest, _callServer: CallServerCallback, _chunks: Map>, - _partialRow: string, _fromJSON: (key: string, value: JSONValue) => any, _stringDecoder: StringDecoder, + _rowState: RowParserState, + _rowID: number, // parts of a row ID parsed so far + _rowTag: number, // 0 indicates that we're currently parsing the row ID + _rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline. + _buffer: Array, // chunks received so far as part of this row }; function readChunk(chunk: SomeChunk): T { @@ -665,9 +677,13 @@ export function createResponse( _bundlerConfig: bundlerConfig, _callServer: callServer !== undefined ? callServer : missingCall, _chunks: chunks, - _partialRow: '', _stringDecoder: createStringDecoder(), _fromJSON: (null: any), + _rowState: 0, + _rowID: 0, + _rowTag: 0, + _rowLength: 0, + _buffer: [], }; // Don't inline this call because it causes closure to outline the call above. response._fromJSON = createFromJSONCallback(response); @@ -806,29 +822,40 @@ function resolveHint( dispatchHint(code, hintModel); } -function processFullRow(response: Response, row: string): void { - if (row === '') { - return; +function processFullRow( + response: Response, + id: number, + tag: number, + buffer: Array, + lastChunk: string | Uint8Array, +): void { + let row = ''; + const stringDecoder = response._stringDecoder; + for (let i = 0; i < buffer.length; i++) { + const chunk = buffer[i]; + if (typeof chunk === 'string') { + row += chunk; + } else { + row += readPartialStringChunk(stringDecoder, chunk); + } + } + if (typeof lastChunk === 'string') { + row += lastChunk; + } else { + row += readFinalStringChunk(stringDecoder, lastChunk); } - const colon = row.indexOf(':', 0); - const id = parseInt(row.slice(0, colon), 16); - const tag = row[colon + 1]; - // When tags that are not text are added, check them here before - // parsing the row as text. - // switch (tag) { - // } switch (tag) { - case 'I': { - resolveModule(response, id, row.slice(colon + 2)); + case 73 /* "I" */: { + resolveModule(response, id, row); return; } - case 'H': { - const code = row[colon + 2]; - resolveHint(response, code, row.slice(colon + 3)); + case 72 /* "H" */: { + const code = row[0]; + resolveHint(response, code, row.slice(1)); return; } - case 'E': { - const errorInfo = JSON.parse(row.slice(colon + 2)); + case 69 /* "E" */: { + const errorInfo = JSON.parse(row); if (__DEV__) { resolveErrorDev( response, @@ -844,7 +871,7 @@ function processFullRow(response: Response, row: string): void { } default: { // We assume anything else is JSON. - resolveModel(response, id, row.slice(colon + 1)); + resolveModel(response, id, row); return; } } @@ -854,18 +881,77 @@ export function processBinaryChunk( response: Response, chunk: Uint8Array, ): void { - const stringDecoder = response._stringDecoder; - let linebreak = chunk.indexOf(10); // newline - while (linebreak > -1) { - const fullrow = - response._partialRow + - readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak)); - processFullRow(response, fullrow); - response._partialRow = ''; - chunk = chunk.subarray(linebreak + 1); - linebreak = chunk.indexOf(10); // newline + let i = 0; + while (i < chunk.length) { + let lastIdx = -1; + switch (response._rowState) { + case ROW_ID: { + const byte = chunk[i++]; + if (byte === 58 /* ":" */) { + // Finished the rowID, next we'll parse the tag. + response._rowState = ROW_TAG; + } else { + response._rowID = + (response._rowID << 4) | (byte > 96 ? byte - 87 : byte - 48); + } + continue; + } + case ROW_TAG: { + const resolvedRowTag = chunk[i]; + if (resolvedRowTag > 64 && resolvedRowTag < 91) { + response._rowTag = resolvedRowTag; + i++; + } else { + // This was an unknown tag so it was probably part of the data. + } + response._rowState = ROW_CHUNK_BY_NEWLINE; + continue; + } + case ROW_LENGTH: { + // TODO + continue; + } + case ROW_CHUNK_BY_NEWLINE: { + // We're looking for a newline + lastIdx = chunk.indexOf(10 /* "\n" */, i); + break; + } + case ROW_CHUNK_BY_LENGTH: { + // We're looking for the remaining byte length + const rowLength = response._rowLength; + if (i + rowLength <= chunk.length) { + lastIdx = i + rowLength; + } + break; + } + } + if (lastIdx > -1) { + // We found the last chunk of the row + const lastChunk = chunk.slice(i, lastIdx); + processFullRow( + response, + response._rowID, + response._rowTag, + response._buffer, + lastChunk, + ); + // Reset state machine for a new row + response._rowState = ROW_ID; + response._rowTag = 0; + response._rowID = 0; + response._rowLength = 0; + response._buffer.length = 0; + i = lastIdx + 1; + } else { + // The rest of this row is in a future chunk. We stash the rest of the + // current chunk until we can process the full row. + const remainingSlice = chunk.slice(i); + response._buffer.push(remainingSlice); + // Update how many bytes we're still waiting for. + response._rowLength -= remainingSlice.length; + break; + } } - response._partialRow += readPartialStringChunk(stringDecoder, chunk); } function parseModel(response: Response, json: UninitializedModel): T {