diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/AllocationBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/AllocationBenchmark.java index 4472add51c59a..c89bf3d1b577c 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/AllocationBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/AllocationBenchmark.java @@ -31,16 +31,6 @@ package org.opensearch.benchmark.routing.allocation; -import org.opensearch.Version; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.common.settings.Settings; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -52,8 +42,20 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.common.settings.Settings; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; @Fork(3) @@ -71,75 +73,103 @@ public class AllocationBenchmark { // support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would // need its own main method and we cannot execute more than one class with a main method per JAR. @Param({ - // indices| shards| replicas| nodes - " 10| 1| 0| 1", - " 10| 3| 0| 1", - " 10| 10| 0| 1", - " 100| 1| 0| 1", - " 100| 3| 0| 1", - " 100| 10| 0| 1", - - " 10| 1| 0| 10", - " 10| 3| 0| 10", - " 10| 10| 0| 10", - " 100| 1| 0| 10", - " 100| 3| 0| 10", - " 100| 10| 0| 10", - - " 10| 1| 1| 10", - " 10| 3| 1| 10", - " 10| 10| 1| 10", - " 100| 1| 1| 10", - " 100| 3| 1| 10", - " 100| 10| 1| 10", - - " 10| 1| 2| 10", - " 10| 3| 2| 10", - " 10| 10| 2| 10", - " 100| 1| 2| 10", - " 100| 3| 2| 10", - " 100| 10| 2| 10", - - " 10| 1| 0| 50", - " 10| 3| 0| 50", - " 10| 10| 0| 50", - " 100| 1| 0| 50", - " 100| 3| 0| 50", - " 100| 10| 0| 50", - - " 10| 1| 1| 50", - " 10| 3| 1| 50", - " 10| 10| 1| 50", - " 100| 1| 1| 50", - " 100| 3| 1| 50", - " 100| 10| 1| 50", - - " 10| 1| 2| 50", - " 10| 3| 2| 50", - " 10| 10| 2| 50", - " 100| 1| 2| 50", - " 100| 3| 2| 50", - " 100| 10| 2| 50" }) - public String indicesShardsReplicasNodes = "10|1|0|1"; + // indices| shards| replicas| source| target| concurrentRecoveries + " 10| 2| 0| 1| 1| 1|", + " 10| 3| 0| 1| 1| 2|", + " 10| 10| 0| 1| 1| 5|", + " 100| 1| 0| 1| 1| 10|", + " 100| 3| 0| 1| 1| 10|", + " 100| 10| 0| 1| 1| 10|", + + " 10| 2| 0| 10| 10| 1|", + " 10| 3| 0| 10| 5| 2|", + " 10| 10| 0| 10| 5| 5|", + " 100| 1| 0| 5| 10| 5|", + " 100| 3| 0| 10| 5| 5|", + " 100| 10| 0| 10| 20| 6|", + + " 10| 1| 1| 10| 10| 1|", + " 10| 3| 1| 10| 3| 3|", + " 10| 10| 1| 5| 12| 5|", + " 100| 1| 1| 10| 10| 6|", + " 100| 3| 1| 10| 5| 8|", + " 100| 10| 1| 8| 17| 8|", + + " 10| 1| 2| 10| 10| 1|", + " 10| 3| 2| 10| 5| 3|", + " 10| 10| 2| 5| 10| 5|", + " 100| 1| 2| 10| 8| 7|", + " 100| 3| 2| 13| 17| 5|", + " 100| 10| 2| 10| 20| 8|", + + " 10| 2| 1| 20| 20| 1|", + " 10| 3| 1| 20| 30| 1|", + " 10| 10| 1| 20| 10| 3|", + " 100| 1| 1| 20| 5| 5|", + " 100| 3| 1| 20| 23| 6|", + " 100| 10| 1| 40| 20| 8|", + + " 10| 3| 2| 50| 30| 1|", + " 10| 3| 2| 50| 25| 1|", + " 10| 10| 1| 50| 33| 2|", + " 100| 1| 1| 40| 50| 2|", + " 100| 3| 1| 50| 70| 3|", + " 100| 10| 1| 60| 50| 3|", + + " 10| 10| 2| 50| 50| 1|", + " 10| 3| 2| 50| 30| 1|", + " 10| 10| 2| 50| 40| 2|", + " 100| 1| 2| 40| 50| 2|", + " 100| 3| 2| 50| 30| 6|", + " 100| 10| 2| 33| 55| 6|", + + " 500| 60| 1| 100| 100| 12|", + " 500| 60| 1| 100| 40| 12|", + " 500| 60| 1| 40| 100| 12|", + + " 50| 60| 1| 100| 100| 6|", + " 50| 60| 1| 100| 40| 6|", + " 50| 60| 1| 40| 100| 6|" }) + public String indicesShardsReplicasSourceTargetRecoveries = "10|1|0|1|1|1"; public int numTags = 2; + public int numZone = 3; + public int concurrentRecoveries; + public int numIndices; + public int numShards; + public int numReplicas; + public int sourceNodes; + public int targetNodes; + public int clusterConcurrentRecoveries; - private AllocationService strategy; + private AllocationService initialClusterStrategy; + private AllocationService clusterExcludeStrategy; + private AllocationService clusterZoneAwareExcludeStrategy; private ClusterState initialClusterState; @Setup public void setUp() throws Exception { - final String[] params = indicesShardsReplicasNodes.split("\\|"); + final String[] params = indicesShardsReplicasSourceTargetRecoveries.split("\\|"); + numIndices = toInt(params[0]); + numShards = toInt(params[1]); + numReplicas = toInt(params[2]); + sourceNodes = toInt(params[3]); + targetNodes = toInt(params[4]); + concurrentRecoveries = toInt(params[5]); - int numIndices = toInt(params[0]); - int numShards = toInt(params[1]); - int numReplicas = toInt(params[2]); - int numNodes = toInt(params[3]); + int totalShardCount = (numReplicas + 1) * numShards * numIndices; - strategy = Allocators.createAllocationService( - Settings.builder().put("cluster.routing.allocation.awareness.attributes", "tag").build() + initialClusterStrategy = Allocators.createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.node_concurrent_recoveries", "20") + .put("cluster.routing.allocation.exclude.tag", "tag_0") + .build() ); + // We'll try to move nodes from tag_1 to tag_0 + clusterConcurrentRecoveries = Math.min(sourceNodes, targetNodes) * concurrentRecoveries; + Metadata.Builder mb = Metadata.builder(); for (int i = 1; i <= numIndices; i++) { mb.put( @@ -155,15 +185,37 @@ public void setUp() throws Exception { rb.addAsNew(metadata.index("test_" + i)); } RoutingTable routingTable = rb.build(); - DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); - for (int i = 1; i <= numNodes; i++) { - nb.add(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags)))); - } initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metadata(metadata) .routingTable(routingTable) - .nodes(nb) + .nodes(setUpClusterNodes(sourceNodes, targetNodes)) .build(); + // Start all unassigned shards + initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute"); + while (initialClusterState.getRoutingNodes().hasUnassignedShards()) { + initialClusterState = initialClusterStrategy.applyStartedShards( + initialClusterState, + initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING) + ); + initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute"); + } + // Ensure all shards are started + while (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) { + initialClusterState = initialClusterStrategy.applyStartedShards( + initialClusterState, + initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING) + ); + } + + assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size() == totalShardCount); + assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 0); + assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size() == 0); + // make sure shards are only allocated on tag1 + for (ShardRouting startedShard : initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) { + assert (initialClusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals( + "tag_1" + ); + } } private int toInt(String v) { @@ -171,15 +223,58 @@ private int toInt(String v) { } @Benchmark - public ClusterState measureAllocation() { + public ClusterState measureExclusionOnZoneAwareStartedShard() throws Exception { ClusterState clusterState = initialClusterState; - while (clusterState.getRoutingNodes().hasUnassignedShards()) { - clusterState = strategy.applyStartedShards( + clusterZoneAwareExcludeStrategy = Allocators.createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries)) + .put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries)) + .put("cluster.routing.allocation.exclude.tag", "tag_1") + .build() + ); + clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute"); + return clusterState; + } + + @Benchmark + public ClusterState measureShardRelocationComplete() throws Exception { + ClusterState clusterState = initialClusterState; + clusterZoneAwareExcludeStrategy = Allocators.createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries)) + .put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries)) + .put("cluster.routing.allocation.exclude.tag", "tag_1") + .build() + ); + clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute"); + while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) { + clusterState = clusterZoneAwareExcludeStrategy.applyStartedShards( clusterState, clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING) ); - clusterState = strategy.reroute(clusterState, "reroute"); + } + for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) { + assert (clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals("tag_0"); } return clusterState; } + + private DiscoveryNodes.Builder setUpClusterNodes(int sourceNodes, int targetNodes) { + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); + for (int i = 1; i <= sourceNodes; i++) { + Map attributes = new HashMap<>(); + attributes.put("tag", "tag_" + 1); + attributes.put("zone", "zone_" + (i % numZone)); + nb.add(Allocators.newNode("node_s_" + i, attributes)); + } + for (int j = 1; j <= targetNodes; j++) { + Map attributes = new HashMap<>(); + attributes.put("tag", "tag_" + 0); + attributes.put("zone", "zone_" + (j % numZone)); + nb.add(Allocators.newNode("node_t_" + j, attributes)); + } + return nb; + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java new file mode 100644 index 0000000000000..10e2514ef7455 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin; + +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.NodesInfoAction; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.opensearch.action.admin.indices.recovery.RecoveryAction; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsAction; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.test.transport.StubbableTransport; +import org.opensearch.transport.ReceiveTimeoutTransportException; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.containsString; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +public class ClientTimeoutIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockTransportService.TestPlugin.class); + } + + public void testNodesInfoTimeout(){ + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + + // Happy case + NodesInfoResponse response = dataNodeClient().admin().cluster().prepareNodesInfo().get(); + assertThat(response.getNodes().size(), equalTo(3)); + + //simulate timeout on bad node. + simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesInfoAction.NAME); + + // One bad data node + response = dataNodeClient().admin().cluster().prepareNodesInfo().get(); + ArrayList nodes = new ArrayList(); + for(NodeInfo node : response.getNodes()) { + nodes.add(node.getNode().getName()); + } + assertThat(response.getNodes().size(), equalTo(2)); + assertThat(nodes.contains(masterNode), is(true)); + } + + public void testNodesStatsTimeout(){ + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + TimeValue timeout = TimeValue.timeValueMillis(1000); + + // Happy case + NodesStatsResponse response1 = dataNodeClient().admin().cluster().prepareNodesStats().get(); + assertThat(response1.getNodes().size(), equalTo(3)); + + // One bad data node + simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesStatsAction.NAME); + + NodesStatsResponse response = dataNodeClient().admin().cluster().prepareNodesStats().get(); + ArrayList nodes = new ArrayList(); + for(NodeStats node : response.getNodes()) { + nodes.add(node.getNode().getName()); + } + assertThat(response.getNodes().size(), equalTo(2)); + assertThat(nodes.contains(masterNode), is(true)); + } + + public void testListTasksTimeout(){ + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + TimeValue timeout = TimeValue.timeValueMillis(1000); + + // Happy case + ListTasksResponse response1 = dataNodeClient().admin().cluster().prepareListTasks().get(); + assertThat(response1.getPerNodeTasks().keySet().size(), equalTo(3)); + + // One bad data node + simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesStatsAction.NAME); + + ListTasksResponse response = dataNodeClient().admin().cluster().prepareListTasks().get(); + assertNull(response.getPerNodeTasks().get(anotherDataNode)); + } + + public void testRecoveriesWithTimeout(){ + internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + + int numShards = 4; + assertAcked(prepareCreate("test-index", 0, Settings.builder(). + put("number_of_shards", numShards).put("routing.allocation.total_shards_per_node", 2). + put("number_of_replicas", 0))); + ensureGreen(); + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index("test-index", "doc", Integer.toString(i)); + } + refresh("test-index"); + ensureSearchable("test-index"); + + // Happy case + RecoveryResponse recoveryResponse = dataNodeClient().admin().indices().prepareRecoveries().get(); + assertThat(recoveryResponse.getTotalShards(), equalTo(numShards)); + assertThat(recoveryResponse.getSuccessfulShards(), equalTo(numShards)); + + //simulate timeout on bad node. + simulateTimeoutAtTransport(dataNode, anotherDataNode, RecoveryAction.NAME); + + //verify response with bad node. + recoveryResponse = dataNodeClient().admin().indices().prepareRecoveries().get(); + assertThat(recoveryResponse.getTotalShards(), equalTo(numShards)); + assertThat(recoveryResponse.getSuccessfulShards(), equalTo(numShards/2)); + assertThat(recoveryResponse.getFailedShards(), equalTo(numShards/2)); + assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); + } + + public void testStatsWithTimeout(){ + internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + + int numShards = 4; + logger.info("--> creating index"); + assertAcked(prepareCreate("test-index", 0, Settings.builder(). + put("number_of_shards", numShards).put("routing.allocation.total_shards_per_node", 2). + put("number_of_replicas", 0))); + ensureGreen(); + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index("test-index", "doc", Integer.toString(i)); + } + refresh("test-index"); + ensureSearchable("test-index"); + + //happy case + IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().setDocs(true).get(); + assertThat(indicesStats.getTotalShards(), equalTo(numShards)); + assertThat(indicesStats.getSuccessfulShards(), equalTo(numShards)); + + // simulate timeout on bad node. + simulateTimeoutAtTransport(dataNode, anotherDataNode, IndicesStatsAction.NAME); + + // verify indices state response with bad node. + indicesStats = dataNodeClient().admin().indices().prepareStats().setDocs(true).get(); + assertThat(indicesStats.getTotalShards(), equalTo(numShards)); + assertThat(indicesStats.getFailedShards(), equalTo(numShards/2)); + assertThat(indicesStats.getSuccessfulShards(), equalTo(numShards/2)); + assertThat(indicesStats.getTotal().getDocs().getCount(), lessThan(numDocs)); + assertThat(indicesStats.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); + } + + private void simulateTimeoutAtTransport(String dataNode, String anotherDataNode, String transportActionName) { + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, + dataNode)); + StubbableTransport.SendRequestBehavior sendBehaviour = (connection, requestId, action, request, options) -> { + if (action.startsWith(transportActionName)) { + throw new ReceiveTimeoutTransportException(connection.getNode(), action, "simulate timeout"); + } + connection.sendRequest(requestId, action, request, options); + }; + mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, anotherDataNode), sendBehaviour); + MockTransportService mockTransportServiceAnotherNode = ((MockTransportService) internalCluster().getInstance(TransportService.class, + anotherDataNode)); + mockTransportServiceAnotherNode.addSendBehavior(internalCluster().getInstance(TransportService.class, dataNode), sendBehaviour); + + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java index e84ef3e92b1a1..0f98b15ecc86e 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java @@ -34,6 +34,7 @@ import org.opensearch.action.support.broadcast.BroadcastOperationRequestBuilder; import org.opensearch.client.OpenSearchClient; +import org.opensearch.common.unit.TimeValue; /** * A request to get indices level stats. Allow to enable different stats to be returned. @@ -67,6 +68,14 @@ public IndicesStatsRequestBuilder clear() { return this; } + /** + * Sets timeout of request. + */ + public final IndicesStatsRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + /** * Document types to return stats for. Mainly affects {@link #setIndexing(boolean)} when * enabled, returning specific indexing stats for those types. diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java b/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java index 0551eb7e65a1a..457f97acbe98b 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java @@ -38,6 +38,7 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; import java.io.IOException; @@ -66,6 +67,9 @@ public String[] indices() { return indices; } + private final TimeValue DEFAULT_TIMEOUT_SECONDS = TimeValue.timeValueSeconds(30); + private TimeValue timeout; + @SuppressWarnings("unchecked") @Override public final Request indices(String... indices) { @@ -73,6 +77,22 @@ public final Request indices(String... indices) { return (Request) this; } + public TimeValue timeout() { + return this.timeout; + } + + @SuppressWarnings("unchecked") + public final Request timeout(TimeValue timeout) { + this.timeout = timeout; + return (Request) this; + } + + @SuppressWarnings("unchecked") + public final Request timeout(String timeout) { + this.timeout = TimeValue.parseTimeValue(timeout, DEFAULT_TIMEOUT_SECONDS, getClass().getSimpleName() + ".timeout"); + return (Request) this; + } + @Override public ActionRequestValidationException validate() { return null; diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index f2889e76cf3dc..c0d3a78fec4f4 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -66,6 +66,7 @@ import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; +import org.opensearch.transport.TransportRequestOptions; import java.io.IOException; import java.util.ArrayList; @@ -334,7 +335,12 @@ private void sendNodeRequest(final DiscoveryNode node, List shards if (task != null) { nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); } - transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler() { + TransportRequestOptions transportRequestOptions = TransportRequestOptions.EMPTY; + if (request != null && request.timeout() != null) { + transportRequestOptions = TransportRequestOptions.builder().withTimeout(request.timeout()).build(); + } + transportService.sendRequest( + node, transportNodeBroadcastAction, nodeRequest, transportRequestOptions, new TransportResponseHandler() { @Override public NodeResponse read(StreamInput in) throws IOException { return new NodeResponse(in); diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index 7bc9d9dc58c2d..7e4ff6a6f25b7 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -61,6 +61,7 @@ public abstract class BaseNodesRequest * will be ignored and this will be used. * */ private DiscoveryNode[] concreteNodes; + private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30); private TimeValue timeout; @@ -102,7 +103,7 @@ public final Request timeout(TimeValue timeout) { @SuppressWarnings("unchecked") public final Request timeout(String timeout) { - this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"); + this.timeout = TimeValue.parseTimeValue(timeout, DEFAULT_TIMEOUT_SECS, getClass().getSimpleName() + ".timeout"); return (Request) this; } public DiscoveryNode[] concreteNodes() { diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index ec87b43414832..5bd0a661c381d 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -57,6 +57,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -244,6 +245,7 @@ public static Collection createAllocationDeciders(Settings se addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider()); addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings)); + addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeVersionAllocationDecider()); addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index abd9d2febc9b2..a7b4baa302aab 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -256,6 +256,7 @@ public static class Balancer { private final Metadata metadata; private final float avgShardsPerNode; private final NodeSorter sorter; + private final Set inEligibleTargetNode; public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { this.logger = logger; @@ -267,6 +268,7 @@ public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weig avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); + inEligibleTargetNode = new HashSet<>(); } /** @@ -632,6 +634,16 @@ protected int comparePivot(int j) { return indices; } + /** + * Checks if target node is ineligible and if so, adds to the list + * of ineligible target nodes + */ + private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { + Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { + inEligibleTargetNode.add(targetNode); + } + } /** * Move started shards that can not be allocated to a node anymore * @@ -646,8 +658,37 @@ public void moveShards() { // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // offloading the shards. + + // Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes + // when no target is eligible + for (ModelNode currentNode : sorter.modelNodes) { + checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); + } for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) { + //Verify if the cluster concurrent recoveries have been reached. + if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { + logger.info("Cannot move any shard in the cluster due to cluster concurrent recoveries getting breached" + + ". Skipping shard iteration"); + return; + } + //Early terminate node interleaved shard iteration when no eligible target nodes are available + if(sorter.modelNodes.length == inEligibleTargetNode.size()) { + logger.info("Cannot move any shard in the cluster as there is no node on which shards can be allocated" + + ". Skipping shard iteration"); + return; + } + ShardRouting shardRouting = it.next(); + + // Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard + // is not being throttled. + Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation); + if(canMoveAwayDecision.type() != Decision.Type.YES) { + if (logger.isDebugEnabled()) + logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting); + continue; + } + final MoveDecision moveDecision = decideMove(shardRouting); if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); @@ -660,6 +701,11 @@ public void moveShards() { if (logger.isTraceEnabled()) { logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); } + + // Verifying if this node can be considered ineligible for further iterations + if (targetNode != null) { + checkAndAddInEligibleTargetNode(targetNode.getRoutingNode()); + } } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); } @@ -704,9 +750,22 @@ public MoveDecision decideMove(final ShardRouting shardRouting) { RoutingNode targetNode = null; final List nodeExplanationMap = explain ? new ArrayList<>() : null; int weightRanking = 0; + int targetNodeProcessed = 0; for (ModelNode currentNode : sorter.modelNodes) { if (currentNode != sourceNode) { RoutingNode target = currentNode.getRoutingNode(); + if(!explain && inEligibleTargetNode.contains(target)) + continue; + // don't use canRebalance as we want hard filtering rules to apply. See #17698 + if (!explain) { + // If we cannot allocate any shard to node marking it in eligible + Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(target, allocation); + if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { + inEligibleTargetNode.add(currentNode.getRoutingNode()); + continue; + } + } + targetNodeProcessed++; // don't use canRebalance as we want hard filtering rules to apply. See #17698 Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); if (explain) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecider.java index ee29a9c248662..9b08cb9921685 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -128,4 +128,32 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n return decision; } } + + /** + * Returns a {@link Decision} whether the given shard can be moved away from the current node + * {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}. + */ + public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) { + return Decision.ALWAYS; + } + + /** + * Returns a {@link Decision} whether any shard in the cluster can be moved away from the current node + * {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}. + */ + public Decision canMoveAnyShard(RoutingAllocation allocation) { + return Decision.ALWAYS; + } + + /** + * Returns a {@link Decision} whether any shard on the given + * {@link RoutingNode}} can be allocated The default is {@link Decision#ALWAYS}. + * All implementations that override this behaviour must take a + * {@link Decision}} whether or not to skip iterating over the remaining + * deciders for this node. + */ + public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { + return Decision.ALWAYS; + } + } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java index cb83afbb45fcc..e39b704bb1cf3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -64,7 +64,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca Decision decision = allocationDecider.canRebalance(shardRouting, allocation); // short track if a NO is returned. if (decision == Decision.NO) { - if (!allocation.debugDecision()) { + if (allocation.debugDecision() == false) { return decision; } else { ret.add(decision); @@ -91,7 +91,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing shardRouting, node.node(), allocationDecider.getClass().getSimpleName()); } // short circuit only if debugging is not enabled - if (!allocation.debugDecision()) { + if (allocation.debugDecision() == false) { return decision; } else { ret.add(decision); @@ -120,7 +120,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl logger.trace("Shard [{}] can not remain on node [{}] due to [{}]", shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName()); } - if (!allocation.debugDecision()) { + if (allocation.debugDecision() == false) { return decision; } else { ret.add(decision); @@ -139,7 +139,7 @@ public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, Routi Decision decision = allocationDecider.canAllocate(indexMetadata, node, allocation); // short track if a NO is returned. if (decision == Decision.NO) { - if (!allocation.debugDecision()) { + if (allocation.debugDecision() == false) { return decision; } else { ret.add(decision); @@ -158,7 +158,7 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod Decision decision = allocationDecider.shouldAutoExpandToNode(indexMetadata, node, allocation); // short track if a NO is returned. if (decision == Decision.NO) { - if (!allocation.debugDecision()) { + if (allocation.debugDecision() == false) { return decision; } else { ret.add(decision); @@ -177,7 +177,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat Decision decision = allocationDecider.canAllocate(shardRouting, allocation); // short track if a NO is returned. if (decision == Decision.NO) { - if (!allocation.debugDecision()) { + if (allocation.debugDecision() == false) { return decision; } else { ret.add(decision); @@ -196,7 +196,7 @@ public Decision canRebalance(RoutingAllocation allocation) { Decision decision = allocationDecider.canRebalance(allocation); // short track if a NO is returned. if (decision == Decision.NO) { - if (!allocation.debugDecision()) { + if (allocation.debugDecision() == false) { return decision; } else { ret.add(decision); @@ -224,7 +224,70 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n logger.trace("Shard [{}] can not be forcefully allocated to node [{}] due to [{}].", shardRouting.shardId(), node.nodeId(), decider.getClass().getSimpleName()); } - if (!allocation.debugDecision()) { + if (allocation.debugDecision() == false) { + return decision; + } else { + ret.add(decision); + } + } else { + addDecision(ret, decision, allocation); + } + } + return ret; + } + + @Override + public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider decider : allocations) { + Decision decision = decider.canAllocateAnyShardToNode(node, allocation); + if (decision.type().canPremptivelyReturn()) { + if (logger.isTraceEnabled()) { + logger.trace("Shard can not be allocated on node [{}] due to [{}]", node.nodeId(), decider.getClass().getSimpleName()); + } + if (allocation.debugDecision() == false) { + return decision; + } else { + ret.add(decision); + } + } else { + addDecision(ret, decision, allocation); + } + } + return ret; + } + + + @Override + public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) { + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider decider : allocations) { + Decision decision = decider.canMoveAway(shardRouting, allocation); + // short track if a NO is returned. + if (decision.type().canPremptivelyReturn()) { + if (logger.isTraceEnabled()) { + logger.trace("Shard [{}] can not be moved away due to [{}]", shardRouting, decider.getClass().getSimpleName()); + } + if (allocation.debugDecision() == false) { + return decision; + } else { + ret.add(decision); + } + } else { + addDecision(ret, decision, allocation); + } + } + return ret; + } + + @Override + public Decision canMoveAnyShard(RoutingAllocation allocation) { + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider decider : allocations) { + Decision decision = decider.canMoveAnyShard(allocation); + // short track if a NO is returned. + if (decision.type().canPremptivelyReturn()) { + if (allocation.debugDecision() == false) { return decision; } else { ret.add(decision); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDecider.java new file mode 100644 index 0000000000000..2bcbdf300875d --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDecider.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * This {@link AllocationDecider} controls the number of currently in-progress + * re-balance (relocation) operations and restricts node allocations if the + * configured threshold is reached. + *

+ * Re-balance operations can be controlled in real-time via the cluster update API using + * cluster.routing.allocation.cluster_concurrent_recoveries. Iff this + * setting is set to -1 the number of cluster concurrent recoveries operations + * are unlimited. + */ +public class ConcurrentRecoveriesAllocationDecider extends AllocationDecider { + + private static final Logger logger = LogManager.getLogger(ConcurrentRecoveriesAllocationDecider.class); + + public static final String NAME = "cluster_concurrent_recoveries"; + + + public static final Setting CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING = + Setting.intSetting("cluster.routing.allocation.cluster_concurrent_recoveries", -1, -1, + Property.Dynamic, Property.NodeScope); + + private volatile int clusterConcurrentRecoveries; + + public ConcurrentRecoveriesAllocationDecider(Settings settings, ClusterSettings clusterSettings) { + this.clusterConcurrentRecoveries = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING.get(settings); + logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRecoveries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING, + this::setClusterConcurrentRebalance); + } + + private void setClusterConcurrentRebalance(int clusterConcurrentRecoveries) { + this.clusterConcurrentRecoveries = clusterConcurrentRecoveries; + } + + @Override + public Decision canMoveAnyShard(RoutingAllocation allocation) { + if (clusterConcurrentRecoveries == -1) { + return allocation.decision(Decision.YES, NAME, "undefined cluster concurrent recoveries"); + } + int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); + if (relocatingShards >= clusterConcurrentRecoveries) { + return allocation.decision(Decision.THROTTLE, NAME, + "too many shards are concurrently relocating [%d], limit: [%d] cluster setting [%s=%d]", + relocatingShards, clusterConcurrentRecoveries, CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING.getKey(), + clusterConcurrentRecoveries); + } + return allocation.decision(Decision.YES, NAME, + "below threshold [%d] for concurrent recoveries, current relocating shard count [%d]", + clusterConcurrentRecoveries, relocatingShards); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canMoveAnyShard(allocation); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java index 959eda2e5b780..5b8e5d228f412 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java @@ -137,6 +137,10 @@ public boolean higherThan(Type other) { return false; } + public boolean canPremptivelyReturn() { + return this == THROTTLE || this == NO; + } + } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index b60f050e752ef..c3f3abce7c013 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -147,6 +147,12 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); } + @Override + public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node.node(), allocation); + return decision != null && decision == Decision.NO ? decision : Decision.ALWAYS; + } + private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { Decision decision = shouldClusterFilter(node, allocation); if (decision != null) return decision; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index e0ee91e3850a6..d8df077eadddb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -218,4 +218,44 @@ private ShardRouting initializingShard(ShardRouting shardRouting, String current assert initializingShard.initializing(); return initializingShard; } + + @Override + public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) { + int outgoingRecoveries = 0; + if (!shardRouting.primary()) { + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); + outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId()); + } else { + outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(shardRouting.currentNodeId()); + } + if (outgoingRecoveries >= concurrentOutgoingRecoveries) { + return allocation.decision( + THROTTLE, NAME, + "too many outgoing shards are currently recovering [%d], limit: [%d] cluster setting [%s=%d]", + outgoingRecoveries, concurrentOutgoingRecoveries, + CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), + concurrentOutgoingRecoveries + ); + } else { + return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d]", outgoingRecoveries, + concurrentOutgoingRecoveries); + } + } + + @Override + public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { + int incomingRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId()); + if (incomingRecoveries >= concurrentIncomingRecoveries) { + return allocation.decision( + THROTTLE, NAME, + "too many incoming shards are currently recovering [%d], limit: [%d] cluster setting [%s=%d]", + incomingRecoveries, concurrentIncomingRecoveries, + CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), + concurrentIncomingRecoveries + ); + } else { + return allocation.decision(YES, NAME, "below shard recovery limit of incoming: [%d < %d]", incomingRecoveries, + concurrentIncomingRecoveries); + } + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index d1174acca5099..af78113335db0 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -67,6 +67,7 @@ import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; @@ -226,6 +227,7 @@ public void apply(Settings value, Settings current, Settings previous) { BreakerSettings.CIRCUIT_BREAKER_TYPE, ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, + ConcurrentRecoveriesAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING, DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java index ec381036518d5..0a65256511be0 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java @@ -78,6 +78,7 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { final NodesInfoRequest nodesInfoRequest = prepareRequest(request); + nodesInfoRequest.timeout(request.param("timeout")); settingsFilter.addFilterSettingParams(request); return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java index 7d114bc2e6b7b..d8f18c9a37fda 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java @@ -90,6 +90,7 @@ public boolean allowSystemIndexAccessByDefault() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.timeout(request.param("timeout")); boolean forbidClosedIndices = request.paramAsBoolean("forbid_closed_indices", true); IndicesOptions defaultIndicesOption = forbidClosedIndices ? indicesStatsRequest.indicesOptions() : IndicesOptions.strictExpandOpen(); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java b/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java index ed288eedd8597..319e13b19b74e 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java @@ -85,7 +85,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } static Set RESPONSE_PARAMS = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList("format", "h", "v", "ts", "pri", "bytes", "size", "time", "s"))); + Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + "format", "h", "v", "ts", "pri", "bytes", "size", "time", "s", "timeout"))); @Override protected Set responseParams() { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java index 9bdcf29e8f0d7..1cfcd5ce41331 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java @@ -88,6 +88,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli @Override public void processResponse(final ClusterStateResponse state) { NodesStatsRequest statsRequest = new NodesStatsRequest(nodes); + statsRequest.timeout(request.param("timeout")); statsRequest.clear().addMetric(NodesStatsRequest.Metric.FS.metricName()) .indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestCatRecoveryAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestCatRecoveryAction.java index 13d2be388772a..606212baf6e60 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestCatRecoveryAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestCatRecoveryAction.java @@ -84,6 +84,7 @@ protected void documentation(StringBuilder sb) { @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index"))); + recoveryRequest.timeout(request.param("timeout")); recoveryRequest.detailed(request.paramAsBoolean("detailed", false)); recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false)); recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions())); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodeAttrsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodeAttrsAction.java index 0677ad9515fef..bafc62c4b25c9 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodeAttrsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodeAttrsAction.java @@ -82,6 +82,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.timeout(request.param("timeout")); nodesInfoRequest.clear() .addMetric(NodesInfoRequest.Metric.PROCESS.metricName()); client.admin().cluster().nodesInfo(nodesInfoRequest, new RestResponseListener(channel) { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index 9c1964d9aa518..a9e77c371847a 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -116,6 +116,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.timeout(request.param("timeout")); nodesInfoRequest.clear().addMetrics( NodesInfoRequest.Metric.JVM.metricName(), NodesInfoRequest.Metric.OS.metricName(), @@ -125,6 +126,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) { @Override public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + nodesStatsRequest.timeout(request.param("timeout")); nodesStatsRequest.clear().indices(true).addMetrics( NodesStatsRequest.Metric.JVM.metricName(), NodesStatsRequest.Metric.OS.metricName(), diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestPluginsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestPluginsAction.java index bf354cbb26935..89797074dbd01 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestPluginsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestPluginsAction.java @@ -81,6 +81,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli @Override public void processResponse(final ClusterStateResponse clusterStateResponse) throws Exception { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.timeout(request.param("timeout")); nodesInfoRequest.clear() .addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); client.admin().cluster().nodesInfo(nodesInfoRequest, new RestResponseListener(channel) { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java index 993d5b4695ee1..44f737b4bd9cd 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java @@ -97,6 +97,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.timeout(request.param("timeout")); nodesInfoRequest.clear().addMetrics( NodesInfoRequest.Metric.PROCESS.metricName(), NodesInfoRequest.Metric.THREAD_POOL.metricName()); @@ -104,6 +105,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) { @Override public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + nodesStatsRequest.timeout(request.param("timeout")); nodesStatsRequest.clear().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName()); client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener(channel) { @Override diff --git a/server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 38eba2b3338a0..941c801ccf137 100644 --- a/server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -71,6 +71,7 @@ import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ReceiveTimeoutTransportException; import org.opensearch.transport.TestTransportChannel; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; @@ -488,4 +489,60 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept assertEquals("failed shards", totalFailedShards, response.getFailedShards()); assertEquals("accumulated exceptions", totalFailedShards, response.getShardFailures().length); } + + public void testResultWithTimeouts() throws ExecutionException, InterruptedException { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + + ShardsIterator shardIt = clusterService.state().getRoutingTable().allShards(new String[]{TEST_INDEX}); + Map> map = new HashMap<>(); + for (ShardRouting shard : shardIt) { + if (!map.containsKey(shard.currentNodeId())) { + map.put(shard.currentNodeId(), new ArrayList<>()); + } + map.get(shard.currentNodeId()).add(shard); + } + + int totalShards = 0; + int totalSuccessfulShards = 0; + int totalFailedShards = 0; + String failedNodeId = "node_" + randomIntBetween(0, capturedRequests.size()); + for (Map.Entry> entry : capturedRequests.entrySet()) { + List exceptions = new ArrayList<>(); + long requestId = entry.getValue().get(0).requestId; + if (entry.getKey().equals(failedNodeId)) { + // simulate node timeout + totalShards += map.get(entry.getKey()).size(); + totalFailedShards += map.get(entry.getKey()).size(); + transport.handleError(requestId, + new ReceiveTimeoutTransportException(clusterService.state().getRoutingNodes().node(entry.getKey()).node(), + "indices:admin/test" , "time_out_simulated")); + } else { + List shards = map.get(entry.getKey()); + List shardResults = new ArrayList<>(); + for (ShardRouting shard : shards) { + totalShards++; + if (rarely()) { + // simulate operation failure + totalFailedShards++; + exceptions.add(new BroadcastShardOperationFailedException(shard.shardId(), "operation indices:admin/test failed")); + } else { + shardResults.add(TransportBroadcastByNodeAction.EmptyResult.INSTANCE); + } + } + totalSuccessfulShards += shardResults.size(); + TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse(entry.getKey(), shards.size(), + shardResults, exceptions); + transport.handleResponse(requestId, nodeResponse); + } + } + + Response response = listener.get(); + assertEquals("total shards", totalShards, response.getTotalShards()); + assertEquals("successful shards", totalSuccessfulShards, response.getSuccessfulShards()); + assertEquals("failed shards", totalFailedShards, response.getFailedShards()); + assertEquals("accumulated exceptions", totalFailedShards, response.getShardFailures().length); + } } diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index 24490a91db0f4..cdfcc56ded249 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; @@ -219,6 +220,7 @@ public void testAllocationDeciderOrder() { RebalanceOnlyWhenActiveAllocationDecider.class, ClusterRebalanceAllocationDecider.class, ConcurrentRebalanceAllocationDecider.class, + ConcurrentRecoveriesAllocationDecider.class, EnableAllocationDecider.class, NodeVersionAllocationDecider.class, SnapshotInProgressAllocationDecider.class, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecidersTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecidersTests.java index 3c6775c12c29b..a0cc58dd4d27d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecidersTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecidersTests.java @@ -103,6 +103,18 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod public Decision canRebalance(RoutingAllocation allocation) { return Decision.YES; } + + public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) { + return Decision.YES; + } + + public Decision canMoveAnyShard(RoutingAllocation allocation) { + return Decision.YES; + } + + public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { + return Decision.YES; + } })); ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); @@ -125,6 +137,9 @@ public Decision canRebalance(RoutingAllocation allocation) { verify(deciders.canRemain(shardRouting, routingNode, allocation), matcher); verify(deciders.canForceAllocatePrimary(shardRouting, routingNode, allocation), matcher); verify(deciders.shouldAutoExpandToNode(idx, null, allocation), matcher); + verify(deciders.canMoveAway(shardRouting, allocation), matcher); + verify(deciders.canMoveAnyShard(allocation), matcher); + verify(deciders.canAllocateAnyShardToNode(routingNode, allocation), matcher); } private void verify(Decision decision, Matcher> matcher) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDeciderTests.java new file mode 100644 index 0000000000000..1dbcd6a226d8d --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDeciderTests.java @@ -0,0 +1,207 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class ConcurrentRecoveriesAllocationDeciderTests extends OpenSearchAllocationTestCase { + + public void testClusterConcurrentRecoveries() { + int primaryShards = 5, replicaShards = 1, numberIndices = 12; + int clusterConcurrentRecoveries = -1; + int nodeConcurrentRecoveries = 4; + AllocationService initialStrategy = createAllocationService( + Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.node_initial_primaries_recoveries", "8") + .put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries)) + .put("cluster.routing.allocation.exclude.tag", "tag_0").build()); + + AllocationService excludeStrategy = null; + + logger.info("Building initial routing table"); + + Metadata.Builder metadataBuilder = Metadata.builder(); + for (int i = 0; i < numberIndices; i++) { + metadataBuilder.put(IndexMetadata.builder("test_" + i).settings(settings(Version.CURRENT)).numberOfShards(primaryShards) + .numberOfReplicas(replicaShards)); + } + RoutingTable.Builder initialRoutingTableBuilder = RoutingTable.builder(); + Metadata metadata = metadataBuilder.build(); + for (int i = 0; i < numberIndices; i++) { + initialRoutingTableBuilder.addAsNew(metadata.index("test_" + i)); + } + RoutingTable routingTable = initialRoutingTableBuilder.build(); + + logger.info("--> adding nodes and starting shards"); + + List> srcTargetNodes = Collections.unmodifiableList(Arrays.>asList( + new Tuple(10, 4), + new Tuple(4, 10), + new Tuple(10, 10)) + ); + + for (Tuple srcTargetNode : srcTargetNodes) { + + int srcNodes = srcTargetNode.v1(); + int targetNodes = srcTargetNode.v2(); + + logger.info("Setting up tests for src node {} and target node {}", srcNodes, targetNodes); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metadata(metadata) + .routingTable(routingTable).nodes(setUpClusterNodes(srcNodes, targetNodes)).build(); + + clusterState = initialStrategy.reroute(clusterState, "reroute"); + + // Initialize shards + + logger.info("--> Starting primary shards"); + while (clusterState.getRoutingNodes().hasUnassignedShards()) { + clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState); + } + + logger.info("--> Starting replica shards"); + while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) { + clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState); + } + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), + equalTo((replicaShards + 1) * primaryShards * numberIndices)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0)); + + clusterConcurrentRecoveries = Math.min(srcNodes, targetNodes) * nodeConcurrentRecoveries; + excludeStrategy = createAllocationService(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries)) + .put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries)) + .put("cluster.routing.allocation.exclude.tag", "tag_1").build()); + + for (int counter = 0; counter < 3; counter++) { + logger.info("--> Performing a reroute "); + clusterState = excludeStrategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), + equalTo(clusterConcurrentRecoveries)); + for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) { + assertThat(clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag"), + equalTo("tag_1")); + } + } + + // Ensure all shards are started + while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) { + clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState); + } + + clusterConcurrentRecoveries = clusterConcurrentRecoveries - randomInt(5); + excludeStrategy = createAllocationService(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries)) + .put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries)) + .put("cluster.routing.allocation.exclude.tag", "tag_1").build()); + + for (int counter = 0; counter < 3; counter++) { + logger.info("--> Performing a reroute "); + clusterState = excludeStrategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), + equalTo(clusterConcurrentRecoveries)); + for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) { + assertThat(clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag"), + equalTo("tag_1")); + } + } + + // Ensure all shards are started + while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) { + clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState); + } + + logger.info("--> Disabling cluster_concurrent_recoveries and re-routing "); + clusterConcurrentRecoveries = Math.min(srcNodes, targetNodes) * nodeConcurrentRecoveries; + + for (int counter = 0; counter < 3; counter++) { + logger.info("--> Performing a reroute "); + excludeStrategy = createAllocationService( + Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries)) + .put("cluster.routing.allocation.exclude.tag", "tag_1").build()); + + clusterState = excludeStrategy.reroute(clusterState, "reroute"); + //When srcNodes < targetNodes relocations go beyond the Math.min(srcNodes, targetNodes) * nodeConcurrentRecoveries limit as + // outgoing recoveries happens target nodes which anyways doesn't get throttled on incoming recoveries + if (srcNodes >= targetNodes) { + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), + equalTo(clusterConcurrentRecoveries)); + }else { + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), + greaterThanOrEqualTo(clusterConcurrentRecoveries)); + } + + } + // Ensure all shards are started + while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) { + clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState); + } + + logger.info("--> Bumping cluster_concurrent_recoveries up and re-routing "); + clusterConcurrentRecoveries = clusterConcurrentRecoveries + randomInt(5); + int expectedClusterConcurrentRecoveries = Math.min(srcNodes, targetNodes) * nodeConcurrentRecoveries; + for (int counter = 0; counter < 3; counter++) { + logger.info("--> Performing a reroute "); + excludeStrategy = createAllocationService(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries)) + .put("cluster.routing.allocation.exclude.tag", "tag_1").build()); + clusterState = excludeStrategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), + equalTo(expectedClusterConcurrentRecoveries)); + + } + } + } + + private DiscoveryNodes.Builder setUpClusterNodes(int sourceNodes, int targetNodes) { + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); + for (int i = 1; i <= sourceNodes; i++) { + Map attributes = new HashMap<>(); + attributes.put("tag", "tag_" + 1); + attributes.put("zone", "zone_" + (i % 2)); + nb.add(newNode("node_s_" + i, attributes)); + } + for (int j = 1; j <= targetNodes; j++) { + Map attributes = new HashMap<>(); + attributes.put("tag", "tag_" + 0); + attributes.put("zone", "zone_" + (j % 2)); + nb.add(newNode("node_t_" + j, attributes)); + } + return nb; + } +}