Skip to content

Commit

Permalink
Add processor scaling setting
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Aug 8, 2023
1 parent 36d60e2 commit 9388fc4
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,26 @@ public void loadExtensions(ExtensionLoader loader) {

public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors_double";

/**
* For the NLP model assignment planner.
* The {@link #ALLOCATED_PROCESSORS_NODE_ATTR} attribute may be
* measured in hyper-threaded or virtual cores when the user
* would like the planner to consider logical cores.
*
* ALLOCATED_PROCESSORS_NODE_ATTR is divided by this setting,
* the default value of 1 means the attribute is unchanged, a value
* of 2 accounts for hyper-threaded cores with 2 threads per core.
* Increasing this setting above 1 reduces the number of model
* allocations that can be deployed on a node.
*/
public static final Setting<Integer> ALLOCATED_PROCESSORS_SCALE = Setting.intSetting(
"ml.allocated_processors_scale",
1,
1,
Property.OperatorDynamic,
Property.NodeScope
);

public static final String ML_CONFIG_VERSION_NODE_ATTR = MlConfigVersion.ML_CONFIG_VERSION_NODE_ATTR;

public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState);
final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes);
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
mlContext.mlNodes,
configuration
);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
.setCurrentMlCapacity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public static void getMlAutoscalingStats(
long modelMemoryAvailableFirstNode = mlNodes.length > 0
? NativeMemoryCalculator.allowedBytesForMl(clusterState.nodes().get(mlNodes[0]), settings).orElse(0L)
: 0L;
int processorsAvailableFirstNode = mlNodes.length > 0 ? MlProcessors.get(clusterState.nodes().get(mlNodes[0])).roundDown() : 0;
int processorsAvailableFirstNode = mlNodes.length > 0
? MlProcessors.get(clusterState.nodes().get(mlNodes[0]), settings).roundDown()
: 0;

// Todo: MAX_LOW_PRIORITY_MODELS_PER_NODE not checked yet
int maxOpenJobsPerNode = MAX_OPEN_JOBS_PER_NODE.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
}
// We should keep this check here as well as in the processor decider while cloud is not
// reacting to processor autoscaling.
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) {
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
return null;
}
Expand Down Expand Up @@ -815,11 +815,15 @@ static MlMemoryAutoscalingCapacity ensureScaleDown(
return newCapacity;
}

static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAssignment> assignments, List<DiscoveryNode> mlNodes) {
static boolean modelAssignmentsRequireMoreThanHalfCpu(
Collection<TrainedModelAssignment> assignments,
List<DiscoveryNode> mlNodes,
Settings settings
) {
int totalRequiredProcessors = assignments.stream()
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
.sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node).roundUp()).sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
return totalRequiredProcessors * 2 > totalMlProcessors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
).build();
}

final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes);
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);

final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();

Expand All @@ -64,7 +64,8 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD

if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
trainedModelAssignmentMetadata.allAssignments().values(),
mlContext.mlNodes
mlContext.mlNodes,
configuration
)) {
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
Expand Down Expand Up @@ -136,11 +137,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo
);
}

MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) {
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
Processors maxNodeProcessors = Processors.ZERO;
Processors tierProcessors = Processors.ZERO;
for (DiscoveryNode node : mlNodes) {
Processors nodeProcessors = MlProcessors.get(node);
Processors nodeProcessors = MlProcessors.get(node, settings);
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
maxNodeProcessors = nodeProcessors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
modelToAdd
);
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
if (modelToAdd.isPresent()) {
checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
Expand Down Expand Up @@ -63,7 +64,7 @@ class TrainedModelAssignmentRebalancer {
this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd);
}

TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) {
throw new ResourceAlreadyExistsException(
"[{}] assignment for deployment with model [{}] already exists",
Expand All @@ -77,8 +78,8 @@ TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata);
}

AssignmentPlan assignmentPlan = computeAssignmentPlan();
return buildAssignmentsFromPlan(assignmentPlan);
AssignmentPlan assignmentPlan = computeAssignmentPlan(settings);
return buildAssignmentsFromPlan(assignmentPlan, settings);
}

private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
Expand All @@ -91,8 +92,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
return true;
}

AssignmentPlan computeAssignmentPlan() {
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap();
AssignmentPlan computeAssignmentPlan(Settings settings) {
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap(settings);
final Set<String> assignableNodeIds = nodesByZone.values()
.stream()
.flatMap(List::stream)
Expand Down Expand Up @@ -270,7 +271,7 @@ private Map<String, Integer> findFittingAssignments(
return fittingAssignments;
}

private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settings settings) {
return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
Collection<DiscoveryNode> discoveryNodes = e.getValue();
List<AssignmentPlan.Node> nodes = new ArrayList<>();
Expand All @@ -284,7 +285,7 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
// We subtract native inference memory as the planner expects available memory for
// native inference including current assignments.
getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
MlProcessors.get(discoveryNode).roundUp()
MlProcessors.get(discoveryNode, settings).roundUp()
)
);
} else {
Expand All @@ -304,7 +305,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(
return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
}

private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) {
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) {
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty();
for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) {
TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id());
Expand Down Expand Up @@ -342,7 +343,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
}
assignmentBuilder.calculateAndSetAssignmentState();

explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason);
explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason);
builder.addNewAssignment(deployment.id(), assignmentBuilder);
}
return builder;
Expand All @@ -351,7 +352,8 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
private Optional<String> explainAssignments(
AssignmentPlan assignmentPlan,
Map<DiscoveryNode, NodeLoad> nodeLoads,
AssignmentPlan.Deployment deployment
AssignmentPlan.Deployment deployment,
Settings settings
) {
if (assignmentPlan.satisfiesAllocations(deployment)) {
return Optional.empty();
Expand All @@ -363,7 +365,7 @@ private Optional<String> explainAssignments(

Map<String, String> nodeToReason = new TreeMap<>();
for (Map.Entry<DiscoveryNode, NodeLoad> nodeAndLoad : nodeLoads.entrySet()) {
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment);
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings);
reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s));
}

Expand All @@ -382,7 +384,8 @@ private Optional<String> explainAssignment(
AssignmentPlan assignmentPlan,
DiscoveryNode node,
NodeLoad load,
AssignmentPlan.Deployment deployment
AssignmentPlan.Deployment deployment,
Settings settings
) {
if (Strings.isNullOrEmpty(load.getError()) == false) {
return Optional.of(load.getError());
Expand All @@ -395,7 +398,7 @@ private Optional<String> explainAssignment(
// But we should also check if we managed to assign a model during the rebalance for which
// we check if the node has used up any of its allocated processors.
boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node).roundUp();
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp();
long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor
? 0
: MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
Expand Down Expand Up @@ -424,7 +427,7 @@ private Optional<String> explainAssignment(
"This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
+ "processors required for each allocation of this model [{}]",
new Object[] {
MlProcessors.get(node).roundUp(),
MlProcessors.get(node, settings).roundUp(),
assignmentPlan.getRemainingNodeCores(node.getId()),
deployment.threadsPerAllocation() }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
package org.elasticsearch.xpack.ml.utils;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.xpack.ml.MachineLearning;

public final class MlProcessors {

private MlProcessors() {}

public static Processors get(DiscoveryNode node) {
public static Processors get(DiscoveryNode node, Settings settings) {
// Try getting the most modern setting, and if that's null then instead get the older setting. (If both are null then return zero.)
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
if (allocatedProcessorsString == null) {
Expand All @@ -26,7 +27,19 @@ public static Processors get(DiscoveryNode node) {
}
try {
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
if (processorsAsDouble <= 0) {
return Processors.ZERO;
}

Integer scale = null;
if (settings != null) {
scale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
}
if (scale != null) {
processorsAsDouble = processorsAsDouble / scale;
}
return Processors.of(processorsAsDouble);

} catch (NumberFormatException e) {
assert e == null
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2")
withMlNodes("ml_node_1", "ml_node_2"),
Settings.EMPTY
)
);
assertTrue(
Expand Down Expand Up @@ -1110,7 +1111,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2")
withMlNodes("ml_node_1", "ml_node_2"),
Settings.EMPTY
)
);
assertFalse(
Expand Down Expand Up @@ -1141,7 +1143,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4")
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4"),
Settings.EMPTY
)
);
}
Expand Down
Loading

0 comments on commit 9388fc4

Please sign in to comment.