From 76010a2eba0f9732b98ec23290413e8f9f3bdb2c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 25 Sep 2020 18:50:49 +0200 Subject: [PATCH] fix: Client.stream writableNeedDrain Fixes: https://github.com/nodejs/undici/issues/441 Refs: https://github.com/nodejs/node/pull/35348 Refs: https://github.com/nodejs/node/issues/35341 --- lib/client-stream.js | 6 ++++++ lib/core/request.js | 1 + test/client-stream.js | 45 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/lib/client-stream.js b/lib/client-stream.js index d3188cfd956..6d9f75d4035 100644 --- a/lib/client-stream.js +++ b/lib/client-stream.js @@ -109,6 +109,12 @@ class StreamHandler extends AsyncResource { }) this.res = res + + const needDrain = res.writableNeedDrain !== undefined + ? res.writableNeedDrain + : res._writableState && res._writableState.needDrain + + return needDrain !== true } onData (chunk) { diff --git a/lib/core/request.js b/lib/core/request.js index 5ebc83eef56..d3828e6ec45 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -156,6 +156,7 @@ class Request { onBody (chunk, offset, length) { assert(!this.aborted) + assert(!this[kPaused]) if (this[kTimeout] && this[kTimeout].refresh) { this[kTimeout].refresh() diff --git a/test/client-stream.js b/test/client-stream.js index d36d9fc1a10..7ca8f38d4b7 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -657,3 +657,48 @@ test('stream body destroyed on invalid callback', (t) => { } }) }) + +test('stream needDrain', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end(Buffer.alloc(4096)) + }) + t.tearDown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.tearDown(() => { + console.error(3) + client.destroy() + }) + + const dst = new PassThrough() + dst.pause() + + while (dst.write(Buffer.alloc(4096))) { + + } + + const orgWrite = dst.write + dst.write = () => t.fail() + const p = client.stream({ + path: '/', + method: 'GET' + }, () => { + return dst + }) + + setTimeout(() => { + dst.write = (...args) => { + console.error("ASD") + orgWrite.call(dst, ...args) + } + dst.resume() + }, 1e3) + + await p + + t.pass() + }) +})