From 362bc78191c607e6b7c7f2b2e7e7ddb2fe53101c Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 21 Jun 2024 11:47:41 +0200 Subject: [PATCH] fix: properly call the send callback during upgrade The "drain" event (added in [1]) had two different meanings: - the transport is ready to be written - the packets are sent over the wire For the WebSocket and the WebTransport transports, those two events happen at the same time, but this is not the case for the HTTP long-polling transport: - the transport is ready to be written when the client sends a GET request - the packets are sent over the wire when the server responds to the GET request Which caused an issue with send callbacks during an upgrade, since the packets were written but the client would not open a new GET request. There are now two distinct events: "ready" and "drain" Related: https://github.com/socketio/engine.io/issues/695 [1]: https://github.com/socketio/engine.io/commit/2a93f06e276f3adb76e494ae9198da38799d493c --- lib/socket.ts | 5 +++-- lib/transports-uws/polling.ts | 3 ++- lib/transports-uws/websocket.ts | 3 ++- lib/transports/polling.ts | 3 ++- lib/transports/websocket.ts | 3 ++- lib/transports/webtransport.ts | 3 ++- lib/userver.ts | 2 +- test/server.js | 23 +++++++++++++++++++++++ 8 files changed, 37 insertions(+), 8 deletions(-) diff --git a/lib/socket.ts b/lib/socket.ts index 0febe90e..1dfa460b 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -221,18 +221,21 @@ export class Socket extends EventEmitter { */ private setTransport(transport) { const onError = this.onError.bind(this); + const onReady = () => this.flush(); const onPacket = this.onPacket.bind(this); const onDrain = this.onDrain.bind(this); const onClose = this.onClose.bind(this, "transport close"); this.transport = transport; this.transport.once("error", onError); + this.transport.on("ready", onReady); this.transport.on("packet", onPacket); this.transport.on("drain", onDrain); this.transport.once("close", onClose); this.cleanupFn.push(function () { transport.removeListener("error", onError); + transport.removeListener("ready", onReady); transport.removeListener("packet", onPacket); transport.removeListener("drain", onDrain); transport.removeListener("close", onClose); @@ -245,8 +248,6 @@ export class Socket extends EventEmitter { * @private */ private onDrain() { - this.flush(); - if (this.sentCallbackFn.length > 0) { debug("executing batch send callback"); const seqFn = this.sentCallbackFn.shift(); diff --git a/lib/transports-uws/polling.ts b/lib/transports-uws/polling.ts index 3fa30d90..01a82f6e 100644 --- a/lib/transports-uws/polling.ts +++ b/lib/transports-uws/polling.ts @@ -97,7 +97,7 @@ export class Polling extends Transport { res.onAborted(onClose); this.writable = true; - this.emit("drain"); + this.emit("ready"); // if we're still writable but had a pending close, trigger an empty send if (this.writable && this.shouldClose) { @@ -291,6 +291,7 @@ export class Polling extends Transport { debug('writing "%s"', data); this.doWrite(data, options, () => { this.req.cleanup(); + this.emit("drain"); }); } diff --git a/lib/transports-uws/websocket.ts b/lib/transports-uws/websocket.ts index 4de3c23e..d9aaf2ae 100644 --- a/lib/transports-uws/websocket.ts +++ b/lib/transports-uws/websocket.ts @@ -60,8 +60,9 @@ export class WebSocket extends Transport { this.socket.send(data, isBinary, compress); if (isLast) { - this.writable = true; this.emit("drain"); + this.writable = true; + this.emit("ready"); } }; diff --git a/lib/transports/polling.ts b/lib/transports/polling.ts index 598315b7..09af20fb 100644 --- a/lib/transports/polling.ts +++ b/lib/transports/polling.ts @@ -96,7 +96,7 @@ export class Polling extends Transport { req.on("close", onClose); this.writable = true; - this.emit("drain"); + this.emit("ready"); // if we're still writable but had a pending close, trigger an empty send if (this.writable && this.shouldClose) { @@ -258,6 +258,7 @@ export class Polling extends Transport { debug('writing "%s"', data); this.doWrite(data, options, () => { this.req.cleanup(); + this.emit("drain"); }); } diff --git a/lib/transports/websocket.ts b/lib/transports/websocket.ts index e4cc2623..788094f4 100644 --- a/lib/transports/websocket.ts +++ b/lib/transports/websocket.ts @@ -103,8 +103,9 @@ export class WebSocket extends Transport { if (err) { this.onError("write error", err.stack); } else { - this.writable = true; this.emit("drain"); + this.writable = true; + this.emit("ready"); } }; diff --git a/lib/transports/webtransport.ts b/lib/transports/webtransport.ts index 663a0f66..07852282 100644 --- a/lib/transports/webtransport.ts +++ b/lib/transports/webtransport.ts @@ -56,8 +56,9 @@ export class WebTransport extends Transport { debug("error while writing: %s", e.message); } - this.writable = true; this.emit("drain"); + this.writable = true; + this.emit("ready"); } doClose(fn) { diff --git a/lib/userver.ts b/lib/userver.ts index 9d1eeb77..c7c91c76 100644 --- a/lib/userver.ts +++ b/lib/userver.ts @@ -80,7 +80,7 @@ export class uServer extends BaseServer { const transport = ws.getUserData().transport; transport.socket = ws; transport.writable = true; - transport.emit("drain"); + transport.emit("ready"); }, message: (ws, message, isBinary) => { ws.getUserData().transport.onData( diff --git a/test/server.js b/test/server.js index c7c914bb..129b07c3 100644 --- a/test/server.js +++ b/test/server.js @@ -2701,6 +2701,29 @@ describe("server", () => { }); }); + it("should execute when message sent during polling upgrade window", (done) => { + const engine = listen((port) => { + const socket = new ClientSocket(`ws://localhost:${port}`, { + transports: ["polling", "websocket"], + }); + + const partialDone = createPartialDone(() => { + engine.httpServer?.close(); + socket.close(); + done(); + }, 2); + + engine.on("connection", (conn) => { + conn.on("upgrading", () => { + conn.send("a", partialDone); + }); + }); + socket.on("open", () => { + socket.on("message", partialDone); + }); + }); + }); + it("should execute when message sent (websocket)", (done) => { const engine = listen({ allowUpgrades: false }, (port) => { const socket = new ClientSocket(`ws://localhost:${port}`, {