diff --git a/x-pack/performance/journeys/ecommerce_dashboard_saved_search_only.ts b/x-pack/performance/journeys/ecommerce_dashboard_saved_search_only.ts index 9c454e890af294..c63f239c5a4918 100644 --- a/x-pack/performance/journeys/ecommerce_dashboard_saved_search_only.ts +++ b/x-pack/performance/journeys/ecommerce_dashboard_saved_search_only.ts @@ -19,7 +19,7 @@ export const journey = new Journey({ await page.waitForSelector('#dashboardListingHeading'); }) - .step('Go to Ecommerce Dashboard with Saved Search only', async ({ page }) => { + .step('Go to Ecommerce Dashboard with Saved Search only', async ({ page, log }) => { await page.click(subj('dashboardListingTitleLink-[eCommerce]-Saved-Search-Dashboard')); - await waitForVisualizations(page, 1); + await waitForVisualizations(page, log, 1); }); diff --git a/x-pack/performance/journeys/ecommerce_dashboard_tsvb_gauge_only.ts b/x-pack/performance/journeys/ecommerce_dashboard_tsvb_gauge_only.ts index 1988112b9a397f..ad252cfb16e88b 100644 --- a/x-pack/performance/journeys/ecommerce_dashboard_tsvb_gauge_only.ts +++ b/x-pack/performance/journeys/ecommerce_dashboard_tsvb_gauge_only.ts @@ -19,7 +19,7 @@ export const journey = new Journey({ await page.waitForSelector('#dashboardListingHeading'); }) - .step('Go to Ecommerce Dashboard with TSVB Gauge only', async ({ page }) => { + .step('Go to Ecommerce Dashboard with TSVB Gauge only', async ({ page, log }) => { await page.click(subj('dashboardListingTitleLink-[eCommerce]-TSVB-Gauge-Only-Dashboard')); - await waitForVisualizations(page, 1); + await waitForVisualizations(page, log, 1); }); diff --git a/x-pack/plugins/task_manager/server/lib/adhoc_task_counter.test.ts b/x-pack/plugins/task_manager/server/lib/adhoc_task_counter.test.ts new file mode 100644 index 00000000000000..d8cb08127b592c --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/adhoc_task_counter.test.ts @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { AdHocTaskCounter } from './adhoc_task_counter'; + +describe('AdHocTaskCounter', () => { + const counter = new AdHocTaskCounter(); + + afterAll(() => { + counter.reset(); + }); + + it('increments counter', async () => { + counter.increment(10); + await expect(counter.count).toEqual(10); + }); + + it('resets counter', async () => { + counter.increment(10); + counter.reset(); + await expect(counter.count).toEqual(0); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/adhoc_task_counter.ts b/x-pack/plugins/task_manager/server/lib/adhoc_task_counter.ts new file mode 100644 index 00000000000000..dc035ba890175e --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/adhoc_task_counter.ts @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +/** + * Keeps track of how many tasks have been created. + * + * @export + * @class AdHocTaskCounter + * + */ +export class AdHocTaskCounter { + /** + * Gets the number of created tasks. + */ + public get count() { + return this._count; + } + + private _count: number; + + constructor() { + this._count = 0; + } + + public increment(by: number = 1) { + this._count += by; + } + + public reset() { + this._count = 0; + } +} diff --git a/x-pack/plugins/task_manager/server/lib/intervals.test.ts b/x-pack/plugins/task_manager/server/lib/intervals.test.ts index 416be4c1946e57..0d139fe5dfccfd 100644 --- a/x-pack/plugins/task_manager/server/lib/intervals.test.ts +++ b/x-pack/plugins/task_manager/server/lib/intervals.test.ts @@ -16,6 +16,7 @@ import { secondsFromDate, asInterval, maxIntervalFromDate, + parseIntervalAsMinute, } from './intervals'; let fakeTimer: sinon.SinonFakeTimers; @@ -65,6 +66,44 @@ describe('taskIntervals', () => { }); }); + describe('parseIntervalAsMinute', () => { + test('it accepts intervals in the form `Nm`', () => { + expect(() => parseIntervalAsMinute(`${_.random(1, 1000)}m`)).not.toThrow(); + }); + + test('it accepts intervals in the form `Ns`', () => { + expect(() => parseIntervalAsMinute(`${_.random(1, 1000)}s`)).not.toThrow(); + }); + + test('it rejects 0 based intervals', () => { + expect(() => parseIntervalAsMinute('0m')).toThrow( + /Invalid interval "0m"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => parseIntervalAsMinute('0s')).toThrow( + /Invalid interval "0s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + + test('it rejects intervals are not of the form `Nm` or `Ns`', () => { + expect(() => parseIntervalAsMinute(`5m 2s`)).toThrow( + /Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => parseIntervalAsMinute(`hello`)).toThrow( + /Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + + test('returns an interval as m', () => { + expect(parseIntervalAsMinute('5s')).toEqual(5 / 60); + expect(parseIntervalAsMinute('15s')).toEqual(15 / 60); + expect(parseIntervalAsMinute('20m')).toEqual(20); + expect(parseIntervalAsMinute('61m')).toEqual(61); + expect(parseIntervalAsMinute('90m')).toEqual(90); + expect(parseIntervalAsMinute('2h')).toEqual(2 * 60); + expect(parseIntervalAsMinute('9d')).toEqual(9 * 60 * 24); + }); + }); + describe('parseIntervalAsMillisecond', () => { test('it accepts intervals in the form `Nm`', () => { expect(() => parseIntervalAsMillisecond(`${_.random(1, 1000)}m`)).not.toThrow(); diff --git a/x-pack/plugins/task_manager/server/lib/intervals.ts b/x-pack/plugins/task_manager/server/lib/intervals.ts index f876c60fe5435f..4ebb1e70a18bff 100644 --- a/x-pack/plugins/task_manager/server/lib/intervals.ts +++ b/x-pack/plugins/task_manager/server/lib/intervals.ts @@ -114,6 +114,10 @@ export const parseIntervalAsSecond = memoize((interval: Interval): number => { return Math.round(parseIntervalAsMillisecond(interval) / 1000); }); +export const parseIntervalAsMinute = memoize((interval: Interval): number => { + return parseIntervalAsMillisecond(interval) / (1000 * 60); +}); + export const parseIntervalAsMillisecond = memoize((interval: Interval): number => { const numericAsStr: string = interval.slice(0, -1); const numeric: number = parseInt(numericAsStr, 10); diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts new file mode 100644 index 00000000000000..875c7cf735836a --- /dev/null +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts @@ -0,0 +1,542 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import uuid from 'uuid'; +import { Subject, Observable } from 'rxjs'; +import { take, bufferCount, skip, map } from 'rxjs/operators'; +import { ConcreteTaskInstance, TaskStatus } from '../task'; +import { asTaskRunEvent, TaskTiming, TaskPersistence } from '../task_events'; +import { asOk } from '../lib/result_type'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { TaskRunResult } from '../task_running'; +import { AggregatedStat } from './runtime_statistics_aggregator'; +import { taskPollingLifecycleMock } from '../polling_lifecycle.mock'; +import { + BackgroundTaskUtilizationStat, + createBackgroundTaskUtilizationAggregator, + SummarizedBackgroundTaskUtilizationStat, + summarizeUtilizationStat, +} from './background_task_utilization_statistics'; +import { parseIntervalAsMinute } from '../lib/intervals'; +import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; +import { sum } from 'lodash'; + +describe('Task Run Statistics', () => { + const pollInterval = 3000; + + beforeAll(() => { + jest.resetAllMocks(); + }); + + test('returns a running count of adhoc actual service_time', async () => { + const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const runningAverageWindowSize = 5; + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + runningAverageWindowSize, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.adhoc.ran.service_time.actual).toEqual(sum(window)); + } + + return new Promise((resolve) => { + const events = []; + const now = Date.now(); + for (const time of serviceTimes) { + events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + } + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeUtilizationStat(value).value, + })), + take(serviceTimes.length), + bufferCount(serviceTimes.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], serviceTimes.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], serviceTimes.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], serviceTimes.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], serviceTimes.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], serviceTimes.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(1, 6)); + expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(2, 7)); + expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(3, 8)); + resolve(); + }); + + for (const event of events) { + events$.next(mockTaskRunEvent({}, event)); + } + }); + }); + + test('returns a running count of adhoc adjusted service_time', async () => { + const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const runningAverageWindowSize = 5; + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + runningAverageWindowSize, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.adhoc.ran.service_time.adjusted).toEqual(sum(window)); + } + + return new Promise((resolve) => { + const events = []; + const now = Date.now(); + for (const time of serviceTimes) { + events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + } + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeUtilizationStat(value).value, + })), + take(serviceTimes.length), + bufferCount(serviceTimes.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], roundUpToNearestSec(serviceTimes.slice(0, 1), 3)); + expectWindowEqualsUpdate(taskStats[1], roundUpToNearestSec(serviceTimes.slice(0, 2), 3)); + expectWindowEqualsUpdate(taskStats[2], roundUpToNearestSec(serviceTimes.slice(0, 3), 3)); + expectWindowEqualsUpdate(taskStats[3], roundUpToNearestSec(serviceTimes.slice(0, 4), 3)); + expectWindowEqualsUpdate(taskStats[4], roundUpToNearestSec(serviceTimes.slice(0, 5), 3)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(1, 6), 3)); + expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(2, 7), 3)); + expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(3, 8), 3)); + resolve(); + }); + + for (const event of events) { + events$.next(mockTaskRunEvent({}, event)); + } + }); + }); + + test('returns a running count of adhoc task_counter', async () => { + const tasks = [0, 0, 0, 0, 0, 0, 0, 0]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const runningAverageWindowSize = 5; + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + runningAverageWindowSize, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.adhoc.ran.service_time.task_counter).toEqual(window.length); + } + + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeUtilizationStat(value).value, + })), + take(tasks.length), + bufferCount(tasks.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], tasks.slice(1, 6)); + expectWindowEqualsUpdate(taskStats[6], tasks.slice(2, 7)); + expectWindowEqualsUpdate(taskStats[7], tasks.slice(3, 8)); + resolve(); + }); + + for (const task of tasks) { + events$.next(mockTaskRunEvent({}, { start: task, stop: task })); + } + }); + }); + + test('returns a running count of adhoc created counter', async () => { + const tasks = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const runningAverageWindowSize = 5; + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + runningAverageWindowSize, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.adhoc.created.counter).toEqual(sum(window)); + } + + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeUtilizationStat(value).value, + })), + take(tasks.length), + bufferCount(tasks.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], tasks.slice(1, 6)); + expectWindowEqualsUpdate(taskStats[6], tasks.slice(2, 7)); + expectWindowEqualsUpdate(taskStats[7], tasks.slice(3, 8)); + resolve(); + }); + + for (const task of tasks) { + adHocTaskCounter.increment(task); + events$.next(mockTaskRunEvent({}, { start: 0, stop: 0 })); + } + }); + }); + + test('returns a running count of recurring actual service_time', async () => { + const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const runningAverageWindowSize = 5; + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + runningAverageWindowSize, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.recurring.ran.service_time.actual).toEqual(sum(window)); + } + + return new Promise((resolve) => { + const events = []; + const now = Date.now(); + for (const time of serviceTimes) { + events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + } + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeUtilizationStat(value).value, + })), + take(serviceTimes.length), + bufferCount(serviceTimes.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], serviceTimes.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], serviceTimes.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], serviceTimes.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], serviceTimes.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], serviceTimes.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(1, 6)); + expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(2, 7)); + expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(3, 8)); + resolve(); + }); + + for (const event of events) { + events$.next(mockTaskRunEvent({ schedule: { interval: '1h' } }, event)); + } + }); + }); + + test('returns a running count of recurring adjusted service_time', async () => { + const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const runningAverageWindowSize = 5; + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + runningAverageWindowSize, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.recurring.ran.service_time.adjusted).toEqual(sum(window)); + } + + return new Promise((resolve) => { + const events = []; + const now = Date.now(); + for (const time of serviceTimes) { + events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + } + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeUtilizationStat(value).value, + })), + take(serviceTimes.length), + bufferCount(serviceTimes.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], roundUpToNearestSec(serviceTimes.slice(0, 1), 3)); + expectWindowEqualsUpdate(taskStats[1], roundUpToNearestSec(serviceTimes.slice(0, 2), 3)); + expectWindowEqualsUpdate(taskStats[2], roundUpToNearestSec(serviceTimes.slice(0, 3), 3)); + expectWindowEqualsUpdate(taskStats[3], roundUpToNearestSec(serviceTimes.slice(0, 4), 3)); + expectWindowEqualsUpdate(taskStats[4], roundUpToNearestSec(serviceTimes.slice(0, 5), 3)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(1, 6), 3)); + expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(2, 7), 3)); + expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(3, 8), 3)); + resolve(); + }); + + for (const event of events) { + events$.next(mockTaskRunEvent({ schedule: { interval: '1h' } }, event)); + } + }); + }); + + test('returns a running count of recurring task_counter', async () => { + const tasks = [0, 0, 0, 0, 0, 0, 0, 0]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const runningAverageWindowSize = 5; + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + runningAverageWindowSize, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.recurring.ran.service_time.task_counter).toEqual(window.length); + } + + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeUtilizationStat(value).value, + })), + take(tasks.length), + bufferCount(tasks.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], tasks.slice(1, 6)); + expectWindowEqualsUpdate(taskStats[6], tasks.slice(2, 7)); + expectWindowEqualsUpdate(taskStats[7], tasks.slice(3, 8)); + resolve(); + }); + + for (const task of tasks) { + events$.next( + mockTaskRunEvent({ schedule: { interval: '1h' } }, { start: task, stop: task }) + ); + } + }); + }); + + test('returns a running count of recurring tasks_per_min', async () => { + const intervals = ['1h', '5m', '2h', '30m', '10m', '1m', '5h', '120m']; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const runningAverageWindowSize = 5; + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + runningAverageWindowSize, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.recurring.tasks_per_min).toEqual(sum(window)); + } + + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeUtilizationStat(value).value, + })), + take(intervals.length), + bufferCount(intervals.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], mapInterval(intervals.slice(0, 1))); + expectWindowEqualsUpdate(taskStats[1], mapInterval(intervals.slice(0, 2))); + expectWindowEqualsUpdate(taskStats[2], mapInterval(intervals.slice(0, 3))); + expectWindowEqualsUpdate(taskStats[3], mapInterval(intervals.slice(0, 4))); + expectWindowEqualsUpdate(taskStats[4], mapInterval(intervals.slice(0, 5))); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], mapInterval(intervals.slice(1, 6))); + expectWindowEqualsUpdate(taskStats[6], mapInterval(intervals.slice(2, 7))); + expectWindowEqualsUpdate(taskStats[7], mapInterval(intervals.slice(3, 8))); + resolve(); + }); + + for (const i of intervals) { + events$.next(mockTaskRunEvent({ schedule: { interval: i } }, { start: 0, stop: 0 })); + } + }); + }); +}); + +function runAtMillisecondsAgo(now: number, ms: number): Date { + return new Date(now - ms); +} + +function roundUpToNearestSec(duration: number[], s: number): number[] { + const pollInterval = s * 1000; + return duration.map((d) => Math.ceil(d / pollInterval) * pollInterval); +} + +function mapInterval(intervals: string[]): number[] { + return intervals.map((i) => { + const interval = parseIntervalAsMinute(i); + return 1 / interval; + }); +} + +const mockTaskRunEvent = ( + overrides: Partial = {}, + timing: TaskTiming, + result: TaskRunResult = TaskRunResult.Success, + persistence?: TaskPersistence +) => { + const task = mockTaskInstance(overrides); + return asTaskRunEvent( + task.id, + asOk({ + task, + persistence: + persistence ?? (task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring), + result, + }), + timing + ); +}; + +const mockTaskInstance = (overrides: Partial = {}): ConcreteTaskInstance => ({ + id: uuid.v4(), + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: {}, + taskType: 'alerting:test', + params: { + alertId: '1', + }, + ownerId: null, + ...overrides, +}); diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts new file mode 100644 index 00000000000000..3d096cd366e722 --- /dev/null +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts @@ -0,0 +1,276 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonObject } from '@kbn/utility-types'; +import { get } from 'lodash'; +import { combineLatest, filter, map, Observable, startWith } from 'rxjs'; +import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; +import { parseIntervalAsMinute } from '../lib/intervals'; +import { unwrap } from '../lib/result_type'; +import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle'; +import { ConcreteTaskInstance } from '../task'; +import { isTaskRunEvent, TaskRun, TaskTiming } from '../task_events'; +import { MonitoredStat } from './monitoring_stats_stream'; +import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { createRunningAveragedStat } from './task_run_calcultors'; + +export interface BackgroundTaskUtilizationStat extends JsonObject { + adhoc: AdhocTaskStat; + recurring: RecurringTaskStat; +} + +interface TaskStat extends JsonObject { + ran: { + service_time: { + actual: number[]; // total service time for running recurring tasks + adjusted: number[]; // total service time adjusted for polling interval + task_counter: number[]; // recurring tasks counter, only increases for the lifetime of the process + }; + }; +} + +interface AdhocTaskStat extends TaskStat { + created: { + counter: number[]; // counter for number of ad hoc tasks created + }; +} + +interface RecurringTaskStat extends TaskStat { + tasks_per_min: number[]; +} + +export interface SummarizedBackgroundTaskUtilizationStat extends JsonObject { + adhoc: { + created: { + counter: number; + }; + ran: { + service_time: { + actual: number; + adjusted: number; + task_counter: number; + }; + }; + }; + recurring: { + tasks_per_min: number; + ran: { + service_time: { + actual: number; + adjusted: number; + task_counter: number; + }; + }; + }; +} + +export function createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle: TaskPollingLifecycle, + runningAverageWindowSize: number, + adHocTaskCounter: AdHocTaskCounter, + pollInterval: number +): AggregatedStatProvider { + const taskRunEventToAdhocStat = createTaskRunEventToAdhocStat(runningAverageWindowSize); + const taskRunAdhocEvents$: Observable> = + taskPollingLifecycle.events.pipe( + filter((taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent) && hasTiming(taskEvent)), + map((taskEvent: TaskLifecycleEvent) => ({ + taskEvent, + ...unwrap((taskEvent as TaskRun).event), + })), + filter(({ task }) => get(task, 'schedule.interval', null) == null), + map(({ taskEvent }) => { + return taskRunEventToAdhocStat(taskEvent.timing!, adHocTaskCounter, pollInterval); + }) + ); + + const taskRunEventToRecurringStat = createTaskRunEventToRecurringStat(runningAverageWindowSize); + const taskRunRecurringEvents$: Observable> = + taskPollingLifecycle.events.pipe( + filter((taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent) && hasTiming(taskEvent)), + map((taskEvent: TaskLifecycleEvent) => ({ + taskEvent, + ...unwrap((taskEvent as TaskRun).event), + })), + filter(({ task }) => get(task, 'schedule.interval', null) != null), + map(({ taskEvent, task }) => { + return taskRunEventToRecurringStat(taskEvent.timing!, task, pollInterval); + }) + ); + + return combineLatest([ + taskRunAdhocEvents$.pipe( + startWith({ + adhoc: { + created: { + counter: [], + }, + ran: { + service_time: { + actual: [], + adjusted: [], + task_counter: [], + }, + }, + }, + }) + ), + taskRunRecurringEvents$.pipe( + startWith({ + recurring: { + tasks_per_min: [], + ran: { + service_time: { + actual: [], + adjusted: [], + task_counter: [], + }, + }, + }, + }) + ), + ]).pipe( + map( + ([adhoc, recurring]: [ + Pick, + Pick + ]) => { + return { + key: 'utilization', + value: { + ...adhoc, + ...recurring, + }, + } as AggregatedStat; + } + ) + ); +} + +function hasTiming(taskEvent: TaskLifecycleEvent) { + return !!taskEvent?.timing; +} + +export function summarizeUtilizationStat({ adhoc, recurring }: BackgroundTaskUtilizationStat): { + value: SummarizedBackgroundTaskUtilizationStat; +} { + return { + value: { + adhoc: { + created: { + counter: calculateSum(adhoc.created.counter), + }, + ran: { + service_time: { + actual: calculateSum(adhoc.ran.service_time.actual), + adjusted: calculateSum(adhoc.ran.service_time.adjusted), + task_counter: calculateSum(adhoc.ran.service_time.task_counter), + }, + }, + }, + recurring: { + tasks_per_min: calculateSum(recurring.tasks_per_min), + ran: { + service_time: { + actual: calculateSum(recurring.ran.service_time.actual), + adjusted: calculateSum(recurring.ran.service_time.adjusted), + task_counter: calculateSum(recurring.ran.service_time.task_counter), + }, + }, + }, + }, + }; +} + +export function summarizeUtilizationStats({ + // eslint-disable-next-line @typescript-eslint/naming-convention + last_update, + stats, +}: { + last_update: string; + stats: MonitoredStat | undefined; +}): { + last_update: string; + stats: MonitoredStat | null; +} { + return { + last_update, + stats: stats + ? { + timestamp: stats.timestamp, + ...summarizeUtilizationStat(stats.value), + } + : null, + }; +} + +function createTaskRunEventToAdhocStat(runningAverageWindowSize: number) { + const createdCounterQueue = createRunningAveragedStat(runningAverageWindowSize); + const actualQueue = createRunningAveragedStat(runningAverageWindowSize); + const adjustedQueue = createRunningAveragedStat(runningAverageWindowSize); + const taskCounterQueue = createRunningAveragedStat(runningAverageWindowSize); + return ( + timing: TaskTiming, + adHocTaskCounter: AdHocTaskCounter, + pollInterval: number + ): Pick => { + const { duration, adjusted } = getServiceTimeStats(timing, pollInterval); + const created = adHocTaskCounter.count; + adHocTaskCounter.reset(); + return { + adhoc: { + created: { + counter: createdCounterQueue(created), + }, + ran: { + service_time: { + actual: actualQueue(duration), + adjusted: adjustedQueue(adjusted), + task_counter: taskCounterQueue(1), + }, + }, + }, + }; + }; +} + +function createTaskRunEventToRecurringStat(runningAverageWindowSize: number) { + const tasksPerMinQueue = createRunningAveragedStat(runningAverageWindowSize); + const actualQueue = createRunningAveragedStat(runningAverageWindowSize); + const adjustedQueue = createRunningAveragedStat(runningAverageWindowSize); + const taskCounterQueue = createRunningAveragedStat(runningAverageWindowSize); + return ( + timing: TaskTiming, + task: ConcreteTaskInstance, + pollInterval: number + ): Pick => { + const { duration, adjusted } = getServiceTimeStats(timing, pollInterval); + const interval = parseIntervalAsMinute(task.schedule?.interval!); + return { + recurring: { + tasks_per_min: tasksPerMinQueue(1 / interval), + ran: { + service_time: { + actual: actualQueue(duration), + adjusted: adjustedQueue(adjusted), + task_counter: taskCounterQueue(1), + }, + }, + }, + }; + }; +} + +function getServiceTimeStats(timing: TaskTiming, pollInterval: number) { + const duration = timing!.stop - timing!.start; + const adjusted = Math.ceil(duration / pollInterval) * pollInterval; + return { duration, adjusted }; +} + +function calculateSum(arr: number[]) { + return arr.reduce((acc, s) => (acc += s), 0); +} diff --git a/x-pack/plugins/task_manager/server/monitoring/index.ts b/x-pack/plugins/task_manager/server/monitoring/index.ts index de1bea796c0385..9ee32e97d7758a 100644 --- a/x-pack/plugins/task_manager/server/monitoring/index.ts +++ b/x-pack/plugins/task_manager/server/monitoring/index.ts @@ -17,6 +17,7 @@ import { TaskStore } from '../task_store'; import { TaskPollingLifecycle } from '../polling_lifecycle'; import { ManagedConfiguration } from '../lib/create_managed_configuration'; import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle'; +import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; export type { MonitoringStats, RawMonitoringStats } from './monitoring_stats_stream'; export { @@ -32,6 +33,7 @@ export function createMonitoringStats( config: TaskManagerConfig, managedConfig: ManagedConfiguration, logger: Logger, + adHocTaskCounter: AdHocTaskCounter, taskPollingLifecycle?: TaskPollingLifecycle, ephemeralTaskLifecycle?: EphemeralTaskLifecycle ): Observable { @@ -42,6 +44,7 @@ export function createMonitoringStats( config, managedConfig, logger, + adHocTaskCounter, taskPollingLifecycle, ephemeralTaskLifecycle ), diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index a16e01189f4c7d..19485e41c2ae22 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -30,12 +30,18 @@ import { TaskRunStat, SummarizedTaskRunStat, } from './task_run_statistics'; +import { + BackgroundTaskUtilizationStat, + createBackgroundTaskUtilizationAggregator, +} from './background_task_utilization_statistics'; + import { ConfigStat, createConfigurationAggregator } from './configuration_statistics'; import { TaskManagerConfig } from '../config'; import { AggregatedStatProvider } from './runtime_statistics_aggregator'; import { ManagedConfiguration } from '../lib/create_managed_configuration'; import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle'; import { CapacityEstimationStat, withCapacityEstimate } from './capacity_estimation'; +import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; export type { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; @@ -46,6 +52,7 @@ export interface MonitoringStats { workload?: MonitoredStat; runtime?: MonitoredStat; ephemeral?: MonitoredStat; + utilization?: MonitoredStat; }; } @@ -55,7 +62,7 @@ export enum HealthStatus { Error = 'error', } -interface MonitoredStat { +export interface MonitoredStat { timestamp: string; value: T; } @@ -80,6 +87,7 @@ export function createAggregators( config: TaskManagerConfig, managedConfig: ManagedConfiguration, logger: Logger, + adHocTaskCounter: AdHocTaskCounter, taskPollingLifecycle?: TaskPollingLifecycle, ephemeralTaskLifecycle?: EphemeralTaskLifecycle ): AggregatedStatProvider { @@ -96,7 +104,13 @@ export function createAggregators( ]; if (taskPollingLifecycle) { aggregators.push( - createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window) + createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window), + createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + config.monitored_stats_running_average_window, + adHocTaskCounter, + config.poll_interval + ) ); } if (ephemeralTaskLifecycle && ephemeralTaskLifecycle.enabled) { @@ -145,7 +159,7 @@ export function summarizeMonitoringStats( { // eslint-disable-next-line @typescript-eslint/naming-convention last_update, - stats: { runtime, workload, configuration, ephemeral }, + stats: { runtime, workload, configuration, ephemeral, utilization }, }: MonitoringStats, config: TaskManagerConfig ): RawMonitoringStats { diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index d2f93903cc7dd0..275f1e845bcd87 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -29,12 +29,13 @@ import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './tas import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store'; import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskScheduling } from './task_scheduling'; -import { healthRoute } from './routes'; +import { backgroundTaskUtilizationRoute, healthRoute } from './routes'; import { createMonitoringStats, MonitoringStats } from './monitoring'; import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle'; import { EphemeralTask, ConcreteTaskInstance } from './task'; import { registerTaskManagerUsageCollector } from './usage'; import { TASK_MANAGER_INDEX } from './constants'; +import { AdHocTaskCounter } from './lib/adhoc_task_counter'; export interface TaskManagerSetupContract { /** @@ -84,6 +85,7 @@ export class TaskManagerPlugin private monitoringStats$ = new Subject(); private shouldRunBackgroundTasks: boolean; private readonly kibanaVersion: PluginInitializerContext['env']['packageInfo']['version']; + private adHocTaskCounter: AdHocTaskCounter; constructor(private readonly initContext: PluginInitializerContext) { this.initContext = initContext; @@ -92,6 +94,7 @@ export class TaskManagerPlugin this.definitions = new TaskTypeDictionary(this.logger); this.kibanaVersion = initContext.env.packageInfo.version; this.shouldRunBackgroundTasks = initContext.node.roles.backgroundTasks; + this.adHocTaskCounter = new AdHocTaskCounter(); } public setup( @@ -133,6 +136,18 @@ export class TaskManagerPlugin startServicesPromise.then(({ elasticsearch }) => elasticsearch.client), shouldRunTasks: this.shouldRunBackgroundTasks, }); + const monitoredUtilization$ = backgroundTaskUtilizationRoute({ + router, + monitoringStats$: this.monitoringStats$, + logger: this.logger, + taskManagerId: this.taskManagerId, + config: this.config!, + usageCounter: this.usageCounter!, + kibanaVersion: this.kibanaVersion, + kibanaIndexName: core.savedObjects.getKibanaIndex(), + getClusterClient: () => + startServicesPromise.then(({ elasticsearch }) => elasticsearch.client), + }); core.status.derivedStatus$.subscribe((status) => this.logger.debug(`status core.status.derivedStatus now set to ${status.level}`) @@ -155,6 +170,7 @@ export class TaskManagerPlugin registerTaskManagerUsageCollector( usageCollection, monitoredHealth$, + monitoredUtilization$, this.config.ephemeral_tasks.enabled, this.config.ephemeral_tasks.request_capacity, this.config.unsafe.exclude_task_types @@ -195,6 +211,7 @@ export class TaskManagerPlugin index: TASK_MANAGER_INDEX, definitions: this.definitions, taskManagerId: `kibana:${this.taskManagerId!}`, + adHocTaskCounter: this.adHocTaskCounter, }); const managedConfiguration = createManagedConfiguration({ @@ -237,6 +254,7 @@ export class TaskManagerPlugin this.config!, managedConfiguration, this.logger, + this.adHocTaskCounter, this.taskPollingLifecycle, this.ephemeralTaskLifecycle ).subscribe((stat) => this.monitoringStats$.next(stat)); diff --git a/x-pack/plugins/task_manager/server/routes/background_task_utilization.test.ts b/x-pack/plugins/task_manager/server/routes/background_task_utilization.test.ts new file mode 100644 index 00000000000000..a10952ddd04635 --- /dev/null +++ b/x-pack/plugins/task_manager/server/routes/background_task_utilization.test.ts @@ -0,0 +1,206 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { of, Subject } from 'rxjs'; +import uuid from 'uuid'; +import { httpServiceMock } from '@kbn/core/server/mocks'; +import { mockHandlerArguments } from './_mock_handler_arguments'; +import { sleep } from '../test_utils'; +import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks'; +import { usageCountersServiceMock } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counters_service.mock'; +import { MonitoringStats } from '../monitoring'; +import { configSchema, TaskManagerConfig } from '../config'; +import { backgroundTaskUtilizationRoute } from './background_task_utilization'; +import { SecurityHasPrivilegesResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; + +const mockUsageCountersSetup = usageCountersServiceMock.createSetupContract(); +const mockUsageCounter = mockUsageCountersSetup.createUsageCounter('test'); + +const createMockClusterClient = (response: SecurityHasPrivilegesResponse) => { + const mockScopedClusterClient = elasticsearchServiceMock.createScopedClusterClient(); + mockScopedClusterClient.asCurrentUser.security.hasPrivileges.mockResponse(response); + + const mockClusterClient = elasticsearchServiceMock.createClusterClient(); + mockClusterClient.asScoped.mockReturnValue(mockScopedClusterClient); + + return { mockClusterClient, mockScopedClusterClient }; +}; + +describe('backgroundTaskUtilizationRoute', () => { + const logger = loggingSystemMock.create().get(); + + beforeEach(() => { + jest.resetAllMocks(); + }); + + it('registers the route', async () => { + const router = httpServiceMock.createRouter(); + backgroundTaskUtilizationRoute({ + router, + monitoringStats$: of(), + logger, + taskManagerId: uuid.v4(), + config: getTaskManagerConfig(), + kibanaVersion: '8.0', + kibanaIndexName: '.kibana', + getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()), + usageCounter: mockUsageCounter, + }); + + const [config] = router.get.mock.calls[0]; + + expect(config.path).toMatchInlineSnapshot( + `"/internal/task_manager/_background_task_utilization"` + ); + }); + + it('checks user privileges and increments usage counter when API is accessed', async () => { + const { mockClusterClient, mockScopedClusterClient } = createMockClusterClient({ + has_all_requested: false, + } as SecurityHasPrivilegesResponse); + const router = httpServiceMock.createRouter(); + backgroundTaskUtilizationRoute({ + router, + monitoringStats$: of(), + logger, + taskManagerId: uuid.v4(), + config: getTaskManagerConfig(), + kibanaVersion: '8.0', + kibanaIndexName: 'foo', + getClusterClient: () => Promise.resolve(mockClusterClient), + usageCounter: mockUsageCounter, + }); + + const [, handler] = router.get.mock.calls[0]; + const [context, req, res] = mockHandlerArguments({}, {}, ['ok']); + await handler(context, req, res); + + expect(mockScopedClusterClient.asCurrentUser.security.hasPrivileges).toHaveBeenCalledWith({ + body: { + application: [ + { + application: `kibana-foo`, + resources: ['*'], + privileges: [`api:8.0:taskManager`], + }, + ], + }, + }); + expect(mockUsageCounter.incrementCounter).toHaveBeenCalledTimes(1); + expect(mockUsageCounter.incrementCounter).toHaveBeenNthCalledWith(1, { + counterName: `taskManagerBackgroundTaskUtilApiAccess`, + counterType: 'taskManagerBackgroundTaskUtilApi', + incrementBy: 1, + }); + }); + + it('checks user privileges and increments admin usage counter when API is accessed when user has access to task manager feature', async () => { + const { mockClusterClient, mockScopedClusterClient } = createMockClusterClient({ + has_all_requested: true, + } as SecurityHasPrivilegesResponse); + const router = httpServiceMock.createRouter(); + backgroundTaskUtilizationRoute({ + router, + monitoringStats$: of(), + logger, + taskManagerId: uuid.v4(), + config: getTaskManagerConfig(), + kibanaVersion: '8.0', + kibanaIndexName: 'foo', + getClusterClient: () => Promise.resolve(mockClusterClient), + usageCounter: mockUsageCounter, + }); + + const [, handler] = router.get.mock.calls[0]; + const [context, req, res] = mockHandlerArguments({}, {}, ['ok']); + await handler(context, req, res); + + expect(mockScopedClusterClient.asCurrentUser.security.hasPrivileges).toHaveBeenCalledWith({ + body: { + application: [ + { + application: `kibana-foo`, + resources: ['*'], + privileges: [`api:8.0:taskManager`], + }, + ], + }, + }); + + expect(mockUsageCounter.incrementCounter).toHaveBeenCalledTimes(2); + expect(mockUsageCounter.incrementCounter).toHaveBeenNthCalledWith(1, { + counterName: `taskManagerBackgroundTaskUtilApiAccess`, + counterType: 'taskManagerBackgroundTaskUtilApi', + incrementBy: 1, + }); + expect(mockUsageCounter.incrementCounter).toHaveBeenNthCalledWith(2, { + counterName: `taskManagerBackgroundTaskUtilApiAdminAccess`, + counterType: 'taskManagerBackgroundTaskUtilApi', + incrementBy: 1, + }); + }); + + it('skips checking user privileges if usage counter is undefined', async () => { + const { mockClusterClient, mockScopedClusterClient } = createMockClusterClient({ + has_all_requested: false, + } as SecurityHasPrivilegesResponse); + const router = httpServiceMock.createRouter(); + backgroundTaskUtilizationRoute({ + router, + monitoringStats$: of(), + logger, + taskManagerId: uuid.v4(), + config: getTaskManagerConfig(), + kibanaVersion: '8.0', + kibanaIndexName: 'foo', + getClusterClient: () => Promise.resolve(mockClusterClient), + }); + + const [, handler] = router.get.mock.calls[0]; + const [context, req, res] = mockHandlerArguments({}, {}, ['ok']); + await handler(context, req, res); + + expect(mockScopedClusterClient.asCurrentUser.security.hasPrivileges).not.toHaveBeenCalled(); + }); + + it(`logs an error if the utilization stats are null`, async () => { + const router = httpServiceMock.createRouter(); + const stats$ = new Subject(); + const id = uuid.v4(); + backgroundTaskUtilizationRoute({ + router, + monitoringStats$: stats$, + logger, + taskManagerId: id, + config: getTaskManagerConfig(), + kibanaVersion: '8.0', + kibanaIndexName: '.kibana', + getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()), + usageCounter: mockUsageCounter, + }); + + stats$.next({ stats: {} } as MonitoringStats); + await sleep(1001); + + expect(logger.debug).toHaveBeenNthCalledWith( + 1, + 'Unable to get Task Manager background task utilization metrics.' + ); + }); +}); + +const getTaskManagerConfig = (overrides: Partial = {}) => + configSchema.validate( + overrides.monitored_stats_required_freshness + ? { + // use `monitored_stats_required_freshness` as poll interval otherwise we might + // fail validation as it must be greather than the poll interval + poll_interval: overrides.monitored_stats_required_freshness, + ...overrides, + } + : overrides + ); diff --git a/x-pack/plugins/task_manager/server/routes/background_task_utilization.ts b/x-pack/plugins/task_manager/server/routes/background_task_utilization.ts new file mode 100644 index 00000000000000..a5ceaf5e5ca77b --- /dev/null +++ b/x-pack/plugins/task_manager/server/routes/background_task_utilization.ts @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + IRouter, + RequestHandlerContext, + KibanaRequest, + IKibanaResponse, + KibanaResponseFactory, + Logger, +} from '@kbn/core/server'; +import { IClusterClient } from '@kbn/core/server'; +import { Observable, Subject } from 'rxjs'; +import { throttleTime, tap, map } from 'rxjs/operators'; +import { UsageCounter } from '@kbn/usage-collection-plugin/server'; +import { MonitoringStats } from '../monitoring'; +import { TaskManagerConfig } from '../config'; +import { + SummarizedBackgroundTaskUtilizationStat, + summarizeUtilizationStats, +} from '../monitoring/background_task_utilization_statistics'; +import { MonitoredStat } from '../monitoring/monitoring_stats_stream'; + +export interface MonitoredUtilization { + process_uuid: string; + timestamp: string; + last_update: string; + stats: MonitoredStat | null; +} + +export interface BackgroundTaskUtilRouteParams { + router: IRouter; + monitoringStats$: Observable; + logger: Logger; + taskManagerId: string; + config: TaskManagerConfig; + kibanaVersion: string; + kibanaIndexName: string; + getClusterClient: () => Promise; + usageCounter?: UsageCounter; +} + +export function backgroundTaskUtilizationRoute( + params: BackgroundTaskUtilRouteParams +): Observable { + const { + router, + monitoringStats$, + logger, + taskManagerId, + config, + kibanaVersion, + kibanaIndexName, + getClusterClient, + usageCounter, + } = params; + + const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness; + + function getBackgroundTaskUtilization(monitoredStats: MonitoringStats) { + const summarizedStats = summarizeUtilizationStats({ + last_update: monitoredStats.last_update, + stats: monitoredStats.stats.utilization, + }); + const now = Date.now(); + const timestamp = new Date(now).toISOString(); + return { process_uuid: taskManagerId, timestamp, ...summarizedStats }; + } + + const monitoredUtilization$: Subject = new Subject(); + /* keep track of last utilization summary, as we'll return that to the next call to _background_task_utilization */ + let lastMonitoredStats: MonitoringStats | null = null; + + monitoringStats$ + .pipe( + throttleTime(requiredHotStatsFreshness), + tap((stats) => { + lastMonitoredStats = stats; + }), + // Only calculate the summarized stats (calculates all running averages and evaluates state) + // when needed by throttling down to the requiredHotStatsFreshness + map((stats) => getBackgroundTaskUtilization(stats)) + ) + .subscribe((utilizationStats) => { + monitoredUtilization$.next(utilizationStats); + if (utilizationStats.stats == null) { + logger.debug('Unable to get Task Manager background task utilization metrics.'); + } + }); + + router.get( + { + path: '/internal/task_manager/_background_task_utilization', + // Uncomment when we determine that we can restrict API usage to Global admins based on telemetry + // options: { tags: ['access:taskManager'] }, + validate: false, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise { + // If we are able to count usage, we want to check whether the user has access to + // the `taskManager` feature, which is only available as part of the Global All privilege. + if (usageCounter) { + const clusterClient = await getClusterClient(); + const hasPrivilegesResponse = await clusterClient + .asScoped(req) + .asCurrentUser.security.hasPrivileges({ + body: { + application: [ + { + application: `kibana-${kibanaIndexName}`, + resources: ['*'], + privileges: [`api:${kibanaVersion}:taskManager`], + }, + ], + }, + }); + + // Keep track of total access vs admin access + usageCounter.incrementCounter({ + counterName: `taskManagerBackgroundTaskUtilApiAccess`, + counterType: 'taskManagerBackgroundTaskUtilApi', + incrementBy: 1, + }); + if (hasPrivilegesResponse.has_all_requested) { + usageCounter.incrementCounter({ + counterName: `taskManagerBackgroundTaskUtilApiAdminAccess`, + counterType: 'taskManagerBackgroundTaskUtilApi', + incrementBy: 1, + }); + } + } + + return res.ok({ + body: lastMonitoredStats + ? getBackgroundTaskUtilization(lastMonitoredStats) + : { process_uuid: taskManagerId, timestamp: new Date().toISOString(), stats: {} }, + }); + } + ); + + return monitoredUtilization$; +} diff --git a/x-pack/plugins/task_manager/server/routes/index.ts b/x-pack/plugins/task_manager/server/routes/index.ts index a3b39bc8eb7522..f3ba539323f8e9 100644 --- a/x-pack/plugins/task_manager/server/routes/index.ts +++ b/x-pack/plugins/task_manager/server/routes/index.ts @@ -6,3 +6,4 @@ */ export { healthRoute } from './health'; +export { backgroundTaskUtilizationRoute } from './background_task_utilization'; diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 7bc731a0d8b6b4..dfc21a7142ecef 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -21,9 +21,11 @@ import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks'; import { SavedObjectAttributes, SavedObjectsErrorHelpers } from '@kbn/core/server'; import { TaskTypeDictionary } from './task_type_dictionary'; import { mockLogger } from './test_utils'; +import { AdHocTaskCounter } from './lib/adhoc_task_counter'; const savedObjectsClient = savedObjectsRepositoryMock.create(); const serializer = savedObjectsServiceMock.createSerializer(); +const adHocTaskCounter = new AdHocTaskCounter(); const randomId = () => `id-${_.random(1, 20)}`; @@ -69,9 +71,14 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); + afterEach(() => { + adHocTaskCounter.reset(); + }); + async function testSchedule(task: unknown) { savedObjectsClient.create.mockImplementation(async (type: string, attributes: unknown) => ({ id: 'testid', @@ -189,6 +196,31 @@ describe('TaskStore', () => { await expect(store.schedule(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); }); + + test('increments adHocTaskCounter', async () => { + const task: TaskInstance = { + id: 'id', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + }; + + await testSchedule(task); + expect(adHocTaskCounter.count).toEqual(1); + }); + + test('does not increment adHocTaskCounter if the task is recurring', async () => { + const task: TaskInstance = { + id: 'id', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + schedule: { interval: '1m' }, + }; + + await testSchedule(task); + expect(adHocTaskCounter.count).toEqual(0); + }); }); describe('fetch', () => { @@ -204,6 +236,7 @@ describe('TaskStore', () => { esClient, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); @@ -272,6 +305,7 @@ describe('TaskStore', () => { esClient, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); @@ -369,6 +403,7 @@ describe('TaskStore', () => { esClient, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); @@ -471,6 +506,7 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); @@ -511,6 +547,7 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); @@ -544,6 +581,7 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); @@ -577,6 +615,7 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); @@ -661,6 +700,7 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); expect(await store.getLifecycle(task.id)).toEqual(status); @@ -680,6 +720,7 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); expect(await store.getLifecycle(randomId())).toEqual(TaskLifecycleResult.NotFound); @@ -697,6 +738,7 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); return expect(store.getLifecycle(randomId())).rejects.toThrow('Bad Request'); @@ -714,9 +756,14 @@ describe('TaskStore', () => { esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, }); }); + afterEach(() => { + adHocTaskCounter.reset(); + }); + async function testBulkSchedule(task: unknown) { savedObjectsClient.bulkCreate.mockImplementation(async () => ({ saved_objects: [ @@ -837,5 +884,30 @@ describe('TaskStore', () => { ); expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); }); + + test('increments adHocTaskCounter', async () => { + const task: TaskInstance = { + id: 'id', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + }; + + const result = await testBulkSchedule([task]); + expect(adHocTaskCounter.count).toEqual(result.length); + }); + + test('does not increment adHocTaskCounter if the task is recurring', async () => { + const task: TaskInstance = { + id: 'id', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + schedule: { interval: '1m' }, + }; + + await testBulkSchedule([task]); + expect(adHocTaskCounter.count).toEqual(0); + }); }); }); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index c2c003ff6f7bb6..e810ea5c1ef3e0 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -9,7 +9,7 @@ * This module contains helpers for managing the task manager storage layer. */ import { Subject } from 'rxjs'; -import { omit, defaults } from 'lodash'; +import { omit, defaults, get } from 'lodash'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { SavedObjectsBulkDeleteResponse } from '@kbn/core/server'; @@ -34,6 +34,7 @@ import { } from './task'; import { TaskTypeDictionary } from './task_type_dictionary'; +import { AdHocTaskCounter } from './lib/adhoc_task_counter'; export interface StoreOpts { esClient: ElasticsearchClient; @@ -42,6 +43,7 @@ export interface StoreOpts { definitions: TaskTypeDictionary; savedObjectsRepository: ISavedObjectsRepository; serializer: ISavedObjectsSerializer; + adHocTaskCounter: AdHocTaskCounter; } export interface SearchOpts { @@ -95,6 +97,7 @@ export class TaskStore { private definitions: TaskTypeDictionary; private savedObjectsRepository: ISavedObjectsRepository; private serializer: ISavedObjectsSerializer; + private adHocTaskCounter: AdHocTaskCounter; /** * Constructs a new TaskStore. @@ -112,6 +115,7 @@ export class TaskStore { this.definitions = opts.definitions; this.serializer = opts.serializer; this.savedObjectsRepository = opts.savedObjectsRepository; + this.adHocTaskCounter = opts.adHocTaskCounter; } /** @@ -140,6 +144,9 @@ export class TaskStore { taskInstanceToAttributes(taskInstance), { id: taskInstance.id, refresh: false } ); + if (get(taskInstance, 'schedule.interval', null) == null) { + this.adHocTaskCounter.increment(); + } } catch (e) { this.errors$.next(e); throw e; @@ -169,6 +176,11 @@ export class TaskStore { objects, { refresh: false } ); + this.adHocTaskCounter.increment( + taskInstances.filter((task) => { + return get(task, 'schedule.interval', null) == null; + }).length + ); } catch (e) { this.errors$.next(e); throw e; diff --git a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts index 9fdd0c4988575e..7931de8aaa48e8 100644 --- a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts +++ b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts @@ -18,6 +18,7 @@ import { TaskPersistence } from '../task_events'; import { registerTaskManagerUsageCollector } from './task_manager_usage_collector'; import { sleep } from '../test_utils'; import { TaskManagerUsage } from './types'; +import { MonitoredUtilization } from '../routes/background_task_utilization'; describe('registerTaskManagerUsageCollector', () => { let collector: Collector; @@ -25,6 +26,7 @@ describe('registerTaskManagerUsageCollector', () => { it('should report telemetry on the ephemeral queue', async () => { const monitoringStats$ = new Subject(); + const monitoringUtilization$ = new Subject(); const usageCollectionMock = createUsageCollectionSetupMock(); const fetchContext = createCollectorFetchContextMock(); usageCollectionMock.makeUsageCollector.mockImplementation((config) => { @@ -32,10 +34,19 @@ describe('registerTaskManagerUsageCollector', () => { return createUsageCollectionSetupMock().makeUsageCollector(config); }); - registerTaskManagerUsageCollector(usageCollectionMock, monitoringStats$, true, 10, []); + registerTaskManagerUsageCollector( + usageCollectionMock, + monitoringStats$, + monitoringUtilization$, + true, + 10, + [] + ); const mockHealth = getMockMonitoredHealth(); monitoringStats$.next(mockHealth); + const mockUtilization = getMockMonitoredUtilization(); + monitoringUtilization$.next(mockUtilization); await sleep(1001); expect(usageCollectionMock.makeUsageCollector).toBeCalled(); @@ -52,6 +63,7 @@ describe('registerTaskManagerUsageCollector', () => { it('should report telemetry on the excluded task types', async () => { const monitoringStats$ = new Subject(); + const monitoringUtilization$ = new Subject(); const usageCollectionMock = createUsageCollectionSetupMock(); const fetchContext = createCollectorFetchContextMock(); usageCollectionMock.makeUsageCollector.mockImplementation((config) => { @@ -59,18 +71,92 @@ describe('registerTaskManagerUsageCollector', () => { return createUsageCollectionSetupMock().makeUsageCollector(config); }); - registerTaskManagerUsageCollector(usageCollectionMock, monitoringStats$, true, 10, [ - 'actions:*', - ]); + registerTaskManagerUsageCollector( + usageCollectionMock, + monitoringStats$, + monitoringUtilization$, + true, + 10, + ['actions:*'] + ); const mockHealth = getMockMonitoredHealth(); monitoringStats$.next(mockHealth); + const mockUtilization = getMockMonitoredUtilization(); + monitoringUtilization$.next(mockUtilization); await sleep(1001); expect(usageCollectionMock.makeUsageCollector).toBeCalled(); const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage; expect(telemetry.task_type_exclusion).toEqual(['actions:*']); }); + + it('should report telemetry on background task utilization', async () => { + const monitoringStats$ = new Subject(); + const monitoringUtilization$ = new Subject(); + const usageCollectionMock = createUsageCollectionSetupMock(); + const fetchContext = createCollectorFetchContextMock(); + usageCollectionMock.makeUsageCollector.mockImplementation((config) => { + collector = new Collector(logger, config); + return createUsageCollectionSetupMock().makeUsageCollector(config); + }); + + registerTaskManagerUsageCollector( + usageCollectionMock, + monitoringStats$, + monitoringUtilization$, + true, + 10, + ['actions:*'] + ); + + const mockHealth = getMockMonitoredHealth(); + monitoringStats$.next(mockHealth); + const mockUtilization = getMockMonitoredUtilization(); + monitoringUtilization$.next(mockUtilization); + await sleep(1001); + + expect(usageCollectionMock.makeUsageCollector).toBeCalled(); + const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage; + expect(telemetry.recurring_tasks).toEqual({ + actual_service_time: mockUtilization.stats?.value.recurring.ran.service_time.actual, + adjusted_service_time: mockUtilization.stats?.value.recurring.ran.service_time.adjusted, + }); + expect(telemetry.adhoc_tasks).toEqual({ + actual_service_time: mockUtilization.stats?.value.adhoc.ran.service_time.actual, + adjusted_service_time: mockUtilization.stats?.value.adhoc.ran.service_time.adjusted, + }); + }); + + it('should report telemetry on capacity', async () => { + const monitoringStats$ = new Subject(); + const monitoringUtilization$ = new Subject(); + const usageCollectionMock = createUsageCollectionSetupMock(); + const fetchContext = createCollectorFetchContextMock(); + usageCollectionMock.makeUsageCollector.mockImplementation((config) => { + collector = new Collector(logger, config); + return createUsageCollectionSetupMock().makeUsageCollector(config); + }); + + registerTaskManagerUsageCollector( + usageCollectionMock, + monitoringStats$, + monitoringUtilization$, + true, + 10, + ['actions:*'] + ); + + const mockHealth = getMockMonitoredHealth(); + monitoringStats$.next(mockHealth); + const mockUtilization = getMockMonitoredUtilization(); + monitoringUtilization$.next(mockUtilization); + await sleep(1001); + + expect(usageCollectionMock.makeUsageCollector).toBeCalled(); + const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage; + expect(telemetry.capacity).toEqual(10); + }); }); function getMockMonitoredHealth(overrides = {}): MonitoredHealth { @@ -187,7 +273,65 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth { }, }, }, + capacity_estimation: { + timestamp: new Date().toISOString(), + status: HealthStatus.OK, + value: { + observed: { + observed_kibana_instances: 10, + max_throughput_per_minute: 10, + max_throughput_per_minute_per_kibana: 10, + minutes_to_drain_overdue: 10, + avg_required_throughput_per_minute: 10, + avg_required_throughput_per_minute_per_kibana: 10, + avg_recurring_required_throughput_per_minute: 10, + avg_recurring_required_throughput_per_minute_per_kibana: 10, + }, + proposed: { + provisioned_kibana: 10, + min_required_kibana: 10, + avg_recurring_required_throughput_per_minute_per_kibana: 10, + avg_required_throughput_per_minute_per_kibana: 10, + }, + }, + }, }, }; return merge(stub, overrides) as unknown as MonitoredHealth; } + +function getMockMonitoredUtilization(overrides = {}): MonitoredUtilization { + const stub: MonitoredUtilization = { + process_uuid: '1', + timestamp: new Date().toISOString(), + last_update: new Date().toISOString(), + stats: { + timestamp: new Date().toISOString(), + value: { + adhoc: { + created: { + counter: 5, + }, + ran: { + service_time: { + actual: 3000, + adjusted: 2500, + task_counter: 10, + }, + }, + }, + recurring: { + tasks_per_min: 2500, + ran: { + service_time: { + actual: 1000, + adjusted: 2000, + task_counter: 10, + }, + }, + }, + }, + }, + }; + return merge(stub, overrides) as unknown as MonitoredUtilization; +} diff --git a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts index 82e9eb95dd7602..a4bd3049a0b499 100644 --- a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts +++ b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts @@ -4,22 +4,28 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ -import { Observable } from 'rxjs'; +import { combineLatest, Observable } from 'rxjs'; import { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server'; import { MonitoredHealth } from '../routes/health'; import { TaskManagerUsage } from './types'; +import { MonitoredUtilization } from '../routes/background_task_utilization'; export function createTaskManagerUsageCollector( usageCollection: UsageCollectionSetup, monitoringStats$: Observable, + monitoredUtilization$: Observable, ephemeralTasksEnabled: boolean, ephemeralRequestCapacity: number, excludeTaskTypes: string[] ) { let lastMonitoredHealth: MonitoredHealth | null = null; - monitoringStats$.subscribe((health) => { - lastMonitoredHealth = health; - }); + let lastMonitoredUtilization: MonitoredUtilization | null = null; + combineLatest([monitoringStats$, monitoredUtilization$]) + .pipe() + .subscribe(([health, utilization]) => { + lastMonitoredHealth = health; + lastMonitoredUtilization = utilization; + }); return usageCollection.makeUsageCollector({ type: 'task_manager', @@ -61,6 +67,21 @@ export function createTaskManagerUsageCollector( }, 0 ), + recurring_tasks: { + actual_service_time: + lastMonitoredUtilization?.stats?.value.recurring.ran.service_time.actual ?? 0, + adjusted_service_time: + lastMonitoredUtilization?.stats?.value.recurring.ran.service_time.adjusted ?? 0, + }, + adhoc_tasks: { + actual_service_time: + lastMonitoredUtilization?.stats?.value.adhoc.ran.service_time.actual ?? 0, + adjusted_service_time: + lastMonitoredUtilization?.stats?.value.adhoc.ran.service_time.adjusted ?? 0, + }, + capacity: + lastMonitoredHealth?.stats.capacity_estimation?.value.observed + .max_throughput_per_minute_per_kibana ?? 0, }; }, schema: { @@ -89,6 +110,15 @@ export function createTaskManagerUsageCollector( }, task_type_exclusion: { type: 'array', items: { type: 'keyword' } }, failed_tasks: { type: 'long' }, + recurring_tasks: { + actual_service_time: { type: 'long' }, + adjusted_service_time: { type: 'long' }, + }, + adhoc_tasks: { + actual_service_time: { type: 'long' }, + adjusted_service_time: { type: 'long' }, + }, + capacity: { type: 'long' }, }, }); } @@ -96,6 +126,7 @@ export function createTaskManagerUsageCollector( export function registerTaskManagerUsageCollector( usageCollection: UsageCollectionSetup, monitoringStats$: Observable, + monitoredUtilization$: Observable, ephemeralTasksEnabled: boolean, ephemeralRequestCapacity: number, excludeTaskTypes: string[] @@ -103,6 +134,7 @@ export function registerTaskManagerUsageCollector( const collector = createTaskManagerUsageCollector( usageCollection, monitoringStats$, + monitoredUtilization$, ephemeralTasksEnabled, ephemeralRequestCapacity, excludeTaskTypes diff --git a/x-pack/plugins/task_manager/server/usage/types.ts b/x-pack/plugins/task_manager/server/usage/types.ts index f9ac823a581245..0e98d1d0685a00 100644 --- a/x-pack/plugins/task_manager/server/usage/types.ts +++ b/x-pack/plugins/task_manager/server/usage/types.ts @@ -31,4 +31,13 @@ export interface TaskManagerUsage { }; }; failed_tasks: number; + recurring_tasks: { + actual_service_time: number; + adjusted_service_time: number; + }; + adhoc_tasks: { + actual_service_time: number; + adjusted_service_time: number; + }; + capacity: number; } diff --git a/x-pack/plugins/telemetry_collection_xpack/schema/xpack_plugins.json b/x-pack/plugins/telemetry_collection_xpack/schema/xpack_plugins.json index af39095245086f..bae65335dfac30 100644 --- a/x-pack/plugins/telemetry_collection_xpack/schema/xpack_plugins.json +++ b/x-pack/plugins/telemetry_collection_xpack/schema/xpack_plugins.json @@ -13336,6 +13336,29 @@ }, "failed_tasks": { "type": "long" + }, + "recurring_tasks": { + "properties": { + "actual_service_time": { + "type": "long" + }, + "adjusted_service_time": { + "type": "long" + } + } + }, + "adhoc_tasks": { + "properties": { + "actual_service_time": { + "type": "long" + }, + "adjusted_service_time": { + "type": "long" + } + } + }, + "capacity": { + "type": "long" } } }, diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/background_task_utilization_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/background_task_utilization_route.ts new file mode 100644 index 00000000000000..18314fe94fe44b --- /dev/null +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/background_task_utilization_route.ts @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import url from 'url'; +import supertest from 'supertest'; +import { FtrProviderContext } from '../../ftr_provider_context'; + +interface MonitoringStats { + last_update: string; + status: string; + stats: { + timestamp: string; + value: { + adhoc: { + created: { + counter: number; + }; + ran: { + service_time: { + actual: number; + adjusted: number; + task_counter: number; + }; + }; + }; + recurring: { + tasks_per_min: number; + ran: { + service_time: { + actual: number; + adjusted: number; + task_counter: number; + }; + }; + }; + }; + }; +} + +export default function ({ getService }: FtrProviderContext) { + const config = getService('config'); + const retry = getService('retry'); + const request = supertest(url.format(config.get('servers.kibana'))); + + const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + + function getUtilizationRequest() { + return request + .get('/internal/task_manager/_background_task_utilization') + .set('kbn-xsrf', 'foo'); + } + + function getUtilization(): Promise { + return getUtilizationRequest() + .expect(200) + .then((response) => response.body); + } + + function getBackgroundTaskUtilization(): Promise { + return retry.try(async () => { + const utilization = await getUtilization(); + + if (utilization.stats) { + return utilization; + } + + await delay(500); + throw new Error('Stats have not run yet'); + }); + } + + describe('background task utilization', () => { + it('should return the task manager background task utilization for recurring stats', async () => { + const { + value: { + // eslint-disable-next-line @typescript-eslint/naming-convention + recurring: { tasks_per_min, ran }, + }, + } = (await getBackgroundTaskUtilization()).stats; + const serviceTime = ran.service_time; + expect(typeof tasks_per_min).to.eql('number'); + + expect(typeof serviceTime.actual).to.eql('number'); + expect(typeof serviceTime.adjusted).to.eql('number'); + expect(typeof serviceTime.task_counter).to.eql('number'); + }); + + it('should return the task manager background task utilization for adhoc stats', async () => { + const { + value: { + adhoc: { created, ran }, + }, + } = (await getBackgroundTaskUtilization()).stats; + const serviceTime = ran.service_time; + expect(typeof created.counter).to.eql('number'); + + expect(typeof serviceTime.actual).to.eql('number'); + expect(typeof serviceTime.adjusted).to.eql('number'); + expect(typeof serviceTime.task_counter).to.eql('number'); + }); + }); +} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts index 6fd4f3e529dc61..e0a31a0f72fe8e 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts @@ -9,6 +9,7 @@ import { FtrProviderContext } from '../../ftr_provider_context'; export default function ({ loadTestFile }: FtrProviderContext) { describe('task_manager', function taskManagerSuite() { + loadTestFile(require.resolve('./background_task_utilization_route')); loadTestFile(require.resolve('./health_route')); loadTestFile(require.resolve('./task_management')); loadTestFile(require.resolve('./task_management_scheduled_at'));