Skip to content

Commit

Permalink
Fix .add() and .addAll()
Browse files Browse the repository at this point in the history
Fixes #158, fixes #168

Signed-off-by: Richie Bendall <richiebendall@gmail.com>
  • Loading branch information
Richienb committed Feb 3, 2023
1 parent c905aaf commit 8184655
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 55 deletions.
15 changes: 6 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
"node": ">=12"
},
"scripts": {
"build": "del dist && tsc",
"test": "xo && npm run build && nyc ava",
"build": "tsc --build --clean",
"test": "xo && ava",
"bench": "node --loader=ts-node/esm bench.ts",
"prepare": "npm run build"
"prepare": "tsc --build --clean"
},
"files": [
"dist"
Expand Down Expand Up @@ -50,18 +50,15 @@
"@sindresorhus/tsconfig": "^2.0.0",
"@types/benchmark": "^2.1.1",
"@types/node": "^17.0.13",
"ava": "^4.0.1",
"ava": "^5.1.1",
"benchmark": "^2.1.4",
"codecov": "^3.8.3",
"del-cli": "^4.0.1",
"delay": "^5.0.0",
"in-range": "^3.0.0",
"nyc": "^15.1.0",
"p-defer": "^4.0.0",
"random-int": "^3.0.0",
"time-span": "^5.0.0",
"ts-node": "^10.4.0",
"typescript": "^4.5.5",
"ts-node": "^10.9.1",
"typescript": "^4.8.4",
"xo": "^0.44.0"
},
"ava": {
Expand Down
107 changes: 65 additions & 42 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ type Task<TaskResultType> =
| ((options: TaskOptions) => PromiseLike<TaskResultType>)
| ((options: TaskOptions) => TaskResultType);

const timeoutError = new TimeoutError();

/**
The error thrown by `queue.add()` when a job is aborted before it is run. See `signal`.
*/
Expand Down Expand Up @@ -41,7 +39,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT

readonly #queueClass: new () => QueueType;

#pendingCount = 0;
#pending = 0;

// The `!` is needed because of https://github.com/microsoft/TypeScript/issues/32194
#concurrency!: number;
Expand Down Expand Up @@ -96,23 +94,15 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
}

get #doesConcurrentAllowAnother(): boolean {
return this.#pendingCount < this.#concurrency;
return this.#pending < this.#concurrency;
}

#next(): void {
this.#pendingCount--;
this.#pending--;
this.#tryToStartAnother();
this.emit('next');
}

#emitEvents(): void {
this.emit('empty');

if (this.#pendingCount === 0) {
this.emit('idle');
}
}

#onResumeInterval(): void {
this.#onInterval();
this.#initializeIntervalIfNeeded();
Expand All @@ -127,7 +117,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
if (delay < 0) {
// Act as the interval was done
// We don't need to resume it here because it will be resumed on line 160
this.#intervalCount = (this.#carryoverConcurrencyCount) ? this.#pendingCount : 0;
this.#intervalCount = (this.#carryoverConcurrencyCount) ? this.#pending : 0;
} else {
// Act as the interval is pending
if (this.#timeoutId === undefined) {
Expand Down Expand Up @@ -156,7 +146,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT

this.#intervalId = undefined;

this.#emitEvents();
this.emit('empty');

if (this.#pending === 0) {
this.emit('idle');
}

return false;
}
Expand Down Expand Up @@ -199,12 +193,12 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
}

#onInterval(): void {
if (this.#intervalCount === 0 && this.#pendingCount === 0 && this.#intervalId) {
if (this.#intervalCount === 0 && this.#pending === 0 && this.#intervalId) {
clearInterval(this.#intervalId);
this.#intervalId = undefined;
}

this.#intervalCount = this.#carryoverConcurrencyCount ? this.#pendingCount : 0;
this.#intervalCount = this.#carryoverConcurrencyCount ? this.#pending : 0;
this.#processQueue();
}

Expand All @@ -230,48 +224,69 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
this.#processQueue();
}

async #throwOnAbort(signal: AbortSignal): Promise<never> {
return new Promise((_resolve, reject) => {
signal.addEventListener('abort', () => {
// TODO: Reject with signal.throwIfAborted() when targeting Node.js 18
// TODO: Use ABORT_ERR code when targeting Node.js 16 (https://nodejs.org/docs/latest-v16.x/api/errors.html#abort_err)
reject(new AbortError('The task was aborted.'));
}, {once: true});
});
}

/**
Adds a sync or async task to the queue. Always returns a promise.
*/
async add<TaskResultType>(fn: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType> {
return new Promise<TaskResultType>((resolve, reject) => {
const run = async (): Promise<void> => {
this.#pendingCount++;
async add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
async add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> {
options = {
timeout: this.timeout,
throwOnTimeout: this.#throwOnTimeout,
...options,
};

return new Promise((resolve, reject) => {
this.#queue.enqueue(async () => {
this.#pending++;
this.#intervalCount++;

try {
// TODO: Use options.signal?.throwIfAborted() when targeting Node.js 18
if (options.signal?.aborted) {
// TODO: Use ABORT_ERR code when targeting Node.js 16 (https://nodejs.org/docs/latest-v16.x/api/errors.html#abort_err)
reject(new AbortError('The task was aborted.'));
return;
throw new AbortError('The task was aborted.');
}

const operation = (this.timeout === undefined && options.timeout === undefined) ? fn({signal: options.signal}) : pTimeout(
Promise.resolve(fn({signal: options.signal})),
(options.timeout === undefined ? this.timeout : options.timeout)!,
() => {
if (options.throwOnTimeout === undefined ? this.#throwOnTimeout : options.throwOnTimeout) {
reject(timeoutError);
}
let operation = function_({signal: options.signal});

return undefined;
},
);
if (options.timeout) {
operation = pTimeout(Promise.resolve(operation), options.timeout);
}

if (options.signal) {
operation = Promise.race([operation, this.#throwOnAbort(options.signal)]);
}

const result = await operation;
resolve(result!);
resolve(result);
this.emit('completed', result);
} catch (error: unknown) {
if (error instanceof TimeoutError && !options.throwOnTimeout) {
resolve();
return;
}

reject(error);
this.emit('error', error);
} finally {
this.#next();
}
}, options);

this.#next();
};
this.emit('add');

this.#queue.enqueue(run, options);
this.#tryToStartAnother();
this.emit('add');
});
}

Expand All @@ -282,8 +297,16 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
*/
async addAll<TaskResultsType>(
functions: ReadonlyArray<Task<TaskResultsType>>,
options?: EnqueueOptionsType,
): Promise<TaskResultsType[]> {
options?: Partial<EnqueueOptionsType>,
): Promise<Array<TaskResultsType | void>>;
async addAll<TaskResultsType>(
functions: ReadonlyArray<Task<TaskResultsType>>,
options?: {throwOnTimeout: true} & Partial<Exclude<EnqueueOptionsType, 'throwOnTimeout'>>,
): Promise<TaskResultsType[]>
async addAll<TaskResultsType>(
functions: ReadonlyArray<Task<TaskResultsType>>,
options?: Partial<EnqueueOptionsType>,
): Promise<Array<TaskResultsType | void>> {
return Promise.all(functions.map(async function_ => this.add(function_, options)));
}

Expand Down Expand Up @@ -352,7 +375,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
*/
async onIdle(): Promise<void> {
// Instantly resolve if none pending and if nothing else is queued
if (this.#pendingCount === 0 && this.#queue.size === 0) {
if (this.#pending === 0 && this.#queue.size === 0) {
return;
}

Expand Down Expand Up @@ -395,7 +418,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
Number of running items (no longer in the queue).
*/
get pending(): number {
return this.#pendingCount;
return this.#pending;
}

/**
Expand Down
23 changes: 21 additions & 2 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,8 @@ test('pause should work when throttled', async t => {
autoStart: false,
});

const values = [0, 1, 2, 3];
const firstValue = [0, 1];
const values = [0, 1, 2, 3];
const firstValue = [0, 1];
const secondValue = [0, 1, 2, 3];

for (const value of values) {
Expand Down Expand Up @@ -1115,3 +1115,22 @@ test('should pass AbortSignal instance to job', async t => {
t.is(controller.signal, signal!);
}, {signal: controller.signal});
});

test('aborting multiple jobs at the same time', async t => {
const queue = new PQueue({concurrency: 1});

const controller1 = new AbortController();
const controller2 = new AbortController();

const task1 = queue.add(async () => new Promise(() => {}), {signal: controller1.signal}); // eslint-disable-line @typescript-eslint/no-empty-function
const task2 = queue.add(async () => new Promise(() => {}), {signal: controller2.signal}); // eslint-disable-line @typescript-eslint/no-empty-function

setTimeout(() => {
controller1.abort();
controller2.abort();
}, 0);

await t.throwsAsync(task1, {instanceOf: AbortError});
await t.throwsAsync(task2, {instanceOf: AbortError});
t.like(queue, {size: 0, pending: 0});
});
3 changes: 1 addition & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
{
"extends": "@sindresorhus/tsconfig",
"compilerOptions": {
"outDir": "dist",
"noPropertyAccessFromIndexSignature": false
"outDir": "dist"
},
"include": [
"source"
Expand Down

0 comments on commit 8184655

Please sign in to comment.