Skip to content

Commit

Permalink
stream: support dispose in writable
Browse files Browse the repository at this point in the history
Add support to Symbol.asyncDispose in writable streams.
Additionally add a test for writable, transform and duplex streams
who inherit from readable/writable to avoid breakage.
  • Loading branch information
benjamingr committed Jun 25, 2023
1 parent 71d7707 commit 63aad0a
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 14 deletions.
11 changes: 11 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,17 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Writable` stream.

##### `writable[Symbol.asyncDispose]()`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
Calls [`writable.destroy()`][writable-destroy] with an `AbortError` and returns
a promise that fulfills when the stream is finished.

##### `writable.write(chunk[, encoding][, callback])`

<!-- YAML
Expand Down
43 changes: 29 additions & 14 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ const {
ObjectDefineProperty,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
StringPrototypeToLowerCase,
Symbol,
SymbolAsyncDispose,
SymbolHasInstance,
} = primordials;

Expand All @@ -44,6 +46,7 @@ const EE = require('events');
const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');

const {
addAbortSignal,
Expand All @@ -54,16 +57,19 @@ const {
getDefaultHighWaterMark,
} = require('internal/streams/state');
const {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
} = require('internal/errors').codes;
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
},
} = require('internal/errors');

const { errorOrDestroy } = destroyImpl;

Expand Down Expand Up @@ -477,7 +483,7 @@ function onwrite(stream, er) {
// rather just increase a counter, to improve performance and avoid
// memory allocations.
if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.count++;
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
Expand Down Expand Up @@ -538,9 +544,9 @@ function errorBuffer(state) {
// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if (state.corked ||
state.bufferProcessing ||
state.destroyed ||
!state.constructed) {
state.bufferProcessing ||
state.destroyed ||
!state.constructed) {
return;
}

Expand Down Expand Up @@ -934,3 +940,12 @@ Writable.fromWeb = function(writableStream, options) {
Writable.toWeb = function(streamWritable) {
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
};

Writable.prototype[SymbolAsyncDispose] = function() {
let error;
if (!this.destroyed) {
error = this.readableEnded ? null : new AbortError();
this.destroy(error);
}
return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))));
};
14 changes: 14 additions & 0 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,17 @@ const assert = require('assert');
duplex.on('close', common.mustCall());
controller.abort();
}
{
// Check Symbol.asyncDispose
const duplex = new Duplex({
write(chunk, enc, cb) { cb(); },
read() {},
});
let count = 0;
duplex.on('error', common.mustCall((e) => {
assert.strictEqual(count++, 0); // Ensure not called twice
assert.strictEqual(e.name, 'AbortError');
}));
duplex.on('close', common.mustCall());
duplex[Symbol.asyncDispose]().then(common.mustCall());
}
11 changes: 11 additions & 0 deletions test/parallel/test-stream-transform-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,14 @@ const assert = require('assert');

transform.destroy();
}

{
const transform = new Transform({
transform(chunk, enc, cb) {}
});
transform.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
transform.on('clocse', common.mustCall());
transform[Symbol.asyncDispose]().then(common.mustCall());
}
12 changes: 12 additions & 0 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,15 @@ const assert = require('assert');
}));
s.destroy(_err);
}

{
const write = new Writable({
write(chunk, enc, cb) { cb(); }
});

write.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
assert.strictEqual(write.destroyed, true);
}));
write[Symbol.asyncDispose]().then(common.mustCall());
}

0 comments on commit 63aad0a

Please sign in to comment.