diff --git a/docs/changelog/84242.yaml b/docs/changelog/84242.yaml new file mode 100644 index 0000000000000..5aa98fbd2babc --- /dev/null +++ b/docs/changelog/84242.yaml @@ -0,0 +1,6 @@ +pr: 84242 +summary: Allow autoscaling to work when vertical scaling is possible +area: Machine Learning +type: bug +issues: + - 84198 diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java index da3d5a8f8800e..7675c63daf0c1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilders; @@ -62,6 +63,7 @@ import org.elasticsearch.xpack.ml.inference.allocation.TrainedModelAllocationService; import org.elasticsearch.xpack.ml.inference.persistence.ChunkedTrainedModelRestorer; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelDefinitionDoc; +import org.elasticsearch.xpack.ml.job.NodeLoadDetector; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import java.util.Collections; @@ -70,6 +72,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -89,6 +92,7 @@ public class TransportStartTrainedModelDeploymentAction extends TransportMasterN private final NamedXContentRegistry xContentRegistry; private final MlMemoryTracker memoryTracker; protected volatile int maxLazyMLNodes; + protected volatile long maxMLNodeSize; @Inject public TransportStartTrainedModelDeploymentAction( @@ -121,13 +125,19 @@ public TransportStartTrainedModelDeploymentAction( this.memoryTracker = Objects.requireNonNull(memoryTracker); this.trainedModelAllocationService = Objects.requireNonNull(trainedModelAllocationService); this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings); + this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes(); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_ML_NODE_SIZE, this::setMaxMLNodeSize); } private void setMaxLazyMLNodes(int value) { this.maxLazyMLNodes = value; } + private void setMaxMLNodeSize(ByteSizeValue value) { + this.maxMLNodeSize = value.getBytes(); + } + @Override protected void masterOperation( Task task, @@ -241,7 +251,7 @@ private void waitForDeploymentState( AllocationStatus.State state, ActionListener listener ) { - DeploymentStartedPredicate predicate = new DeploymentStartedPredicate(modelId, state, maxLazyMLNodes); + DeploymentStartedPredicate predicate = new DeploymentStartedPredicate(modelId, state, maxLazyMLNodes, maxMLNodeSize); trainedModelAllocationService.waitForAllocationCondition( modelId, predicate, @@ -402,11 +412,13 @@ private static class DeploymentStartedPredicate implements Predicate nodesShuttingDown.contains(d.getId()) == false) .filter(TaskParams::mayAllocateToNode) .collect(Collectors.toList()); + OptionalLong smallestMLNode = nodes.stream().map(NodeLoadDetector::getNodeSize).flatMapToLong(OptionalLong::stream).min(); // No nodes allocated at all! - if (nodesAndState.isEmpty() && maxLazyMLNodes <= nodes.size()) { + if (nodesAndState.isEmpty() + // We cannot scale horizontally + && maxLazyMLNodes <= nodes.size() + // We cannot scale vertically + && (smallestMLNode.isEmpty() || smallestMLNode.getAsLong() >= maxMLNodeSize)) { String msg = "Could not start deployment because no suitable nodes were found, allocation explanation [" + trainedModelAllocation.getReason() + "]"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index 1c7d34f83b06b..359ebca8facb6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -27,6 +27,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.TreeMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -272,7 +273,8 @@ public PersistentTasksCustomMetadata.Assignment selectNode( reasons.values(), maxNodeSize > 0L ? NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage) - : Long.MAX_VALUE + : Long.MAX_VALUE, + maxNodeSize ); } @@ -280,18 +282,19 @@ PersistentTasksCustomMetadata.Assignment createAssignment( long estimatedMemoryUsage, DiscoveryNode minLoadedNode, Collection reasons, - long biggestPossibleJob + long mostAvailableMemoryForML, + long maxNodeSize ) { if (minLoadedNode == null) { String explanation = String.join("|", reasons); PersistentTasksCustomMetadata.Assignment currentAssignment = new PersistentTasksCustomMetadata.Assignment(null, explanation); logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation); - if ((MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage) > biggestPossibleJob) { + if ((MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage) > mostAvailableMemoryForML) { ParameterizedMessage message = new ParameterizedMessage( "[{}] not waiting for node assignment as estimated job size [{}] is greater than largest possible job size [{}]", jobId, MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage, - biggestPossibleJob + mostAvailableMemoryForML ); logger.info(message); List newReasons = new ArrayList<>(reasons); @@ -299,13 +302,16 @@ PersistentTasksCustomMetadata.Assignment createAssignment( explanation = String.join("|", newReasons); return new PersistentTasksCustomMetadata.Assignment(null, explanation); } - return considerLazyAssignment(currentAssignment); + return considerLazyAssignment(currentAssignment, maxNodeSize); } logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId); return new PersistentTasksCustomMetadata.Assignment(minLoadedNode.getId(), ""); } - PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksCustomMetadata.Assignment currentAssignment) { + PersistentTasksCustomMetadata.Assignment considerLazyAssignment( + PersistentTasksCustomMetadata.Assignment currentAssignment, + long maxNodeSize + ) { assert currentAssignment.getExecutorNode() == null; @@ -316,10 +322,21 @@ PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksC } } + // Can we scale horizontally? if (numMlNodes < maxLazyNodes) { // Means we have lazy nodes left to allocate return AWAITING_LAZY_ASSIGNMENT; } - + // Can we scale vertically and is scaling possible? + if (maxNodeSize > 0L && maxLazyNodes > 0) { + OptionalLong smallestMLNode = candidateNodes.stream() + .filter(MachineLearning::isMlNode) + .map(NodeLoadDetector::getNodeSize) + .flatMapToLong(OptionalLong::stream) + .min(); + if (smallestMLNode.isPresent() && smallestMLNode.getAsLong() < maxNodeSize) { + return AWAITING_LAZY_ASSIGNMENT; + } + } return currentAssignment; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java index 2a8ef46d8e5c9..8652f9bbda37c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java @@ -29,10 +29,27 @@ import java.util.OptionalLong; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.ml.MachineLearning.MACHINE_MEMORY_NODE_ATTR; + public class NodeLoadDetector { private final MlMemoryTracker mlMemoryTracker; + /** + * Returns the node's total memory size. + * @param node The node whose size to grab + * @return maybe the answer, will be empty if size cannot be determined + */ + public static OptionalLong getNodeSize(DiscoveryNode node) { + String memoryString = node.getAttributes().get(MACHINE_MEMORY_NODE_ATTR); + try { + return OptionalLong.of(Long.parseLong(memoryString)); + } catch (NumberFormatException e) { + assert e == null : "ml.machine_memory should parse because we set it internally: invalid value was " + memoryString; + return OptionalLong.empty(); + } + } + public NodeLoadDetector(MlMemoryTracker memoryTracker) { this.mlMemoryTracker = memoryTracker; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index 38fd059691334..02b33d3c6c16c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -1009,7 +1009,8 @@ public void testConsiderLazyAssignmentWithNoLazyNodes() { node -> nodeFilter(node, job) ); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment( - new PersistentTasksCustomMetadata.Assignment(null, "foo") + new PersistentTasksCustomMetadata.Assignment(null, "foo"), + ByteSizeValue.ofGb(1).getBytes() ); assertEquals("foo", result.getExplanation()); assertNull(result.getExecutorNode()); @@ -1053,7 +1054,53 @@ public void testConsiderLazyAssignmentWithLazyNodes() { node -> nodeFilter(node, job) ); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment( - new PersistentTasksCustomMetadata.Assignment(null, "foo") + new PersistentTasksCustomMetadata.Assignment(null, "foo"), + ByteSizeValue.ofGb(1).getBytes() + ); + assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), result.getExplanation()); + assertNull(result.getExecutorNode()); + } + + public void testConsiderLazyAssignmentWithFilledLazyNodesAndVerticalScale() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add( + new DiscoveryNode( + "_node_name1", + "_node_id1", + new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(ByteSizeValue.ofGb(1).getBytes())), + ROLES_WITH_ML, + Version.CURRENT + ) + ) + .add( + new DiscoveryNode( + "_node_name2", + "_node_id2", + new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(ByteSizeValue.ofGb(1).getBytes())), + ROLES_WITH_ML, + Version.CURRENT + ) + ) + .build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, + memoryTracker, + randomIntBetween(1, 3), + node -> nodeFilter(node, job) + ); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment( + new PersistentTasksCustomMetadata.Assignment(null, "foo"), + ByteSizeValue.ofGb(64).getBytes() ); assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), result.getExplanation()); assertNull(result.getExecutorNode());