Skip to content

Commit

Permalink
[Task manager] Adds support for limited concurrency tasks (#90365)
Browse files Browse the repository at this point in the history
Adds support for limited concurrency on a Task Type.
  • Loading branch information
gmmorris authored Feb 11, 2021
1 parent 6bd0a7f commit 619db36
Show file tree
Hide file tree
Showing 34 changed files with 4,163 additions and 2,604 deletions.
8 changes: 4 additions & 4 deletions x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ export class Plugin {
// This defaults to what is configured at the task manager level.
maxAttempts: 5,

// The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots,
// 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is
// overridden by the `override_num_workers` config value, if specified.
numWorkers: 2,
// The maximum number tasks of this type that can be run concurrently per Kibana instance.
// Setting this value will force Task Manager to poll for this task type seperatly from other task types which
// can add significant load to the ES cluster, so please use this configuration only when absolutly necesery.
maxConcurrency: 1,

// The createTaskRunner function / method returns an object that is responsible for
// performing the work of the task. context: { taskInstance }, is documented below.
Expand Down
10 changes: 4 additions & 6 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@ import { TaskStatus } from './task';

describe('Buffered Task Store', () => {
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const taskStore = taskStoreMock.create();
taskStore.bulkUpdate.mockResolvedValue([]);
const bufferedStore = new BufferedTaskStore(taskStore, {});

expect(bufferedStore.maxAttempts).toEqual(10);

bufferedStore.remove('1');
expect(taskStore.remove).toHaveBeenCalledWith('1');
});

describe('update', () => {
test("proxies the TaskStore's `bulkUpdate`", async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});

const task = mockTask();
Expand All @@ -37,7 +35,7 @@ describe('Buffered Task Store', () => {
});

test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});

const tasks = [mockTask(), mockTask(), mockTask()];
Expand All @@ -61,7 +59,7 @@ describe('Buffered Task Store', () => {
});

test('handles multiple items with the same id', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});

const duplicateIdTask = mockTask();
Expand Down
4 changes: 0 additions & 4 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ export class BufferedTaskStore implements Updatable {
);
}

public get maxAttempts(): number {
return this.taskStore.maxAttempts;
}

public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
}
Expand Down
56 changes: 25 additions & 31 deletions x-pack/plugins/task_manager/server/lib/fill_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,32 @@ import sinon from 'sinon';
import { fillPool, FillPoolResult } from './fill_pool';
import { TaskPoolRunResult } from '../task_pool';
import { asOk, Result } from './result_type';
import { ClaimOwnershipResult } from '../task_store';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import { TaskManagerRunner } from '../task_running/task_runner';
import { from, Observable } from 'rxjs';
import { ClaimOwnershipResult } from '../queries/task_claiming';

jest.mock('../task_running/task_runner');

describe('fillPool', () => {
function mockFetchAvailableTasks(
tasksToMock: number[][]
): () => Promise<Result<ClaimOwnershipResult, FillPoolResult>> {
const tasks: ConcreteTaskInstance[][] = tasksToMock.map((ids) => mockTaskInstances(ids));
let index = 0;
return async () =>
asOk({
stats: {
tasksUpdated: tasks[index + 1]?.length ?? 0,
tasksConflicted: 0,
tasksClaimed: 0,
},
docs: tasks[index++] || [],
});
): () => Observable<Result<ClaimOwnershipResult, FillPoolResult>> {
const claimCycles: ConcreteTaskInstance[][] = tasksToMock.map((ids) => mockTaskInstances(ids));
return () =>
from(
claimCycles.map((tasks) =>
asOk({
stats: {
tasksUpdated: tasks?.length ?? 0,
tasksConflicted: 0,
tasksClaimed: 0,
tasksRejected: 0,
},
docs: tasks,
})
)
);
}

const mockTaskInstances = (ids: number[]): ConcreteTaskInstance[] =>
Expand All @@ -51,7 +56,7 @@ describe('fillPool', () => {
ownerId: null,
}));

test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => {
test('fills task pool with all claimed tasks until fetchAvailableTasks stream closes', async () => {
const tasks = [
[1, 2, 3],
[4, 5],
Expand All @@ -62,21 +67,7 @@ describe('fillPool', () => {

await fillPool(fetchAvailableTasks, converter, run);

expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3]));
});

test('stops filling when the pool has no more capacity', async () => {
const tasks = [
[1, 2, 3],
[4, 5],
];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = _.identity;

await fillPool(fetchAvailableTasks, converter, run);

expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3]));
expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3, 4, 5]));
});

test('calls the converter on the records prior to running', async () => {
Expand All @@ -91,7 +82,7 @@ describe('fillPool', () => {

await fillPool(fetchAvailableTasks, converter, run);

expect(_.flattenDeep(run.args)).toEqual(['1', '2', '3']);
expect(_.flattenDeep(run.args)).toEqual(['1', '2', '3', '4', '5']);
});

describe('error handling', () => {
Expand All @@ -101,7 +92,10 @@ describe('fillPool', () => {
(instance.id as unknown) as TaskManagerRunner;

try {
const fetchAvailableTasks = async () => Promise.reject('fetch is not working');
const fetchAvailableTasks = () =>
new Observable<Result<ClaimOwnershipResult, FillPoolResult>>((obs) =>
obs.error('fetch is not working')
);

await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
Expand Down
132 changes: 87 additions & 45 deletions x-pack/plugins/task_manager/server/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
*/

import { performance } from 'perf_hooks';
import { Observable } from 'rxjs';
import { concatMap, last } from 'rxjs/operators';
import { ClaimOwnershipResult } from '../queries/task_claiming';
import { ConcreteTaskInstance } from '../task';
import { WithTaskTiming, startTaskTimer } from '../task_events';
import { TaskPoolRunResult } from '../task_pool';
import { TaskManagerRunner } from '../task_running';
import { ClaimOwnershipResult } from '../task_store';
import { Result, map } from './result_type';
import { Result, map as mapResult, asErr, asOk } from './result_type';

export enum FillPoolResult {
Failed = 'Failed',
Expand All @@ -22,6 +24,17 @@ export enum FillPoolResult {
PoolFilled = 'PoolFilled',
}

type FillPoolAndRunResult = Result<
{
result: TaskPoolRunResult;
stats?: ClaimOwnershipResult['stats'];
},
{
result: FillPoolResult;
stats?: ClaimOwnershipResult['stats'];
}
>;

export type ClaimAndFillPoolResult = Partial<Pick<ClaimOwnershipResult, 'stats'>> & {
result: FillPoolResult;
};
Expand All @@ -40,52 +53,81 @@ export type TimedFillPoolResult = WithTaskTiming<ClaimAndFillPoolResult>;
* @param converter - a function that converts task records to the appropriate task runner
*/
export async function fillPool(
fetchAvailableTasks: () => Promise<Result<ClaimOwnershipResult, FillPoolResult>>,
fetchAvailableTasks: () => Observable<Result<ClaimOwnershipResult, FillPoolResult>>,
converter: (taskInstance: ConcreteTaskInstance) => TaskManagerRunner,
run: (tasks: TaskManagerRunner[]) => Promise<TaskPoolRunResult>
): Promise<TimedFillPoolResult> {
performance.mark('fillPool.start');
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
result: FillPoolResult,
stats?: ClaimOwnershipResult['stats']
): TimedFillPoolResult => ({
result,
stats,
timing: stopTaskTimer(),
});
return map<ClaimOwnershipResult, FillPoolResult, Promise<TimedFillPoolResult>>(
await fetchAvailableTasks(),
async ({ docs, stats }) => {
if (!docs.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats);
}

const tasks = docs.map(converter);

switch (await run(tasks)) {
case TaskPoolRunResult.RanOutOfCapacity:
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
return new Promise((resolve, reject) => {
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
result: FillPoolResult,
stats?: ClaimOwnershipResult['stats']
): TimedFillPoolResult => ({
result,
stats,
timing: stopTaskTimer(),
});
fetchAvailableTasks()
.pipe(
// each ClaimOwnershipResult will be sequencially consumed an ran using the `run` handler
concatMap(async (res) =>
mapResult<ClaimOwnershipResult, FillPoolResult, Promise<FillPoolAndRunResult>>(
res,
async ({ docs, stats }) => {
if (!docs.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return asOk({ result: TaskPoolRunResult.NoTaskWereRan, stats });
}
return asOk(
await run(docs.map(converter)).then((runResult) => ({
result: runResult,
stats,
}))
);
},
async (fillPoolResult) => asErr({ result: fillPoolResult })
)
),
// when the final call to `run` completes, we'll complete the stream and emit the
// final accumulated result
last()
)
.subscribe(
(claimResults) => {
resolve(
mapResult(
claimResults,
({ result, stats }) => {
switch (result) {
case TaskPoolRunResult.RanOutOfCapacity:
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats);
case TaskPoolRunResult.RunningAtCapacity:
performance.mark('fillPool.cycle');
return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats);
case TaskPoolRunResult.NoTaskWereRan:
return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats);
default:
performance.mark('fillPool.cycle');
return augmentTimingTo(FillPoolResult.PoolFilled, stats);
}
},
({ result, stats }) => augmentTimingTo(result, stats)
)
);
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats);
case TaskPoolRunResult.RunningAtCapacity:
performance.mark('fillPool.cycle');
return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats);
default:
performance.mark('fillPool.cycle');
return augmentTimingTo(FillPoolResult.PoolFilled, stats);
}
},
async (result) => augmentTimingTo(result)
);
},
(err) => reject(err)
);
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ describe('Task Run Statistics', () => {
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
events$.next(asTaskManagerStatEvent('pollingDelay', asOk(0)));
events$.next(asTaskManagerStatEvent('claimDuration', asOk(10)));
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
Expand Down
Loading

0 comments on commit 619db36

Please sign in to comment.