Skip to content

Commit

Permalink
Limit the number of concurrent snapshot file restores per node during…
Browse files Browse the repository at this point in the history
… recoveries

Today we limit the max number of concurrent snapshot file restores
per recovery. This works well when the default
node_concurrent_recoveries is used (which is 2). When this limit is
increased, it is possible to exahust the underlying repository
connection pool, affecting other workloads.

This commit adds a new setting
`indices.recovery.max_concurrent_snapshot_file_downloads_per_node` that
allows to limit the max number of snapshot file downloads per node
during recoveries. When a recovery starts in the target node it tries
to acquire a permit that allows it to download snapshot files when it is
granted. This is communicated to the source node in the
StartRecoveryRequest. This is a rather conservative approach since it is
possible that a recovery that gets a permit to use snapshot files
doesn't recover any snapshot file while there's a concurrent recovery
that doesn't get a permit could take advantage of recovering from a
snapshot.

Closes #79044
  • Loading branch information
fcofdez committed Oct 18, 2021
1 parent fceacfe commit 8a8b13d
Show file tree
Hide file tree
Showing 29 changed files with 587 additions and 86 deletions.
7 changes: 7 additions & 0 deletions docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,10 @@ sent in parallel to the target node for each recovery. Defaults to `5`.
+
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

`indices.recovery.max_concurrent_snapshot_file_downloads_per_node`::
(<<cluster-update-settings,Dynamic>>, Expert) Number of snapshot file downloads requests
execyted in parallel in the target node for all recoveries. Defaults to `25`.
+
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.Mapping;

import java.util.concurrent.Semaphore;

/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
Expand Down Expand Up @@ -106,30 +105,4 @@ protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListe
client.execute(AutoPutMappingAction.INSTANCE, putMappingRequest,
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
}

static class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySnapshotFileDownloadsThrottler;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.monitor.fs.FsHealthService;
Expand Down Expand Up @@ -216,6 +217,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
RecoverySnapshotFileDownloadsThrottler.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import java.util.concurrent.Semaphore;

public class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

public AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

public void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,21 @@ public static class Actions {
private final SnapshotFilesProvider snapshotFilesProvider;

private final RecoveriesCollection onGoingRecoveries;
private final RecoverySnapshotFileDownloadsThrottler recoverySnapshotFileDownloadsThrottler;

public PeerRecoveryTargetService(ThreadPool threadPool,
TransportService transportService,
RecoverySettings recoverySettings,
ClusterService clusterService,
SnapshotFilesProvider snapshotFilesProvider) {
SnapshotFilesProvider snapshotFilesProvider,
RecoverySnapshotFileDownloadsThrottler recoverySnapshotFileDownloadsThrottler) {
this.threadPool = threadPool;
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.snapshotFilesProvider = snapshotFilesProvider;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
this.recoverySnapshotFileDownloadsThrottler = recoverySnapshotFileDownloadsThrottler;

transportService.registerRequestHandler(Actions.FILES_INFO, ThreadPool.Names.GENERIC, RecoveryFilesInfoRequest::new,
new FilesInfoRequestHandler());
Expand Down Expand Up @@ -138,9 +141,17 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}

public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
final Releasable snapshotFileDownloadsPermit =
recoverySnapshotFileDownloadsThrottler.tryAcquire(recoverySettings.getMaxConcurrentSnapshotFileDownloads());
// create a new recovery status, and process...
final long recoveryId =
onGoingRecoveries.startRecovery(indexShard, sourceNode, snapshotFilesProvider, listener, recoverySettings.activityTimeout());
final long recoveryId = onGoingRecoveries.startRecovery(
indexShard,
sourceNode,
snapshotFilesProvider,
listener,
recoverySettings.activityTimeout(),
snapshotFileDownloadsPermit
);
// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
// assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
threadPool.generic().execute(new RecoveryRunner(recoveryId));
Expand Down Expand Up @@ -267,7 +278,9 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
startingSeqNo);
startingSeqNo,
recoveryTarget.hasPermitToDownloadSnapshotFiles()
);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -54,8 +56,10 @@ public long startRecovery(IndexShard indexShard,
DiscoveryNode sourceNode,
SnapshotFilesProvider snapshotFilesProvider,
PeerRecoveryTargetService.RecoveryListener listener,
TimeValue activityTimeout) {
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener);
TimeValue activityTimeout,
@Nullable Releasable snapshotFileDownloadsPermit) {
RecoveryTarget recoveryTarget =
new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class RecoverySettings {
public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0;
public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0;
public static final Version SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION = Version.V_8_0_0;

private static final Logger logger = LogManager.getLogger(RecoverySettings.class);

Expand Down Expand Up @@ -161,7 +162,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;
private volatile boolean useSnapshotsDuringRecovery;
private volatile int maxConcurrentSnapshotFileDownloads;
private volatile int getMaxConcurrentSnapshotFileDownloads;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

Expand All @@ -185,7 +186,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}
this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings);
this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);
this.getMaxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);

Expand All @@ -201,7 +202,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, this::setUseSnapshotsDuringRecovery);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
this::setMaxConcurrentSnapshotFileDownloads);
this::setGetMaxConcurrentSnapshotFileDownloads);
}

public RateLimiter rateLimiter() {
Expand Down Expand Up @@ -297,10 +298,10 @@ private void setUseSnapshotsDuringRecovery(boolean useSnapshotsDuringRecovery) {
}

public int getMaxConcurrentSnapshotFileDownloads() {
return maxConcurrentSnapshotFileDownloads;
return getMaxConcurrentSnapshotFileDownloads;
}

public void setMaxConcurrentSnapshotFileDownloads(int maxConcurrentSnapshotFileDownloads) {
this.maxConcurrentSnapshotFileDownloads = maxConcurrentSnapshotFileDownloads;
public void setGetMaxConcurrentSnapshotFileDownloads(int getMaxConcurrentSnapshotFileDownloads) {
this.getMaxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices.recovery;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

public class RecoverySnapshotFileDownloadsThrottler {
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE =
Setting.intSetting("indices.recovery.max_concurrent_snapshot_file_downloads_per_node",
25,
1,
25,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final AdjustableSemaphore semaphore;

public RecoverySnapshotFileDownloadsThrottler(Settings settings, ClusterSettings clusterSettings) {
int maxSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings);
this.semaphore = new AdjustableSemaphore(maxSnapshotFileDownloadsPerNode, true);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE, this::updateMaxPermits);
}

@Nullable
public Releasable tryAcquire(int count) {
if (semaphore.tryAcquire(count)) {
return Releasables.releaseOnce(() -> semaphore.release(count));
}

return null;
}

private void updateMaxPermits(int updatedMaxConcurrentSnapshotFileDownloadsPerNode) {
semaphore.setMaxPermits(updatedMaxConcurrentSnapshotFileDownloadsPerNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,15 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
}
if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {
cancellableThreads.checkForCancel();
boolean canUseSnapshots = useSnapshots && request.hasPermitsToDownloadSnapshotFiles();
recoveryPlannerService.computeRecoveryPlan(shard.shardId(),
shardStateIdentifier,
recoverySourceMetadata,
request.metadataSnapshot(),
startingSeqNo,
translogOps.getAsInt(),
getRequest().targetNode().getVersion(),
useSnapshots,
canUseSnapshots,
ActionListener.wrap(plan ->
recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, listener), listener::onFailure)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final SnapshotFilesProvider snapshotFilesProvider;
@Nullable
private final Releasable snapshotFileDownloadsPermit;
private final MultiFileWriter multiFileWriter;
private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
private final Store store;
Expand All @@ -89,11 +91,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param snapshotFileDownloadsPermit a permit that allows to download files from a snapshot,
* limiting the concurrent snapshot file downloads per node
* preventing the exhaustion of repository resources.
* @param listener called when recovery is completed/failed
*/
public RecoveryTarget(IndexShard indexShard,
DiscoveryNode sourceNode,
SnapshotFilesProvider snapshotFilesProvider,
@Nullable Releasable snapshotFileDownloadsPermit,
PeerRecoveryTargetService.RecoveryListener listener) {
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
Expand All @@ -102,6 +108,7 @@ public RecoveryTarget(IndexShard indexShard,
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.snapshotFilesProvider = snapshotFilesProvider;
this.snapshotFileDownloadsPermit = snapshotFileDownloadsPermit;
this.shardId = indexShard.shardId();
final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger,
Expand All @@ -118,7 +125,7 @@ public RecoveryTarget(IndexShard indexShard,
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener);
return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener);
}

@Nullable
Expand Down Expand Up @@ -151,6 +158,10 @@ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

public boolean hasPermitToDownloadSnapshotFiles() {
return snapshotFileDownloadsPermit != null;
}

/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
if (recoveryMonitorEnabled) {
Expand Down Expand Up @@ -288,6 +299,13 @@ protected void closeInternal() {
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
closedLatch.countDown();
releaseSnapshotFileDownloadsPermit();
}
}

private void releaseSnapshotFileDownloadsPermit() {
if (snapshotFileDownloadsPermit != null) {
snapshotFileDownloadsPermit.close();
}
}

Expand Down Expand Up @@ -506,6 +524,8 @@ public void restoreFileFromSnapshot(String repository,
IndexId indexId,
BlobStoreIndexShardSnapshot.FileInfo fileInfo,
ActionListener<Void> listener) {
assert hasPermitToDownloadSnapshotFiles();

try (InputStream inputStream =
snapshotFilesProvider.getInputStreamForSnapshotFile(repository, indexId, shardId, fileInfo, this::registerThrottleTime)) {
StoreFileMetadata metadata = fileInfo.metadata();
Expand Down
Loading

0 comments on commit 8a8b13d

Please sign in to comment.