diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/index.js b/x-pack/test/plugin_api_integration/plugins/task_manager/index.js index 938324c12a3779..73253224bb45db 100644 --- a/x-pack/test/plugin_api_integration/plugins/task_manager/index.js +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/index.js @@ -4,9 +4,20 @@ * you may not use this file except in compliance with the Elastic License. */ +const { EventEmitter } = require('events'); + import { initRoutes } from './init_routes'; + +const once = function (emitter, event) { + return new Promise(resolve => { + emitter.once(event, resolve); + }); +}; + export default function TaskTestingAPI(kibana) { + const taskTestingEvents = new EventEmitter(); + return new kibana.Plugin({ name: 'sampleTask', require: ['elasticsearch', 'task_manager'], @@ -52,6 +63,10 @@ export default function TaskTestingAPI(kibana) { refresh: true, }); + if (params.waitForEvent) { + await once(taskTestingEvents, params.waitForEvent); + } + return { state: { count: (prevState.count || 0) + 1 }, runAt: millisecondsFromNow(params.nextRunMilliseconds), @@ -88,7 +103,7 @@ export default function TaskTestingAPI(kibana) { }, }); - initRoutes(server); + initRoutes(server, taskTestingEvents); }, }); } diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js index a9dfabae6d609f..7b9e265a15d6f8 100644 --- a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js @@ -23,11 +23,44 @@ const taskManagerQuery = { } }; -export function initRoutes(server) { +export function initRoutes(server, taskTestingEvents) { const taskManager = server.plugins.task_manager; server.route({ - path: '/api/sample_tasks', + path: '/api/sample_tasks/schedule', + method: 'POST', + config: { + validate: { + payload: Joi.object({ + task: Joi.object({ + taskType: Joi.string().required(), + interval: Joi.string().optional(), + params: Joi.object().required(), + state: Joi.object().optional(), + id: Joi.string().optional() + }) + }), + }, + }, + async handler(request) { + try { + const { task: taskFields } = request.payload; + const task = { + ...taskFields, + scope: [scope], + }; + + const taskResult = await (taskManager.schedule(task, { request })); + + return taskResult; + } catch (err) { + return err; + } + }, + }); + + server.route({ + path: '/api/sample_tasks/ensure_scheduled', method: 'POST', config: { validate: { @@ -38,26 +71,19 @@ export function initRoutes(server) { params: Joi.object().required(), state: Joi.object().optional(), id: Joi.string().optional() - }), - ensureScheduled: Joi.boolean() - .default(false) - .optional(), + }) }), }, }, async handler(request) { try { - const { ensureScheduled = false, task: taskFields } = request.payload; + const { task: taskFields } = request.payload; const task = { ...taskFields, scope: [scope], }; - const taskResult = await ( - ensureScheduled - ? taskManager.ensureScheduled(task, { request }) - : taskManager.schedule(task, { request }) - ); + const taskResult = await (taskManager.ensureScheduled(task, { request })); return taskResult; } catch (err) { @@ -66,6 +92,27 @@ export function initRoutes(server) { }, }); + server.route({ + path: '/api/sample_tasks/event', + method: 'POST', + config: { + validate: { + payload: Joi.object({ + event: Joi.string().required() + }), + }, + }, + async handler(request) { + try { + const { event } = request.payload; + taskTestingEvents.emit(event); + return { event }; + } catch (err) { + return err; + } + }, + }); + server.route({ path: '/api/sample_tasks', method: 'GET', diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index 9b4297e995cbd8..986648f795da65 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -58,7 +58,7 @@ export default function ({ getService }) { } function scheduleTask(task) { - return supertest.post('/api/sample_tasks') + return supertest.post('/api/sample_tasks/schedule') .set('kbn-xsrf', 'xxx') .send({ task }) .expect(200) @@ -66,13 +66,20 @@ export default function ({ getService }) { } function scheduleTaskIfNotExists(task) { - return supertest.post('/api/sample_tasks') + return supertest.post('/api/sample_tasks/ensure_scheduled') .set('kbn-xsrf', 'xxx') - .send({ task, ensureScheduled: true }) + .send({ task }) .expect(200) .then((response) => response.body); } + function releaseTasksWaitingForEventToComplete(event) { + return supertest.post('/api/sample_tasks/event') + .set('kbn-xsrf', 'xxx') + .send({ event }) + .expect(200); + } + it('should support middleware', async () => { const historyItem = _.random(1, 100); @@ -204,5 +211,45 @@ export default function ({ getService }) { expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer); expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer); } + + it('should run tasks in parallel, allowing for long running tasks along side faster tasks', async () => { + /** + * It's worth noting this test relies on the /event endpoint that forces Task Manager to hold off + * on completing a task until a call is made by the test suite. + * If we begin testing with multiple Kibana instacnes in Parallel this will likely become flaky. + * If you end up here because the test is flaky, this might be why. + */ + const fastTask = await scheduleTask({ + taskType: 'sampleTask', + interval: `1s`, + params: { }, + }); + + const longRunningTask = await scheduleTask({ + taskType: 'sampleTask', + interval: `1s`, + params: { + waitForEvent: 'rescheduleHasHappened' + }, + }); + + function getTaskById(tasks, id) { + return tasks.filter(task => task.id === id)[0]; + } + + await retry.try(async () => { + const tasks = (await currentTasks()).docs; + expect(getTaskById(tasks, fastTask.id).state.count).to.eql(2); + }); + + await releaseTasksWaitingForEventToComplete('rescheduleHasHappened'); + + await retry.try(async () => { + const tasks = (await currentTasks()).docs; + + expect(getTaskById(tasks, fastTask.id).state.count).to.greaterThan(2); + expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1); + }); + }); }); }