From fc90c2affdcf7d8303444bf06ad6c70a9a761c00 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 23 Jul 2015 16:13:06 +0200 Subject: [PATCH] Cancel replica recovery on another sync option copy found When a replica is initializing from the primary, and we find a better node that has full sync id match, it is better to cancel the existing replica allocation and allocate it to the new node with sync id match (eventually) --- .../cluster/routing/UnassignedInfo.java | 6 +- .../gateway/GatewayAllocator.java | 1 + .../gateway/ReplicaShardAllocator.java | 65 ++++++++++++++++++- .../cluster/routing/UnassignedInfoTests.java | 3 +- .../gateway/ReplicaShardAllocatorTests.java | 52 +++++++++++++++ 5 files changed, 122 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index c546e250bed25..a09b0349365bf 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -95,7 +95,11 @@ public enum Reason { /** * When a shard moves from started back to initializing, for example, during shadow replica */ - REINITIALIZED; + REINITIALIZED, + /** + * A better replica location is identified and causes the existing replica allocation to be cancelled. + */ + REALLOCATED_REPLICA; } private final Reason reason; diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 4357a81961bcf..7b6da4dae2720 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -126,6 +126,7 @@ protected Settings getIndexSettings(String index) { }); // sort for priority ordering changed |= primaryShardAllocator.allocateUnassigned(allocation); + changed |= replicaShardAllocator.processExistingRecoveries(allocation); changed |= replicaShardAllocator.allocateUnassigned(allocation); return changed; } diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2d0d38d0bd2e6..4bf6be893e3fb 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -25,11 +25,11 @@ import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; @@ -40,7 +40,6 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; -import java.util.Iterator; import java.util.Map; /** @@ -51,6 +50,62 @@ public ReplicaShardAllocator(Settings settings) { super(settings); } + /** + * Process existing recoveries of replicas and see if we need to cancel them if we find a better + * match. Today, a better match is one that has full sync id match compared to not having one in + * the previous recovery. + */ + public boolean processExistingRecoveries(RoutingAllocation allocation) { + boolean changed = false; + for (RoutingNodes.RoutingNodesIterator nodes = allocation.routingNodes().nodes(); nodes.hasNext(); ) { + nodes.next(); + for (RoutingNodes.RoutingNodeIterator it = nodes.nodeShards(); it.hasNext(); ) { + ShardRouting shard = it.next(); + if (shard.primary() == true) { + continue; + } + if (shard.initializing() == false) { + continue; + } + if (shard.relocatingNodeId() != null) { + continue; + } + + AsyncShardFetch.FetchResult shardStores = fetchData(shard, allocation); + if (shardStores.hasData() == false) { + logger.trace("{}: fetching new stores for initializing shard", shard); + continue; // still fetching + } + + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard); + assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; + TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores); + if (primaryStore == null || primaryStore.allocated() == false) { + // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) + // just let the recovery find it out, no need to do anything about it for the initializing shard + logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); + continue; + } + + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores); + if (matchingNodes.getNodeWithHighestMatch() != null) { + DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); + DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); + if (currentNode.equals(nodeWithHighestMatch) == false + && matchingNodes.isNodeMatchBySyncID(currentNode) == false + && matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch) == true) { + // we found a better match that has a full sync id match, the existing allocation is not fully synced + // so we found a better one, cancel this one + it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, + "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]")); + changed = true; + } + } + } + } + return changed; + } + public boolean allocateUnassigned(RoutingAllocation allocation) { boolean changed = false; final RoutingNodes routingNodes = allocation.routingNodes(); @@ -236,7 +291,7 @@ public MatchingNodes(ObjectLongMap nodesToSize) { highestMatchNode = cursor.key; } } - nodeWithHighestMatch = highestMatchNode; + this.nodeWithHighestMatch = highestMatchNode; } /** @@ -248,6 +303,10 @@ public DiscoveryNode getNodeWithHighestMatch() { return this.nodeWithHighestMatch; } + public boolean isNodeMatchBySyncID(DiscoveryNode node) { + return nodesToSize.get(node) == Long.MAX_VALUE; + } + /** * Did we manage to find any data, regardless how well they matched or not. */ diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index f4617a9ee5241..29e46ff1b3641 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -60,7 +60,8 @@ public void testReasonOrdinalOrder() { UnassignedInfo.Reason.ALLOCATION_FAILED, UnassignedInfo.Reason.NODE_LEFT, UnassignedInfo.Reason.REROUTE_CANCELLED, - UnassignedInfo.Reason.REINITIALIZED}; + UnassignedInfo.Reason.REINITIALIZED, + UnassignedInfo.Reason.REALLOCATED_REPLICA}; for (int i = 0; i < order.length; i++) { assertThat(order[i].ordinal(), equalTo(i)); } diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index df78028750bc7..97c7ecdcfd9a9 100644 --- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -219,6 +219,39 @@ public void testDelayedAllocation() { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id())); } + @Test + public void testCancelRecoveryBetterSyncId() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node2, false, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node3, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); + boolean changed = testAllocator.processExistingRecoveries(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); + } + + @Test + public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node3, false, randomBoolean() ? "MATCH" : "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); + boolean changed = testAllocator.processExistingRecoveries(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); + } + + @Test + public void testNotCancellingRecovery() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); + boolean changed = testAllocator.processExistingRecoveries(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); + } + private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) { return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.INDEX_CREATED); } @@ -242,6 +275,25 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null); } + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .add(IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId) + .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10)) + .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node2.id(), false, ShardRoutingState.INITIALIZING, 10)) + .build()) + ) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); + return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null); + } + class TestAllocator extends ReplicaShardAllocator { private Map data = null;