Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metrics UI] Increase composite size to 10K for Metric Threshold Rule and optimize processing #121904

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export const doWork = async <T>(fn: () => T): Promise<T> => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(fn());
});
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import moment from 'moment';
import { ElasticsearchClient } from 'kibana/server';
import { mapValues, first, last, isNaN, isNumber, isObject, has } from 'lodash';
import { difference, mapValues, first, last, isNaN, isNumber, isObject, has } from 'lodash';
import {
isTooManyBucketsPreviewException,
TOO_MANY_BUCKETS_PREVIEW_EXCEPTION,
Expand All @@ -22,6 +22,7 @@ import { UNGROUPED_FACTORY_KEY } from '../../common/utils';
import { MetricExpressionParams, Comparator, Aggregators } from '../types';
import { getElasticsearchMetricQuery } from './metric_query';
import { createTimerange } from './create_timerange';
import { doWork } from './do_work';

interface AggregationWithoutIntervals {
aggregatedValue: { value: number; values?: Array<{ key: number; value: number }> };
Expand Down Expand Up @@ -66,6 +67,7 @@ export const evaluateRule = <Params extends EvaluatedRuleParams = EvaluatedRuleP
params: Params,
config: InfraSource['configuration'],
prevGroups: string[],
compositeSize: number,
timeframe?: { start?: number; end: number }
) => {
const { criteria, groupBy, filterQuery, shouldDropPartialBuckets } = params;
Expand All @@ -78,6 +80,7 @@ export const evaluateRule = <Params extends EvaluatedRuleParams = EvaluatedRuleP
config.metricAlias,
groupBy,
filterQuery,
compositeSize,
timeframe,
shouldDropPartialBuckets
);
Expand All @@ -96,27 +99,22 @@ export const evaluateRule = <Params extends EvaluatedRuleParams = EvaluatedRuleP
// If any previous groups are no longer being reported, backfill them with null values
const currentGroups = Object.keys(currentValues);

const missingGroups = prevGroups.filter((g) => !currentGroups.includes(g));
const missingGroups = difference(prevGroups, currentGroups);
Copy link
Contributor

@klacabane klacabane Dec 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If currentGroups is large enough, we could convert it to a hash/set for constant read access to bring this down to a linear execution, but that's probably what lodash does ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gains with using Lodash's difference is probably sufficient plus it's easy enough to read. If the new code was in place when I was doing the performance testing, I probably wouldn't have even bothered with it.


if (currentGroups.length === 0 && missingGroups.length === 0) {
missingGroups.push(UNGROUPED_FACTORY_KEY);
}
const backfillTimestamp =
last(last(Object.values(currentValues)))?.key ?? new Date().toISOString();
const backfilledPrevGroups: Record<
string,
Array<{ key: string; value: number }>
> = missingGroups.reduce(
(result, group) => ({
...result,
[group]: [
{
key: backfillTimestamp,
value: criterion.aggType === Aggregators.COUNT ? 0 : null,
},
],
}),
{}
);
const backfilledPrevGroups: Record<string, Array<{ key: string; value: number | null }>> = {};
for (const group of missingGroups) {
backfilledPrevGroups[group] = [
{
key: backfillTimestamp,
value: criterion.aggType === Aggregators.COUNT ? 0 : null,
},
];
}
const currentValuesWithBackfilledPrevGroups = {
...currentValues,
...backfilledPrevGroups,
Expand Down Expand Up @@ -150,6 +148,7 @@ const getMetric: (
index: string,
groupBy: string | undefined | string[],
filterQuery: string | undefined,
compositeSize: number,
timeframe?: { start?: number; end: number },
shouldDropPartialBuckets?: boolean
) => Promise<Record<string, Array<{ key: string; value: number }>>> = async function (
Expand All @@ -158,6 +157,7 @@ const getMetric: (
index,
groupBy,
filterQuery,
compositeSize,
timeframe,
shouldDropPartialBuckets
) {
Expand All @@ -172,6 +172,7 @@ const getMetric: (
const searchBody = getElasticsearchMetricQuery(
params,
calculatedTimerange,
compositeSize,
hasGroupBy ? groupBy : undefined,
filterQuery
);
Expand Down Expand Up @@ -202,20 +203,23 @@ const getMetric: (
bucketSelector,
afterKeyHandler
)) as Array<Aggregation & { key: Record<string, string>; doc_count: number }>;
const groupedResults = compositeBuckets.reduce(
(result, bucket) => ({
...result,
[Object.values(bucket.key)
.map((value) => value)
.join(', ')]: getValuesFromAggregations(
bucket,
aggType,
dropPartialBucketsOptions,
calculatedTimerange,
bucket.doc_count
),
}),
{}
const groupedResults: Record<string, any> = {};
await Promise.all(
simianhacker marked this conversation as resolved.
Show resolved Hide resolved
compositeBuckets.map((bucket) => {
return doWork(() => {
const key = Object.values(bucket.key)
.map((value) => value)
simianhacker marked this conversation as resolved.
Show resolved Hide resolved
.join(', ');
const value = getValuesFromAggregations(
bucket,
aggType,
dropPartialBucketsOptions,
calculatedTimerange,
bucket.doc_count
);
groupedResults[key] = value;
});
})
);
return groupedResults;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {
};

describe('when passed no filterQuery', () => {
const searchBody = getElasticsearchMetricQuery(expressionParams, timeframe, groupBy);
const searchBody = getElasticsearchMetricQuery(expressionParams, timeframe, 100, groupBy);
test('includes a range filter', () => {
expect(
searchBody.query.bool.filter.find((filter) => filter.hasOwnProperty('range'))
Expand All @@ -47,6 +47,7 @@ describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {
const searchBody = getElasticsearchMetricQuery(
expressionParams,
timeframe,
100,
groupBy,
filterQuery
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import { MetricExpressionParams, Aggregators } from '../types';
import { createPercentileAggregation } from './create_percentile_aggregation';
import { calculateDateHistogramOffset } from '../../../metrics/lib/calculate_date_histogram_offset';

const COMPOSITE_RESULTS_PER_PAGE = 100;

const getParsedFilterQuery: (filterQuery: string | undefined) => Record<string, any> | null = (
filterQuery
) => {
Expand All @@ -23,6 +21,7 @@ const getParsedFilterQuery: (filterQuery: string | undefined) => Record<string,
export const getElasticsearchMetricQuery = (
{ metric, aggType, timeUnit, timeSize }: MetricExpressionParams,
timeframe: { start: number; end: number },
compositeSize: number,
groupBy?: string | string[],
filterQuery?: string
) => {
Expand Down Expand Up @@ -73,7 +72,7 @@ export const getElasticsearchMetricQuery = (
? {
groupings: {
composite: {
size: COMPOSITE_RESULTS_PER_PAGE,
size: compositeSize,
sources: Array.isArray(groupBy)
? groupBy.map((field, index) => ({
[`groupBy${index}`]: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,9 @@ describe('The metric threshold alert type', () => {
});

const createMockStaticConfiguration = (sources: any) => ({
alerting: {
group_by_page_size: 100,
},
inventory: {
compositeSize: 2000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs) =>
sourceId || 'default'
);
const config = source.configuration;
const compositeSize = libs.configuration.alerting.group_by_page_size;

const previousGroupBy = state.groupBy;
const previousFilterQuery = state.filterQuery;
Expand All @@ -135,7 +136,8 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs) =>
services.scopedClusterClient.asCurrentUser,
params as EvaluatedRuleParams,
config,
prevGroups
prevGroups,
compositeSize
);

// Because each alert result has the same group definitions, just grab the groups from the first one.
Expand Down Expand Up @@ -248,7 +250,6 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs) =>
});
}
}

return { groups, groupBy: params.groupBy, filterQuery: params.filterQuery };
});

Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/infra/server/lib/infra_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { handleEsError } from '../../../../../src/plugins/es_ui_shared/server';
import { InfraConfig } from '../plugin';
import { InfraConfig } from '../types';
import { GetLogQueryFields } from '../services/log_queries/get_log_query_fields';
import { RulesServiceSetup } from '../services/rules';
import { KibanaFramework } from './adapters/framework/kibana_framework_adapter';
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/infra/server/lib/sources/sources.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ describe('the InfraSources lib', () => {
});

const createMockStaticConfiguration = (sources: any) => ({
alerting: {
group_by_page_size: 10000,
},
enabled: true,
inventory: {
compositeSize: 2000,
Expand Down
11 changes: 7 additions & 4 deletions x-pack/plugins/infra/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { Server } from '@hapi/hapi';
import { schema, TypeOf } from '@kbn/config-schema';
import { schema } from '@kbn/config-schema';
import { i18n } from '@kbn/i18n';
import { Logger } from '@kbn/logging';
import {
Expand Down Expand Up @@ -35,15 +35,18 @@ import { InfraBackendLibs, InfraDomainLibs } from './lib/infra_types';
import { infraSourceConfigurationSavedObjectType, InfraSources } from './lib/sources';
import { InfraSourceStatus } from './lib/source_status';
import { LogEntriesService } from './services/log_entries';
import { InfraPluginRequestHandlerContext } from './types';
import { InfraPluginRequestHandlerContext, InfraConfig } from './types';
import { UsageCollector } from './usage/usage_collector';
import { createGetLogQueryFields } from './services/log_queries/get_log_query_fields';
import { handleEsError } from '../../../../src/plugins/es_ui_shared/server';
import { RulesService } from './services/rules';
import { configDeprecations, getInfraDeprecationsFactory } from './deprecations';

export const config: PluginConfigDescriptor = {
export const config: PluginConfigDescriptor<InfraConfig> = {
schema: schema.object({
alerting: schema.object({
group_by_page_size: schema.number({ defaultValue: 10000 }),
}),
inventory: schema.object({
compositeSize: schema.number({ defaultValue: 2000 }),
}),
Expand All @@ -64,7 +67,7 @@ export const config: PluginConfigDescriptor = {
deprecations: configDeprecations,
};

export type InfraConfig = TypeOf<typeof config.schema>;
export type { InfraConfig };

export interface KbnServer extends Server {
usage: any;
Expand Down
16 changes: 16 additions & 0 deletions x-pack/plugins/infra/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,19 @@ export interface InfraPluginRequestHandlerContext extends RequestHandlerContext
infra: InfraRequestHandlerContext;
search: SearchRequestHandlerContext;
}

export interface InfraConfig {
alerting: {
group_by_page_size: number;
};
inventory: {
compositeSize: number;
};
sources?: {
default?: {
fields?: {
message?: string[];
};
};
};
}
Loading