Skip to content

Commit

Permalink
[Task Manager] Adding list of explicitly de-registered task types (#1…
Browse files Browse the repository at this point in the history
…23963)

* Adding REMOVED_TYPES to task manager and only marking those types as unrecognized

* Adding unit tests

* Fixing functional test

* Throwing error when registering a removed task type

* Adding migration

* Adding functional tests

* Cleanup

* Adding disabled siem signals rule type

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
ymao1 and kibanamachine authored Feb 15, 2022
1 parent 21cef49 commit c2a0103
Show file tree
Hide file tree
Showing 15 changed files with 483 additions and 38 deletions.
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { TaskManagerConfig } from './config';
import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware';
import { removeIfExists } from './lib/remove_if_exists';
import { setupSavedObjects } from './saved_objects';
import { TaskDefinitionRegistry, TaskTypeDictionary } from './task_type_dictionary';
import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary';
import { FetchResult, SearchOpts, TaskStore } from './task_store';
import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskScheduling } from './task_scheduling';
Expand Down Expand Up @@ -189,6 +189,7 @@ export class TaskManagerPlugin
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
unusedTypes: REMOVED_TYPES,
logger: this.logger,
executionContext,
taskStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ describe('TaskPollingLifecycle', () => {
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
unusedTypes: [],
definitions: new TaskTypeDictionary(taskManagerLogger),
middleware: createInitialMiddleware(),
maxWorkersConfiguration$: of(100),
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import { TaskClaiming, ClaimOwnershipResult } from './queries/task_claiming';
export type TaskPollingLifecycleOpts = {
logger: Logger;
definitions: TaskTypeDictionary;
unusedTypes: string[];
taskStore: TaskStore;
config: TaskManagerConfig;
middleware: Middleware;
Expand Down Expand Up @@ -106,6 +107,7 @@ export class TaskPollingLifecycle {
config,
taskStore,
definitions,
unusedTypes,
executionContext,
usageCounter,
}: TaskPollingLifecycleOpts) {
Expand Down Expand Up @@ -134,6 +136,7 @@ export class TaskPollingLifecycle {
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,
unusedTypes,
logger: this.logger,
getCapacity: (taskType?: string) =>
taskType && this.definitions.get(taskType)?.maxConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@ describe('mark_available_tasks_as_claimed', () => {
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
),
script: updateFieldsAndMarkAsFailed(
script: updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById || [],
definitions.getAllTypes(),
[],
Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => {
claimTasksById: claimTasksById || [],
claimableTaskTypes: definitions.getAllTypes(),
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts };
}, {})
),
}, {}),
}),
sort: SortByRunAtAndRetryAt,
}).toEqual({
query: {
Expand Down Expand Up @@ -126,7 +127,7 @@ if (doc['task.runAt'].size()!=0) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) {
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
Expand All @@ -140,6 +141,7 @@ if (doc['task.runAt'].size()!=0) {
claimTasksById: [],
claimableTaskTypes: ['sampleTask', 'otherTask'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
sampleTask: 5,
otherTask: 1,
Expand All @@ -164,9 +166,16 @@ if (doc['task.runAt'].size()!=0) {
];

expect(
updateFieldsAndMarkAsFailed(fieldUpdates, claimTasksById, ['foo', 'bar'], [], {
foo: 5,
bar: 2,
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById,
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
})
).toMatchObject({
source: `
Expand All @@ -182,7 +191,7 @@ if (doc['task.runAt'].size()!=0) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) {
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
Expand All @@ -196,6 +205,7 @@ if (doc['task.runAt'].size()!=0) {
],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
Expand All @@ -213,9 +223,16 @@ if (doc['task.runAt'].size()!=0) {
};

expect(
updateFieldsAndMarkAsFailed(fieldUpdates, [], ['foo', 'bar'], [], {
foo: 5,
bar: 2,
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
}).source
).toMatch(/ctx.op = "noop"/);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,25 @@ if (doc['task.runAt'].size()!=0) {
};
export const SortByRunAtAndRetryAt = SortByRunAtAndRetryAtScript as estypes.SortCombinations;

export const updateFieldsAndMarkAsFailed = (
export interface UpdateFieldsAndMarkAsFailedOpts {
fieldUpdates: {
[field: string]: string | number | Date;
},
claimTasksById: string[],
claimableTaskTypes: string[],
skippedTaskTypes: string[],
taskMaxAttempts: { [field: string]: number }
): ScriptClause => {
};
claimTasksById: string[];
claimableTaskTypes: string[];
skippedTaskTypes: string[];
unusedTaskTypes: string[];
taskMaxAttempts: { [field: string]: number };
}

export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
taskMaxAttempts,
}: UpdateFieldsAndMarkAsFailedOpts): ScriptClause => {
const markAsClaimingScript = `ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}`;
Expand All @@ -126,7 +136,7 @@ export const updateFieldsAndMarkAsFailed = (
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
${markAsClaimingScript}
} else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) {
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
Expand All @@ -137,6 +147,7 @@ export const updateFieldsAndMarkAsFailed = (
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
taskMaxAttempts,
},
};
Expand Down
125 changes: 125 additions & 0 deletions x-pack/plugins/task_manager/server/queries/task_claiming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ describe('TaskClaiming', () => {
logger: taskManagerLogger,
definitions,
excludedTaskTypes: [],
unusedTypes: [],
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getCapacity: () => 10,
Expand All @@ -127,12 +128,14 @@ describe('TaskClaiming', () => {
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
unusedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
unusedTaskTypes?: string[];
}) {
const definitions = storeOpts.definitions ?? taskDefinitions;
const store = taskStoreMock.create({ taskManagerId: storeOpts.taskManagerId });
Expand Down Expand Up @@ -161,6 +164,7 @@ describe('TaskClaiming', () => {
definitions,
taskStore: store,
excludedTaskTypes,
unusedTypes: unusedTaskTypes,
maxAttempts: taskClaimingOpts.maxAttempts ?? 2,
getCapacity: taskClaimingOpts.getCapacity ?? (() => 10),
...taskClaimingOpts,
Expand All @@ -176,19 +180,22 @@ describe('TaskClaiming', () => {
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
unusedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
claimingOpts: Omit<OwnershipClaimingOpts, 'size' | 'taskTypes'>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
unusedTaskTypes?: string[];
}) {
const getCapacity = taskClaimingOpts.getCapacity ?? (() => 10);
const { taskClaiming, store } = initialiseTestClaiming({
storeOpts,
taskClaimingOpts,
excludedTaskTypes,
unusedTaskTypes,
hits,
versionConflicts,
});
Expand Down Expand Up @@ -496,6 +503,7 @@ if (doc['task.runAt'].size()!=0) {
],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
bar: customMaxAttempts,
foo: maxAttempts,
Expand Down Expand Up @@ -614,6 +622,7 @@ if (doc['task.runAt'].size()!=0) {
'anotherLimitedToOne',
'limitedToTwo',
],
unusedTaskTypes: [],
taskMaxAttempts: {
unlimited: maxAttempts,
},
Expand Down Expand Up @@ -871,6 +880,121 @@ if (doc['task.runAt'].size()!=0) {
expect(firstCycle).not.toMatchObject(secondCycle);
});

test('it passes any unusedTaskTypes to script', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const taskManagerId = uuid.v1();
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: new Date(Date.now()),
};
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
bar: {
title: 'bar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
foobar: {
title: 'foobar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});

const [
{
args: {
updateByQuery: [{ query, script }],
},
},
] = await testClaimAvailableTasks({
storeOpts: {
definitions,
taskManagerId,
},
taskClaimingOpts: {
maxAttempts,
},
claimingOpts: {
claimOwnershipUntil: new Date(),
},
excludedTaskTypes: ['foobar'],
unusedTaskTypes: ['barfoo'],
});
expect(query).toMatchObject({
bool: {
must: [
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
],
filter: [
{
bool: {
must_not: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
must: { range: { 'task.retryAt': { gt: 'now' } } },
},
},
],
},
},
],
},
});
expect(script).toMatchObject({
source: expect.any(String),
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: ['foobar'],
unusedTaskTypes: ['barfoo'],
taskMaxAttempts: {
bar: customMaxAttempts,
foo: maxAttempts,
},
},
});
});

test('it claims tasks by setting their ownerId, status and retryAt', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
Expand Down Expand Up @@ -1263,6 +1387,7 @@ if (doc['task.runAt'].size()!=0) {
logger: taskManagerLogger,
definitions,
excludedTaskTypes: [],
unusedTypes: [],
taskStore,
maxAttempts: 2,
getCapacity,
Expand Down
Loading

0 comments on commit c2a0103

Please sign in to comment.