Skip to content

Commit

Permalink
Cancel replica recovery on another sync option copy found
Browse files Browse the repository at this point in the history
When a replica is initializing from the primary, and we find a better node that has full sync id match, it is better to cancel the existing replica allocation and allocate it to the new node with sync id match (eventually)
  • Loading branch information
kimchy committed Jul 24, 2015
1 parent f8d9de8 commit fc90c2a
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ public enum Reason {
/**
* When a shard moves from started back to initializing, for example, during shadow replica
*/
REINITIALIZED;
REINITIALIZED,
/**
* A better replica location is identified and causes the existing replica allocation to be cancelled.
*/
REALLOCATED_REPLICA;
}

private final Reason reason;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ protected Settings getIndexSettings(String index) {
}); // sort for priority ordering

changed |= primaryShardAllocator.allocateUnassigned(allocation);
changed |= replicaShardAllocator.processExistingRecoveries(allocation);
changed |= replicaShardAllocator.allocateUnassigned(allocation);
return changed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
Expand All @@ -40,7 +40,6 @@
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;

import java.util.Iterator;
import java.util.Map;

/**
Expand All @@ -51,6 +50,62 @@ public ReplicaShardAllocator(Settings settings) {
super(settings);
}

/**
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
* match. Today, a better match is one that has full sync id match compared to not having one in
* the previous recovery.
*/
public boolean processExistingRecoveries(RoutingAllocation allocation) {
boolean changed = false;
for (RoutingNodes.RoutingNodesIterator nodes = allocation.routingNodes().nodes(); nodes.hasNext(); ) {
nodes.next();
for (RoutingNodes.RoutingNodeIterator it = nodes.nodeShards(); it.hasNext(); ) {
ShardRouting shard = it.next();
if (shard.primary() == true) {
continue;
}
if (shard.initializing() == false) {
continue;
}
if (shard.relocatingNodeId() != null) {
continue;
}

AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
if (shardStores.hasData() == false) {
logger.trace("{}: fetching new stores for initializing shard", shard);
continue; // still fetching
}

ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard);
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores);
if (primaryStore == null || primaryStore.allocated() == false) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
continue;
}

MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
if (currentNode.equals(nodeWithHighestMatch) == false
&& matchingNodes.isNodeMatchBySyncID(currentNode) == false
&& matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch) == true) {
// we found a better match that has a full sync id match, the existing allocation is not fully synced
// so we found a better one, cancel this one
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]"));
changed = true;
}
}
}
}
return changed;
}

public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
Expand Down Expand Up @@ -236,7 +291,7 @@ public MatchingNodes(ObjectLongMap<DiscoveryNode> nodesToSize) {
highestMatchNode = cursor.key;
}
}
nodeWithHighestMatch = highestMatchNode;
this.nodeWithHighestMatch = highestMatchNode;
}

/**
Expand All @@ -248,6 +303,10 @@ public DiscoveryNode getNodeWithHighestMatch() {
return this.nodeWithHighestMatch;
}

public boolean isNodeMatchBySyncID(DiscoveryNode node) {
return nodesToSize.get(node) == Long.MAX_VALUE;
}

/**
* Did we manage to find any data, regardless how well they matched or not.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void testReasonOrdinalOrder() {
UnassignedInfo.Reason.ALLOCATION_FAILED,
UnassignedInfo.Reason.NODE_LEFT,
UnassignedInfo.Reason.REROUTE_CANCELLED,
UnassignedInfo.Reason.REINITIALIZED};
UnassignedInfo.Reason.REINITIALIZED,
UnassignedInfo.Reason.REALLOCATED_REPLICA};
for (int i = 0; i < order.length; i++) {
assertThat(order[i].ordinal(), equalTo(i));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,39 @@ public void testDelayedAllocation() {
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
}

@Test
public void testCancelRecoveryBetterSyncId() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, false, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node3, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId));
}

@Test
public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node3, false, randomBoolean() ? "MATCH" : "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
}

@Test
public void testNotCancellingRecovery() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
}

private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) {
return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.INDEX_CREATED);
}
Expand All @@ -242,6 +275,25 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide
return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null);
}

private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shardId.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
.addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10))
.addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node2.id(), false, ShardRoutingState.INITIALIZING, 10))
.build())
)
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null);
}

class TestAllocator extends ReplicaShardAllocator {

private Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> data = null;
Expand Down

0 comments on commit fc90c2a

Please sign in to comment.