diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 1e42b15363fb36..41f1c52bc42288 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -140,8 +140,8 @@ function ReadableState(options, stream, isDuplex) { // Has it been destroyed this.destroyed = false; - // Indicates whether the stream has errored. - this.errored = false; + // Indicates whether the stream has errored. Contains the error. + this.errored = null; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 7753eaf1f1c674..7f0974fbb41602 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -171,7 +171,8 @@ function WritableState(options, stream, isDuplex) { // Indicates whether the stream has errored. When true all write() calls // should return false. This is needed since when autoDestroy // is disabled we need a way to tell whether the stream has failed. - this.errored = false; + // Contains the error. + this.errored = null; // Count buffered requests this.bufferedRequestCount = 0; @@ -488,7 +489,12 @@ function onwrite(stream, er) { state.writelen = 0; if (er) { - state.errored = true; + if (!state.errored) { + state.errored = er; + } + if (stream._readableState && !stream._readableState.errored) { + stream._readableState.errored = er; + } if (sync) { process.nextTick(onwriteError, stream, state, er, cb); } else { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index fb206c6c83d0a0..2e50ca9d1a70d8 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -8,21 +8,19 @@ function destroy(err, cb) { const w = this._writableState; if (err) { - if (w) { - w.errored = true; + if (w && !w.errored) { + w.errored = err; } - if (r) { - r.errored = true; + if (r && !r.errored) { + r.errored = err; } } if ((w && w.destroyed) || (r && r.destroyed)) { if (cb) { - cb(err); - } else if (err) { - process.nextTick(emitErrorNT, this, err); + // TODO(ronag): Wait until 'close' is emitted (if not already emitted). + cb((w && w.errored) || (r && r.errored)); } - return this; } @@ -38,32 +36,30 @@ function destroy(err, cb) { this._destroy(err || null, (err) => { if (err) { - if (w) { - w.errored = true; + if (w && !w.errored) { + w.errored = err; } - if (r) { - r.errored = true; + if (r && !r.errored) { + r.errored = err; } } if (cb) { // Invoke callback before scheduling emitClose so that callback // can schedule before. - cb(err); + cb((w && w.errored) || (r && r.errored)); // Don't emit 'error' if passed a callback. process.nextTick(emitCloseNT, this); - } else if (err) { - process.nextTick(emitErrorCloseNT, this, err); } else { - process.nextTick(emitCloseNT, this); + process.nextTick(emitErrorCloseNT, this); } }); return this; } -function emitErrorCloseNT(self, err) { - emitErrorNT(self, err); +function emitErrorCloseNT(self) { + emitErrorNT(self); emitCloseNT(self); } @@ -76,10 +72,16 @@ function emitCloseNT(self) { } } -function emitErrorNT(self, err) { +function emitErrorNT(self) { const r = self._readableState; const w = self._writableState; + const err = (w && w.errored) || (r && r.errored); + + if (!err) { + return; + } + if ((w && w.errorEmitted) || (r && r.errorEmitted)) { return; } @@ -100,7 +102,7 @@ function undestroy() { if (r) { r.destroyed = false; - r.errored = false; + r.errored = null; r.reading = false; r.ended = false; r.endEmitted = false; @@ -109,7 +111,7 @@ function undestroy() { if (w) { w.destroyed = false; - w.errored = false; + w.errored = null; w.ended = false; w.ending = false; w.finalCalled = false; @@ -132,13 +134,13 @@ function errorOrDestroy(stream, err) { if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err); else if (err) { - if (w) { - w.errored = true; + if (w && !w.errored) { + w.errored = err; } - if (r) { - r.errored = true; + if (r && !r.errored) { + r.errored = err; } - emitErrorNT(stream, err); + emitErrorNT(stream); } } diff --git a/test/parallel/test-stream-auto-destroy.js b/test/parallel/test-stream-auto-destroy.js index 2a1a5190debb57..22ca09bccaeb5f 100644 --- a/test/parallel/test-stream-auto-destroy.js +++ b/test/parallel/test-stream-auto-destroy.js @@ -86,13 +86,14 @@ const assert = require('assert'); { const r = new stream.Readable({ read() { - r2.emit('error', new Error('fail')); + r2.destroy(new Error('fail')); } }); const r2 = new stream.Readable({ autoDestroy: true, destroy: common.mustCall((err, cb) => cb()) }); + r2.on('error', common.mustCall()); r.pipe(r2); } @@ -100,13 +101,14 @@ const assert = require('assert'); { const r = new stream.Readable({ read() { - w.emit('error', new Error('fail')); + w.destroy(new Error('fail')); } }); const w = new stream.Writable({ autoDestroy: true, destroy: common.mustCall((err, cb) => cb()) }); + w.on('error', common.mustCall()); r.pipe(w); } diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js index 3c38d2c364051c..5c9b0937078e5e 100644 --- a/test/parallel/test-stream-duplex-destroy.js +++ b/test/parallel/test-stream-duplex-destroy.js @@ -76,8 +76,8 @@ const assert = require('assert'); duplex.on('end', common.mustNotCall('no end event')); duplex.on('finish', common.mustNotCall('no finish event')); - // Error is swallowed by the custom _destroy - duplex.on('error', common.mustNotCall('no error event')); + // Error is NOT swallowed by the custom _destroy + duplex.on('error', common.mustCall()); duplex.on('close', common.mustCall()); duplex.destroy(expected); diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index d446dd3611781e..1ac77d4220c0a7 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -69,8 +69,8 @@ const assert = require('assert'); read.on('end', common.mustNotCall('no end event')); - // Error is swallowed by the custom _destroy - read.on('error', common.mustNotCall('no error event')); + // Error is NOT swallowed by the custom _destroy + read.on('error', common.mustCall()); read.on('close', common.mustCall()); read.destroy(expected); @@ -134,13 +134,13 @@ const assert = require('assert'); read.on('error', common.mustCall((err) => { assert.strictEqual(ticked, true); assert.strictEqual(read._readableState.errorEmitted, true); - assert.strictEqual(read._readableState.errored, true); + assert.strictEqual(read._readableState.errored, expected); assert.strictEqual(err, expected); })); read.destroy(); assert.strictEqual(read._readableState.errorEmitted, false); - assert.strictEqual(read._readableState.errored, true); + assert.strictEqual(read._readableState.errored, expected); assert.strictEqual(read.destroyed, true); ticked = true; } @@ -190,15 +190,15 @@ const assert = require('assert'); // destroy(err, callback); read.on('error', common.mustNotCall()); - assert.strictEqual(read._readableState.errored, false); + assert.strictEqual(read._readableState.errored, null); assert.strictEqual(read._readableState.errorEmitted, false); read.destroy(expected, common.mustCall(function(err) { - assert.strictEqual(read._readableState.errored, true); + assert.strictEqual(read._readableState.errored, expected); assert.strictEqual(err, expected); })); assert.strictEqual(read._readableState.errorEmitted, false); - assert.strictEqual(read._readableState.errored, true); + assert.strictEqual(read._readableState.errored, expected); ticked = true; } @@ -223,14 +223,15 @@ const assert = require('assert'); readable.destroy(); assert.strictEqual(readable.destroyed, true); - assert.strictEqual(readable._readableState.errored, false); + assert.strictEqual(readable._readableState.errored, null); assert.strictEqual(readable._readableState.errorEmitted, false); // Test case where `readable.destroy()` is called again with an error before // the `_destroy()` callback is called. - readable.destroy(new Error('kaboom 2')); + const expected = new Error('kaboom 2'); + readable.destroy(expected); assert.strictEqual(readable._readableState.errorEmitted, false); - assert.strictEqual(readable._readableState.errored, true); + assert.strictEqual(readable._readableState.errored, expected); ticked = true; } diff --git a/test/parallel/test-stream-transform-destroy.js b/test/parallel/test-stream-transform-destroy.js index c594d9989ae4de..38764ebe0fb92b 100644 --- a/test/parallel/test-stream-transform-destroy.js +++ b/test/parallel/test-stream-transform-destroy.js @@ -72,8 +72,8 @@ const assert = require('assert'); transform.on('close', common.mustCall()); transform.on('finish', common.mustNotCall('no finish event')); - // Error is swallowed by the custom _destroy - transform.on('error', common.mustNotCall('no error event')); + // Error is NOT swallowed by the custom _destroy + transform.on('error', common.mustCall()); transform.destroy(expected); } diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index d321d808199d56..6bbb04fbbb4c40 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -83,8 +83,33 @@ const assert = require('assert'); write.on('finish', common.mustNotCall('no finish event')); write.on('close', common.mustCall()); - // Error is swallowed by the custom _destroy - write.on('error', common.mustNotCall('no error event')); + // Error is NOT swallowed by the custom _destroy + write.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + write.destroy(expected); + assert.strictEqual(write.destroyed, true); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); }, + destroy: common.mustCall(function(err, cb) { + assert.strictEqual(err, expected); + cb(new Error('not this error')); + }) + }); + + const expected = new Error('kaboom'); + + write.on('finish', common.mustNotCall('no finish event')); + write.on('close', common.mustCall()); + + // Error is NOT overriden by the custom _destroy + write.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); write.destroy(expected); assert.strictEqual(write.destroyed, true); @@ -167,9 +192,10 @@ const assert = require('assert'); assert.strictEqual(write._writableState.errorEmitted, true); })); - write.destroy(new Error('kaboom 1')); + const expected = new Error('kaboom 1'); + write.destroy(expected); write.destroy(new Error('kaboom 2')); - assert.strictEqual(write._writableState.errored, true); + assert.strictEqual(write._writableState.errored, expected); assert.strictEqual(write._writableState.errorEmitted, false); assert.strictEqual(write.destroyed, true); ticked = true; @@ -198,14 +224,15 @@ const assert = require('assert'); writable.destroy(); assert.strictEqual(writable.destroyed, true); - assert.strictEqual(writable._writableState.errored, false); + assert.strictEqual(writable._writableState.errored, null); assert.strictEqual(writable._writableState.errorEmitted, false); // Test case where `writable.destroy()` is called again with an error before // the `_destroy()` callback is called. - writable.destroy(new Error('kaboom 2')); + const expected = new Error('kaboom 2'); + writable.destroy(expected); assert.strictEqual(writable._writableState.errorEmitted, false); - assert.strictEqual(writable._writableState.errored, true); + assert.strictEqual(writable._writableState.errored, expected); ticked = true; } @@ -249,6 +276,9 @@ const assert = require('assert'); write.destroy(expected, common.mustCall(function(err) { assert.strictEqual(err, expected); })); + write.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); } {