Skip to content

Commit

Permalink
WIP main done
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzaharovits committed Sep 3, 2024
1 parent a94a6ae commit 46b3c21
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
Expand Down Expand Up @@ -134,7 +135,7 @@ protected void masterOperation(
);
final String trialSourceIndexName = trialRolloverNames.sourceName();
final String trialRolloverIndexName = trialRolloverNames.rolloverName();
MetadataRolloverService.validateIndexName(clusterState, trialRolloverIndexName);
MetadataCreateIndexService.validateIndexName(trialRolloverIndexName, clusterState.metadata(), clusterState.routingTable());

assert metadata.dataStreams().containsKey(rolloverRequest.getRolloverTarget()) : "Auto-rollover applies only to data streams";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@ public RolloverResult rolloverClusterState(
};
}

public static void validateIndexName(ClusterState state, String index) {
MetadataCreateIndexService.validateIndexName(index, state);
}

/**
* Returns the names that rollover would use, but does not perform the actual rollover
*/
Expand Down Expand Up @@ -252,7 +248,8 @@ private RolloverResult rolloverAlias(
final Boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.exists(createIndexRequest.settings())
? IndexMetadata.INDEX_HIDDEN_SETTING.get(createIndexRequest.settings())
: null;
MetadataCreateIndexService.validateIndexName(rolloverIndexName, currentState); // fails if the index already exists
MetadataCreateIndexService.validateIndexName(rolloverIndexName, metadata, currentState.routingTable()); // fails if the index
// already exists
checkNoDuplicatedAliasInIndexTemplate(metadata, rolloverIndexName, aliasName, isHidden);
if (onlyValidate) {
return new RolloverResult(rolloverIndexName, sourceIndexName, currentState);
Expand Down Expand Up @@ -328,7 +325,8 @@ private RolloverResult rolloverDataStream(
final Tuple<String, Long> nextIndexAndGeneration = dataStream.nextWriteIndexAndGeneration(metadata, dataStreamIndices);
final String newWriteIndexName = nextIndexAndGeneration.v1();
final long newGeneration = nextIndexAndGeneration.v2();
MetadataCreateIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists
MetadataCreateIndexService.validateIndexName(newWriteIndexName, metadata, currentState.routingTable()); // fails if the index
// already exists
if (onlyValidate) {
return new RolloverResult(newWriteIndexName, isLazyCreation ? NON_EXISTENT_SOURCE : originalWriteIndex.getName(), currentState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
Expand Down Expand Up @@ -179,7 +180,7 @@ protected void masterOperation(
);
final String trialSourceIndexName = trialRolloverNames.sourceName();
final String trialRolloverIndexName = trialRolloverNames.rolloverName();
MetadataRolloverService.validateIndexName(clusterState, trialRolloverIndexName);
MetadataCreateIndexService.validateIndexName(trialRolloverIndexName, metadata, clusterState.routingTable());

boolean isDataStream = metadata.dataStreams().containsKey(rolloverRequest.getRolloverTarget());
if (rolloverRequest.isLazy()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ private ClusterState openIndices(final Index[] indices, final ClusterState curre
}
}

shardLimitValidator.validateShardLimit(currentState, indices);
shardLimitValidator.validateShardLimit(currentState.nodes(), currentState.metadata(), indices);
if (indicesToOpen.isEmpty()) {
return currentState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,12 @@ ClusterState execute(ClusterState currentState) {
final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
if (preserveExisting == false) {
// Verify that this won't take us over the cluster shard limit.
shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);
shardLimitValidator.validateShardLimitOnReplicaUpdate(
currentState.nodes(),
currentState.metadata(),
request.indices(),
updatedNumberOfReplicas
);

/*
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

package org.elasticsearch.health.node;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -124,8 +125,18 @@ public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResources
var shardLimitsMetadata = healthMetadata.getShardLimitsMetadata();
return mergeIndicators(
verbose,
calculateFrom(shardLimitsMetadata.maxShardsPerNode(), state, ShardLimitValidator::checkShardLimitForNormalNodes),
calculateFrom(shardLimitsMetadata.maxShardsPerNodeFrozen(), state, ShardLimitValidator::checkShardLimitForFrozenNodes)
calculateFrom(
shardLimitsMetadata.maxShardsPerNode(),
state.nodes(),
state.metadata(),
ShardLimitValidator::checkShardLimitForNormalNodes
),
calculateFrom(
shardLimitsMetadata.maxShardsPerNodeFrozen(),
state.nodes(),
state.metadata(),
ShardLimitValidator::checkShardLimitForFrozenNodes
)
);
}

Expand Down Expand Up @@ -173,13 +184,18 @@ private HealthIndicatorResult mergeIndicators(boolean verbose, StatusResult data
);
}

static StatusResult calculateFrom(int maxShardsPerNodeSetting, ClusterState state, ShardsCapacityChecker checker) {
var result = checker.check(maxShardsPerNodeSetting, 5, 1, state);
static StatusResult calculateFrom(
int maxShardsPerNodeSetting,
DiscoveryNodes discoveryNodes,
Metadata metadata,
ShardsCapacityChecker checker
) {
var result = checker.check(maxShardsPerNodeSetting, 5, 1, discoveryNodes, metadata);
if (result.canAddShards() == false) {
return new StatusResult(HealthStatus.RED, result);
}

result = checker.check(maxShardsPerNodeSetting, 10, 1, state);
result = checker.check(maxShardsPerNodeSetting, 10, 1, discoveryNodes, metadata);
if (result.canAddShards() == false) {
return new StatusResult(HealthStatus.YELLOW, result);
}
Expand Down Expand Up @@ -225,6 +241,12 @@ record StatusResult(HealthStatus status, ShardLimitValidator.Result result) {}

@FunctionalInterface
interface ShardsCapacityChecker {
ShardLimitValidator.Result check(int maxConfiguredShardsPerNode, int numberOfNewShards, int replicas, ClusterState state);
ShardLimitValidator.Result check(
int maxConfiguredShardsPerNode,
int numberOfNewShards,
int replicas,
DiscoveryNodes discoveryNodes,
Metadata metadata
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ public ClusterState execute(ClusterState currentState) {
if (currentIndexMetadata == null) {
// Index doesn't exist - create it and start recovery
// Make sure that the index we are about to create has a valid name
ensureValidIndexName(currentState, snapshotIndexMetadata, renamedIndexName);
ensureValidIndexName(currentState.metadata(), currentState.routingTable(), snapshotIndexMetadata, renamedIndexName);
shardLimitValidator.validateShardLimit(
snapshotIndexMetadata.getSettings(),
currentState.nodes(),
Expand Down Expand Up @@ -1793,9 +1793,14 @@ private static IndexMetadata.Builder restoreOverClosedIndex(
return indexMdBuilder;
}

private void ensureValidIndexName(ClusterState currentState, IndexMetadata snapshotIndexMetadata, String renamedIndexName) {
private void ensureValidIndexName(
Metadata metadata,
RoutingTable routingTable,
IndexMetadata snapshotIndexMetadata,
String renamedIndexName
) {
final boolean isHidden = snapshotIndexMetadata.isHidden();
MetadataCreateIndexService.validateIndexName(renamedIndexName, currentState);
MetadataCreateIndexService.validateIndexName(renamedIndexName, metadata, routingTable);
createIndexService.validateDotIndex(renamedIndexName, isHidden);
createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetadata.getSettings(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,10 @@ public void testCalculateMethods() {
maxConfiguredShardsPerNode,
numberOfNewShards,
replicas,
state) -> {
assertEquals(mockedState, state);
discoveryNodes,
metadata) -> {
assertEquals(mockedState.nodes(), discoveryNodes);
assertEquals(mockedState.metadata(), metadata);
assertEquals(randomMaxShardsPerNodeSetting, maxConfiguredShardsPerNode);
return new ShardLimitValidator.Result(
numberOfNewShards != shardsToAdd && replicas == 1,
Expand All @@ -353,13 +355,19 @@ public void testCalculateMethods() {
);
};

assertEquals(calculateFrom(randomMaxShardsPerNodeSetting, mockedState, checkerWrapper.apply(5)).status(), RED);
assertEquals(calculateFrom(randomMaxShardsPerNodeSetting, mockedState, checkerWrapper.apply(10)).status(), YELLOW);
assertEquals(
calculateFrom(randomMaxShardsPerNodeSetting, mockedState.nodes(), mockedState.metadata(), checkerWrapper.apply(5)).status(),
RED
);
assertEquals(
calculateFrom(randomMaxShardsPerNodeSetting, mockedState.nodes(), mockedState.metadata(), checkerWrapper.apply(10)).status(),
YELLOW
);

// Let's cover the holes :)
Stream.of(randomIntBetween(1, 4), randomIntBetween(6, 9), randomIntBetween(11, Integer.MAX_VALUE))
.map(checkerWrapper)
.map(checker -> calculateFrom(randomMaxShardsPerNodeSetting, mockedState, checker))
.map(checker -> calculateFrom(randomMaxShardsPerNodeSetting, mockedState.nodes(), mockedState.metadata(), checker))
.map(ShardsCapacityHealthIndicatorService.StatusResult::status)
.forEach(status -> assertEquals(status, GREEN));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ public void testValidateShardLimitUpdateReplicas() {

final Index[] indices = getIndices(state);
final ShardLimitValidator shardLimitValidator = createTestShardLimitService(shardsPerNode, group);
shardLimitValidator.validateShardLimitOnReplicaUpdate(state, indices, nodesInCluster - 1);
shardLimitValidator.validateShardLimitOnReplicaUpdate(state.nodes(), state.metadata(), indices, nodesInCluster - 1);

ValidationException exception = expectThrows(
ValidationException.class,
() -> shardLimitValidator.validateShardLimitOnReplicaUpdate(state, indices, nodesInCluster)
() -> shardLimitValidator.validateShardLimitOnReplicaUpdate(state.nodes(), state.metadata(), indices, nodesInCluster)
);
assertEquals(
"Validation Failed: 1: this action would add ["
Expand Down

0 comments on commit 46b3c21

Please sign in to comment.