Skip to content

Commit

Permalink
refactor: Use WritableStream and ReadableStream in WebSocket exam…
Browse files Browse the repository at this point in the history
…ples
  • Loading branch information
pojntfx committed Feb 1, 2024
1 parent 93bd6dc commit e56efbd
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 128 deletions.
135 changes: 71 additions & 64 deletions ts/bin/panrpc-example-websocket-client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
/* eslint-disable no-console */
// eslint-disable-next-line import/no-extraneous-dependencies
import { JSONParser } from "@streamparser/json-node";
import { JSONParser } from "@streamparser/json-whatwg";
import { env, exit, stdin, stdout } from "process";
import { createInterface } from "readline/promises";
import { Readable, Transform, TransformCallback, Writable } from "stream";
import { parse } from "url";
// eslint-disable-next-line import/no-extraneous-dependencies
import { WebSocket, WebSocketServer } from "ws";
Expand Down Expand Up @@ -111,49 +110,53 @@ if (listen) {
console.error("Client disconnected with error:", e);
});

const encoder = new (class extends Transform {
_transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback
) {
this.push(JSON.stringify(chunk));
callback();
}
})({
objectMode: true,
});
encoder.addListener("data", (chunk) => socket.send(chunk));

const decoder = new (class extends Transform {
_transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback
) {
this.push(chunk?.value);
callback();
}
})({
objectMode: true,
const encoder = new WritableStream({
write(chunk) {
socket.send(JSON.stringify(chunk));
},
});

const parser = new JSONParser({
paths: ["$"],
separator: "",
});
parser.pipe(decoder);

socket.addEventListener("message", (m) => parser.write(m.data));
const parserReader = parser.readable.getReader();
const decoder = new ReadableStream({
start(controller) {
parserReader
.read()
.then(function process({
done,
value: message,
}: {
done: boolean;
value?: any;
}) {
if (done) {
controller.close();

return;
}

controller.enqueue(message?.value);

parserReader.read().then(process);
});
},
});

const parserWriter = parser.writable.getWriter();
socket.addEventListener("message", (m) => parserWriter.write(m.data));

socket.addEventListener("close", () => {
encoder.destroy();
decoder.destroy();
encoder.close();
decoder.cancel();
});

registry.linkStream(
Writable.toWeb(encoder),
Readable.toWeb(decoder),
encoder,
decoder,

(v) => v,
(v) => v
Expand All @@ -176,49 +179,53 @@ if (listen) {
socket.addEventListener("error", rej);
});

const encoder = new (class extends Transform {
_transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback
) {
this.push(JSON.stringify(chunk));
callback();
}
})({
objectMode: true,
});
encoder.addListener("data", (chunk) => socket.send(chunk));

const decoder = new (class extends Transform {
_transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback
) {
this.push(chunk?.value);
callback();
}
})({
objectMode: true,
const encoder = new WritableStream({
write(chunk) {
socket.send(JSON.stringify(chunk));
},
});

const parser = new JSONParser({
paths: ["$"],
separator: "",
});
parser.pipe(decoder);

socket.addEventListener("message", (m) => parser.write(m.data));
const parserReader = parser.readable.getReader();
const decoder = new ReadableStream({
start(controller) {
parserReader
.read()
.then(function process({
done,
value: message,
}: {
done: boolean;
value?: any;
}) {
if (done) {
controller.close();

return;
}

controller.enqueue(message?.value);

parserReader.read().then(process);
});
},
});

const parserWriter = parser.writable.getWriter();
socket.addEventListener("message", (m) => parserWriter.write(m.data));

socket.addEventListener("close", () => {
encoder.destroy();
decoder.destroy();
encoder.close();
decoder.cancel();
});

registry.linkStream(
Writable.toWeb(encoder),
Readable.toWeb(decoder),
encoder,
decoder,

(v) => v,
(v) => v
Expand Down
135 changes: 71 additions & 64 deletions ts/bin/panrpc-example-websocket-server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
/* eslint-disable no-console */
// eslint-disable-next-line import/no-extraneous-dependencies
import { JSONParser } from "@streamparser/json-node";
import { JSONParser } from "@streamparser/json-whatwg";
import { env, exit, stdin, stdout } from "process";
import { createInterface } from "readline/promises";
import { Readable, Transform, TransformCallback, Writable } from "stream";
import { parse } from "url";
// eslint-disable-next-line import/no-extraneous-dependencies
import { WebSocket, WebSocketServer } from "ws";
Expand Down Expand Up @@ -107,49 +106,53 @@ if (listen) {
console.error("Client disconnected with error:", e);
});

const encoder = new (class extends Transform {
_transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback
) {
this.push(JSON.stringify(chunk));
callback();
}
})({
objectMode: true,
});
encoder.addListener("data", (chunk) => socket.send(chunk));

const decoder = new (class extends Transform {
_transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback
) {
this.push(chunk?.value);
callback();
}
})({
objectMode: true,
const encoder = new WritableStream({
write(chunk) {
socket.send(JSON.stringify(chunk));
},
});

const parser = new JSONParser({
paths: ["$"],
separator: "",
});
parser.pipe(decoder);

socket.addEventListener("message", (m) => parser.write(m.data));
const parserReader = parser.readable.getReader();
const decoder = new ReadableStream({
start(controller) {
parserReader
.read()
.then(function process({
done,
value: message,
}: {
done: boolean;
value?: any;
}) {
if (done) {
controller.close();

return;
}

controller.enqueue(message?.value);

parserReader.read().then(process);
});
},
});

const parserWriter = parser.writable.getWriter();
socket.addEventListener("message", (m) => parserWriter.write(m.data));

socket.addEventListener("close", () => {
encoder.destroy();
decoder.destroy();
encoder.close();
decoder.cancel();
});

registry.linkStream(
Writable.toWeb(encoder),
Readable.toWeb(decoder),
encoder,
decoder,

(v) => v,
(v) => v
Expand All @@ -172,49 +175,53 @@ if (listen) {
socket.addEventListener("error", rej);
});

const encoder = new (class extends Transform {
_transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback
) {
this.push(JSON.stringify(chunk));
callback();
}
})({
objectMode: true,
});
encoder.addListener("data", (chunk) => socket.send(chunk));

const decoder = new (class extends Transform {
_transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback
) {
this.push(chunk?.value);
callback();
}
})({
objectMode: true,
const encoder = new WritableStream({
write(chunk) {
socket.send(JSON.stringify(chunk));
},
});

const parser = new JSONParser({
paths: ["$"],
separator: "",
});
parser.pipe(decoder);

socket.addEventListener("message", (m) => parser.write(m.data));
const parserReader = parser.readable.getReader();
const decoder = new ReadableStream({
start(controller) {
parserReader
.read()
.then(function process({
done,
value: message,
}: {
done: boolean;
value?: any;
}) {
if (done) {
controller.close();

return;
}

controller.enqueue(message?.value);

parserReader.read().then(process);
});
},
});

const parserWriter = parser.writable.getWriter();
socket.addEventListener("message", (m) => parserWriter.write(m.data));

socket.addEventListener("close", () => {
encoder.destroy();
decoder.destroy();
encoder.close();
decoder.cancel();
});

registry.linkStream(
Writable.toWeb(encoder),
Readable.toWeb(decoder),
encoder,
decoder,

(v) => v,
(v) => v
Expand Down
Binary file modified ts/bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"homepage": "https://github.com/pojntfx/panrpc#readme",
"devDependencies": {
"@streamparser/json-node": "^0.0.20",
"@streamparser/json-whatwg": "^0.0.20",
"@typescript-eslint/eslint-plugin": "^5.30.6",
"@typescript-eslint/parser": "^5.30.6",
"bun-types": "^1.0.7",
Expand Down

0 comments on commit e56efbd

Please sign in to comment.