Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Autoscaling for machine learning #59309

Merged
merged 33 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4ceb881
[ML] Adding ML autoscaling decider integration
benwtrent Jul 8, 2020
dacbffc
Merge branch 'master' into feature/ml-autoscaling-integration
benwtrent Sep 14, 2020
f5a1d80
Merge branch 'master' into feature/ml-autoscaling-integration
benwtrent Sep 22, 2020
bb639c6
Merge remote-tracking branch 'upstream/master' into feature/ml-autosc…
benwtrent Sep 28, 2020
040b16d
adjusting autoscaling decider
benwtrent Sep 28, 2020
d199dca
fixing setting handling
benwtrent Sep 28, 2020
1394ca1
addressing scale up
benwtrent Sep 28, 2020
5b267e0
finalizing scale up logic
benwtrent Sep 29, 2020
8b8462e
adding downscale delay option
benwtrent Oct 13, 2020
0aa0301
Merge remote-tracking branch 'upstream/master' into feature/ml-autosc…
benwtrent Oct 13, 2020
b2139a5
adding native memory calculator for special dynamic case
benwtrent Oct 14, 2020
1e8183b
adjusting how we calculate memory percentage
benwtrent Oct 14, 2020
453d4a7
Merge remote-tracking branch 'upstream/master' into feature/ml-autosc…
benwtrent Oct 14, 2020
8e846e4
handling native size to node size and scale down
benwtrent Oct 15, 2020
28f2257
Merge branch 'master' into feature/ml-autoscaling-integration
benwtrent Oct 21, 2020
a665245
updating from master
benwtrent Oct 21, 2020
6e45e6a
undo bad delete
benwtrent Oct 21, 2020
d5e065b
minor adjustments
benwtrent Oct 21, 2020
a4a7f23
Merge branch 'master' into feature/ml-autoscaling-integration
benwtrent Nov 10, 2020
f4ed982
fixing tests
benwtrent Nov 10, 2020
a00e129
fixing tests
benwtrent Nov 11, 2020
e47a136
Merge remote-tracking branch 'upstream/master' into feature/ml-autosc…
benwtrent Nov 11, 2020
7fbe30e
adding scaledown tests, refactoring nodeload class
benwtrent Nov 11, 2020
8cf46ed
adding tests and validations
benwtrent Nov 12, 2020
6684333
Merge remote-tracking branch 'upstream/master' into feature/ml-autosc…
benwtrent Nov 12, 2020
6dc17aa
Apply suggestions from code review
benwtrent Nov 12, 2020
f023e77
Merge remote-tracking branch 'upstream/master' into feature/ml-autosc…
benwtrent Nov 16, 2020
b181b40
fixing compilation
benwtrent Nov 16, 2020
94a969e
Merge remote-tracking branch 'upstream/master' into feature/ml-autosc…
benwtrent Nov 16, 2020
15a8a7b
addressing PR concerns
benwtrent Nov 16, 2020
6536073
Merge remote-tracking branch 'upstream/master' into feature/ml-autosc…
benwtrent Nov 16, 2020
ac8ca59
adding logging
benwtrent Nov 17, 2020
29adb17
fixing test
benwtrent Nov 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public static String jobTaskId(String jobId) {
return JOB_TASK_ID_PREFIX + jobId;
}

public static String jobId(String jobTaskId) {
return jobTaskId.substring(JOB_TASK_ID_PREFIX.length());
}

/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
Expand All @@ -61,6 +65,10 @@ public static String dataFrameAnalyticsTaskId(String id) {
return DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + id;
}

public static String dataFrameAnalyticsId(String taskId) {
return taskId.substring(DATA_FRAME_ANALYTICS_TASK_ID_PREFIX.length());
}

@Nullable
public static PersistentTasksCustomMetadata.PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetadata tasks) {
return tasks == null ? null : tasks.getTask(jobTaskId(jobId));
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugin/ml/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ esplugin {
description 'Elasticsearch Expanded Pack Plugin - Machine Learning'
classname 'org.elasticsearch.xpack.ml.MachineLearning'
hasNativeController true
extendedPlugins = ['x-pack-core', 'lang-painless']
extendedPlugins = ['x-pack-autoscaling', 'lang-painless']
}


Expand Down Expand Up @@ -50,6 +50,7 @@ dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts')
testImplementation project(path: xpackModule('ilm'), configuration: 'default')
compileOnly project(path: xpackModule('autoscaling'), configuration: 'default')
testImplementation project(path: xpackModule('data-streams'), configuration: 'default')
// This should not be here
testImplementation project(path: xpackModule('security'), configuration: 'testArtifacts')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
Expand Down Expand Up @@ -221,6 +223,8 @@
import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService;
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingNamedWritableProvider;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
Expand Down Expand Up @@ -422,7 +426,7 @@ public Set<DiscoveryNodeRole> getRoles() {
// controls the types of jobs that can be created, and each job alone is considerably smaller than what each node
// can handle.
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
/**
* This boolean value indicates if `max_machine_memory_percent` should be ignored and a automatic calculation is used instead.
*
Expand Down Expand Up @@ -478,6 +482,17 @@ public Set<DiscoveryNodeRole> getRoles() {
Property.NodeScope
);

/**
* This is the maximum possible node size for a machine learning node. It is useful when determining if a job could ever be opened
* on the cluster.
*
* If the value is the default special case of `0b`, that means the value is ignored when assigning jobs.
*/
public static final Setting<ByteSizeValue> MAX_ML_NODE_SIZE = Setting.byteSizeSetting(
"xpack.ml.max_ml_node_size",
ByteSizeValue.ZERO,
Setting.Property.NodeScope);
benwtrent marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger logger = LogManager.getLogger(MachineLearning.class);

private final Settings settings;
Expand All @@ -491,6 +506,7 @@ public Set<DiscoveryNodeRole> getRoles() {
private final SetOnce<ActionFilter> mlUpgradeModeActionFilter = new SetOnce<>();
private final SetOnce<CircuitBreaker> inferenceModelBreaker = new SetOnce<>();
private final SetOnce<ModelLoadingService> modelLoadingService = new SetOnce<>();
private final SetOnce<MlAutoscalingDeciderService> mlAutoscalingDeciderService = new SetOnce<>();

public MachineLearning(Settings settings, Path configPath) {
this.settings = settings;
Expand Down Expand Up @@ -527,7 +543,8 @@ public List<Setting<?>> getSettings() {
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
USE_AUTO_MACHINE_MEMORY_PERCENT
USE_AUTO_MACHINE_MEMORY_PERCENT,
MAX_ML_NODE_SIZE
);
}

Expand Down Expand Up @@ -765,6 +782,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
// Perform node startup operations
nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();

mlAutoscalingDeciderService.set(new MlAutoscalingDeciderService(memoryTracker, settings, clusterService));

return Arrays.asList(
mlLifeCycleService,
new MlControllerHolder(mlController),
Expand Down Expand Up @@ -1101,6 +1120,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
namedWriteables.addAll(new AnalysisStatsNamedWriteablesProvider().getNamedWriteables());
namedWriteables.addAll(MlEvaluationNamedXContentProvider.getNamedWriteables());
namedWriteables.addAll(new MlInferenceNamedXContentProvider().getNamedWriteables());
namedWriteables.addAll(MlAutoscalingNamedWritableProvider.getNamedWriteables());
return namedWriteables;
}

Expand Down Expand Up @@ -1131,4 +1151,13 @@ public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
assert circuitBreaker.getName().equals(TRAINED_MODEL_CIRCUIT_BREAKER_NAME);
this.inferenceModelBreaker.set(circuitBreaker);
}

public Collection<AutoscalingDeciderService<? extends AutoscalingDeciderConfiguration>> deciders() {
if (enabled) {
assert mlAutoscalingDeciderService.get() != null;
return List.of(mlAutoscalingDeciderService.get());
} else {
return List.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params,
maxOpenJobs,
Integer.MAX_VALUE,
maxMachineMemoryPercent,
maxNodeMemory,
isMemoryTrackerRecentlyRefreshed,
useAutoMemoryPercentage
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration;

import java.io.IOException;
import java.util.Objects;

public class MlAutoscalingDeciderConfiguration implements AutoscalingDeciderConfiguration {
static final String NAME = "ml";

private static final int DEFAULT_ANOMALY_JOBS_IN_QUEUE = 0;
private static final int DEFAULT_ANALYTICS_JOBS_IN_QUEUE = 0;

private static final ParseField NUM_ANOMALY_JOBS_IN_QUEUE = new ParseField("num_anomaly_jobs_in_queue");
private static final ParseField NUM_ANALYTICS_JOBS_IN_QUEUE = new ParseField("num_analytics_jobs_in_queue");
private static final ParseField DOWN_SCALE_DELAY = new ParseField("down_scale_delay");

private static final ObjectParser<MlAutoscalingDeciderConfiguration.Builder, Void> PARSER = new ObjectParser<>(NAME,
MlAutoscalingDeciderConfiguration.Builder::new);

static {
PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnomalyJobsInQueue, NUM_ANOMALY_JOBS_IN_QUEUE);
PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnalyticsJobsInQueue, NUM_ANALYTICS_JOBS_IN_QUEUE);
PARSER.declareString(MlAutoscalingDeciderConfiguration.Builder::setDownScaleDelay, DOWN_SCALE_DELAY);
}

public static MlAutoscalingDeciderConfiguration parse(final XContentParser parser) {
return PARSER.apply(parser, null).build();
}

private final int numAnomalyJobsInQueue;
private final int numAnalyticsJobsInQueue;
private final TimeValue downScaleDelay;

MlAutoscalingDeciderConfiguration(int numAnomalyJobsInQueue, int numAnalyticsJobsInQueue, TimeValue downScaleDelay) {
if (numAnomalyJobsInQueue < 0) {
throw new IllegalArgumentException("[" + NUM_ANOMALY_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative");
}
if (numAnalyticsJobsInQueue < 0) {
throw new IllegalArgumentException("[" + NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative");
}
this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue;
this.numAnomalyJobsInQueue = numAnomalyJobsInQueue;
this.downScaleDelay = downScaleDelay;
}

public MlAutoscalingDeciderConfiguration(StreamInput in) throws IOException {
numAnomalyJobsInQueue = in.readVInt();
numAnalyticsJobsInQueue = in.readVInt();
downScaleDelay = in.readTimeValue();
}

@Override
public String name() {
return NAME;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numAnomalyJobsInQueue);
out.writeVInt(numAnalyticsJobsInQueue);
out.writeTimeValue(downScaleDelay);
}

public int getNumAnomalyJobsInQueue() {
return numAnomalyJobsInQueue;
}

public int getNumAnalyticsJobsInQueue() {
return numAnalyticsJobsInQueue;
}

public TimeValue getDownScaleDelay() {
return downScaleDelay;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MlAutoscalingDeciderConfiguration that = (MlAutoscalingDeciderConfiguration) o;
return numAnomalyJobsInQueue == that.numAnomalyJobsInQueue &&
numAnalyticsJobsInQueue == that.numAnalyticsJobsInQueue &&
Objects.equals(downScaleDelay, that.downScaleDelay);
}

@Override
public int hashCode() {
return Objects.hash(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NUM_ANOMALY_JOBS_IN_QUEUE .getPreferredName(), numAnomalyJobsInQueue);
builder.field(NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName(), numAnalyticsJobsInQueue);
builder.field(DOWN_SCALE_DELAY.getPreferredName(), downScaleDelay.getStringRep());
builder.endObject();
return builder;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {

private int numAnomalyJobsInQueue = DEFAULT_ANOMALY_JOBS_IN_QUEUE;
private int numAnalyticsJobsInQueue = DEFAULT_ANALYTICS_JOBS_IN_QUEUE;
private TimeValue downScaleDelay = TimeValue.ZERO;

public Builder setNumAnomalyJobsInQueue(int numAnomalyJobsInQueue) {
this.numAnomalyJobsInQueue = numAnomalyJobsInQueue;
return this;
}

public Builder setNumAnalyticsJobsInQueue(int numAnalyticsJobsInQueue) {
this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue;
return this;
}

Builder setDownScaleDelay(String unparsedTimeValue) {
return setDownScaleDelay(TimeValue.parseTimeValue(unparsedTimeValue, DOWN_SCALE_DELAY.getPreferredName()));
}

public Builder setDownScaleDelay(TimeValue downScaleDelay) {
this.downScaleDelay = downScaleDelay;
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

public MlAutoscalingDeciderConfiguration build() {
return new MlAutoscalingDeciderConfiguration(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay);
}
}

}
Loading