Skip to content

Commit

Permalink
Hook up discovery service to Task Manager health (#194113)
Browse files Browse the repository at this point in the history
Resolves #192568

In this PR, I'm solving the issue where the task manager health API is
unable to determine how many Kibana nodes are running. I'm doing so by
leveraging the Kibana discovery service to get a count instead of
calculating it based on an aggregation on the `.kibana_task_manager`
index where we count the unique number of `ownerId`, which requires
tasks to be running and a sufficient distribution across the Kibana
nodes to determine the number properly.

Note: This will only work when mget is the task claim strategy

## To verify
1. Set `xpack.task_manager.claim_strategy: mget` in kibana.yml
2. Startup the PR locally with Elasticsearch and Kibana running
3. Navigate to the `/api/task_manager/_health` route and confirm
`observed_kibana_instances` is `1`
4. Apply the following code and restart Kibana
```
diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts
index 090847032bf..69dfb6d1b36 100644
--- a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts
+++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts
@@ -59,6 +59,7 @@ export class KibanaDiscoveryService {
     const lastSeen = lastSeenDate.toISOString();
     try {
       await this.upsertCurrentNode({ id: this.currentNode, lastSeen });
+      await this.upsertCurrentNode({ id: `${this.currentNode}-2`, lastSeen });
       if (!this.started) {
         this.logger.info('Kibana Discovery Service has been started');
         this.started = true;
```
5. Navigate to the `/api/task_manager/_health` route and confirm
`observed_kibana_instances` is `2`

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
mikecote and elasticmachine authored Oct 2, 2024
1 parent 4e68c21 commit d0d2032
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository.find.mockResolvedValueOnce(createFindResponse(mockActiveNodes));

it('returns the active kibana nodes', async () => {
const onNodesCounted = jest.fn();
const kibanaDiscoveryService = new KibanaDiscoveryService({
savedObjectsRepository,
logger,
Expand All @@ -254,6 +255,7 @@ describe('KibanaDiscoveryService', () => {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
onNodesCounted,
});

const activeNodes = await kibanaDiscoveryService.getActiveKibanaNodes();
Expand All @@ -265,6 +267,7 @@ describe('KibanaDiscoveryService', () => {
type: BACKGROUND_TASK_NODE_SO_NAME,
});
expect(activeNodes).toEqual(mockActiveNodes);
expect(onNodesCounted).toHaveBeenCalledWith(mockActiveNodes.length);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ interface DiscoveryServiceParams {
currentNode: string;
savedObjectsRepository: ISavedObjectsRepository;
logger: Logger;
onNodesCounted?: (numOfNodes: number) => void;
}

interface DiscoveryServiceUpsertParams {
Expand All @@ -34,13 +35,15 @@ export class KibanaDiscoveryService {
private logger: Logger;
private stopped = false;
private timer: NodeJS.Timeout | undefined;
private onNodesCounted?: (numOfNodes: number) => void;

constructor({ config, currentNode, savedObjectsRepository, logger }: DiscoveryServiceParams) {
this.activeNodesLookBack = config.active_nodes_lookback;
this.discoveryInterval = config.interval;
this.savedObjectsRepository = savedObjectsRepository;
this.logger = logger;
this.currentNode = currentNode;
constructor(opts: DiscoveryServiceParams) {
this.activeNodesLookBack = opts.config.active_nodes_lookback;
this.discoveryInterval = opts.config.interval;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.logger = opts.logger;
this.currentNode = opts.currentNode;
this.onNodesCounted = opts.onNodesCounted;
}

private async upsertCurrentNode({ id, lastSeen }: DiscoveryServiceUpsertParams) {
Expand Down Expand Up @@ -106,6 +109,10 @@ export class KibanaDiscoveryService {
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${this.activeNodesLookBack}`,
});

if (this.onNodesCounted) {
this.onNodesCounted(activeNodes.length);
}

return activeNodes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -119,7 +120,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -158,7 +160,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -214,7 +217,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -271,7 +275,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -327,7 +332,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
3
).value.observed
).toMatchObject({
observed_kibana_instances: 3,
Expand Down Expand Up @@ -396,7 +402,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
2
).value.observed
).toMatchObject({
observed_kibana_instances: provisionedKibanaInstances,
Expand Down Expand Up @@ -477,7 +484,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
2
).value
).toMatchObject({
observed: {
Expand Down Expand Up @@ -561,7 +569,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -626,7 +635,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -691,7 +701,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -755,7 +766,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -831,7 +843,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -905,7 +918,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ function isCapacityEstimationParams(

export function estimateCapacity(
logger: Logger,
capacityStats: CapacityEstimationParams
capacityStats: CapacityEstimationParams,
assumedKibanaInstances: number
): RawMonitoredStat<CapacityEstimationStat> {
const workload = capacityStats.workload.value;
// if there are no active owners right now, assume there's at least 1
const assumedKibanaInstances = Math.max(workload.owner_ids, 1);

const {
load: { p90: averageLoadPercentage },
Expand Down Expand Up @@ -262,12 +261,13 @@ function getHealthStatus(

export function withCapacityEstimate(
logger: Logger,
monitoredStats: RawMonitoringStats['stats']
monitoredStats: RawMonitoringStats['stats'],
assumedKibanaInstances: number
): RawMonitoringStats['stats'] {
if (isCapacityEstimationParams(monitoredStats)) {
return {
...monitoredStats,
capacity_estimation: estimateCapacity(logger, monitoredStats),
capacity_estimation: estimateCapacity(logger, monitoredStats, assumedKibanaInstances),
};
}
return monitoredStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,42 +158,47 @@ export function summarizeMonitoringStats(
last_update,
stats: { runtime, workload, configuration, ephemeral, utilization },
}: MonitoringStats,
config: TaskManagerConfig
config: TaskManagerConfig,
assumedKibanaInstances: number
): RawMonitoringStats {
const summarizedStats = withCapacityEstimate(logger, {
...(configuration
? {
configuration: {
...configuration,
status: HealthStatus.OK,
},
}
: {}),
...(runtime
? {
runtime: {
timestamp: runtime.timestamp,
...summarizeTaskRunStat(logger, runtime.value, config),
},
}
: {}),
...(workload
? {
workload: {
timestamp: workload.timestamp,
...summarizeWorkloadStat(workload.value),
},
}
: {}),
...(ephemeral
? {
ephemeral: {
timestamp: ephemeral.timestamp,
...summarizeEphemeralStat(ephemeral.value),
},
}
: {}),
});
const summarizedStats = withCapacityEstimate(
logger,
{
...(configuration
? {
configuration: {
...configuration,
status: HealthStatus.OK,
},
}
: {}),
...(runtime
? {
runtime: {
timestamp: runtime.timestamp,
...summarizeTaskRunStat(logger, runtime.value, config),
},
}
: {}),
...(workload
? {
workload: {
timestamp: workload.timestamp,
...summarizeWorkloadStat(workload.value),
},
}
: {}),
...(ephemeral
? {
ephemeral: {
timestamp: ephemeral.timestamp,
...summarizeEphemeralStat(ephemeral.value),
},
}
: {}),
},
assumedKibanaInstances
);

return {
last_update,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ describe('Workload Statistics Aggregator', () => {
missing: { field: 'task.schedule.interval' },
aggs: { taskType: { terms: { size: 3, field: 'task.taskType' } } },
},
ownerIds: {
filter: { range: { 'task.startedAt': { gte: 'now-1w/w' } } },
aggs: { ownerIds: { cardinality: { field: 'task.ownerId' } } },
},
idleTasks: {
filter: { term: { 'task.status': 'idle' } },
aggs: {
Expand Down
Loading

0 comments on commit d0d2032

Please sign in to comment.