Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promote replica on the highest version node #25277

Merged
merged 11 commits into from
Jun 29, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import?

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -320,14 +321,18 @@ public ShardRouting activePrimary(ShardId shardId) {
/**
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
*
* Since replicas could possibly be on nodes with a older version of ES than
* the primary is, this will return replicas on the highest version of ES.
*
*/
public ShardRouting activeReplica(ShardId shardId) {
for (ShardRouting shardRouting : assignedShards(shardId)) {
if (!shardRouting.primary() && shardRouting.active()) {
return shardRouting;
}
}
return null;
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
return assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter(shr -> node(shr.currentNodeId()) != null)
.max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you readd the comment why we need to consider "null" here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-added this comment

.orElse(null);
}

/**
Expand Down Expand Up @@ -567,7 +572,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
if (failedShard.relocatingNodeId() == null) {
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
ShardRouting activeReplica = activeReplica(failedShard.shardId());
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down Expand Up @@ -596,7 +601,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
assert failedShard.active();
if (failedShard.primary()) {
// promote active replica to primary if active replica exists
ShardRouting activeReplica = activeReplica(failedShard.shardId());
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.elasticsearch.cluster.routing.allocation;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand All @@ -35,6 +37,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.VersionUtils;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -499,7 +502,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() {
Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0)));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId);
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);


// fail the primary shard, check replicas get removed as well...
Expand Down Expand Up @@ -556,4 +559,119 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
}

public void testReplicaOnNewestVersionIsPromoted() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test checks one specific scenario. I think that it can be easily generalized in the way of the IndicesClusterStateServiceRandomUpdatesTests so that it simulates a large range of scenarios.
Essentially it would boil down to creating a few nodes with random version (see randomInitialClusterState of IndicesClusterStateServiceRandomUpdatesTests), allocating a few shards to the nodes (see ClusterStateChanges.createIndex), then failing some of the shards (incl. primary), see ClusterStateChanges.applyFailedShards or failing some of the nodes (incl. primary), see ClusterStateChanges.deassociateDeadNodes and then checking that the new primary is on the node with highest version.

AllocationService allocation = createAllocationService(Settings.builder().build());

MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test")
.settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) .build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();

ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(initialRoutingTable).build();

ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0);

// add a single node
clusterState = ClusterState.builder(clusterState).nodes(
DiscoveryNodes.builder()
.add(newNode("node1-5.x", Version.V_5_6_0)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you generalize the test to use two arbitrary (but distinct) versions? i.e. VersionUtils.randomVersion()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No? Currently the only situation that is valid for a mixed-major-version cluster is 5.6 and 6.0, we don't support mixed clusters of any other versions and 5.6.1 isn't out yet. I'm not sure how randomization would help here, other than triggering some other version-related failures :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR does more than just an ordering on 5.6/6.x. It also orders among 6.0 and 6.1 nodes, which is left untested here. Either we restrict the "Promote replica on the highest version node" logic to only order 6.x nodes before 5.6 (and leave 6.0 and 6.1 unordered) or we test that this logic also properly orders 6.0 and 6.1. I agree there is no need to test 5.1 and 6.2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I randomized the versions

.build();
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3));

// start primary shard
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3));

// add another 5.6 node
clusterState = ClusterState.builder(clusterState).nodes(
DiscoveryNodes.builder(clusterState.nodes())
.add(newNode("node2-5.x", Version.V_5_6_0)))
.build();

// start the shards, should have 1 primary and 1 replica available
clusterState = allocation.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));

clusterState = ClusterState.builder(clusterState).nodes(
DiscoveryNodes.builder(clusterState.nodes())
.add(newNode("node3-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null)))
.add(newNode("node4-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null))))
.build();

// start all the replicas
clusterState = allocation.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));

ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica);

// fail the primary shard again and make sure the correct replica is promoted
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
// the primary gets allocated on another node
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3));

ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));

Version replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion();
assertNotNull(replicaNodeVersion);
logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion);

for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
if ("node1".equals(cursor.value.getId())) {
// Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check
continue;
}
Version nodeVer = cursor.value.getVersion();
assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
replicaNodeVersion.onOrAfter(nodeVer));
}

startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
logger.info("--> failing primary shard a second time, should select: {}", startedReplica);

// fail the primary shard again, and ensure the same thing happens
ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
// the primary gets allocated on another node
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));

newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(secondPrimaryShardToFail)));
assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));

replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion();
assertNotNull(replicaNodeVersion);
logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion);

for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
if (primaryShardToFail.currentNodeId().equals(cursor.value.getId()) ||
secondPrimaryShardToFail.currentNodeId().equals(cursor.value.getId())) {
// Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check
continue;
}
Version nodeVer = cursor.value.getVersion();
assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
replicaNodeVersion.onOrAfter(nodeVer));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.indices.cluster;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -72,6 +74,8 @@
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -139,6 +143,87 @@ public void testRandomClusterStateUpdates() {
logger.info("Final cluster state: {}", state);
}

public void testRandomClusterPromotesNewestReplica() {
// we have an IndicesClusterStateService per node in the cluster
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>();
ClusterState state = randomInitialClusterState(clusterStateServiceMap, MockIndicesService::new);

// randomly add nodes of mixed versions
logger.info("--> adding random nodes");
for (int i = 0; i < randomIntBetween(4, 8); i++) {
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes())
.add(createRandomVersionNode()).build();
state = ClusterState.builder(state).nodes(newNodes).build();
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave
updateNodes(state, clusterStateServiceMap, MockIndicesService::new);
}

// Log the shard versions (for debugging if necessary)
for (ObjectCursor<DiscoveryNode> cursor : state.nodes().getDataNodes().values()) {
Version nodeVer = cursor.value.getVersion();
logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer);
}

// randomly create some indices
logger.info("--> creating some indices");
for (int i = 0; i < randomIntBetween(2, 5); i++) {
String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT);
Settings.Builder settingsBuilder = Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3))
.put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3))
.put("index.routing.allocation.total_shards_per_node", 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this :)

CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex(name));
}
state = cluster.reroute(state, new ClusterRerouteRequest());

ClusterState previousState = state;
// apply cluster state to nodes (incl. master)
for (DiscoveryNode node : state.nodes()) {
IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not require IndicesClusterStateService, only the ClusterStateChanges class. All the code in this block can go away, it does not add anything to the test.
The test can be put into FailedShardsRoutingTests.

Copy link
Member Author

@dakrone dakrone Jun 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does use the randomInitialClusterState method, which I'm not sure we want to duplicate, is it worth coupling the tests just to put it in the other location? (edit: I misread and thought two methods were used, only one is)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

randomInitialClusterState is 5 lines. I think we can duplicate that :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I moved it.

ClusterState localState = adaptClusterStateToLocalNode(state, node);
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node);
final ClusterChangedEvent event = new ClusterChangedEvent("simulating change", localState, previousLocalState);
indicesClusterStateService.applyClusterState(event);

// check that cluster state has been properly applied to node
assertClusterStateMatchesNodeState(localState, indicesClusterStateService);
}

logger.info("--> starting shards");
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra semicolon

state = cluster.reroute(state, new ClusterRerouteRequest());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reroute happens as part of applyStartedShards in the line above

logger.info("--> starting replicas");
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no guarantee that all replicas are started (as we have throttling). It's good to test the situation where not all replicas are started though, so maybe we can call applyStartedShards a random number of times.

state = cluster.reroute(state, new ClusterRerouteRequest());

logger.info("--> state before failing shards: {}", state);
for (int i = 0; i < randomIntBetween(5, 10); i++) {
for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) {
if (shardRouting.primary() && randomBoolean()) {
ShardRouting replicaToBePromoted = state.getRoutingNodes()
.activeReplicaWithHighestVersion(shardRouting.shardId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're testing the method activeReplicaWithHighestVersion here using the method itself? I see no checks here that the primary is indeed on the node with the highest version. I think for the purpose of the test it is sufficient to check that

  1. if there was at least one active replica while the primary was failed, that a new active primary got assigned
  2. That the new active primary is on a node with higher or equal version than the replicas.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the test to verify candidates using without the activeReplicaWithHighestVersion method

if (replicaToBePromoted != null) {
Version replicaNodeVersion = state.nodes().getDataNodes()
.get(replicaToBePromoted.currentNodeId()).getVersion();
List<FailedShard> shardsToFail = new ArrayList<>();
logger.info("--> found replica that should be promoted: {}", replicaToBePromoted);
logger.info("--> failing shard {}", shardRouting);
shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're testing only one failure at a time.
Instead, the test could select a subset of the primary shards at random (and also a few replica shards) and fail them in one go.

state = cluster.applyFailedShards(state, shardsToFail);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an alternative to explicit shard failing is to remove nodes where the shards are allocated (i.e. when a node disconnects from the cluster).
This would also test the scenario where DiscoveryNode is null in the RoutingNode.

ShardRouting newPrimary = state.routingTable().index(shardRouting.index())
.shard(shardRouting.id()).primaryShard();

assertThat(newPrimary.allocationId().getId(),
equalTo(replicaToBePromoted.allocationId().getId()));
}
}
state = cluster.reroute(state, new ClusterRerouteRequest());
}
}
}

/**
* This test ensures that when a node joins a brand new cluster (different cluster UUID),
* different from the cluster it was previously a part of, the in-memory index data structures
Expand Down Expand Up @@ -388,6 +473,16 @@ protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) {
Version.CURRENT);
}

protected DiscoveryNode createRandomVersionNode(DiscoveryNode.Role... mustHaveRoles) {
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values())));
for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) {
roles.add(mustHaveRole);
}
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null));
}

private static ClusterState adaptClusterStateToLocalNode(ClusterState state, DiscoveryNode node) {
return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build();
}
Expand Down