Skip to content

Commit

Permalink
[7.x] [ML] Update cloning for jobs to use exclude_generated (elastic#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qn895 authored Feb 2, 2021
1 parent 2ba8627 commit 821b850
Show file tree
Hide file tree
Showing 17 changed files with 206 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { cloneDeep } from 'lodash';
import { Datafeed } from './datafeed';
import { DatafeedStats } from './datafeed_stats';
import { Job } from './job';
Expand All @@ -25,16 +24,6 @@ export interface CombinedJobWithStats extends JobWithStats {
datafeed_config: DatafeedWithStats;
}

export function expandCombinedJobConfig(combinedJob: CombinedJob) {
const combinedJobClone = cloneDeep(combinedJob);
const job = combinedJobClone;
const datafeed = combinedJobClone.datafeed_config;
// @ts-expect-error
delete job.datafeed_config;

return { job, datafeed };
}

export function isCombinedJobWithStats(arg: any): arg is CombinedJobWithStats {
return typeof arg.job_id === 'string';
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export const Page: FC<Props> = ({ jobId }) => {
if (currentIndexPattern) {
(async function () {
if (jobId !== undefined) {
const analyticsConfigs = await ml.dataFrameAnalytics.getDataFrameAnalytics(jobId);
const analyticsConfigs = await ml.dataFrameAnalytics.getDataFrameAnalytics(jobId, true);
if (
Array.isArray(analyticsConfigs.data_frame_analytics) &&
analyticsConfigs.data_frame_analytics.length > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,6 @@ export type CloneDataFrameAnalyticsConfig = Omit<
*/
export function extractCloningConfig({
id,
version,
// eslint-disable-next-line @typescript-eslint/naming-convention
create_time,
...configToClone
}: DeepReadonly<DataFrameAnalyticsConfig>): CloneDataFrameAnalyticsConfig {
return (cloneDeep({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ export function loadFullJob(jobId) {
});
}

export function loadJobForCloning(jobId) {
return new Promise((resolve, reject) => {
ml.jobs
.jobForCloning(jobId)
.then((resp) => {
if (resp) {
resolve(resp);
} else {
throw new Error(`Could not find job ${jobId}`);
}
})
.catch((error) => {
reject(error);
});
});
}

export function isStartable(jobs) {
return jobs.some(
(j) => j.datafeedState === DATAFEED_STATE.STOPPED && j.jobState !== JOB_STATE.CLOSING
Expand Down Expand Up @@ -180,44 +197,56 @@ function showResults(resp, action) {

export async function cloneJob(jobId) {
try {
const job = await loadFullJob(jobId);
if (job.custom_settings && job.custom_settings.created_by) {
const [{ job: cloneableJob, datafeed }, originalJob] = await Promise.all([
loadJobForCloning(jobId),
loadFullJob(jobId, false),
]);
if (cloneableJob !== undefined && originalJob?.custom_settings?.created_by !== undefined) {
// if the job is from a wizards, i.e. contains a created_by property
// use tempJobCloningObjects to temporarily store the job
mlJobService.tempJobCloningObjects.job = job;
mlJobService.tempJobCloningObjects.createdBy = originalJob?.custom_settings?.created_by;
mlJobService.tempJobCloningObjects.job = cloneableJob;

if (
job.data_counts.earliest_record_timestamp !== undefined &&
job.data_counts.latest_record_timestamp !== undefined &&
job.data_counts.latest_bucket_timestamp !== undefined
originalJob.data_counts.earliest_record_timestamp !== undefined &&
originalJob.data_counts.latest_record_timestamp !== undefined &&
originalJob.data_counts.latest_bucket_timestamp !== undefined
) {
// if the job has run before, use the earliest and latest record timestamp
// as the cloned job's time range
let start = job.data_counts.earliest_record_timestamp;
let end = job.data_counts.latest_record_timestamp;
let start = originalJob.data_counts.earliest_record_timestamp;
let end = originalJob.data_counts.latest_record_timestamp;

if (job.datafeed_config.aggregations !== undefined) {
if (originalJob.datafeed_config.aggregations !== undefined) {
// if the datafeed uses aggregations the earliest and latest record timestamps may not be the same
// as the start and end of the data in the index.
const bucketSpanMs = parseInterval(job.analysis_config.bucket_span).asMilliseconds();
const bucketSpanMs = parseInterval(
originalJob.analysis_config.bucket_span
).asMilliseconds();
// round down to the start of the nearest bucket
start =
Math.floor(job.data_counts.earliest_record_timestamp / bucketSpanMs) * bucketSpanMs;
Math.floor(originalJob.data_counts.earliest_record_timestamp / bucketSpanMs) *
bucketSpanMs;
// use latest_bucket_timestamp and add two bucket spans minus one ms
end = job.data_counts.latest_bucket_timestamp + bucketSpanMs * 2 - 1;
end = originalJob.data_counts.latest_bucket_timestamp + bucketSpanMs * 2 - 1;
}

mlJobService.tempJobCloningObjects.start = start;
mlJobService.tempJobCloningObjects.end = end;
}
} else {
// otherwise use the tempJobCloningObjects
mlJobService.tempJobCloningObjects.job = job;
mlJobService.tempJobCloningObjects.job = cloneableJob;
// resets the createdBy field in case it still retains previous settings
mlJobService.tempJobCloningObjects.createdBy = undefined;
}
if (datafeed !== undefined) {
mlJobService.tempJobCloningObjects.datafeed = datafeed;
}

if (job.calendars) {
if (originalJob.calendars) {
mlJobService.tempJobCloningObjects.calendars = await mlCalendarService.fetchCalendarsByIds(
job.calendars
originalJob.calendars
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import { ApplicationStart } from 'kibana/public';
import { IndexPatternsContract } from '../../../../../../../../../src/plugins/data/public';
import { mlJobService } from '../../../../services/job_service';
import { loadIndexPatterns, getIndexPatternIdFromName } from '../../../../util/index_utils';
import { CombinedJob } from '../../../../../../common/types/anomaly_detection_jobs';
import { Datafeed, Job } from '../../../../../../common/types/anomaly_detection_jobs';
import { CREATED_BY_LABEL, JOB_TYPE } from '../../../../../../common/constants/new_job';

export async function preConfiguredJobRedirect(
indexPatterns: IndexPatternsContract,
basePath: string,
navigateToUrl: ApplicationStart['navigateToUrl']
) {
const { job } = mlJobService.tempJobCloningObjects;
if (job) {
const { createdBy, job, datafeed } = mlJobService.tempJobCloningObjects;
if (job && datafeed) {
try {
await loadIndexPatterns(indexPatterns);
const redirectUrl = getWizardUrlFromCloningJob(job);
const redirectUrl = getWizardUrlFromCloningJob(createdBy, job, datafeed);
await navigateToUrl(`${basePath}/app/ml/${redirectUrl}`);
return Promise.reject();
} catch (error) {
Expand All @@ -33,8 +33,8 @@ export async function preConfiguredJobRedirect(
}
}

function getWizardUrlFromCloningJob(job: CombinedJob) {
const created = job?.custom_settings?.created_by;
function getWizardUrlFromCloningJob(createdBy: string | undefined, job: Job, datafeed: Datafeed) {
const created = createdBy;
let page = '';

switch (created) {
Expand All @@ -55,7 +55,7 @@ function getWizardUrlFromCloningJob(job: CombinedJob) {
break;
}

const indexPatternId = getIndexPatternIdFromName(job.datafeed_config.indices.join());
const indexPatternId = getIndexPatternIdFromName(datafeed.indices.join());

return `jobs/new_job/${page}?index=${indexPatternId}&_g=()`;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import { useMlContext } from '../../../../contexts/ml';
import { getTimeFilterRange } from '../../../../components/full_time_range_selector';
import { getTimeBucketsFromCache } from '../../../../util/time_buckets';
import { ExistingJobsAndGroups, mlJobService } from '../../../../services/job_service';
import { expandCombinedJobConfig } from '../../../../../../common/types/anomaly_detection_jobs';
import { newJobCapsService } from '../../../../services/new_job_capabilities_service';
import { EVENT_RATE_FIELD_ID } from '../../../../../../common/types/fields';
import { getNewJobDefaults } from '../../../../services/ml_server_info';
Expand Down Expand Up @@ -74,10 +73,11 @@ export const Page: FC<PageProps> = ({ existingJobsAndGroups, jobType }) => {

if (mlJobService.tempJobCloningObjects.job !== undefined) {
// cloning a job
const clonedJob = mlJobService.cloneJob(mlJobService.tempJobCloningObjects.job);
const { job, datafeed } = expandCombinedJobConfig(clonedJob);
const clonedJob = mlJobService.tempJobCloningObjects.job;
const clonedDatafeed = mlJobService.cloneDatafeed(mlJobService.tempJobCloningObjects.datafeed);

initCategorizationSettings();
jobCreator.cloneFromExistingJob(job, datafeed);
jobCreator.cloneFromExistingJob(clonedJob, clonedDatafeed);

// if we're not skipping the time range, this is a standard job clone, so wipe the jobId
if (mlJobService.tempJobCloningObjects.skipTimeRangeStep === false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { SearchResponse } from 'elasticsearch';
import { TimeRange } from 'src/plugins/data/common/query/timefilter/types';
import { CombinedJob } from '../../../common/types/anomaly_detection_jobs';
import { CombinedJob, Datafeed } from '../../../common/types/anomaly_detection_jobs';
import { Calendar } from '../../../common/types/calendars';

export interface ExistingJobsAndGroups {
Expand All @@ -18,6 +18,8 @@ declare interface JobService {
jobs: CombinedJob[];
createResultsUrlForJobs: (jobs: any[], target: string, timeRange?: TimeRange) => string;
tempJobCloningObjects: {
createdBy?: string;
datafeed?: Datafeed;
job: any;
skipTimeRangeStep: boolean;
start?: number;
Expand All @@ -26,7 +28,7 @@ declare interface JobService {
};
skipTimeRangeStep: boolean;
saveNewJob(job: any): Promise<any>;
cloneJob(job: any): any;
cloneDatafeed(datafeed: any): Datafeed;
openJob(jobId: string): Promise<any>;
saveNewDatafeed(datafeedConfig: any, jobId: string): Promise<any>;
startDatafeed(
Expand Down
66 changes: 8 additions & 58 deletions x-pack/plugins/ml/public/application/services/job_service.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class JobService {
// if populated when loading the job management page, the start datafeed modal
// is automatically opened.
this.tempJobCloningObjects = {
createdBy: undefined,
datafeed: undefined,
job: undefined,
skipTimeRangeStep: false,
start: undefined,
Expand Down Expand Up @@ -325,67 +327,15 @@ class JobService {
return ml.addJob({ jobId: job.job_id, job }).then(func).catch(func);
}

cloneJob(job) {
// create a deep copy of a job object
// also remove items from the job which are set by the server and not needed
// in the future this formatting could be optional
const tempJob = cloneDeep(job);

// remove all of the items which should not be copied
// such as counts, state and times
delete tempJob.state;
delete tempJob.job_version;
delete tempJob.data_counts;
delete tempJob.create_time;
delete tempJob.finished_time;
delete tempJob.last_data_time;
delete tempJob.model_size_stats;
delete tempJob.node;
delete tempJob.average_bucket_processing_time_ms;
delete tempJob.model_snapshot_id;
delete tempJob.open_time;
delete tempJob.established_model_memory;
delete tempJob.calendars;
delete tempJob.timing_stats;
delete tempJob.forecasts_stats;
delete tempJob.assignment_explanation;

delete tempJob.analysis_config.use_per_partition_normalization;

each(tempJob.analysis_config.detectors, (d) => {
delete d.detector_index;
});
cloneDatafeed(datafeed) {
const tempDatafeed = cloneDeep(datafeed);

// remove parts of the datafeed config which should not be copied
if (tempJob.datafeed_config) {
delete tempJob.datafeed_config.datafeed_id;
delete tempJob.datafeed_config.job_id;
delete tempJob.datafeed_config.state;
delete tempJob.datafeed_config.node;
delete tempJob.datafeed_config.timing_stats;
delete tempJob.datafeed_config.assignment_explanation;

// remove query_delay if it's between 60s and 120s
// the back-end produces a random value between 60 and 120 and so
// by deleting it, the back-end will produce a new random value
if (tempJob.datafeed_config.query_delay) {
const interval = parseInterval(tempJob.datafeed_config.query_delay);
if (interval !== null) {
const queryDelay = interval.asSeconds();
if (queryDelay > 60 && queryDelay < 120) {
delete tempJob.datafeed_config.query_delay;
}
}
}
if (tempDatafeed) {
delete tempDatafeed.datafeed_id;
delete tempDatafeed.job_id;
}

// when jumping from a wizard to the advanced job creation,
// the wizard's created_by information should be stripped.
if (tempJob.custom_settings && tempJob.custom_settings.created_by) {
delete tempJob.custom_settings.created_by;
}

return tempJob;
return tempDatafeed;
}

// find a job based on the id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ interface JobsExistsResponse {
}

export const dataFrameAnalytics = {
getDataFrameAnalytics(analyticsId?: string) {
getDataFrameAnalytics(analyticsId?: string, excludeGenerated?: boolean) {
const analyticsIdString = analyticsId !== undefined ? `/${analyticsId}` : '';
return http<GetDataFrameAnalyticsResponse>({
path: `${basePath()}/data_frame/analytics${analyticsIdString}`,
method: 'GET',
...(excludeGenerated ? { query: { excludeGenerated } } : {}),
});
},
getDataFrameAnalyticsStats(analyticsId?: string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
MlJobWithTimeRange,
MlSummaryJobs,
CombinedJobWithStats,
Job,
Datafeed,
} from '../../../../common/types/anomaly_detection_jobs';
import { JobMessage } from '../../../../common/types/audit_message';
import { AggFieldNamePair } from '../../../../common/types/fields';
Expand Down Expand Up @@ -48,6 +50,15 @@ export const jobsApiProvider = (httpService: HttpService) => ({
});
},

jobForCloning(jobId: string) {
const body = JSON.stringify({ jobId });
return httpService.http<{ job?: Job; datafeed?: Datafeed } | undefined>({
path: `${basePath()}/jobs/job_for_cloning`,
method: 'POST',
body,
});
},

jobs(jobIds: string[]) {
const body = JSON.stringify({ jobIds });
return httpService.http<CombinedJobWithStats[]>({
Expand Down
Loading

0 comments on commit 821b850

Please sign in to comment.