-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Promote replica on the highest version node #25277
Changes from 10 commits
3aa3edc
1273e99
879eef0
cb42f65
54edaf8
ef8c79d
27469f5
4da03ea
56dfe80
3ca757b
9d9d4b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,20 +19,49 @@ | |
|
||
package org.elasticsearch.cluster.routing.allocation; | ||
|
||
import com.carrotsearch.hppc.cursors.ObjectCursor; | ||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; | ||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; | ||
import org.elasticsearch.action.support.ActiveShardCount; | ||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ESAllocationTestCase; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.cluster.routing.RoutingNode; | ||
import org.elasticsearch.cluster.routing.RoutingNodes; | ||
import org.elasticsearch.cluster.routing.RoutingTable; | ||
import org.elasticsearch.cluster.routing.ShardRouting; | ||
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; | ||
import org.elasticsearch.common.logging.Loggers; | ||
import org.elasticsearch.common.settings.Settings; | ||
|
||
import org.elasticsearch.common.util.set.Sets; | ||
import org.elasticsearch.indices.cluster.AbstractIndicesClusterStateServiceTestCase; | ||
import org.elasticsearch.indices.cluster.ClusterStateChanges; | ||
import org.elasticsearch.indices.cluster.IndicesClusterStateService; | ||
import org.elasticsearch.test.VersionUtils; | ||
import org.elasticsearch.threadpool.TestThreadPool; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; | ||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; | ||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; | ||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; | ||
import static org.hamcrest.Matchers.equalTo; | ||
|
@@ -91,4 +120,131 @@ public void testSimpleFailedNodeTest() { | |
assertThat(routingNode.numberOfShardsWithState(INITIALIZING), equalTo(1)); | ||
} | ||
} | ||
|
||
public void testRandomClusterPromotesNewestReplica() throws InterruptedException { | ||
|
||
ThreadPool threadPool = new TestThreadPool(getClass().getName()); | ||
ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); | ||
ClusterState state = randomInitialClusterState(); | ||
|
||
// randomly add nodes of mixed versions | ||
logger.info("--> adding random nodes"); | ||
for (int i = 0; i < randomIntBetween(4, 8); i++) { | ||
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) | ||
.add(createNode()).build(); | ||
state = ClusterState.builder(state).nodes(newNodes).build(); | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this comment is stale (there are no nodes removed here) |
||
} | ||
|
||
// Log the shard versions (for debugging if necessary) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log the node versions? |
||
for (ObjectCursor<DiscoveryNode> cursor : state.nodes().getDataNodes().values()) { | ||
Version nodeVer = cursor.value.getVersion(); | ||
logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer); | ||
} | ||
|
||
// randomly create some indices | ||
logger.info("--> creating some indices"); | ||
for (int i = 0; i < randomIntBetween(2, 5); i++) { | ||
String name = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT); | ||
Settings.Builder settingsBuilder = Settings.builder() | ||
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4)) | ||
.put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4)); | ||
CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); | ||
state = cluster.createIndex(state, request); | ||
assertTrue(state.metaData().hasIndex(name)); | ||
} | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not needed. createIndex automatically reroutes. |
||
|
||
ClusterState previousState = state; | ||
|
||
logger.info("--> starting shards"); | ||
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
logger.info("--> starting replicas a random number of times"); | ||
for (int i = 0; i < randomIntBetween(1,10); i++) { | ||
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
} | ||
|
||
logger.info("--> state before failing shards: {}", state); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { | ||
if (shardRouting.primary() && randomBoolean()) { | ||
ShardRouting replicaToBePromoted = state.getRoutingNodes() | ||
.activeReplicaWithHighestVersion(shardRouting.shardId()); | ||
final ClusterState currentState = state; | ||
// List of potential candidate replicas for promotion | ||
Set<ShardRouting> candidates = state.getRoutingNodes().shardsWithState(STARTED) | ||
.stream() | ||
.filter(s -> !s.primary() && s.active()) | ||
.filter(s -> s.shardId().equals(shardRouting.shardId())) | ||
.filter(s -> !s.equals(replicaToBePromoted)) | ||
.filter(s -> currentState.getRoutingNodes().node(s.currentNodeId()) != null) | ||
.collect(Collectors.toSet()); | ||
// If we find a replica and at least another candidate | ||
if (replicaToBePromoted != null && candidates.size() > 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need to determine |
||
logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); | ||
logger.info("--> other candidates: {}", candidates); | ||
|
||
List<FailedShard> shardsToFail = new ArrayList<>(); | ||
logger.info("--> failing shard {}", shardRouting); | ||
shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); | ||
state = cluster.applyFailedShards(state, shardsToFail); | ||
ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) | ||
.shard(shardRouting.id()).primaryShard(); | ||
Version newPrimaryVersion = getNodeVersion(newPrimary, state); | ||
|
||
final ClusterState compareState = state; | ||
logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary); | ||
List<Version> candidateVersions = candidates.stream().map(sr -> { | ||
Version version = getNodeVersion(sr, compareState); | ||
logger.info("--> candidate on {} node; shard routing: {}", version, sr); | ||
return version; | ||
}).collect(Collectors.toList()); | ||
for (Version candidateVer : candidateVersions) { | ||
assertTrue("candidate was not on the newest version, new primary is on " + | ||
newPrimaryVersion + " and there is a candidate on " + candidateVer, | ||
candidateVer.onOrBefore(newPrimaryVersion)); | ||
} | ||
} | ||
} | ||
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
} | ||
terminate(threadPool); | ||
} | ||
|
||
private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) { | ||
for (ObjectObjectCursor<String, DiscoveryNode> entry : state.getNodes().getDataNodes()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need for iteration here, you can get the node directly by calling |
||
if (entry.key.equals(shardRouting.currentNodeId())) { | ||
return state.getRoutingNodes().node(entry.key).node().getVersion(); | ||
} | ||
} | ||
fail("shard is not assigned to a node"); | ||
return null; | ||
} | ||
|
||
private static final AtomicInteger nodeIdGenerator = new AtomicInteger(); | ||
|
||
public ClusterState randomInitialClusterState() { | ||
List<DiscoveryNode> allNodes = new ArrayList<>(); | ||
DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master | ||
allNodes.add(localNode); | ||
// at least two nodes that have the data role so that we can allocate shards | ||
allNodes.add(createNode(DiscoveryNode.Role.DATA)); | ||
allNodes.add(createNode(DiscoveryNode.Role.DATA)); | ||
for (int i = 0; i < randomIntBetween(2, 5); i++) { | ||
allNodes.add(createNode()); | ||
} | ||
ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()])); | ||
return state; | ||
} | ||
|
||
|
||
protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { | ||
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); | ||
for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { | ||
roles.add(mustHaveRole); | ||
} | ||
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); | ||
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, | ||
VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null)); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you readd the comment why we need to consider "null" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-added this comment