Skip to content

Commit

Permalink
[APM] Fixes duplicate ML job creation for existing environments (elas…
Browse files Browse the repository at this point in the history
…tic#85023) (elastic#93098)

* [APM] Fixes duplicate ML job creation for existing environments (elastic#85023)

* Removes commented out test code.

* Adds API integration tests

* clean up code for readability
  • Loading branch information
ogupte committed Mar 2, 2021
1 parent 495c259 commit dad6e6b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
} from '../../../common/elasticsearch_fieldnames';
import { APM_ML_JOB_GROUP, ML_MODULE_ID_APM_TRANSACTION } from './constants';
import { withApmSpan } from '../../utils/with_apm_span';
import { getAnomalyDetectionJobs } from './get_anomaly_detection_jobs';

export async function createAnomalyDetectionJobs(
setup: Setup,
Expand All @@ -38,14 +39,19 @@ export async function createAnomalyDetectionJobs(
throw Boom.forbidden(ML_ERRORS.ML_NOT_AVAILABLE_IN_SPACE);
}

const uniqueMlJobEnvs = await getUniqueMlJobEnvs(setup, environments, logger);
if (uniqueMlJobEnvs.length === 0) {
return [];
}

return withApmSpan('create_anomaly_detection_jobs', async () => {
logger.info(
`Creating ML anomaly detection jobs for environments: [${environments}].`
`Creating ML anomaly detection jobs for environments: [${uniqueMlJobEnvs}].`
);

const indexPatternName = indices['apm_oss.transactionIndices'];
const responses = await Promise.all(
environments.map((environment) =>
uniqueMlJobEnvs.map((environment) =>
createAnomalyDetectionJob({ ml, environment, indexPatternName })
)
);
Expand Down Expand Up @@ -105,3 +111,24 @@ async function createAnomalyDetectionJob({
});
});
}

async function getUniqueMlJobEnvs(
setup: Setup,
environments: string[],
logger: Logger
) {
// skip creation of duplicate ML jobs
const jobs = await getAnomalyDetectionJobs(setup, logger);
const existingMlJobEnvs = jobs.map(({ environment }) => environment);
const requestedExistingMlJobEnvs = environments.filter((env) =>
existingMlJobEnvs.includes(env)
);

if (requestedExistingMlJobEnvs.length) {
logger.warn(
`Skipping creation of existing ML jobs for environments: [${requestedExistingMlJobEnvs}]}`
);
}

return environments.filter((env) => !existingMlJobEnvs.includes(env));
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export const createAnomalyDetectionJobsRoute = createRoute({
}

await createAnomalyDetectionJobs(setup, environments, context.logger);

notifyFeatureUsage({
licensingPlugin: context.licensing,
featureName: 'ml',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import expect from '@kbn/expect';
import { countBy } from 'lodash';
import { registry } from '../../../common/registry';
import { FtrProviderContext } from '../../../common/ftr_provider_context';

Expand Down Expand Up @@ -49,7 +50,26 @@ export default function apiTest({ getService }: FtrProviderContext) {

const { body } = await getJobs();
expect(body.hasLegacyJobs).to.be(false);
expect(body.jobs.map((job: any) => job.environment)).to.eql(['production', 'staging']);
expect(countBy(body.jobs, 'environment')).to.eql({
production: 1,
staging: 1,
});
});

describe('with existing ML jobs', () => {
before(async () => {
await createJobs(['production', 'staging']);
});
it('skips duplicate job creation', async () => {
await createJobs(['production', 'test']);

const { body } = await getJobs();
expect(countBy(body.jobs, 'environment')).to.eql({
production: 1,
staging: 1,
test: 1,
});
});
});
});
});
Expand Down

0 comments on commit dad6e6b

Please sign in to comment.