Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
aponb committed May 20, 2021
2 parents c6779c6 + 51e25ec commit 5de25dd
Show file tree
Hide file tree
Showing 28 changed files with 991 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@

package org.opensearch.benchmark.routing.allocation;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -52,8 +42,20 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Fork(3)
Expand All @@ -71,75 +73,103 @@ public class AllocationBenchmark {
// support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would
// need its own main method and we cannot execute more than one class with a main method per JAR.
@Param({
// indices| shards| replicas| nodes
" 10| 1| 0| 1",
" 10| 3| 0| 1",
" 10| 10| 0| 1",
" 100| 1| 0| 1",
" 100| 3| 0| 1",
" 100| 10| 0| 1",

" 10| 1| 0| 10",
" 10| 3| 0| 10",
" 10| 10| 0| 10",
" 100| 1| 0| 10",
" 100| 3| 0| 10",
" 100| 10| 0| 10",

" 10| 1| 1| 10",
" 10| 3| 1| 10",
" 10| 10| 1| 10",
" 100| 1| 1| 10",
" 100| 3| 1| 10",
" 100| 10| 1| 10",

" 10| 1| 2| 10",
" 10| 3| 2| 10",
" 10| 10| 2| 10",
" 100| 1| 2| 10",
" 100| 3| 2| 10",
" 100| 10| 2| 10",

" 10| 1| 0| 50",
" 10| 3| 0| 50",
" 10| 10| 0| 50",
" 100| 1| 0| 50",
" 100| 3| 0| 50",
" 100| 10| 0| 50",

" 10| 1| 1| 50",
" 10| 3| 1| 50",
" 10| 10| 1| 50",
" 100| 1| 1| 50",
" 100| 3| 1| 50",
" 100| 10| 1| 50",

" 10| 1| 2| 50",
" 10| 3| 2| 50",
" 10| 10| 2| 50",
" 100| 1| 2| 50",
" 100| 3| 2| 50",
" 100| 10| 2| 50" })
public String indicesShardsReplicasNodes = "10|1|0|1";
// indices| shards| replicas| source| target| concurrentRecoveries
" 10| 2| 0| 1| 1| 1|",
" 10| 3| 0| 1| 1| 2|",
" 10| 10| 0| 1| 1| 5|",
" 100| 1| 0| 1| 1| 10|",
" 100| 3| 0| 1| 1| 10|",
" 100| 10| 0| 1| 1| 10|",

" 10| 2| 0| 10| 10| 1|",
" 10| 3| 0| 10| 5| 2|",
" 10| 10| 0| 10| 5| 5|",
" 100| 1| 0| 5| 10| 5|",
" 100| 3| 0| 10| 5| 5|",
" 100| 10| 0| 10| 20| 6|",

" 10| 1| 1| 10| 10| 1|",
" 10| 3| 1| 10| 3| 3|",
" 10| 10| 1| 5| 12| 5|",
" 100| 1| 1| 10| 10| 6|",
" 100| 3| 1| 10| 5| 8|",
" 100| 10| 1| 8| 17| 8|",

" 10| 1| 2| 10| 10| 1|",
" 10| 3| 2| 10| 5| 3|",
" 10| 10| 2| 5| 10| 5|",
" 100| 1| 2| 10| 8| 7|",
" 100| 3| 2| 13| 17| 5|",
" 100| 10| 2| 10| 20| 8|",

" 10| 2| 1| 20| 20| 1|",
" 10| 3| 1| 20| 30| 1|",
" 10| 10| 1| 20| 10| 3|",
" 100| 1| 1| 20| 5| 5|",
" 100| 3| 1| 20| 23| 6|",
" 100| 10| 1| 40| 20| 8|",

" 10| 3| 2| 50| 30| 1|",
" 10| 3| 2| 50| 25| 1|",
" 10| 10| 1| 50| 33| 2|",
" 100| 1| 1| 40| 50| 2|",
" 100| 3| 1| 50| 70| 3|",
" 100| 10| 1| 60| 50| 3|",

" 10| 10| 2| 50| 50| 1|",
" 10| 3| 2| 50| 30| 1|",
" 10| 10| 2| 50| 40| 2|",
" 100| 1| 2| 40| 50| 2|",
" 100| 3| 2| 50| 30| 6|",
" 100| 10| 2| 33| 55| 6|",

" 500| 60| 1| 100| 100| 12|",
" 500| 60| 1| 100| 40| 12|",
" 500| 60| 1| 40| 100| 12|",

" 50| 60| 1| 100| 100| 6|",
" 50| 60| 1| 100| 40| 6|",
" 50| 60| 1| 40| 100| 6|" })
public String indicesShardsReplicasSourceTargetRecoveries = "10|1|0|1|1|1";

public int numTags = 2;
public int numZone = 3;
public int concurrentRecoveries;
public int numIndices;
public int numShards;
public int numReplicas;
public int sourceNodes;
public int targetNodes;
public int clusterConcurrentRecoveries;

private AllocationService strategy;
private AllocationService initialClusterStrategy;
private AllocationService clusterExcludeStrategy;
private AllocationService clusterZoneAwareExcludeStrategy;
private ClusterState initialClusterState;

@Setup
public void setUp() throws Exception {
final String[] params = indicesShardsReplicasNodes.split("\\|");
final String[] params = indicesShardsReplicasSourceTargetRecoveries.split("\\|");
numIndices = toInt(params[0]);
numShards = toInt(params[1]);
numReplicas = toInt(params[2]);
sourceNodes = toInt(params[3]);
targetNodes = toInt(params[4]);
concurrentRecoveries = toInt(params[5]);

int numIndices = toInt(params[0]);
int numShards = toInt(params[1]);
int numReplicas = toInt(params[2]);
int numNodes = toInt(params[3]);
int totalShardCount = (numReplicas + 1) * numShards * numIndices;

strategy = Allocators.createAllocationService(
Settings.builder().put("cluster.routing.allocation.awareness.attributes", "tag").build()
initialClusterStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", "20")
.put("cluster.routing.allocation.exclude.tag", "tag_0")
.build()
);

// We'll try to move nodes from tag_1 to tag_0
clusterConcurrentRecoveries = Math.min(sourceNodes, targetNodes) * concurrentRecoveries;

Metadata.Builder mb = Metadata.builder();
for (int i = 1; i <= numIndices; i++) {
mb.put(
Expand All @@ -155,31 +185,96 @@ public void setUp() throws Exception {
rb.addAsNew(metadata.index("test_" + i));
}
RoutingTable routingTable = rb.build();
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= numNodes; i++) {
nb.add(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
}
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(nb)
.nodes(setUpClusterNodes(sourceNodes, targetNodes))
.build();
// Start all unassigned shards
initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute");
while (initialClusterState.getRoutingNodes().hasUnassignedShards()) {
initialClusterState = initialClusterStrategy.applyStartedShards(
initialClusterState,
initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute");
}
// Ensure all shards are started
while (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
initialClusterState = initialClusterStrategy.applyStartedShards(
initialClusterState,
initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
}

assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size() == totalShardCount);
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 0);
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size() == 0);
// make sure shards are only allocated on tag1
for (ShardRouting startedShard : initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assert (initialClusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals(
"tag_1"
);
}
}

private int toInt(String v) {
return Integer.valueOf(v.trim());
}

@Benchmark
public ClusterState measureAllocation() {
public ClusterState measureExclusionOnZoneAwareStartedShard() throws Exception {
ClusterState clusterState = initialClusterState;
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
clusterState = strategy.applyStartedShards(
clusterZoneAwareExcludeStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1")
.build()
);
clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute");
return clusterState;
}

@Benchmark
public ClusterState measureShardRelocationComplete() throws Exception {
ClusterState clusterState = initialClusterState;
clusterZoneAwareExcludeStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries))
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1")
.build()
);
clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute");
while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
clusterState = clusterZoneAwareExcludeStrategy.applyStartedShards(
clusterState,
clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
clusterState = strategy.reroute(clusterState, "reroute");
}
for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assert (clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals("tag_0");
}
return clusterState;
}

private DiscoveryNodes.Builder setUpClusterNodes(int sourceNodes, int targetNodes) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= sourceNodes; i++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 1);
attributes.put("zone", "zone_" + (i % numZone));
nb.add(Allocators.newNode("node_s_" + i, attributes));
}
for (int j = 1; j <= targetNodes; j++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 0);
attributes.put("zone", "zone_" + (j % numZone));
nb.add(Allocators.newNode("node_t_" + j, attributes));
}
return nb;
}
}
Loading

0 comments on commit 5de25dd

Please sign in to comment.