Skip to content

Commit

Permalink
Promote replica on the highest version node (#25277)
Browse files Browse the repository at this point in the history
* Promote replica on the highest version node

This changes the replica selection to prefer to return replicas on the highest
version when choosing a replacement to promote when the primary shard fails.

Consider this situation:

- A replica on a 5.6 node
- Another replica on a 6.0 node
- The primary on a 6.0 node

The primary shard is sending sequence numbers to the replica on the 6.0 node and
skipping sending them for the 5.6 node. Now assume that the primary shard fails
and (prior to this change) the replica on 5.6 node gets promoted to primary, it
now has no knowledge of sequence numbers and the replica on the 6.0 node will be
expecting sequence numbers but will never receive them.

Relates to #10708

* Switch from map of node to version to retrieving the version from the node

* Remove uneeded null check

* You can pretend you're a functional language Java, but you're not fooling me.

* Randomize node versions

* Add test with random cluster state with multiple versions that fails shards

* Re-add comment and remove extra import

* Remove unneeded stuff, randomly start replicas a few more times

* Move test into FailedNodeRoutingTests

* Make assertions actually test replica version promotion

* Rewrite test, taking Yannick's feedback into account
  • Loading branch information
dakrone committed Jun 29, 2017
1 parent 1f33576 commit 05889ca
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -320,14 +321,35 @@ public ShardRouting activePrimary(ShardId shardId) {
/**
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
*
* Since replicas could possibly be on nodes with a older version of ES than
* the primary is, this will return replicas on the highest version of ES.
*
*/
public ShardRouting activeReplica(ShardId shardId) {
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
Version highestVersionSeen = null;
ShardRouting candidate = null;
for (ShardRouting shardRouting : assignedShards(shardId)) {
if (!shardRouting.primary() && shardRouting.active()) {
return shardRouting;
// It's possible for replicaNodeVersion to be null, when deassociating dead nodes
// that have been removed, the shards are failed, and part of the shard failing
// calls this method with an out-of-date RoutingNodes, where the version might not
// be accessible. Therefore, we need to protect against the version being null
// (meaning the node will be going away).
RoutingNode replicaNode = node(shardRouting.currentNodeId());
if (replicaNode != null && replicaNode.node() != null) {
Version replicaNodeVersion = replicaNode.node().getVersion();
if (highestVersionSeen == null || replicaNodeVersion.after(highestVersionSeen)) {
highestVersionSeen = replicaNodeVersion;
candidate = shardRouting;
} else if (candidate == null) {
// Only use this replica if there are no other candidates
candidate = shardRouting;
}
}
}
}
return null;
return candidate;
}

/**
Expand Down Expand Up @@ -567,7 +589,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
if (failedShard.relocatingNodeId() == null) {
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
ShardRouting activeReplica = activeReplica(failedShard.shardId());
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
assert activeReplica == null || IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.getSettings()) :
"initializing primary [" + failedShard + "] with active replicas [" + activeReplica + "] only expected when " +
"using shadow replicas";
Expand Down Expand Up @@ -599,7 +621,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
assert failedShard.active();
if (failedShard.primary()) {
// promote active replica to primary if active replica exists
ShardRouting activeReplica = activeReplica(failedShard.shardId());
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void dispatchRequest(final RestRequest request, final RestChannel channel, final

if (handler == null) {
if (request.method() == RestRequest.Method.OPTIONS) {
// when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)
// when we routinghave OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)

channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,51 @@

package org.elasticsearch.cluster.routing.allocation;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.indices.cluster.AbstractIndicesClusterStateServiceTestCase;
import org.elasticsearch.indices.cluster.ClusterStateChanges;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.TemplateUpgradeServiceTests.buildNewFakeTransportAddress;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -91,4 +122,120 @@ public void testSimpleFailedNodeTest() {
assertThat(routingNode.numberOfShardsWithState(INITIALIZING), equalTo(1));
}
}

public void testRandomClusterPromotesNewestReplica() throws InterruptedException {

ThreadPool threadPool = new TestThreadPool(getClass().getName());
ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
ClusterState state = randomInitialClusterState();

// randomly add nodes of mixed versions
logger.info("--> adding random nodes");
for (int i = 0; i < randomIntBetween(4, 8); i++) {
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes())
.add(createNode()).build();
state = ClusterState.builder(state).nodes(newNodes).build();
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after adding node
}

// Log the node versions (for debugging if necessary)
for (ObjectCursor<DiscoveryNode> cursor : state.nodes().getDataNodes().values()) {
Version nodeVer = cursor.value.getVersion();
logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer);
}

// randomly create some indices
logger.info("--> creating some indices");
for (int i = 0; i < randomIntBetween(2, 5); i++) {
String name = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
Settings.Builder settingsBuilder = Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4))
.put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4));
CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex(name));
}

ClusterState previousState = state;

logger.info("--> starting shards");
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
logger.info("--> starting replicas a random number of times");
for (int i = 0; i < randomIntBetween(1,10); i++) {
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
}

boolean keepGoing = true;
while (keepGoing) {
List<ShardRouting> primaries = state.getRoutingNodes().shardsWithState(STARTED)
.stream().filter(ShardRouting::primary).collect(Collectors.toList());

// Pick a random subset of primaries to fail
List<FailedShard> shardsToFail = new ArrayList<>();
List<ShardRouting> failedPrimaries = randomSubsetOf(primaries);
failedPrimaries.stream().forEach(sr -> {
shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception()));
});

logger.info("--> state before failing shards: {}", state);
state = cluster.applyFailedShards(state, shardsToFail);

final ClusterState compareState = state;
failedPrimaries.forEach(shardRouting -> {
logger.info("--> verifying version for {}", shardRouting);

ShardRouting newPrimary = compareState.routingTable().index(shardRouting.index())
.shard(shardRouting.id()).primaryShard();
Version newPrimaryVersion = getNodeVersion(newPrimary, compareState);

logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary);
compareState.routingTable().shardRoutingTable(newPrimary.shardId()).shardsWithState(STARTED)
.stream()
.forEach(sr -> {
Version candidateVer = getNodeVersion(sr, compareState);
if (candidateVer != null) {
logger.info("--> candidate on {} node; shard routing: {}", candidateVer, sr);
assertTrue("candidate was not on the newest version, new primary is on " +
newPrimaryVersion + " and there is a candidate on " + candidateVer,
candidateVer.onOrBefore(newPrimaryVersion));
}
});
});

keepGoing = randomBoolean();
}
terminate(threadPool);
}

private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) {
return Optional.ofNullable(state.getNodes().get(shardRouting.currentNodeId())).map(DiscoveryNode::getVersion).orElse(null);
}

private static final AtomicInteger nodeIdGenerator = new AtomicInteger();

public ClusterState randomInitialClusterState() {
List<DiscoveryNode> allNodes = new ArrayList<>();
DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master
allNodes.add(localNode);
// at least two nodes that have the data role so that we can allocate shards
allNodes.add(createNode(DiscoveryNode.Role.DATA));
allNodes.add(createNode(DiscoveryNode.Role.DATA));
for (int i = 0; i < randomIntBetween(2, 5); i++) {
allNodes.add(createNode());
}
ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()]));
return state;
}


protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) {
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values())));
for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) {
roles.add(mustHaveRole);
}
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
VersionUtils.randomVersionBetween(random(), Version.V_5_6_0_UNRELEASED, null));
}

}
Loading

0 comments on commit 05889ca

Please sign in to comment.