Skip to content

Commit

Permalink
stream: use synchronous error validation & validate abort signal option
Browse files Browse the repository at this point in the history
made sure top level methods aren't async/generators
so that validation errors could be caught synchronously
also added validation for the abort signal option
  • Loading branch information
iMoses committed Jan 30, 2022
1 parent 253f934 commit 49b4925
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 81 deletions.
189 changes: 130 additions & 59 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ const {
},
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const {
validateAbortSignal,
validateInteger,
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');

Expand Down Expand Up @@ -38,6 +41,10 @@ function map(fn, options) {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}

if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

let concurrency = 1;
if (options?.concurrency != null) {
concurrency = MathFloor(options.concurrency);
Expand Down Expand Up @@ -161,17 +168,35 @@ function map(fn, options) {
}.call(this);
}

async function* asIndexedPairs(options) {
let index = 0;
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
yield [index++, val];
function asIndexedPairs(options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}

if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

return async function* asIndexedPairs() {
let index = 0;
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
yield [index++, val];
}
}.call(this);
}

async function some(fn, options) {
function some(fn, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}

if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
const ac = new AbortController();
Expand All @@ -185,27 +210,32 @@ async function some(fn, options) {
});
}
const mapped = this.map(fn, { ...options, signal: ac.signal });
for await (const result of mapped) {
if (result) {
ac.abort();
return true;
return async function some() {
for await (const result of mapped) {
if (result) {
ac.abort();
return true;
}
}
}
return false;
return false;
}.call(this);
}

async function every(fn, options) {
function every(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (...args) => {
const somePromise = some.call(this, async (...args) => {
return !(await fn(...args));
}, options));
}, options);
return async function every() {
return !(await somePromise);
}.call(this);
}

async function forEach(fn, options) {
function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
Expand All @@ -214,8 +244,11 @@ async function forEach(fn, options) {
await fn(value, options);
return kEmpty;
}
// eslint-disable-next-line no-unused-vars
for await (const unused of this.map(forEachFn, options));
const values = this.map(forEachFn, options);
return async function forEach() {
// eslint-disable-next-line no-unused-vars
for await (const unused of values);
}.call(this);
}

function filter(fn, options) {
Expand All @@ -241,56 +274,78 @@ class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
}
}

async function reduce(reducer, initialValue, options) {
function reduce(reducer, initialValue, options) {
if (typeof reducer !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'reducer', ['Function', 'AsyncFunction'], reducer);
}
let hasInitialValue = arguments.length > 1;
if (options?.signal?.aborted) {
const err = new AbortError(undefined, { cause: options.signal.reason });
this.once('error', () => {}); // The error is already propagated
await finished(this.destroy(err));
throw err;

if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
const ac = new AbortController();
const signal = ac.signal;
if (options?.signal) {
const opts = { once: true, [kWeakHandler]: this };
options.signal.addEventListener('abort', () => ac.abort(), opts);

if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}
let gotAnyItemFromStream = false;
try {
for await (const value of this) {
gotAnyItemFromStream = true;
if (options?.signal?.aborted) {
throw new AbortError();

let hasInitialValue = arguments.length > 1;

return async function reduce() {
if (options?.signal?.aborted) {
const err = new AbortError(undefined, { cause: options.signal.reason });
this.once('error', () => {}); // The error is already propagated
await finished(this.destroy(err));
throw err;
}
const ac = new AbortController();
const signal = ac.signal;
if (options?.signal) {
const opts = { once: true, [kWeakHandler]: this };
options.signal.addEventListener('abort', () => ac.abort(), opts);
}
let gotAnyItemFromStream = false;
try {
for await (const value of this) {
gotAnyItemFromStream = true;
if (options?.signal?.aborted) {
throw new AbortError();
}
if (!hasInitialValue) {
initialValue = value;
hasInitialValue = true;
} else {
initialValue = await reducer(initialValue, value, { signal });
}
}
if (!hasInitialValue) {
initialValue = value;
hasInitialValue = true;
} else {
initialValue = await reducer(initialValue, value, { signal });
if (!gotAnyItemFromStream && !hasInitialValue) {
throw new ReduceAwareErrMissingArgs();
}
} finally {
ac.abort();
}
if (!gotAnyItemFromStream && !hasInitialValue) {
throw new ReduceAwareErrMissingArgs();
}
} finally {
ac.abort();
}
return initialValue;
return initialValue;
}.call(this);
}

async function toArray(options) {
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError(undefined, { cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
function toArray(options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}

if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}
return result;

return async function toArray() {
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError(undefined, { cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
return result;
}.call(this);
}

function flatMap(fn, options) {
Expand All @@ -316,6 +371,14 @@ function toIntegerOrInfinity(number) {
}

function drop(number, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}

if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

number = toIntegerOrInfinity(number);
return async function* drop() {
if (options?.signal?.aborted) {
Expand All @@ -334,6 +397,14 @@ function drop(number, options) {


function take(number, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}

if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
Expand Down
8 changes: 7 additions & 1 deletion test/parallel/test-stream-asIndexedPairs.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import '../common/index.mjs';
import { Readable } from 'stream';
import { deepStrictEqual, rejects } from 'assert';
import { deepStrictEqual, rejects, throws } from 'assert';

{
// asIndexedPairs with a synchronous stream
Expand Down Expand Up @@ -45,3 +45,9 @@ import { deepStrictEqual, rejects } from 'assert';
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
}, /AbortError/);
}

{
// Error cases
throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/);
}
6 changes: 6 additions & 0 deletions test/parallel/test-stream-drop-take.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,10 @@ const naturals = () => from(async function*() {
for (const example of invalidArgs) {
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
}

throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);

throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
1 change: 1 addition & 0 deletions test/parallel/test-stream-flatMap.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ function oneTo5() {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down
17 changes: 6 additions & 11 deletions test/parallel/test-stream-forEach.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,12 @@ const { setTimeout } = require('timers/promises');

{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).forEach(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).forEach((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).forEach((x) => x, 1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.throws(() => Readable.from([1]).forEach(1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).forEach((x) => x, {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).forEach((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).forEach((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Promise
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ const { setTimeout } = require('timers/promises');
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down
8 changes: 6 additions & 2 deletions test/parallel/test-stream-reduce.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
Readable,
} = require('stream');
const assert = require('assert');
const {throws} = require("assert");

function sum(p, c) {
return p + c;
Expand Down Expand Up @@ -119,8 +120,11 @@ function sum(p, c) {

{
// Error cases
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
assert.throws(() => Readable.from([]).reduce(1), /TypeError/);
assert.throws(() => Readable.from([]).reduce('5'), /TypeError/);

assert.throws(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}

{
Expand Down
21 changes: 13 additions & 8 deletions test/parallel/test-stream-some-every.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,17 @@ function oneTo5Async() {
}
{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).every(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).every((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.throws(() => Readable.from([1]).some(1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).some((x) => x, {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).some((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).some((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);

assert.throws(() => Readable.from([1]).every(1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).every((x) => x, {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).every((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).every((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
Loading

0 comments on commit 49b4925

Please sign in to comment.