diff --git a/package.json b/package.json index 70574c8..08a8ae4 100644 --- a/package.json +++ b/package.json @@ -11,10 +11,10 @@ "node": ">=12" }, "scripts": { - "build": "del dist && tsc", - "test": "xo && npm run build && nyc ava", + "build": "tsc --build --clean", + "test": "xo && ava", "bench": "node --loader=ts-node/esm bench.ts", - "prepare": "npm run build" + "prepare": "tsc --build --clean" }, "files": [ "dist" @@ -50,18 +50,15 @@ "@sindresorhus/tsconfig": "^2.0.0", "@types/benchmark": "^2.1.1", "@types/node": "^17.0.13", - "ava": "^4.0.1", + "ava": "^5.1.1", "benchmark": "^2.1.4", - "codecov": "^3.8.3", - "del-cli": "^4.0.1", "delay": "^5.0.0", "in-range": "^3.0.0", - "nyc": "^15.1.0", "p-defer": "^4.0.0", "random-int": "^3.0.0", "time-span": "^5.0.0", - "ts-node": "^10.4.0", - "typescript": "^4.5.5", + "ts-node": "^10.9.1", + "typescript": "^4.8.4", "xo": "^0.44.0" }, "ava": { diff --git a/source/index.ts b/source/index.ts index b91c61e..0e9bda5 100644 --- a/source/index.ts +++ b/source/index.ts @@ -8,8 +8,6 @@ type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -const timeoutError = new TimeoutError(); - /** The error thrown by `queue.add()` when a job is aborted before it is run. See `signal`. */ @@ -41,7 +39,7 @@ export default class PQueue QueueType; - #pendingCount = 0; + #pending = 0; // The `!` is needed because of https://github.com/microsoft/TypeScript/issues/32194 #concurrency!: number; @@ -96,23 +94,15 @@ export default class PQueue { + return new Promise((_resolve, reject) => { + signal.addEventListener('abort', () => { + // TODO: Reject with signal.throwIfAborted() when targeting Node.js 18 + // TODO: Use ABORT_ERR code when targeting Node.js 16 (https://nodejs.org/docs/latest-v16.x/api/errors.html#abort_err) + reject(new AbortError('The task was aborted.')); + }, {once: true}); + }); + } + /** Adds a sync or async task to the queue. Always returns a promise. */ - async add(fn: Task, options: Partial = {}): Promise { - return new Promise((resolve, reject) => { - const run = async (): Promise => { - this.#pendingCount++; + async add(function_: Task, options?: Partial): Promise; + async add(function_: Task, options: {throwOnTimeout: true} & Exclude): Promise; + async add(function_: Task, options: Partial = {}): Promise { + options = { + timeout: this.timeout, + throwOnTimeout: this.#throwOnTimeout, + ...options, + }; + + return new Promise((resolve, reject) => { + this.#queue.enqueue(async () => { + this.#pending++; this.#intervalCount++; try { + // TODO: Use options.signal?.throwIfAborted() when targeting Node.js 18 if (options.signal?.aborted) { // TODO: Use ABORT_ERR code when targeting Node.js 16 (https://nodejs.org/docs/latest-v16.x/api/errors.html#abort_err) - reject(new AbortError('The task was aborted.')); - return; + throw new AbortError('The task was aborted.'); } - const operation = (this.timeout === undefined && options.timeout === undefined) ? fn({signal: options.signal}) : pTimeout( - Promise.resolve(fn({signal: options.signal})), - (options.timeout === undefined ? this.timeout : options.timeout)!, - () => { - if (options.throwOnTimeout === undefined ? this.#throwOnTimeout : options.throwOnTimeout) { - reject(timeoutError); - } + let operation = function_({signal: options.signal}); - return undefined; - }, - ); + if (options.timeout) { + operation = pTimeout(Promise.resolve(operation), options.timeout); + } + + if (options.signal) { + operation = Promise.race([operation, this.#throwOnAbort(options.signal)]); + } const result = await operation; - resolve(result!); + resolve(result); this.emit('completed', result); } catch (error: unknown) { + if (error instanceof TimeoutError && !options.throwOnTimeout) { + resolve(); + return; + } + reject(error); this.emit('error', error); + } finally { + this.#next(); } + }, options); - this.#next(); - }; + this.emit('add'); - this.#queue.enqueue(run, options); this.#tryToStartAnother(); - this.emit('add'); }); } @@ -282,8 +297,16 @@ export default class PQueue( functions: ReadonlyArray>, - options?: EnqueueOptionsType, - ): Promise { + options?: Partial, + ): Promise>; + async addAll( + functions: ReadonlyArray>, + options?: {throwOnTimeout: true} & Partial>, + ): Promise + async addAll( + functions: ReadonlyArray>, + options?: Partial, + ): Promise> { return Promise.all(functions.map(async function_ => this.add(function_, options))); } @@ -352,7 +375,7 @@ export default class PQueue { // Instantly resolve if none pending and if nothing else is queued - if (this.#pendingCount === 0 && this.#queue.size === 0) { + if (this.#pending === 0 && this.#queue.size === 0) { return; } @@ -395,7 +418,7 @@ export default class PQueue { autoStart: false, }); - const values = [0, 1, 2, 3]; - const firstValue = [0, 1]; + const values = [0, 1, 2, 3]; + const firstValue = [0, 1]; const secondValue = [0, 1, 2, 3]; for (const value of values) { @@ -1115,3 +1115,22 @@ test('should pass AbortSignal instance to job', async t => { t.is(controller.signal, signal!); }, {signal: controller.signal}); }); + +test('aborting multiple jobs at the same time', async t => { + const queue = new PQueue({concurrency: 1}); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const task1 = queue.add(async () => new Promise(() => {}), {signal: controller1.signal}); // eslint-disable-line @typescript-eslint/no-empty-function + const task2 = queue.add(async () => new Promise(() => {}), {signal: controller2.signal}); // eslint-disable-line @typescript-eslint/no-empty-function + + setTimeout(() => { + controller1.abort(); + controller2.abort(); + }, 0); + + await t.throwsAsync(task1, {instanceOf: AbortError}); + await t.throwsAsync(task2, {instanceOf: AbortError}); + t.like(queue, {size: 0, pending: 0}); +}); diff --git a/tsconfig.json b/tsconfig.json index 19f10b8..032ec28 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,8 +1,7 @@ { "extends": "@sindresorhus/tsconfig", "compilerOptions": { - "outDir": "dist", - "noPropertyAccessFromIndexSignature": false + "outDir": "dist" }, "include": [ "source"