Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promote replica on the highest version node #25277

Merged
merged 11 commits into from
Jun 29, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,23 @@ public ShardRouting activePrimary(ShardId shardId) {
/**
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
*
* Since replicas could possibly be on nodes with a older version of ES than
* the primary is, this will return replicas on the highest version of ES.
*
*/
public ShardRouting activeReplica(ShardId shardId) {
for (ShardRouting shardRouting : assignedShards(shardId)) {
if (!shardRouting.primary() && shardRouting.active()) {
return shardRouting;
}
}
return null;
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
// It's possible for replicaNodeVersion to be null, when deassociating dead nodes
// that have been removed, the shards are failed, and part of the shard failing
// calls this method with an out-of-date RoutingNodes, where the version might not
// be accessible. Therefore, we need to protect against the version being null
// (meaning the node will be going away).
return assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter(shr -> node(shr.currentNodeId()) != null)
.max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you readd the comment why we need to consider "null" here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-added this comment

.orElse(null);
}

/**
Expand Down Expand Up @@ -567,7 +576,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
if (failedShard.relocatingNodeId() == null) {
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
ShardRouting activeReplica = activeReplica(failedShard.shardId());
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down Expand Up @@ -596,7 +605,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
assert failedShard.active();
if (failedShard.primary()) {
// promote active replica to primary if active replica exists
ShardRouting activeReplica = activeReplica(failedShard.shardId());
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,49 @@

package org.elasticsearch.cluster.routing.allocation;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.indices.cluster.AbstractIndicesClusterStateServiceTestCase;
import org.elasticsearch.indices.cluster.ClusterStateChanges;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -91,4 +120,131 @@ public void testSimpleFailedNodeTest() {
assertThat(routingNode.numberOfShardsWithState(INITIALIZING), equalTo(1));
}
}

public void testRandomClusterPromotesNewestReplica() throws InterruptedException {

ThreadPool threadPool = new TestThreadPool(getClass().getName());
ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
ClusterState state = randomInitialClusterState();

// randomly add nodes of mixed versions
logger.info("--> adding random nodes");
for (int i = 0; i < randomIntBetween(4, 8); i++) {
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes())
.add(createNode()).build();
state = ClusterState.builder(state).nodes(newNodes).build();
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is stale (there are no nodes removed here)

}

// Log the shard versions (for debugging if necessary)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log the node versions?
Can also be done directly in the loop where you are adding the nodes :-)

for (ObjectCursor<DiscoveryNode> cursor : state.nodes().getDataNodes().values()) {
Version nodeVer = cursor.value.getVersion();
logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer);
}

// randomly create some indices
logger.info("--> creating some indices");
for (int i = 0; i < randomIntBetween(2, 5); i++) {
String name = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
Settings.Builder settingsBuilder = Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4))
.put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4));
CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex(name));
}
state = cluster.reroute(state, new ClusterRerouteRequest());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed. createIndex automatically reroutes.


ClusterState previousState = state;

logger.info("--> starting shards");
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
logger.info("--> starting replicas a random number of times");
for (int i = 0; i < randomIntBetween(1,10); i++) {
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
}

logger.info("--> state before failing shards: {}", state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) {
if (shardRouting.primary() && randomBoolean()) {
ShardRouting replicaToBePromoted = state.getRoutingNodes()
.activeReplicaWithHighestVersion(shardRouting.shardId());
final ClusterState currentState = state;
// List of potential candidate replicas for promotion
Set<ShardRouting> candidates = state.getRoutingNodes().shardsWithState(STARTED)
.stream()
.filter(s -> !s.primary() && s.active())
.filter(s -> s.shardId().equals(shardRouting.shardId()))
.filter(s -> !s.equals(replicaToBePromoted))
.filter(s -> currentState.getRoutingNodes().node(s.currentNodeId()) != null)
.collect(Collectors.toSet());
// If we find a replica and at least another candidate
if (replicaToBePromoted != null && candidates.size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to determine replicaToBePromoted. Candidates also does not need to be filtered with !s.equals(replicaToBePromoted). It's ok to just check candidates.size() > 0 here to see whether there is going to be a new primary. In that case, we fail the primary + random(0, candidates.size() - 1) replicas and check afterwards that the new primary is on a node that is at least as high as all replicas.

logger.info("--> found replica that should be promoted: {}", replicaToBePromoted);
logger.info("--> other candidates: {}", candidates);

List<FailedShard> shardsToFail = new ArrayList<>();
logger.info("--> failing shard {}", shardRouting);
shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception()));
state = cluster.applyFailedShards(state, shardsToFail);
ShardRouting newPrimary = state.routingTable().index(shardRouting.index())
.shard(shardRouting.id()).primaryShard();
Version newPrimaryVersion = getNodeVersion(newPrimary, state);

final ClusterState compareState = state;
logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary);
List<Version> candidateVersions = candidates.stream().map(sr -> {
Version version = getNodeVersion(sr, compareState);
logger.info("--> candidate on {} node; shard routing: {}", version, sr);
return version;
}).collect(Collectors.toList());
for (Version candidateVer : candidateVersions) {
assertTrue("candidate was not on the newest version, new primary is on " +
newPrimaryVersion + " and there is a candidate on " + candidateVer,
candidateVer.onOrBefore(newPrimaryVersion));
}
}
}
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
}
terminate(threadPool);
}

private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) {
for (ObjectObjectCursor<String, DiscoveryNode> entry : state.getNodes().getDataNodes()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for iteration here, you can get the node directly by calling state.getNodes().get(shardRouting.currentNodeId()) (which will return null if no node found)

if (entry.key.equals(shardRouting.currentNodeId())) {
return state.getRoutingNodes().node(entry.key).node().getVersion();
}
}
fail("shard is not assigned to a node");
return null;
}

private static final AtomicInteger nodeIdGenerator = new AtomicInteger();

public ClusterState randomInitialClusterState() {
List<DiscoveryNode> allNodes = new ArrayList<>();
DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master
allNodes.add(localNode);
// at least two nodes that have the data role so that we can allocate shards
allNodes.add(createNode(DiscoveryNode.Role.DATA));
allNodes.add(createNode(DiscoveryNode.Role.DATA));
for (int i = 0; i < randomIntBetween(2, 5); i++) {
allNodes.add(createNode());
}
ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()]));
return state;
}


protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) {
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values())));
for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) {
roles.add(mustHaveRole);
}
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null));
}

}
Loading