Skip to content

Commit

Permalink
stop monitoring task sabotage
Browse files Browse the repository at this point in the history
  • Loading branch information
tsullivan committed Mar 6, 2021
1 parent 907d209 commit 65d752d
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 37 deletions.
11 changes: 3 additions & 8 deletions x-pack/plugins/reporting/server/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,9 @@ export class ReportingCore {
this.pluginStartDeps = startDeps; // cache

const { taskManager } = startDeps;
if (this.getConfig().get('queue', 'pollEnabled')) {
const { executeTask, monitorTask } = this;
// enable this instance to generate reports and to monitor for pending reports
await Promise.all([executeTask.init(taskManager), monitorTask.init(taskManager)]);
} else {
// enable this instance to request other instances to generate reports
await this.executeTask.init(taskManager);
}
const { executeTask, monitorTask } = this;
// enable this instance to generate reports and to monitor for pending reports
await Promise.all([executeTask.init(taskManager), monitorTask.init(taskManager)]);
}

/*
Expand Down
6 changes: 4 additions & 2 deletions x-pack/plugins/reporting/server/lib/enqueue_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export function enqueueJobFactory(
new Report({
jobtype: exportType.jobType,
created_by: user ? user.username : false,
max_attempts: config.get('capture', 'maxAttempts'), // NOTE: changing the capture.maxAttempts config setting does not affect existing pending reports
max_attempts: config.get('capture', 'maxAttempts'), // NOTE: since max attempts is stored in the document, changing the capture.maxAttempts setting does not affect existing pending reports
payload: job,
meta: {
objectType: jobParams.objectType,
Expand All @@ -63,7 +63,9 @@ export function enqueueJobFactory(

// 2. Schedule the report with Task Manager
const task = await reporting.scheduleTask(report.toReportTaskJSON());
logger.info(`Scheduled ${exportType.name} reporting task: ${task.id}`);
logger.info(
`Scheduled ${exportType.name} reporting task. Task ID: ${task.id}. Report ID: ${report._id}`
);

return report;
};
Expand Down
39 changes: 32 additions & 7 deletions x-pack/plugins/reporting/server/lib/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,27 @@ export class ReportingStore {
}
}

public async setReportPending(report: Report) {
const doc = { status: statuses.JOB_STATUS_PENDING };

try {
checkReportIsEditable(report);

return await this.client.callAsInternalUser('update', {
id: report._id,
index: report._index,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});
} catch (err) {
this.logger.error('Error in setting report pending status!');
this.logger.error(err);
throw err;
}
}

public async setReportClaimed(report: Report, stats: Partial<Report>): Promise<ReportDocument> {
const doc = {
...stats,
Expand Down Expand Up @@ -283,9 +304,17 @@ export class ReportingStore {
}

/*
* Finds jobs stuck in pending status, or timed-out jobs stuck in processing status
* A zombie report document is one that isn't completed or failed, isn't
* being executed, and isn't scheduled to run. They arise:
* - when the cluster has processing documents in ESQueue before upgrading to v7.13 when ESQueue was removed
* - if Kibana crashes while a report task is executing and it couldn't be rescheduled on its own
*
* Pending reports are not included in this search: they may be scheduled in TM just not run yet.
* TODO Should we get a list of the reports that are pending and scheduled in TM so we can exclude them from this query?
*/
public async findLongPendingReports(logger = this.logger): Promise<ReportRecordTimeout[] | null> {
public async findZombieReportDocuments(
logger = this.logger
): Promise<ReportRecordTimeout[] | null> {
const searchParams: SearchParams = {
index: this.indexPrefix + '-*',
filterPath: 'hits.hits',
Expand All @@ -298,11 +327,7 @@ export class ReportingStore {
bool: {
must: [
{ range: { process_expiration: { lt: `now-${this.queueTimeoutMins}m` } } },
{
terms: {
status: [statuses.JOB_STATUS_PENDING, statuses.JOB_STATUS_PROCESSING],
},
},
{ terms: { status: [statuses.JOB_STATUS_PROCESSING] } },
],
},
},
Expand Down
28 changes: 20 additions & 8 deletions x-pack/plugins/reporting/server/lib/tasks/execute_report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,21 @@ export class ExecuteReportTask implements ReportingTask {
// if this is an ad-hoc report, there is a corresponding "pending" record in ReportingStore in need of updating
report = await store.findReportFromTask(task); // update seq_no
} else {
// if this is a scheduled report, the report object needs to be instantiated
// if this is a scheduled report (not implemented), the report object needs to be instantiated
throw new Error('scheduled reports are not supported!');
}

// Check if this is a completed job. This may happen if the `reports:monitor`
// task detected it to be a zombie job and rescheduled it, but it
// eventually completed on its own.
if (report.status === 'completed') {
throw new Error(`Can not claim the report job: it is already completed!`);
}

const m = moment();

// check if job has exceeded maxAttempts (stored in job params) and somehow hasn't been marked as failed yet
// NOTE: changing the capture.maxAttempts config setting does not affect existing pending reports
// NOTE: the max attempts value comes from the stored document, so changing the capture.maxAttempts config setting does not affect existing pending reports
const maxAttempts = task.max_attempts;
if (report.attempts >= maxAttempts) {
const err = new Error(`Max attempts reached (${maxAttempts}). Queue timeout reached.`);
Expand Down Expand Up @@ -231,7 +238,7 @@ export class ExecuteReportTask implements ReportingTask {

try {
await store.setReportCompleted(report, doc);
this.logger.info(`Saved ${report.jobtype} job ${docId}`);
this.logger.debug(`Saved ${report.jobtype} job ${docId}`);
} catch (err) {
if (err.statusCode === 409) return false;
errorLogger(this.logger, `Failure saving completed job ${docId}!`);
Expand Down Expand Up @@ -267,13 +274,16 @@ export class ExecuteReportTask implements ReportingTask {
if (!jobId) {
throw new Error('Invalid report data provided in scheduled task!');
}

this.reporting.trackReport(jobId);
this.logger.info(`Starting ${task.jobtype} report ${jobId}.`);
this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`);

// Update job status to claimed
report = await this._claimJob(task);

const { jobtype: jobType, attempts: attempt, max_attempts: maxAttempts } = task;
this.logger.info(
`Starting ${jobType} report ${jobId}: attempt ${attempt + 1} of ${maxAttempts}.`
);
this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`);
} catch (failedToClaim) {
// error claiming report - log the error
// could be version conflict, or no longer connected to ES
Expand All @@ -294,7 +304,7 @@ export class ExecuteReportTask implements ReportingTask {
}

// untrack the report for concurrency awareness
this.logger.info(`Stopping ${jobId}.`);
this.logger.debug(`Stopping ${jobId}.`);
this.reporting.untrackReport(jobId);
this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`);
} catch (failedToExecuteErr) {
Expand Down Expand Up @@ -375,7 +385,9 @@ export class ExecuteReportTask implements ReportingTask {
state: {},
params: task,
};
return await this.getTaskManagerStart().schedule(oldTaskInstance);
const newTask = await this.getTaskManagerStart().schedule(oldTaskInstance);
logger.debug(`Rescheduled ${task.id}`);
return newTask;
}

public getStatus() {
Expand Down
31 changes: 19 additions & 12 deletions x-pack/plugins/reporting/server/lib/tasks/monitor_reports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,16 @@ export class MonitorReportsTask implements ReportingTask {
return () => {
return {
run: async () => {
if (!this.config.queue.pollEnabled) {
this.logger.debug(
`This instance is configured to not poll for pending reports. Exiting from the monitoring task.`
);
return;
}

const reportingStore = await this.getStore();

try {
const results = await reportingStore.findLongPendingReports();
const results = await reportingStore.findZombieReportDocuments();
if (results && results.length) {
this.logger.info(`Found ${results.length} pending reports to reschedule.`);
this.logger.info(
`Found ${results.length} reports to reschedule: ${results
.map((pending) => pending._id)
.join(',')}`
);
} else {
this.logger.debug(`Found 0 pending reports.`);
return;
Expand All @@ -97,7 +94,7 @@ export class MonitorReportsTask implements ReportingTask {
_id: jobId,
_source: { process_expiration: processExpiration, status },
} = pending;
const expirationTime = moment(processExpiration);
const expirationTime = moment(processExpiration); // If it is the start of the Epoch, something went wrong
const timeWaitValue = moment().valueOf() - expirationTime.valueOf();
const timeWaitTime = moment.duration(timeWaitValue);
this.logger.info(
Expand Down Expand Up @@ -134,19 +131,29 @@ export class MonitorReportsTask implements ReportingTask {
};
}

// reschedule the task with TM and update the report document status to "Pending"
private async rescheduleTask(task: ReportTaskParams, logger: LevelLogger) {
if (!this.taskManagerStart) {
throw new Error('Reporting task runner has not been initialized!');
}

logger.info(`Rescheduling ${task.id} to retry after timeout expiration.`);

const store = await this.getStore();

const oldTaskInstance: ReportingExecuteTaskInstance = {
taskType: REPORTING_EXECUTE_TYPE, // schedule a task to EXECUTE
state: {},
params: task,
};
return await this.taskManagerStart.schedule(oldTaskInstance);

const [report, newTask] = await Promise.all([
await store.findReportFromTask(task),
await this.taskManagerStart.schedule(oldTaskInstance),
]);

await store.setReportPending(report);

return newTask;
}

public getStatus() {
Expand Down

0 comments on commit 65d752d

Please sign in to comment.