Skip to content

Commit

Permalink
Parse chunks with a running parser
Browse files Browse the repository at this point in the history
  • Loading branch information
sebmarkbage committed Jun 12, 2023
1 parent ce6842d commit 194d544
Showing 1 changed file with 117 additions and 31 deletions.
148 changes: 117 additions & 31 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ export type JSONValue =
| {+[key: string]: JSONValue}
| $ReadOnlyArray<JSONValue>;

const ROW_ID = 0;
const ROW_TAG = 1;
const ROW_LENGTH = 2;
const ROW_CHUNK_BY_NEWLINE = 3;
const ROW_CHUNK_BY_LENGTH = 4;

type RowParserState = 0 | 1 | 2 | 3 | 4;

const PENDING = 'pending';
const BLOCKED = 'blocked';
const RESOLVED_MODEL = 'resolved_model';
Expand Down Expand Up @@ -165,9 +173,13 @@ export type Response = {
_bundlerConfig: SSRManifest,
_callServer: CallServerCallback,
_chunks: Map<number, SomeChunk<any>>,
_partialRow: string,
_fromJSON: (key: string, value: JSONValue) => any,
_stringDecoder: StringDecoder,
_rowState: RowParserState,
_rowID: number, // parts of a row ID parsed so far
_rowTag: number, // 0 indicates that we're currently parsing the row ID
_rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline.
_buffer: Array<string | Uint8Array>, // chunks received so far as part of this row
};

function readChunk<T>(chunk: SomeChunk<T>): T {
Expand Down Expand Up @@ -665,9 +677,13 @@ export function createResponse(
_bundlerConfig: bundlerConfig,
_callServer: callServer !== undefined ? callServer : missingCall,
_chunks: chunks,
_partialRow: '',
_stringDecoder: createStringDecoder(),
_fromJSON: (null: any),
_rowState: 0,
_rowID: 0,
_rowTag: 0,
_rowLength: 0,
_buffer: [],
};
// Don't inline this call because it causes closure to outline the call above.
response._fromJSON = createFromJSONCallback(response);
Expand Down Expand Up @@ -806,29 +822,40 @@ function resolveHint(
dispatchHint(code, hintModel);
}

function processFullRow(response: Response, row: string): void {
if (row === '') {
return;
function processFullRow(
response: Response,
id: number,
tag: number,
buffer: Array<string | Uint8Array>,
lastChunk: string | Uint8Array,
): void {
let row = '';
const stringDecoder = response._stringDecoder;
for (let i = 0; i < buffer.length; i++) {
const chunk = buffer[i];
if (typeof chunk === 'string') {
row += chunk;
} else {
row += readPartialStringChunk(stringDecoder, chunk);
}
}
if (typeof lastChunk === 'string') {
row += lastChunk;
} else {
row += readFinalStringChunk(stringDecoder, lastChunk);
}
const colon = row.indexOf(':', 0);
const id = parseInt(row.slice(0, colon), 16);
const tag = row[colon + 1];
// When tags that are not text are added, check them here before
// parsing the row as text.
// switch (tag) {
// }
switch (tag) {
case 'I': {
resolveModule(response, id, row.slice(colon + 2));
case 73 /* "I" */: {
resolveModule(response, id, row);
return;
}
case 'H': {
const code = row[colon + 2];
resolveHint(response, code, row.slice(colon + 3));
case 72 /* "H" */: {
const code = row[0];
resolveHint(response, code, row.slice(1));
return;
}
case 'E': {
const errorInfo = JSON.parse(row.slice(colon + 2));
case 69 /* "E" */: {
const errorInfo = JSON.parse(row);
if (__DEV__) {
resolveErrorDev(
response,
Expand All @@ -844,7 +871,7 @@ function processFullRow(response: Response, row: string): void {
}
default: {
// We assume anything else is JSON.
resolveModel(response, id, row.slice(colon + 1));
resolveModel(response, id, row);
return;
}
}
Expand All @@ -854,18 +881,77 @@ export function processBinaryChunk(
response: Response,
chunk: Uint8Array,
): void {
const stringDecoder = response._stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
const fullrow =
response._partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response._partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
let i = 0;
while (i < chunk.length) {
let lastIdx = -1;
switch (response._rowState) {
case ROW_ID: {
const byte = chunk[i++];
if (byte === 58 /* ":" */) {
// Finished the rowID, next we'll parse the tag.
response._rowState = ROW_TAG;
} else {
response._rowID =
(response._rowID << 4) | (byte > 96 ? byte - 87 : byte - 48);
}
continue;
}
case ROW_TAG: {
const resolvedRowTag = chunk[i];
if (resolvedRowTag > 64 && resolvedRowTag < 91) {
response._rowTag = resolvedRowTag;
i++;
} else {
// This was an unknown tag so it was probably part of the data.
}
response._rowState = ROW_CHUNK_BY_NEWLINE;
continue;
}
case ROW_LENGTH: {
// TODO
continue;
}
case ROW_CHUNK_BY_NEWLINE: {
// We're looking for a newline
lastIdx = chunk.indexOf(10 /* "\n" */, i);
break;
}
case ROW_CHUNK_BY_LENGTH: {
// We're looking for the remaining byte length
const rowLength = response._rowLength;
if (i + rowLength <= chunk.length) {
lastIdx = i + rowLength;
}
break;
}
}
if (lastIdx > -1) {
// We found the last chunk of the row
const lastChunk = chunk.slice(i, lastIdx);
processFullRow(
response,
response._rowID,
response._rowTag,
response._buffer,
lastChunk,
);
// Reset state machine for a new row
response._rowState = ROW_ID;
response._rowTag = 0;
response._rowID = 0;
response._rowLength = 0;
response._buffer.length = 0;
i = lastIdx + 1;
} else {
// The rest of this row is in a future chunk. We stash the rest of the
// current chunk until we can process the full row.
const remainingSlice = chunk.slice(i);
response._buffer.push(remainingSlice);
// Update how many bytes we're still waiting for.
response._rowLength -= remainingSlice.length;
break;
}
}
response._partialRow += readPartialStringChunk(stringDecoder, chunk);
}

function parseModel<T>(response: Response, json: UninitializedModel): T {
Expand Down

0 comments on commit 194d544

Please sign in to comment.