Skip to content

Commit

Permalink
Remove recovery threadpools and throttle outgoing recoveries on the m…
Browse files Browse the repository at this point in the history
…aster

Today we throttle recoveries only for incoming recoveries. Nodes that have a lot
of primaries can get overloaded due to too many recoveries. To still keep that at bay
we limit the number of threads that are sending files to the target to overcome this problem.

The right solution here is to also throttle the outgoing recoveries that are today unbounded on
the master and don't start the recovery until we have enough resources on both source and target nodes.

The concurrency aspects of the recovery source also added a lot of complexity and additional threadpools
that are hard to configure. This commit removes the concurrent streamns notion completely and sends files
in the thread that drives the recovery simplifying the recovery code considerably.
Outgoing recoveries are not throttled on the master via a allocation decider.
  • Loading branch information
s1monw committed Dec 22, 2015
1 parent d353dcb commit f5e4cd4
Show file tree
Hide file tree
Showing 48 changed files with 531 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.ttl.IndicesTTLService;
Expand Down Expand Up @@ -129,7 +128,6 @@ public ClusterModule(Settings settings) {
registerShardsAllocator(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, BalancedShardsAllocator.class);
}


private void registerBuiltinIndexSettings() {
registerIndexDynamicSetting(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
registerIndexDynamicSetting(IndexStore.INDEX_STORE_THROTTLE_TYPE, Validator.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
}
this.allShardsStarted = allShardsStarted;

this.primary = primary;
if (primary != null) {
this.primaryAsList = Collections.singletonList(primary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private int relocatingShards = 0;

private final Map<String, ObjectIntHashMap<String>> nodesPerAttributeNames = new HashMap<>();
private final Map<String, Recoveries> recoveryiesPerNode = new HashMap<>();

public RoutingNodes(ClusterState clusterState) {
this(clusterState, true);
Expand All @@ -91,6 +92,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
// also fill replicaSet information
for (ObjectCursor<IndexRoutingTable> indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable.value) {
assert indexShard.primary != null;
for (ShardRouting shard : indexShard) {
// to get all the shards belonging to an index, including the replicas,
// we define a replica set and keep track of it. A replica set is identified
Expand All @@ -107,16 +109,18 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
// add the counterpart shard with relocatingNodeId reflecting the source from which
// it's relocating from.
ShardRouting targetShardRouting = shard.buildTargetRelocatingShard();
addInitialRecovery(targetShardRouting);
if (readOnly) {
targetShardRouting.freeze();
}
entries.add(targetShardRouting);
assignedShardsAdd(targetShardRouting);
} else if (!shard.active()) { // shards that are initializing without being relocated
} else if (shard.active() == false) { // shards that are initializing without being relocated
if (shard.primary()) {
inactivePrimaryCount++;
}
inactiveShardCount++;
addInitialRecovery(shard);
}
} else {
final ShardRouting sr = getRouting(shard, readOnly);
Expand All @@ -132,6 +136,79 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
}
}

private void addRecovery(ShardRouting routing) {
addRecovery(routing, true, false);
}

private void removeRecovery(ShardRouting routing) {
addRecovery(routing, false, false);
}

public void addInitialRecovery(ShardRouting routing) {
addRecovery(routing,true, true);
}

private void addRecovery(final ShardRouting routing, final boolean increment, final boolean initializing) {
final int howMany = increment ? 1 : -1;
assert routing.initializing() : "routing must be initializing: " + routing;
Recoveries.getOrAdd(recoveryiesPerNode, routing.currentNodeId()).addIncoming(howMany);
final String sourceNodeId;
if (routing.relocatingNodeId() != null) { // this is a relocation-target
sourceNodeId = routing.relocatingNodeId();
if (routing.primary() && increment == false) { // primary is done relocating
int numRecoveringReplicas = 0;
for (ShardRouting assigned : assignedShards(routing)) {
if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) {
numRecoveringReplicas++;
}
}
// we transfer the recoveries to the relocated primary
recoveryiesPerNode.get(sourceNodeId).addOutgoing(-numRecoveringReplicas);
recoveryiesPerNode.get(routing.currentNodeId()).addOutgoing(numRecoveringReplicas);
}
} else if (routing.primary() == false) { // primary without relocationID is initial recovery
ShardRouting primary = findPrimary(routing);
if (primary == null && initializing) {
primary = routingTable.index(routing.index()).shard(routing.shardId().id()).primary;
} else if (primary == null) {
throw new IllegalStateException("replica is initializing but primary is unassigned");
}
sourceNodeId = primary.currentNodeId();
} else {
sourceNodeId = null;
}
if (sourceNodeId != null) {
Recoveries.getOrAdd(recoveryiesPerNode, sourceNodeId).addOutgoing(howMany);
}
}

public int getIncomingRecoveries(String nodeId) {
return recoveryiesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming();
}

public int getOutgoingRecoveries(String nodeId) {
return recoveryiesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getOutgoing();
}

private ShardRouting findPrimary(ShardRouting routing) {
List<ShardRouting> shardRoutings = assignedShards.get(routing.shardId());
ShardRouting primary = null;
if (shardRoutings != null) {
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.primary()) {
if (shardRouting.active()) {
return shardRouting;
} else if (primary == null) {
primary = shardRouting;
} else if (primary.relocatingNodeId() != null) {
primary = shardRouting;
}
}
}
}
return primary;
}

private static ShardRouting getRouting(ShardRouting src, boolean readOnly) {
if (readOnly) {
src.freeze(); // we just freeze and reuse this instance if we are read only
Expand Down Expand Up @@ -352,6 +429,7 @@ public void initialize(ShardRouting shard, String nodeId, long expectedSize) {
if (shard.primary()) {
inactivePrimaryCount++;
}
addRecovery(shard);
assignedShardsAdd(shard);
}

Expand All @@ -367,6 +445,7 @@ public ShardRouting relocate(ShardRouting shard, String nodeId, long expectedSha
ShardRouting target = shard.buildTargetRelocatingShard();
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
addRecovery(target);
return target;
}

Expand All @@ -383,9 +462,12 @@ public void started(ShardRouting shard) {
inactivePrimaryCount--;
}
}
removeRecovery(shard);
shard.moveToStarted();
}



/**
* Cancels a relocation of a shard that shard must relocating.
*/
Expand Down Expand Up @@ -440,6 +522,9 @@ private void remove(ShardRouting shard) {
cancelRelocation(shard);
}
assignedShardsRemove(shard);
if (shard.initializing()) {
removeRecovery(shard);
}
}

private void assignedShardsAdd(ShardRouting shard) {
Expand Down Expand Up @@ -749,6 +834,54 @@ public static boolean assertShardStats(RoutingNodes routingNodes) {
}
}

for (Map.Entry<String, Recoveries> recoveries : routingNodes.recoveryiesPerNode.entrySet()) {
String node = recoveries.getKey();
final Recoveries value = recoveries.getValue();
int incoming = 0;
int outgoing = 0;
RoutingNode routingNode = routingNodes.nodesToShards.get(node);
if (routingNode != null) { // node might have dropped out of the cluster
for (ShardRouting routing : routingNode) {
if (routing.initializing()) {
incoming++;
} else if (routing.relocating()) {
outgoing++;
}
if (routing.primary() && (routing.initializing() && routing.relocatingNodeId() != null) == false) { // we don't count the initialization end of the primary relocation
List<ShardRouting> shardRoutings = routingNodes.assignedShards.get(routing.shardId());
for (ShardRouting assigned : shardRoutings) {
if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) {
outgoing++;
}
}
}
}
}
// if (outgoing != value.outgoing) {
// incoming = 0;
// outgoing = 0;
// for (ShardRouting routing : routingNode) {
// if (routing.initializing()) {
// incoming++;
// } else if (routing.relocating()) {
// outgoing++;
// }
// if (routing.primary() && (routing.initializing() && routing.relocatingNodeId() != null) == false) { // we don't count the initialization end of the primary relocation
// List<ShardRouting> shardRoutings = routingNodes.assignedShards.get(routing.shardId());
// for (ShardRouting assigned : shardRoutings) {
// if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) {
// outgoing++;
// }
// }
// }
// }
// }
assert incoming == value.incoming : incoming + " != " + value.incoming;
assert outgoing == value.outgoing : outgoing + " != " + value.outgoing + " node: " + routingNode;

}


assert unassignedPrimaryCount == routingNodes.unassignedShards.getNumPrimaries() :
"Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" + routingNodes.unassigned().getNumPrimaries() + "]";
assert unassignedIgnoredPrimaryCount == routingNodes.unassignedShards.getNumIgnoredPrimaries() :
Expand Down Expand Up @@ -856,4 +989,41 @@ private void ensureMutable() {
throw new IllegalStateException("can't modify RoutingNodes - readonly");
}
}

private static final class Recoveries {
private static final Recoveries EMPTY = new Recoveries();
private int incoming = 0;
private int outgoing = 0;

int getTotal() {
return incoming + outgoing;
}

void addOutgoing(int howMany) {
assert outgoing + howMany >= 0 : outgoing + howMany+ " must be >= 0";
outgoing += howMany;
}

void addIncoming(int howMany) {
assert incoming + howMany >= 0 : incoming + howMany+ " must be >= 0";
incoming += howMany;
}

int getOutgoing() {
return outgoing;
}

int getIncoming() {
return incoming;
}

public static Recoveries getOrAdd(Map<String, Recoveries> map, String key) {
Recoveries recoveries = map.get(key);
if (recoveries == null) {
recoveries = new Recoveries();
map.put(key, recoveries);
}
return recoveries;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
Expand Down Expand Up @@ -364,35 +365,17 @@ private boolean moveShards(RoutingAllocation allocation) {

private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
final RoutingNodes routingNodes = allocation.routingNodes();
if (routingNodes.unassigned().getNumPrimaries() == 0) {
// move out if we don't have unassigned primaries
return changed;
}

// go over and remove dangling replicas that are initializing for primary shards
List<ShardRouting> shardsToFail = new ArrayList<>();
for (ShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for (ShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary() && routing.initializing()) {
shardsToFail.add(routing);
}
}

}
}
for (ShardRouting shardToFail : shardsToFail) {
changed |= applyFailedShard(allocation, shardToFail, false,
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}

// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
// routingNodes.hasUnassignedPrimaries() will potentially be false

for (ShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
// remove dangling replicas that are initializing for primary shards
changed |= failReplicasForUnassignedPrimary(allocation, shardEntry);
ShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
if (candidate != null) {
IndexMetaData index = allocation.metaData().index(candidate.index());
Expand Down Expand Up @@ -457,6 +440,22 @@ private boolean deassociateDeadNodes(RoutingAllocation allocation) {
return changed;
}

private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting primary) {
List<ShardRouting> replicas = new ArrayList<>();
for (ShardRouting routing : allocation.routingNodes().assignedShards(primary)) {
if (!routing.primary() && routing.initializing()) {
replicas.add(routing);
}
}
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
return changed;
}

private boolean applyStartedShards(RoutingNodes routingNodes, Iterable<? extends ShardRouting> startedShardEntries) {
boolean dirty = false;
// apply shards might be called several times with the same shard, ignore it
Expand Down Expand Up @@ -523,7 +522,6 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
logger.debug("{} ignoring shard failure, unknown index in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}

RoutingNodes routingNodes = allocation.routingNodes();

RoutingNodes.RoutingNodeIterator matchedNode = routingNodes.routingNodeIter(failedShard.currentNodeId());
Expand All @@ -546,7 +544,10 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
logger.debug("{} ignoring shard failure, unknown allocation id in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}

if (failedShard.primary()) {
// fail replicas first otherwise we move RoutingNodes into an inconsistent state
failReplicasForUnassignedPrimary(allocation, failedShard);
}
// replace incoming instance to make sure we work on the latest one. Copy it to maintain information during modifications.
failedShard = new ShardRouting(matchedNode.current());

Expand Down
Loading

0 comments on commit f5e4cd4

Please sign in to comment.