Skip to content

Commit

Permalink
[Task Manager] Tests for the ability to run tasks of varying duration…
Browse files Browse the repository at this point in the history
…s in parallel (elastic#51572) (elastic#51701)

This PR adds a test that ensures Task Manager is capable of picking up new tasks in parallel to a long running tasks that might otherwise hold up task execution.

This doesn't add functionality - just a missing test case.
  • Loading branch information
gmmorris authored Nov 27, 2019
1 parent ef43d3d commit 89250be
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 16 deletions.
17 changes: 16 additions & 1 deletion x-pack/test/plugin_api_integration/plugins/task_manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,20 @@
* you may not use this file except in compliance with the Elastic License.
*/

const { EventEmitter } = require('events');

import { initRoutes } from './init_routes';


const once = function (emitter, event) {
return new Promise(resolve => {
emitter.once(event, resolve);
});
};

export default function TaskTestingAPI(kibana) {
const taskTestingEvents = new EventEmitter();

return new kibana.Plugin({
name: 'sampleTask',
require: ['elasticsearch', 'task_manager'],
Expand Down Expand Up @@ -52,6 +63,10 @@ export default function TaskTestingAPI(kibana) {
refresh: true,
});

if (params.waitForEvent) {
await once(taskTestingEvents, params.waitForEvent);
}

return {
state: { count: (prevState.count || 0) + 1 },
runAt: millisecondsFromNow(params.nextRunMilliseconds),
Expand Down Expand Up @@ -88,7 +103,7 @@ export default function TaskTestingAPI(kibana) {
},
});

initRoutes(server);
initRoutes(server, taskTestingEvents);
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,44 @@ const taskManagerQuery = {
}
};

export function initRoutes(server) {
export function initRoutes(server, taskTestingEvents) {
const taskManager = server.plugins.task_manager;

server.route({
path: '/api/sample_tasks',
path: '/api/sample_tasks/schedule',
method: 'POST',
config: {
validate: {
payload: Joi.object({
task: Joi.object({
taskType: Joi.string().required(),
interval: Joi.string().optional(),
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional()
})
}),
},
},
async handler(request) {
try {
const { task: taskFields } = request.payload;
const task = {
...taskFields,
scope: [scope],
};

const taskResult = await (taskManager.schedule(task, { request }));

return taskResult;
} catch (err) {
return err;
}
},
});

server.route({
path: '/api/sample_tasks/ensure_scheduled',
method: 'POST',
config: {
validate: {
Expand All @@ -38,26 +71,19 @@ export function initRoutes(server) {
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional()
}),
ensureScheduled: Joi.boolean()
.default(false)
.optional(),
})
}),
},
},
async handler(request) {
try {
const { ensureScheduled = false, task: taskFields } = request.payload;
const { task: taskFields } = request.payload;
const task = {
...taskFields,
scope: [scope],
};

const taskResult = await (
ensureScheduled
? taskManager.ensureScheduled(task, { request })
: taskManager.schedule(task, { request })
);
const taskResult = await (taskManager.ensureScheduled(task, { request }));

return taskResult;
} catch (err) {
Expand All @@ -66,6 +92,27 @@ export function initRoutes(server) {
},
});

server.route({
path: '/api/sample_tasks/event',
method: 'POST',
config: {
validate: {
payload: Joi.object({
event: Joi.string().required()
}),
},
},
async handler(request) {
try {
const { event } = request.payload;
taskTestingEvents.emit(event);
return { event };
} catch (err) {
return err;
}
},
});

server.route({
path: '/api/sample_tasks',
method: 'GET',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,28 @@ export default function ({ getService }) {
}

function scheduleTask(task) {
return supertest.post('/api/sample_tasks')
return supertest.post('/api/sample_tasks/schedule')
.set('kbn-xsrf', 'xxx')
.send({ task })
.expect(200)
.then((response) => response.body);
}

function scheduleTaskIfNotExists(task) {
return supertest.post('/api/sample_tasks')
return supertest.post('/api/sample_tasks/ensure_scheduled')
.set('kbn-xsrf', 'xxx')
.send({ task, ensureScheduled: true })
.send({ task })
.expect(200)
.then((response) => response.body);
}

function releaseTasksWaitingForEventToComplete(event) {
return supertest.post('/api/sample_tasks/event')
.set('kbn-xsrf', 'xxx')
.send({ event })
.expect(200);
}

it('should support middleware', async () => {
const historyItem = _.random(1, 100);

Expand Down Expand Up @@ -204,5 +211,45 @@ export default function ({ getService }) {
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer);
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer);
}

it('should run tasks in parallel, allowing for long running tasks along side faster tasks', async () => {
/**
* It's worth noting this test relies on the /event endpoint that forces Task Manager to hold off
* on completing a task until a call is made by the test suite.
* If we begin testing with multiple Kibana instacnes in Parallel this will likely become flaky.
* If you end up here because the test is flaky, this might be why.
*/
const fastTask = await scheduleTask({
taskType: 'sampleTask',
interval: `1s`,
params: { },
});

const longRunningTask = await scheduleTask({
taskType: 'sampleTask',
interval: `1s`,
params: {
waitForEvent: 'rescheduleHasHappened'
},
});

function getTaskById(tasks, id) {
return tasks.filter(task => task.id === id)[0];
}

await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, fastTask.id).state.count).to.eql(2);
});

await releaseTasksWaitingForEventToComplete('rescheduleHasHappened');

await retry.try(async () => {
const tasks = (await currentTasks()).docs;

expect(getTaskById(tasks, fastTask.id).state.count).to.greaterThan(2);
expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1);
});
});
});
}

0 comments on commit 89250be

Please sign in to comment.