diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index bb4c461758f963..b58b0665c10c0f 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -22,7 +22,7 @@ import { TaskManagerConfig } from './config'; import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware'; import { removeIfExists } from './lib/remove_if_exists'; import { setupSavedObjects } from './saved_objects'; -import { TaskDefinitionRegistry, TaskTypeDictionary } from './task_type_dictionary'; +import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary'; import { FetchResult, SearchOpts, TaskStore } from './task_store'; import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskScheduling } from './task_scheduling'; @@ -189,6 +189,7 @@ export class TaskManagerPlugin this.taskPollingLifecycle = new TaskPollingLifecycle({ config: this.config!, definitions: this.definitions, + unusedTypes: REMOVED_TYPES, logger: this.logger, executionContext, taskStore, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index b6a93b14f578b7..cf29d1f475c6ce 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -70,6 +70,7 @@ describe('TaskPollingLifecycle', () => { }, taskStore: mockTaskStore, logger: taskManagerLogger, + unusedTypes: [], definitions: new TaskTypeDictionary(taskManagerLogger), middleware: createInitialMiddleware(), maxWorkersConfiguration$: of(100), diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index b61891d732f5e3..a452c8a3f82fbe 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -50,6 +50,7 @@ import { TaskClaiming, ClaimOwnershipResult } from './queries/task_claiming'; export type TaskPollingLifecycleOpts = { logger: Logger; definitions: TaskTypeDictionary; + unusedTypes: string[]; taskStore: TaskStore; config: TaskManagerConfig; middleware: Middleware; @@ -106,6 +107,7 @@ export class TaskPollingLifecycle { config, taskStore, definitions, + unusedTypes, executionContext, usageCounter, }: TaskPollingLifecycleOpts) { @@ -134,6 +136,7 @@ export class TaskPollingLifecycle { maxAttempts: config.max_attempts, excludedTaskTypes: config.unsafe.exclude_task_types, definitions, + unusedTypes, logger: this.logger, getCapacity: (taskType?: string) => taskType && this.definitions.get(taskType)?.maxConcurrency diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts index 9e31ab9f0cb4e2..18ed1a58025384 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts @@ -47,15 +47,16 @@ describe('mark_available_tasks_as_claimed', () => { // status running or claiming with a retryAt <= now. shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt) ), - script: updateFieldsAndMarkAsFailed( + script: updateFieldsAndMarkAsFailed({ fieldUpdates, - claimTasksById || [], - definitions.getAllTypes(), - [], - Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => { + claimTasksById: claimTasksById || [], + claimableTaskTypes: definitions.getAllTypes(), + skippedTaskTypes: [], + unusedTaskTypes: [], + taskMaxAttempts: Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => { return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts }; - }, {}) - ), + }, {}), + }), sort: SortByRunAtAndRetryAt, }).toEqual({ query: { @@ -126,7 +127,7 @@ if (doc['task.runAt'].size()!=0) { ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates) .map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`) .join(' ')} - } else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) { + } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { ctx._source.task.status = "unrecognized"; } else { ctx.op = "noop"; @@ -140,6 +141,7 @@ if (doc['task.runAt'].size()!=0) { claimTasksById: [], claimableTaskTypes: ['sampleTask', 'otherTask'], skippedTaskTypes: [], + unusedTaskTypes: [], taskMaxAttempts: { sampleTask: 5, otherTask: 1, @@ -164,9 +166,16 @@ if (doc['task.runAt'].size()!=0) { ]; expect( - updateFieldsAndMarkAsFailed(fieldUpdates, claimTasksById, ['foo', 'bar'], [], { - foo: 5, - bar: 2, + updateFieldsAndMarkAsFailed({ + fieldUpdates, + claimTasksById, + claimableTaskTypes: ['foo', 'bar'], + skippedTaskTypes: [], + unusedTaskTypes: [], + taskMaxAttempts: { + foo: 5, + bar: 2, + }, }) ).toMatchObject({ source: ` @@ -182,7 +191,7 @@ if (doc['task.runAt'].size()!=0) { ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates) .map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`) .join(' ')} - } else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) { + } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { ctx._source.task.status = "unrecognized"; } else { ctx.op = "noop"; @@ -196,6 +205,7 @@ if (doc['task.runAt'].size()!=0) { ], claimableTaskTypes: ['foo', 'bar'], skippedTaskTypes: [], + unusedTaskTypes: [], taskMaxAttempts: { foo: 5, bar: 2, @@ -213,9 +223,16 @@ if (doc['task.runAt'].size()!=0) { }; expect( - updateFieldsAndMarkAsFailed(fieldUpdates, [], ['foo', 'bar'], [], { - foo: 5, - bar: 2, + updateFieldsAndMarkAsFailed({ + fieldUpdates, + claimTasksById: [], + claimableTaskTypes: ['foo', 'bar'], + skippedTaskTypes: [], + unusedTaskTypes: [], + taskMaxAttempts: { + foo: 5, + bar: 2, + }, }).source ).toMatch(/ctx.op = "noop"/); }); diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index b1ccb191bdce08..5f2aa25253b0cd 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -104,15 +104,25 @@ if (doc['task.runAt'].size()!=0) { }; export const SortByRunAtAndRetryAt = SortByRunAtAndRetryAtScript as estypes.SortCombinations; -export const updateFieldsAndMarkAsFailed = ( +export interface UpdateFieldsAndMarkAsFailedOpts { fieldUpdates: { [field: string]: string | number | Date; - }, - claimTasksById: string[], - claimableTaskTypes: string[], - skippedTaskTypes: string[], - taskMaxAttempts: { [field: string]: number } -): ScriptClause => { + }; + claimTasksById: string[]; + claimableTaskTypes: string[]; + skippedTaskTypes: string[]; + unusedTaskTypes: string[]; + taskMaxAttempts: { [field: string]: number }; +} + +export const updateFieldsAndMarkAsFailed = ({ + fieldUpdates, + claimTasksById, + claimableTaskTypes, + skippedTaskTypes, + unusedTaskTypes, + taskMaxAttempts, +}: UpdateFieldsAndMarkAsFailedOpts): ScriptClause => { const markAsClaimingScript = `ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates) .map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`) .join(' ')}`; @@ -126,7 +136,7 @@ export const updateFieldsAndMarkAsFailed = ( } } else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) { ${markAsClaimingScript} - } else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) { + } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { ctx._source.task.status = "unrecognized"; } else { ctx.op = "noop"; @@ -137,6 +147,7 @@ export const updateFieldsAndMarkAsFailed = ( claimTasksById, claimableTaskTypes, skippedTaskTypes, + unusedTaskTypes, taskMaxAttempts, }, }; diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts index ed656b51449561..7b46f10adaabc5 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts @@ -109,6 +109,7 @@ describe('TaskClaiming', () => { logger: taskManagerLogger, definitions, excludedTaskTypes: [], + unusedTypes: [], taskStore: taskStoreMock.create({ taskManagerId: '' }), maxAttempts: 2, getCapacity: () => 10, @@ -127,12 +128,14 @@ describe('TaskClaiming', () => { hits = [generateFakeTasks(1)], versionConflicts = 2, excludedTaskTypes = [], + unusedTaskTypes = [], }: { storeOpts: Partial; taskClaimingOpts: Partial; hits?: ConcreteTaskInstance[][]; versionConflicts?: number; excludedTaskTypes?: string[]; + unusedTaskTypes?: string[]; }) { const definitions = storeOpts.definitions ?? taskDefinitions; const store = taskStoreMock.create({ taskManagerId: storeOpts.taskManagerId }); @@ -161,6 +164,7 @@ describe('TaskClaiming', () => { definitions, taskStore: store, excludedTaskTypes, + unusedTypes: unusedTaskTypes, maxAttempts: taskClaimingOpts.maxAttempts ?? 2, getCapacity: taskClaimingOpts.getCapacity ?? (() => 10), ...taskClaimingOpts, @@ -176,6 +180,7 @@ describe('TaskClaiming', () => { hits = [generateFakeTasks(1)], versionConflicts = 2, excludedTaskTypes = [], + unusedTaskTypes = [], }: { storeOpts: Partial; taskClaimingOpts: Partial; @@ -183,12 +188,14 @@ describe('TaskClaiming', () => { hits?: ConcreteTaskInstance[][]; versionConflicts?: number; excludedTaskTypes?: string[]; + unusedTaskTypes?: string[]; }) { const getCapacity = taskClaimingOpts.getCapacity ?? (() => 10); const { taskClaiming, store } = initialiseTestClaiming({ storeOpts, taskClaimingOpts, excludedTaskTypes, + unusedTaskTypes, hits, versionConflicts, }); @@ -496,6 +503,7 @@ if (doc['task.runAt'].size()!=0) { ], claimableTaskTypes: ['foo', 'bar'], skippedTaskTypes: [], + unusedTaskTypes: [], taskMaxAttempts: { bar: customMaxAttempts, foo: maxAttempts, @@ -614,6 +622,7 @@ if (doc['task.runAt'].size()!=0) { 'anotherLimitedToOne', 'limitedToTwo', ], + unusedTaskTypes: [], taskMaxAttempts: { unlimited: maxAttempts, }, @@ -871,6 +880,121 @@ if (doc['task.runAt'].size()!=0) { expect(firstCycle).not.toMatchObject(secondCycle); }); + test('it passes any unusedTaskTypes to script', async () => { + const maxAttempts = _.random(2, 43); + const customMaxAttempts = _.random(44, 100); + const taskManagerId = uuid.v1(); + const fieldUpdates = { + ownerId: taskManagerId, + retryAt: new Date(Date.now()), + }; + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + title: 'foo', + createTaskRunner: jest.fn(), + }, + bar: { + title: 'bar', + maxAttempts: customMaxAttempts, + createTaskRunner: jest.fn(), + }, + foobar: { + title: 'foobar', + maxAttempts: customMaxAttempts, + createTaskRunner: jest.fn(), + }, + }); + + const [ + { + args: { + updateByQuery: [{ query, script }], + }, + }, + ] = await testClaimAvailableTasks({ + storeOpts: { + definitions, + taskManagerId, + }, + taskClaimingOpts: { + maxAttempts, + }, + claimingOpts: { + claimOwnershipUntil: new Date(), + }, + excludedTaskTypes: ['foobar'], + unusedTaskTypes: ['barfoo'], + }); + expect(query).toMatchObject({ + bool: { + must: [ + { + bool: { + should: [ + { + bool: { + must: [ + { term: { 'task.status': 'idle' } }, + { range: { 'task.runAt': { lte: 'now' } } }, + ], + }, + }, + { + bool: { + must: [ + { + bool: { + should: [ + { term: { 'task.status': 'running' } }, + { term: { 'task.status': 'claiming' } }, + ], + }, + }, + { range: { 'task.retryAt': { lte: 'now' } } }, + ], + }, + }, + ], + }, + }, + ], + filter: [ + { + bool: { + must_not: [ + { + bool: { + should: [ + { term: { 'task.status': 'running' } }, + { term: { 'task.status': 'claiming' } }, + ], + must: { range: { 'task.retryAt': { gt: 'now' } } }, + }, + }, + ], + }, + }, + ], + }, + }); + expect(script).toMatchObject({ + source: expect.any(String), + lang: 'painless', + params: { + fieldUpdates, + claimTasksById: [], + claimableTaskTypes: ['foo', 'bar'], + skippedTaskTypes: ['foobar'], + unusedTaskTypes: ['barfoo'], + taskMaxAttempts: { + bar: customMaxAttempts, + foo: maxAttempts, + }, + }, + }); + }); + test('it claims tasks by setting their ownerId, status and retryAt', async () => { const taskManagerId = uuid.v1(); const claimOwnershipUntil = new Date(Date.now()); @@ -1263,6 +1387,7 @@ if (doc['task.runAt'].size()!=0) { logger: taskManagerLogger, definitions, excludedTaskTypes: [], + unusedTypes: [], taskStore, maxAttempts: 2, getCapacity, diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts index b45591a233e192..1b4f0fdb73683c 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts @@ -57,6 +57,7 @@ import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; export interface TaskClaimingOpts { logger: Logger; definitions: TaskTypeDictionary; + unusedTypes: string[]; taskStore: TaskStore; maxAttempts: number; excludedTaskTypes: string[]; @@ -121,6 +122,7 @@ export class TaskClaiming { private readonly taskClaimingBatchesByType: TaskClaimingBatches; private readonly taskMaxAttempts: Record; private readonly excludedTaskTypes: string[]; + private readonly unusedTypes: string[]; /** * Constructs a new TaskStore. @@ -137,6 +139,7 @@ export class TaskClaiming { this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions); this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions)); this.excludedTaskTypes = opts.excludedTaskTypes; + this.unusedTypes = opts.unusedTypes; this.events$ = new Subject(); } @@ -225,7 +228,7 @@ export class TaskClaiming { return of(accumulatedResult); } return from( - this.executClaimAvailableTasks({ + this.executeClaimAvailableTasks({ claimOwnershipUntil, claimTasksById: claimTasksById.splice(0, capacity), size: capacity, @@ -249,7 +252,7 @@ export class TaskClaiming { ); } - private executClaimAvailableTasks = async ({ + private executeClaimAvailableTasks = async ({ claimOwnershipUntil, claimTasksById = [], size, @@ -403,16 +406,17 @@ export class TaskClaiming { : queryForScheduledTasks, filterDownBy(InactiveTasks) ); - const script = updateFieldsAndMarkAsFailed( - { + const script = updateFieldsAndMarkAsFailed({ + fieldUpdates: { ownerId: this.taskStore.taskManagerId, retryAt: claimOwnershipUntil, }, - claimTasksById || [], - taskTypesToClaim, - taskTypesToSkip, - pick(this.taskMaxAttempts, taskTypesToClaim) - ); + claimTasksById: claimTasksById || [], + claimableTaskTypes: taskTypesToClaim, + skippedTaskTypes: taskTypesToSkip, + unusedTaskTypes: this.unusedTypes, + taskMaxAttempts: pick(this.taskMaxAttempts, taskTypesToClaim), + }); const apmTrans = apm.startTransaction( TASK_MANAGER_MARK_AS_CLAIMED, diff --git a/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts b/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts index e912eda2580907..cfd0f874f58ff6 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts @@ -169,6 +169,62 @@ describe('successful migrations', () => { expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); }); + + test('resets "unrecognized" status to "idle" when task type is not in REMOVED_TYPES list', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'someValidTask', + status: 'unrecognized', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual({ + ...taskInstance, + attributes: { + ...taskInstance.attributes, + status: 'idle', + }, + }); + }); + + test('does not modify "unrecognized" status when task type is in REMOVED_TYPES list', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'sampleTaskRemovedType', + status: 'unrecognized', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); + }); + + test('does not modify document when status is "running"', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'someTask', + status: 'running', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); + }); + + test('does not modify document when status is "idle"', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'someTask', + status: 'idle', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); + }); + + test('does not modify document when status is "failed"', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'someTask', + status: 'failed', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); + }); }); }); diff --git a/x-pack/plugins/task_manager/server/saved_objects/migrations.ts b/x-pack/plugins/task_manager/server/saved_objects/migrations.ts index f50b3d6a927ade..6e527918f2a7e5 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/migrations.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/migrations.ts @@ -13,6 +13,7 @@ import { SavedObjectsUtils, SavedObjectUnsanitizedDoc, } from '../../../../../src/core/server'; +import { REMOVED_TYPES } from '../task_type_dictionary'; import { ConcreteTaskInstance, TaskStatus } from '../task'; interface TaskInstanceLogMeta extends LogMeta { @@ -38,7 +39,7 @@ export function getMigrations(): SavedObjectMigrationMap { '8.0.0' ), '8.2.0': executeMigrationWithErrorHandling( - pipeMigrations(resetAttemptsAndStatusForTheTasksWithoutSchedule), + pipeMigrations(resetAttemptsAndStatusForTheTasksWithoutSchedule, resetUnrecognizedStatus), '8.2.0' ), }; @@ -143,6 +144,29 @@ function moveIntervalIntoSchedule({ }; } +function resetUnrecognizedStatus( + doc: SavedObjectUnsanitizedDoc +): SavedObjectUnsanitizedDoc { + const status = doc?.attributes?.status; + if (status && status === 'unrecognized') { + const taskType = doc.attributes.taskType; + // If task type is in the REMOVED_TYPES list, maintain "unrecognized" status + if (REMOVED_TYPES.indexOf(taskType) >= 0) { + return doc; + } + + return { + ...doc, + attributes: { + ...doc.attributes, + status: 'idle', + }, + } as SavedObjectUnsanitizedDoc; + } + + return doc; +} + function pipeMigrations(...migrations: TaskInstanceMigration[]): TaskInstanceMigration { return (doc: SavedObjectUnsanitizedDoc) => migrations.reduce((migratedDoc, nextMigration) => nextMigration(migratedDoc), doc); diff --git a/x-pack/plugins/task_manager/server/task_type_dictionary.test.ts b/x-pack/plugins/task_manager/server/task_type_dictionary.test.ts index d682d40a1d811f..cb2f436fa86763 100644 --- a/x-pack/plugins/task_manager/server/task_type_dictionary.test.ts +++ b/x-pack/plugins/task_manager/server/task_type_dictionary.test.ts @@ -7,7 +7,12 @@ import { get } from 'lodash'; import { RunContext, TaskDefinition } from './task'; -import { sanitizeTaskDefinitions, TaskDefinitionRegistry } from './task_type_dictionary'; +import { mockLogger } from './test_utils'; +import { + sanitizeTaskDefinitions, + TaskDefinitionRegistry, + TaskTypeDictionary, +} from './task_type_dictionary'; interface Opts { numTasks: number; @@ -40,6 +45,12 @@ const getMockTaskDefinitions = (opts: Opts) => { }; describe('taskTypeDictionary', () => { + let definitions: TaskTypeDictionary; + + beforeEach(() => { + definitions = new TaskTypeDictionary(mockLogger()); + }); + describe('sanitizeTaskDefinitions', () => {}); it('provides tasks with defaults', () => { const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 }); @@ -154,4 +165,49 @@ describe('taskTypeDictionary', () => { `"Invalid timeout \\"1.5h\\". Timeout must be of the form \\"{number}{cadance}\\" where number is an integer. Example: 5m."` ); }); + + describe('registerTaskDefinitions', () => { + it('registers a valid task', () => { + definitions.registerTaskDefinitions({ + foo: { + title: 'foo', + maxConcurrency: 2, + createTaskRunner: jest.fn(), + }, + }); + expect(definitions.has('foo')).toBe(true); + }); + + it('throws error when registering duplicate task type', () => { + definitions.registerTaskDefinitions({ + foo: { + title: 'foo', + maxConcurrency: 2, + createTaskRunner: jest.fn(), + }, + }); + + expect(() => { + definitions.registerTaskDefinitions({ + foo: { + title: 'foo2', + createTaskRunner: jest.fn(), + }, + }); + }).toThrowErrorMatchingInlineSnapshot(`"Task foo is already defined!"`); + }); + + it('throws error when registering removed task type', () => { + expect(() => { + definitions.registerTaskDefinitions({ + sampleTaskRemovedType: { + title: 'removed', + createTaskRunner: jest.fn(), + }, + }); + }).toThrowErrorMatchingInlineSnapshot( + `"Task sampleTaskRemovedType has been removed from registration!"` + ); + }); + }); }); diff --git a/x-pack/plugins/task_manager/server/task_type_dictionary.ts b/x-pack/plugins/task_manager/server/task_type_dictionary.ts index 3bc60284efc8fd..a2ea46122acf8f 100644 --- a/x-pack/plugins/task_manager/server/task_type_dictionary.ts +++ b/x-pack/plugins/task_manager/server/task_type_dictionary.ts @@ -8,6 +8,17 @@ import { TaskDefinition, taskDefinitionSchema, TaskRunCreatorFunction } from './task'; import { Logger } from '../../../../src/core/server'; +/** + * Types that are no longer registered and will be marked as unregistered + */ +export const REMOVED_TYPES: string[] = [ + // for testing + 'sampleTaskRemovedType', + + // deprecated in https://github.com/elastic/kibana/pull/121442 + 'alerting:siem.signals', +]; + /** * Defines a task which can be scheduled and run by the Kibana * task manager. @@ -109,6 +120,11 @@ export class TaskTypeDictionary { throw new Error(`Task ${duplicate} is already defined!`); } + const removed = Object.keys(taskDefinitions).find((type) => REMOVED_TYPES.indexOf(type) >= 0); + if (removed) { + throw new Error(`Task ${removed} has been removed from registration!`); + } + try { for (const definition of sanitizeTaskDefinitions(taskDefinitions)) { this.definitions.set(definition.type, definition); diff --git a/x-pack/test/functional/es_archives/task_manager_removed_types/data.json b/x-pack/test/functional/es_archives/task_manager_removed_types/data.json index 8594e9d567b8a1..3fc1a2cad2d286 100644 --- a/x-pack/test/functional/es_archives/task_manager_removed_types/data.json +++ b/x-pack/test/functional/es_archives/task_manager_removed_types/data.json @@ -1,3 +1,34 @@ +{ + "type": "doc", + "value": { + "id": "task:ce7e1250-3322-11eb-94c1-db6995e83f6b", + "index": ".kibana_task_manager_1", + "source": { + "migrationVersion": { + "task": "7.6.0" + }, + "references": [ + ], + "task": { + "attempts": 0, + "params": "{\"originalParams\":{},\"superFly\":\"My middleware param!\"}", + "retryAt": "2020-11-30T15:43:39.626Z", + "runAt": "2020-11-30T15:43:08.277Z", + "scheduledAt": "2020-11-30T15:43:08.277Z", + "scope": [ + "testing" + ], + "startedAt": null, + "state": "{}", + "status": "idle", + "taskType": "sampleTaskNotRegisteredType" + }, + "type": "task", + "updated_at": "2020-11-30T15:43:08.277Z" + } + } +} + { "type": "doc", "value": { diff --git a/x-pack/test/functional/es_archives/task_manager_tasks/data.json b/x-pack/test/functional/es_archives/task_manager_tasks/data.json index 3431419dda17e1..2b92c18dcd47bd 100644 --- a/x-pack/test/functional/es_archives/task_manager_tasks/data.json +++ b/x-pack/test/functional/es_archives/task_manager_tasks/data.json @@ -90,3 +90,65 @@ } } } + +{ + "type": "doc", + "value": { + "id": "task:ce7e1250-3322-11eb-94c1-db6995e84f6d", + "index": ".kibana_task_manager_1", + "source": { + "migrationVersion": { + "task": "7.16.0" + }, + "references": [ + ], + "task": { + "attempts": 0, + "params": "{\"spaceId\":\"user1\",\"alertId\":\"0359d7fcc04da9878ee9aadbda38ba55\"}", + "retryAt": "2020-11-30T15:43:39.626Z", + "runAt": "2020-11-30T15:43:08.277Z", + "scheduledAt": "2020-11-30T15:43:08.277Z", + "scope": [ + "testing" + ], + "startedAt": null, + "state": "{}", + "status": "unrecognized", + "taskType": "alerting:0359d7fcc04da9878ee9aadbda38ba55" + }, + "type": "task", + "updated_at": "2020-11-30T15:43:08.277Z" + } + } +} + +{ + "type": "doc", + "value": { + "id": "task:fe7e1250-3322-11eb-94c1-db6395e84f6e", + "index": ".kibana_task_manager_1", + "source": { + "migrationVersion": { + "task": "7.16.0" + }, + "references": [ + ], + "task": { + "attempts": 0, + "params": "{\"spaceId\":\"user1\",\"alertId\":\"0359d7fcc04da9878ee9aadbda38ba55\"}", + "retryAt": "2020-11-30T15:43:39.626Z", + "runAt": "2020-11-30T15:43:08.277Z", + "scheduledAt": "2020-11-30T15:43:08.277Z", + "scope": [ + "testing" + ], + "startedAt": null, + "state": "{}", + "status": "unrecognized", + "taskType": "sampleTaskRemovedType" + }, + "type": "task", + "updated_at": "2020-11-30T15:43:08.277Z" + } + } +} \ No newline at end of file diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts index 1e6bb11c13583a..1b0ffdedb00779 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts @@ -104,5 +104,37 @@ export default function createGetTests({ getService }: FtrProviderContext) { expect(hit!._source!.task.attempts).to.be(0); expect(hit!._source!.task.status).to.be(TaskStatus.Idle); }); + + it('8.2.0 migrates tasks with unrecognized status to idle if task type is removed', async () => { + const response = await es.get<{ task: ConcreteTaskInstance }>( + { + index: '.kibana_task_manager', + id: 'task:ce7e1250-3322-11eb-94c1-db6995e84f6d', + }, + { + meta: true, + } + ); + expect(response.statusCode).to.eql(200); + expect(response.body._source?.task.taskType).to.eql( + `alerting:0359d7fcc04da9878ee9aadbda38ba55` + ); + expect(response.body._source?.task.status).to.eql(`idle`); + }); + + it('8.2.0 does not migrate tasks with unrecognized status if task type is valid', async () => { + const response = await es.get<{ task: ConcreteTaskInstance }>( + { + index: '.kibana_task_manager', + id: 'task:fe7e1250-3322-11eb-94c1-db6395e84f6e', + }, + { + meta: true, + } + ); + expect(response.statusCode).to.eql(200); + expect(response.body._source?.task.taskType).to.eql(`sampleTaskRemovedType`); + expect(response.body._source?.task.status).to.eql(`unrecognized`); + }); }); } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts index 61223b8b67e641..90590f1e3e5722 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts @@ -45,9 +45,10 @@ export default function ({ getService }: FtrProviderContext) { const config = getService('config'); const request = supertest(url.format(config.get('servers.kibana'))); + const UNREGISTERED_TASK_TYPE_ID = 'ce7e1250-3322-11eb-94c1-db6995e83f6b'; const REMOVED_TASK_TYPE_ID = 'be7e1250-3322-11eb-94c1-db6995e83f6a'; - describe('removed task types', () => { + describe('not registered task types', () => { before(async () => { await esArchiver.load('x-pack/test/functional/es_archives/task_manager_removed_types'); }); @@ -76,7 +77,7 @@ export default function ({ getService }: FtrProviderContext) { .then((response) => response.body); } - it('should successfully schedule registered tasks and mark unregistered tasks as unrecognized', async () => { + it('should successfully schedule registered tasks, not claim unregistered tasks and mark removed task types as unrecognized', async () => { const scheduledTask = await scheduleTask({ taskType: 'sampleTask', schedule: { interval: `1s` }, @@ -85,16 +86,21 @@ export default function ({ getService }: FtrProviderContext) { await retry.try(async () => { const tasks = (await currentTasks()).docs; - expect(tasks.length).to.eql(2); + expect(tasks.length).to.eql(3); const taskIds = tasks.map((task) => task.id); expect(taskIds).to.contain(scheduledTask.id); + expect(taskIds).to.contain(UNREGISTERED_TASK_TYPE_ID); expect(taskIds).to.contain(REMOVED_TASK_TYPE_ID); const scheduledTaskInstance = tasks.find((task) => task.id === scheduledTask.id); + const unregisteredTaskInstance = tasks.find( + (task) => task.id === UNREGISTERED_TASK_TYPE_ID + ); const removedTaskInstance = tasks.find((task) => task.id === REMOVED_TASK_TYPE_ID); expect(scheduledTaskInstance?.status).to.eql('claiming'); + expect(unregisteredTaskInstance?.status).to.eql('idle'); expect(removedTaskInstance?.status).to.eql('unrecognized'); }); });