Skip to content

Commit

Permalink
[ML] Filtering runtime mappings in anomaly detection wizards (#91534)
Browse files Browse the repository at this point in the history
* [ML] Filtering runtime mappings in anomaly detection wizards

* updating tests

* adding check for null when parsing aggs

* removing async from tests
  • Loading branch information
jgowdyelastic authored Feb 17, 2021
1 parent a32f86d commit d890d22
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
SHARED_RESULTS_INDEX_NAME,
} from '../../../../../../common/constants/new_job';
import { collectAggs } from './util/general';
import { filterRuntimeMappings } from './util/filter_runtime_mappings';
import { parseInterval } from '../../../../../../common/util/parse_interval';
import { Calendar } from '../../../../../../common/types/calendars';
import { mlCalendarService } from '../../../../services/calendar_service';
Expand Down Expand Up @@ -65,6 +66,7 @@ export class JobCreator {
protected _scriptFields: Field[] = [];
protected _runtimeFields: Field[] = [];
protected _runtimeMappings: RuntimeMappings | null = null;
protected _filterRuntimeMappingsOnSave: boolean = true;
protected _aggregationFields: Field[] = [];
protected _sparseData: boolean = false;
private _stopAllRefreshPolls: {
Expand Down Expand Up @@ -546,7 +548,8 @@ export class JobCreator {

public async createDatafeed(): Promise<object> {
try {
return await mlJobService.saveNewDatafeed(this._datafeed_config, this._job_config.job_id);
const tempDatafeed = this._getDatafeedWithFilteredRuntimeMappings();
return await mlJobService.saveNewDatafeed(tempDatafeed, this._job_config.job_id);
} catch (error) {
throw error;
}
Expand All @@ -559,6 +562,23 @@ export class JobCreator {
return jobRunner;
}

private _getDatafeedWithFilteredRuntimeMappings(): Datafeed {
if (this._filterRuntimeMappingsOnSave === false) {
return this._datafeed_config;
}

const { runtime_mappings: filteredRuntimeMappings } = filterRuntimeMappings(
this._job_config,
this._datafeed_config
);

return {
...this._datafeed_config,
runtime_mappings:
Object.keys(filteredRuntimeMappings).length > 0 ? filteredRuntimeMappings : undefined,
};
}

public subscribeToProgress(func: ProgressSubscriber) {
this._subscribers.push(func);
}
Expand Down Expand Up @@ -645,6 +665,14 @@ export class JobCreator {
return JSON.stringify(this._datafeed_config, null, 2);
}

public set filterRuntimeMappingsOnSave(filter: boolean) {
this._filterRuntimeMappingsOnSave = filter;
}

public get filterRuntimeMappingsOnSave(): boolean {
return this._filterRuntimeMappingsOnSave;
}

protected _initPerPartitionCategorization() {
if (this._job_config.analysis_config.per_partition_categorization === undefined) {
this._job_config.analysis_config.per_partition_categorization = {};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Job, Datafeed } from '../../../../../../../common/types/anomaly_detection_jobs';
import { filterRuntimeMappings } from './filter_runtime_mappings';

function getJob(): Job {
return {
job_id: 'test',
description: '',
groups: [],
analysis_config: {
bucket_span: '15m',
detectors: [
{
function: 'mean',
field_name: 'responsetime',
},
],
influencers: [],
},
data_description: {
time_field: '@timestamp',
},
analysis_limits: {
model_memory_limit: '11MB',
},
model_plot_config: {
enabled: false,
annotations_enabled: false,
},
};
}

function getDatafeed(): Datafeed {
return {
datafeed_id: 'datafeed-test',
job_id: 'dds',
indices: ['farequote-*'],
query: {
bool: {
must: [
{
match_all: {},
},
],
},
},
runtime_mappings: {
responsetime_big: {
type: 'double',
script: {
source: "emit(doc['responsetime'].value * 100.0)",
},
},
airline_lower: {
type: 'keyword',
script: {
source: "emit(doc['airline'].value.toLowerCase())",
},
},
},
};
}

function getAggs() {
return {
buckets: {
date_histogram: {
field: '@timestamp',
fixed_interval: '90000ms',
},
aggregations: {
responsetime: {
avg: {
field: 'responsetime_big',
},
},
'@timestamp': {
max: {
field: '@timestamp',
},
},
},
},
};
}

describe('filter_runtime_mappings', () => {
describe('filterRuntimeMappings()', () => {
let job: Job;
let datafeed: Datafeed;
beforeEach(() => {
job = getJob();
datafeed = getDatafeed();
});

test('returns no runtime mappings, no mappings in aggs', () => {
const resp = filterRuntimeMappings(job, datafeed);
expect(Object.keys(resp.runtime_mappings).length).toEqual(0);

expect(Object.keys(resp.discarded_mappings).length).toEqual(2);
expect(resp.discarded_mappings.responsetime_big).not.toEqual(undefined);
expect(resp.discarded_mappings.airline_lower).not.toEqual(undefined);
});

test('returns no runtime mappings, no runtime mappings in datafeed', () => {
datafeed.runtime_mappings = undefined;
const resp = filterRuntimeMappings(job, datafeed);
expect(Object.keys(resp.runtime_mappings).length).toEqual(0);
expect(resp.runtime_mappings.responsetime_big).toEqual(undefined);

expect(Object.keys(resp.discarded_mappings).length).toEqual(0);
expect(resp.discarded_mappings.airline_lower).toEqual(undefined);
});

test('return one runtime mapping and one unused mapping, mappings in aggs', () => {
datafeed.aggregations = getAggs();
const resp = filterRuntimeMappings(job, datafeed);
expect(Object.keys(resp.runtime_mappings).length).toEqual(1);
expect(resp.runtime_mappings.responsetime_big).not.toEqual(undefined);

expect(Object.keys(resp.discarded_mappings).length).toEqual(1);
expect(resp.discarded_mappings.airline_lower).not.toEqual(undefined);
});

test('return no runtime mappings, no mappings in aggs', () => {
datafeed.aggregations = getAggs();
datafeed.aggregations!.buckets!.aggregations!.responsetime!.avg!.field! = 'responsetime';

const resp = filterRuntimeMappings(job, datafeed);
expect(Object.keys(resp.runtime_mappings).length).toEqual(0);

expect(Object.keys(resp.discarded_mappings).length).toEqual(2);
expect(resp.discarded_mappings.responsetime_big).not.toEqual(undefined);
expect(resp.discarded_mappings.airline_lower).not.toEqual(undefined);
});

test('return one runtime mapping and one unused mapping, no mappings in aggs', () => {
// set the detector field to be a runtime mapping
job.analysis_config.detectors[0].field_name = 'responsetime_big';
const resp = filterRuntimeMappings(job, datafeed);
expect(Object.keys(resp.runtime_mappings).length).toEqual(1);
expect(resp.runtime_mappings.responsetime_big).not.toEqual(undefined);

expect(Object.keys(resp.discarded_mappings).length).toEqual(1);
expect(resp.discarded_mappings.airline_lower).not.toEqual(undefined);
});

test('return two runtime mappings, no mappings in aggs', () => {
// set the detector field to be a runtime mapping
job.analysis_config.detectors[0].field_name = 'responsetime_big';
// set the detector by field to be a runtime mapping
job.analysis_config.detectors[0].by_field_name = 'airline_lower';
const resp = filterRuntimeMappings(job, datafeed);
expect(Object.keys(resp.runtime_mappings).length).toEqual(2);
expect(resp.runtime_mappings.responsetime_big).not.toEqual(undefined);
expect(resp.runtime_mappings.airline_lower).not.toEqual(undefined);

expect(Object.keys(resp.discarded_mappings).length).toEqual(0);
});

test('return two runtime mappings, no mappings in aggs, categorization job', () => {
job.analysis_config.detectors[0].function = 'count';
// set the detector field to be a runtime mapping
job.analysis_config.detectors[0].field_name = undefined;
// set the detector by field to be a runtime mapping
job.analysis_config.detectors[0].by_field_name = 'mlcategory';
job.analysis_config.categorization_field_name = 'airline_lower';

const resp = filterRuntimeMappings(job, datafeed);
expect(Object.keys(resp.runtime_mappings).length).toEqual(1);
expect(resp.runtime_mappings.airline_lower).not.toEqual(undefined);

expect(Object.keys(resp.discarded_mappings).length).toEqual(1);
expect(resp.discarded_mappings.responsetime_big).not.toEqual(undefined);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { RuntimeMappings } from '../../../../../../../common/types/fields';
import type { Datafeed, Job } from '../../../../../../../common/types/anomaly_detection_jobs';

interface Response {
runtime_mappings: RuntimeMappings;
discarded_mappings: RuntimeMappings;
}

export function filterRuntimeMappings(job: Job, datafeed: Datafeed): Response {
if (datafeed.runtime_mappings === undefined) {
return {
runtime_mappings: {},
discarded_mappings: {},
};
}

const usedFields = findFieldsInJob(job, datafeed);

const { runtimeMappings, discardedMappings } = createMappings(
datafeed.runtime_mappings,
usedFields
);

return { runtime_mappings: runtimeMappings, discarded_mappings: discardedMappings };
}

function findFieldsInJob(job: Job, datafeed: Datafeed) {
const usedFields = new Set<string>();
job.analysis_config.detectors.forEach((d) => {
if (d.field_name !== undefined) {
usedFields.add(d.field_name);
}
if (d.by_field_name !== undefined) {
usedFields.add(d.by_field_name);
}
if (d.over_field_name !== undefined) {
usedFields.add(d.over_field_name);
}
if (d.partition_field_name !== undefined) {
usedFields.add(d.partition_field_name);
}
});

if (job.analysis_config.categorization_field_name !== undefined) {
usedFields.add(job.analysis_config.categorization_field_name);
}

if (job.analysis_config.summary_count_field_name !== undefined) {
usedFields.add(job.analysis_config.summary_count_field_name);
}

if (job.analysis_config.influencers !== undefined) {
job.analysis_config.influencers.forEach((i) => usedFields.add(i));
}

const aggs = datafeed.aggregations ?? datafeed.aggs;
if (aggs !== undefined) {
findFieldsInAgg(aggs).forEach((f) => usedFields.add(f));
}

return [...usedFields];
}

function findFieldsInAgg(obj: Record<string, any>) {
const fields: string[] = [];
Object.entries(obj).forEach(([key, val]) => {
if (typeof val === 'object' && val !== null) {
fields.push(...findFieldsInAgg(val));
} else if (typeof val === 'string' && key === 'field') {
fields.push(val);
}
});
return fields;
}

function createMappings(rm: RuntimeMappings, usedFieldNames: string[]) {
return {
runtimeMappings: usedFieldNames.reduce((acc, cur) => {
if (rm[cur] !== undefined) {
acc[cur] = rm[cur];
}
return acc;
}, {} as RuntimeMappings),
discardedMappings: Object.keys(rm).reduce((acc, cur) => {
if (usedFieldNames.includes(cur) === false && rm[cur] !== undefined) {
acc[cur] = rm[cur];
}
return acc;
}, {} as RuntimeMappings),
};
}

0 comments on commit d890d22

Please sign in to comment.