Skip to content

Commit

Permalink
Alter handling of composite aggs
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerry350 committed Jun 12, 2020
1 parent f0e11f6 commit 7806cf3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 24 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/infra/common/alerting/logs/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ const GroupedSearchQueryResponseRT = rt.intersection([
doc_count: rt.number,
})
),
after_key: rt.string,
after_key: rt.record(rt.string, rt.string),
}),
}),
hits: rt.type({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ import {
import { InfraBackendLibs } from '../../infra_types';
import { getIntervalInSeconds } from '../../../utils/get_interval_in_seconds';
import { InfraSource } from '../../../../common/http_api/source_api';
import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler';
import { getAllCompositeData } from '../../../utils/get_all_composite_data';
import { InfraDatabaseSearchResponse } from '../../../lib/adapters/framework';
import { decodeOrThrow } from '../../../../common/runtime_types';

const UNGROUPED_FACTORY_KEY = '*';
const COMPOSITE_GROUP_SIZE = 40;

const checkValueAgainstComparatorMap: {
[key: string]: (a: number, b: number) => boolean;
Expand Down Expand Up @@ -146,7 +144,7 @@ const getESQuery = (
sourceConfiguration: InfraSource['configuration'],
index: string
): object => {
const { timeSize, timeUnit, criteria, groupBy, count } = params;
const { timeSize, timeUnit, criteria, groupBy } = params;
const interval = `${timeSize}${timeUnit}`;
const intervalAsSeconds = getIntervalInSeconds(interval);
const to = Date.now();
Expand Down Expand Up @@ -182,7 +180,7 @@ const getESQuery = (
? {
groups: {
composite: {
size: 40,
size: COMPOSITE_GROUP_SIZE,
sources: groupBy.map((field, groupIndex) => ({
[`group-${groupIndex}-${field}`]: {
terms: { field },
Expand Down Expand Up @@ -330,28 +328,31 @@ const getUngroupedResults = async (
};

const getGroupedResults = async (
query: object,
query: any,
index: string,
callCluster: AlertServices['callCluster']
) => {
const bucketSelector = (
response: InfraDatabaseSearchResponse<any, GroupedSearchQueryResponse['aggregations']>
) => {
return response.aggregations?.groups?.buckets || [];
};

const afterKeyHandler = createAfterKeyHandler('aggs.groups.composite.after', (response) => {
return response.aggregations?.groups?.after_key;
});

const compositeBuckets = await getAllCompositeData(
(body) => callCluster('search', query),
query,
bucketSelector,
afterKeyHandler
);
let compositeGroupBuckets: GroupedSearchQueryResponse['aggregations']['groups']['buckets'] = [];
let lastAfterKey: GroupedSearchQueryResponse['aggregations']['groups']['after_key'] | undefined;

while (true) {
const queryWithAfterKey = { ...query };
queryWithAfterKey.body.aggregations.groups.composite.after = lastAfterKey;
const groupResponse: GroupedSearchQueryResponse = await callCluster(
'search',
queryWithAfterKey
);
compositeGroupBuckets = [
...compositeGroupBuckets,
...groupResponse.aggregations.groups.buckets,
];
lastAfterKey = groupResponse.aggregations.groups.after_key;
if (groupResponse.aggregations.groups.buckets.length < COMPOSITE_GROUP_SIZE) {
break;
}
}

return compositeBuckets;
return compositeGroupBuckets;
};

const createConditionsMessage = (criteria: LogDocumentCountAlertParams['criteria']) => {
Expand Down

0 comments on commit 7806cf3

Please sign in to comment.