From d0d2032f18a37e4c458a26d92092665453b737b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Wed, 2 Oct 2024 07:19:06 -0400 Subject: [PATCH] Hook up discovery service to Task Manager health (#194113) Resolves https://github.com/elastic/kibana/issues/192568 In this PR, I'm solving the issue where the task manager health API is unable to determine how many Kibana nodes are running. I'm doing so by leveraging the Kibana discovery service to get a count instead of calculating it based on an aggregation on the `.kibana_task_manager` index where we count the unique number of `ownerId`, which requires tasks to be running and a sufficient distribution across the Kibana nodes to determine the number properly. Note: This will only work when mget is the task claim strategy ## To verify 1. Set `xpack.task_manager.claim_strategy: mget` in kibana.yml 2. Startup the PR locally with Elasticsearch and Kibana running 3. Navigate to the `/api/task_manager/_health` route and confirm `observed_kibana_instances` is `1` 4. Apply the following code and restart Kibana ``` diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts index 090847032bf..69dfb6d1b36 100644 --- a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts +++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts @@ -59,6 +59,7 @@ export class KibanaDiscoveryService { const lastSeen = lastSeenDate.toISOString(); try { await this.upsertCurrentNode({ id: this.currentNode, lastSeen }); + await this.upsertCurrentNode({ id: `${this.currentNode}-2`, lastSeen }); if (!this.started) { this.logger.info('Kibana Discovery Service has been started'); this.started = true; ``` 5. Navigate to the `/api/task_manager/_health` route and confirm `observed_kibana_instances` is `2` --------- Co-authored-by: Elastic Machine --- .../kibana_discovery_service.test.ts | 3 + .../kibana_discovery_service.ts | 19 +++-- .../monitoring/capacity_estimation.test.ts | 42 +++++++---- .../server/monitoring/capacity_estimation.ts | 10 +-- .../monitoring/monitoring_stats_stream.ts | 75 ++++++++++--------- .../monitoring/workload_statistics.test.ts | 4 - .../server/monitoring/workload_statistics.ts | 24 +----- x-pack/plugins/task_manager/server/plugin.ts | 5 +- .../task_manager/server/routes/health.test.ts | 62 ++++++++++++++- .../task_manager/server/routes/health.ts | 15 +++- .../test_suites/task_manager/health_route.ts | 1 - .../test_suites/task_manager/health_route.ts | 1 - 12 files changed, 171 insertions(+), 90 deletions(-) diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts index 4b9af1f77270bc..3a4870b6a47637 100644 --- a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts +++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts @@ -246,6 +246,7 @@ describe('KibanaDiscoveryService', () => { savedObjectsRepository.find.mockResolvedValueOnce(createFindResponse(mockActiveNodes)); it('returns the active kibana nodes', async () => { + const onNodesCounted = jest.fn(); const kibanaDiscoveryService = new KibanaDiscoveryService({ savedObjectsRepository, logger, @@ -254,6 +255,7 @@ describe('KibanaDiscoveryService', () => { active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, interval: DEFAULT_DISCOVERY_INTERVAL_MS, }, + onNodesCounted, }); const activeNodes = await kibanaDiscoveryService.getActiveKibanaNodes(); @@ -265,6 +267,7 @@ describe('KibanaDiscoveryService', () => { type: BACKGROUND_TASK_NODE_SO_NAME, }); expect(activeNodes).toEqual(mockActiveNodes); + expect(onNodesCounted).toHaveBeenCalledWith(mockActiveNodes.length); }); }); diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts index 9318a09e3fbab3..1c4fcb00981a06 100644 --- a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts +++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts @@ -16,6 +16,7 @@ interface DiscoveryServiceParams { currentNode: string; savedObjectsRepository: ISavedObjectsRepository; logger: Logger; + onNodesCounted?: (numOfNodes: number) => void; } interface DiscoveryServiceUpsertParams { @@ -34,13 +35,15 @@ export class KibanaDiscoveryService { private logger: Logger; private stopped = false; private timer: NodeJS.Timeout | undefined; + private onNodesCounted?: (numOfNodes: number) => void; - constructor({ config, currentNode, savedObjectsRepository, logger }: DiscoveryServiceParams) { - this.activeNodesLookBack = config.active_nodes_lookback; - this.discoveryInterval = config.interval; - this.savedObjectsRepository = savedObjectsRepository; - this.logger = logger; - this.currentNode = currentNode; + constructor(opts: DiscoveryServiceParams) { + this.activeNodesLookBack = opts.config.active_nodes_lookback; + this.discoveryInterval = opts.config.interval; + this.savedObjectsRepository = opts.savedObjectsRepository; + this.logger = opts.logger; + this.currentNode = opts.currentNode; + this.onNodesCounted = opts.onNodesCounted; } private async upsertCurrentNode({ id, lastSeen }: DiscoveryServiceUpsertParams) { @@ -106,6 +109,10 @@ export class KibanaDiscoveryService { filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${this.activeNodesLookBack}`, }); + if (this.onNodesCounted) { + this.onNodesCounted(activeNodes.length); + } + return activeNodes; } diff --git a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts index 94b11171f9e040..23ef344c197fc9 100644 --- a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts @@ -63,7 +63,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ).value.observed ).toMatchObject({ observed_kibana_instances: 1, @@ -119,7 +120,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ).value.observed ).toMatchObject({ observed_kibana_instances: 1, @@ -158,7 +160,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ).value.observed ).toMatchObject({ observed_kibana_instances: 1, @@ -214,7 +217,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ).value.observed ).toMatchObject({ observed_kibana_instances: 1, @@ -271,7 +275,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ).value.observed ).toMatchObject({ observed_kibana_instances: 1, @@ -327,7 +332,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 3 ).value.observed ).toMatchObject({ observed_kibana_instances: 3, @@ -396,7 +402,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 2 ).value.observed ).toMatchObject({ observed_kibana_instances: provisionedKibanaInstances, @@ -477,7 +484,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 2 ).value ).toMatchObject({ observed: { @@ -561,7 +569,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ) ).toMatchObject({ status: 'OK', @@ -626,7 +635,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ) ).toMatchObject({ status: 'OK', @@ -691,7 +701,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ) ).toMatchObject({ status: 'OK', @@ -755,7 +766,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ) ).toMatchObject({ status: 'OK', @@ -831,7 +843,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ) ).toMatchObject({ status: 'OK', @@ -905,7 +918,8 @@ describe('estimateCapacity', () => { result_frequency_percent_as_number: {}, }, } - ) + ), + 1 ).value.observed ).toMatchObject({ observed_kibana_instances: 1, diff --git a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts index d1c2f3591ea22a..acbf1284b21b72 100644 --- a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts +++ b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts @@ -46,11 +46,10 @@ function isCapacityEstimationParams( export function estimateCapacity( logger: Logger, - capacityStats: CapacityEstimationParams + capacityStats: CapacityEstimationParams, + assumedKibanaInstances: number ): RawMonitoredStat { const workload = capacityStats.workload.value; - // if there are no active owners right now, assume there's at least 1 - const assumedKibanaInstances = Math.max(workload.owner_ids, 1); const { load: { p90: averageLoadPercentage }, @@ -262,12 +261,13 @@ function getHealthStatus( export function withCapacityEstimate( logger: Logger, - monitoredStats: RawMonitoringStats['stats'] + monitoredStats: RawMonitoringStats['stats'], + assumedKibanaInstances: number ): RawMonitoringStats['stats'] { if (isCapacityEstimationParams(monitoredStats)) { return { ...monitoredStats, - capacity_estimation: estimateCapacity(logger, monitoredStats), + capacity_estimation: estimateCapacity(logger, monitoredStats, assumedKibanaInstances), }; } return monitoredStats; diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index e1bffb55d54fa1..1237af9e68ebc3 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -158,42 +158,47 @@ export function summarizeMonitoringStats( last_update, stats: { runtime, workload, configuration, ephemeral, utilization }, }: MonitoringStats, - config: TaskManagerConfig + config: TaskManagerConfig, + assumedKibanaInstances: number ): RawMonitoringStats { - const summarizedStats = withCapacityEstimate(logger, { - ...(configuration - ? { - configuration: { - ...configuration, - status: HealthStatus.OK, - }, - } - : {}), - ...(runtime - ? { - runtime: { - timestamp: runtime.timestamp, - ...summarizeTaskRunStat(logger, runtime.value, config), - }, - } - : {}), - ...(workload - ? { - workload: { - timestamp: workload.timestamp, - ...summarizeWorkloadStat(workload.value), - }, - } - : {}), - ...(ephemeral - ? { - ephemeral: { - timestamp: ephemeral.timestamp, - ...summarizeEphemeralStat(ephemeral.value), - }, - } - : {}), - }); + const summarizedStats = withCapacityEstimate( + logger, + { + ...(configuration + ? { + configuration: { + ...configuration, + status: HealthStatus.OK, + }, + } + : {}), + ...(runtime + ? { + runtime: { + timestamp: runtime.timestamp, + ...summarizeTaskRunStat(logger, runtime.value, config), + }, + } + : {}), + ...(workload + ? { + workload: { + timestamp: workload.timestamp, + ...summarizeWorkloadStat(workload.value), + }, + } + : {}), + ...(ephemeral + ? { + ephemeral: { + timestamp: ephemeral.timestamp, + ...summarizeEphemeralStat(ephemeral.value), + }, + } + : {}), + }, + assumedKibanaInstances + ); return { last_update, diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts index cd37c6661ec005..0326e07de6f489 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts @@ -169,10 +169,6 @@ describe('Workload Statistics Aggregator', () => { missing: { field: 'task.schedule.interval' }, aggs: { taskType: { terms: { size: 3, field: 'task.taskType' } } }, }, - ownerIds: { - filter: { range: { 'task.startedAt': { gte: 'now-1w/w' } } }, - aggs: { ownerIds: { cardinality: { field: 'task.ownerId' } } }, - }, idleTasks: { filter: { term: { 'task.status': 'idle' } }, aggs: { diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index 92cca0bf9a4f90..37f12911325475 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -16,7 +16,6 @@ import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { parseIntervalAsSecond, asInterval, parseIntervalAsMillisecond } from '../lib/intervals'; import { HealthStatus } from './monitoring_stats_stream'; import { TaskStore } from '../task_store'; -import { createRunningAveragedStat } from './task_run_calculators'; import { TaskTypeDictionary } from '../task_type_dictionary'; import { TaskCost } from '../task'; @@ -45,12 +44,8 @@ interface RawWorkloadStat extends JsonObject { capacity_requirements: CapacityRequirements; } -export interface WorkloadStat extends RawWorkloadStat { - owner_ids: number[]; -} -export interface SummarizedWorkloadStat extends RawWorkloadStat { - owner_ids: number; -} +export type WorkloadStat = RawWorkloadStat; +export type SummarizedWorkloadStat = RawWorkloadStat; export interface CapacityRequirements extends JsonObject { per_minute: number; per_hour: number; @@ -143,7 +138,6 @@ export function createWorkloadAggregator({ const totalNumTaskDefinitions = taskDefinitions.getAllTypes().length; const taskTypeTermAggSize = Math.min(totalNumTaskDefinitions, 10000); - const ownerIdsQueue = createRunningAveragedStat(scheduleDensityBuckets); return combineLatest([timer(0, refreshInterval), elasticsearchAndSOAvailability$]).pipe( filter(([, areElasticsearchAndSOAvailable]) => areElasticsearchAndSOAvailable), @@ -163,10 +157,6 @@ export function createWorkloadAggregator({ taskType: { terms: { size: taskTypeTermAggSize, field: 'task.taskType' } }, }, }, - ownerIds: { - filter: { range: { 'task.startedAt': { gte: 'now-1w/w' } } }, - aggs: { ownerIds: { cardinality: { field: 'task.ownerId' } } }, - }, idleTasks: { filter: { term: { 'task.status': 'idle' } }, aggs: { @@ -225,7 +215,6 @@ export function createWorkloadAggregator({ const taskTypes = aggregations.taskType.buckets; const nonRecurring = aggregations.nonRecurringTasks.doc_count; const nonRecurringTaskTypes = aggregations.nonRecurringTasks.taskType.buckets; - const ownerIds = aggregations.ownerIds.ownerIds.value; const { overdue: { @@ -297,7 +286,6 @@ export function createWorkloadAggregator({ task_types: taskTypeSummary, non_recurring: nonRecurring, non_recurring_cost: totalNonRecurringCost, - owner_ids: ownerIdsQueue(ownerIds), schedule: schedules .sort((scheduleLeft, scheduleRight) => scheduleLeft.asSeconds - scheduleRight.asSeconds) .map((schedule) => [schedule.interval, schedule.count]), @@ -452,12 +440,7 @@ export function summarizeWorkloadStat(workloadStats: WorkloadStat): { status: HealthStatus; } { return { - value: { - ...workloadStats, - // assume the largest number we've seen of active owner IDs - // matches the number of active Task Managers in the cluster - owner_ids: Math.max(...workloadStats.owner_ids), - }, + value: workloadStats, status: HealthStatus.OK, }; } @@ -477,7 +460,6 @@ export interface WorkloadAggregationResponse { schedule: ScheduleAggregation; idleTasks: IdleTasksAggregation; nonRecurringTasks: { doc_count: number; taskType: TaskTypeAggregation }; - ownerIds: { ownerIds: { value: number } }; [otherAggs: string]: estypes.AggregationsAggregate; } diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index d7c3cd7e941fc8..87acf096d007cf 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { combineLatest, Observable, Subject } from 'rxjs'; +import { combineLatest, Observable, Subject, BehaviorSubject } from 'rxjs'; import { map, distinctUntilChanged } from 'rxjs'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { UsageCollectionSetup, UsageCounter } from '@kbn/usage-collection-plugin/server'; @@ -106,6 +106,7 @@ export class TaskManagerPlugin private nodeRoles: PluginInitializerContext['node']['roles']; private kibanaDiscoveryService?: KibanaDiscoveryService; private heapSizeLimit: number = 0; + private numOfKibanaInstances$: Subject = new BehaviorSubject(1); constructor(private readonly initContext: PluginInitializerContext) { this.initContext = initContext; @@ -169,6 +170,7 @@ export class TaskManagerPlugin startServicesPromise.then(({ elasticsearch }) => elasticsearch.client), shouldRunTasks: this.shouldRunBackgroundTasks, docLinks: core.docLinks, + numOfKibanaInstances$: this.numOfKibanaInstances$, }); const monitoredUtilization$ = backgroundTaskUtilizationRoute({ router, @@ -260,6 +262,7 @@ export class TaskManagerPlugin logger: this.logger, currentNode: this.taskManagerId!, config: this.config.discovery, + onNodesCounted: (numOfNodes: number) => this.numOfKibanaInstances$.next(numOfNodes), }); if (this.shouldRunBackgroundTasks) { diff --git a/x-pack/plugins/task_manager/server/routes/health.test.ts b/x-pack/plugins/task_manager/server/routes/health.test.ts index e76d218911dc1f..e3a7eb278d2251 100644 --- a/x-pack/plugins/task_manager/server/routes/health.test.ts +++ b/x-pack/plugins/task_manager/server/routes/health.test.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { firstValueFrom, of, Subject } from 'rxjs'; +import { firstValueFrom, of, Subject, BehaviorSubject } from 'rxjs'; import { merge } from 'lodash'; import { v4 as uuidv4 } from 'uuid'; import { httpServiceMock, docLinksServiceMock } from '@kbn/core/server/mocks'; @@ -87,6 +87,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const [config] = router.get.mock.calls[0]; @@ -111,6 +112,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const [, handler] = router.get.mock.calls[0]; @@ -153,6 +155,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const [, handler] = router.get.mock.calls[0]; @@ -200,6 +203,7 @@ describe('healthRoute', () => { getClusterClient: () => Promise.resolve(mockClusterClient), shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const [, handler] = router.get.mock.calls[0]; @@ -242,6 +246,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); stats$.next(mockStat); @@ -319,6 +324,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const serviceStatus = firstValueFrom(serviceStatus$); @@ -421,6 +427,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const serviceStatus = firstValueFrom(serviceStatus$); @@ -509,6 +516,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const serviceStatus = firstValueFrom(serviceStatus$); @@ -602,6 +610,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const serviceStatus = firstValueFrom(serviceStatus$); @@ -683,6 +692,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: true, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const serviceStatus = firstValueFrom(serviceStatus$); await sleep(0); @@ -782,6 +792,7 @@ describe('healthRoute', () => { usageCounter: mockUsageCounter, shouldRunTasks: false, docLinks, + numOfKibanaInstances$: new BehaviorSubject(1), }); const serviceStatus = firstValueFrom(serviceStatus$); await sleep(0); @@ -809,6 +820,55 @@ describe('healthRoute', () => { }, }); }); + + it('calls summarizeMonitoringStats with the latest number of Kibana nodes', async () => { + const router = httpServiceMock.createRouter(); + const stats$ = new Subject(); + const numOfKibanaInstances$ = new BehaviorSubject(1); + + const id = uuidv4(); + const config = getTaskManagerConfig({ + monitored_stats_required_freshness: 1000, + monitored_stats_health_verbose_log: { + enabled: true, + level: 'debug', + warn_delayed_task_start_in_seconds: 100, + }, + monitored_aggregated_stats_refresh_rate: 60000, + }); + healthRoute({ + router, + monitoringStats$: stats$, + logger, + taskManagerId: id, + config, + kibanaVersion: '8.0', + kibanaIndexName: '.kibana', + getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()), + usageCounter: mockUsageCounter, + shouldRunTasks: true, + docLinks, + numOfKibanaInstances$, + }); + + stats$.next(mockHealthStats()); + expect(summarizeMonitoringStats).toHaveBeenCalledWith( + expect.anything(), + expect.anything(), + expect.anything(), + 1 + ); + + await sleep(1000); + numOfKibanaInstances$.next(2); + stats$.next(mockHealthStats()); + expect(summarizeMonitoringStats).toHaveBeenCalledWith( + expect.anything(), + expect.anything(), + expect.anything(), + 2 + ); + }); }); function ignoreCapacityEstimation(stats: RawMonitoringStats) { diff --git a/x-pack/plugins/task_manager/server/routes/health.ts b/x-pack/plugins/task_manager/server/routes/health.ts index 38fd2e3b675ca2..7bcebfabdca60c 100644 --- a/x-pack/plugins/task_manager/server/routes/health.ts +++ b/x-pack/plugins/task_manager/server/routes/health.ts @@ -62,6 +62,7 @@ export interface HealthRouteParams { getClusterClient: () => Promise; usageCounter?: UsageCounter; docLinks: DocLinksServiceSetup; + numOfKibanaInstances$: Observable; } export function healthRoute(params: HealthRouteParams): { @@ -80,14 +81,26 @@ export function healthRoute(params: HealthRouteParams): { usageCounter, shouldRunTasks, docLinks, + numOfKibanaInstances$, } = params; + let numOfKibanaInstances = 1; + numOfKibanaInstances$.subscribe((updatedNumber) => { + // if there are no active nodes right now, assume there's at least 1 + numOfKibanaInstances = Math.max(updatedNumber, 1); + }); + // if "hot" health stats are any more stale than monitored_stats_required_freshness (pollInterval +1s buffer by default) // consider the system unhealthy const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness; function getHealthStatus(monitoredStats: MonitoringStats) { - const summarizedStats = summarizeMonitoringStats(logger, monitoredStats, config); + const summarizedStats = summarizeMonitoringStats( + logger, + monitoredStats, + config, + numOfKibanaInstances + ); const { status, reason } = calculateHealthStatus( summarizedStats, config, diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts index c4d9c4b720d61a..d1f55918547f3c 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts @@ -237,7 +237,6 @@ export default function ({ getService }: FtrProviderContext) { expect(typeof workload.overdue).to.eql('number'); expect(typeof workload.non_recurring).to.eql('number'); - expect(typeof workload.owner_ids).to.eql('number'); expect(typeof workload.capacity_requirements.per_minute).to.eql('number'); expect(typeof workload.capacity_requirements.per_hour).to.eql('number'); diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts index ea0de3e6db9a8e..241bd8adcd40d6 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts @@ -237,7 +237,6 @@ export default function ({ getService }: FtrProviderContext) { expect(typeof workload.overdue).to.eql('number'); expect(typeof workload.non_recurring).to.eql('number'); - expect(typeof workload.owner_ids).to.eql('number'); expect(typeof workload.capacity_requirements.per_minute).to.eql('number'); expect(typeof workload.capacity_requirements.per_hour).to.eql('number');