From 2b4fe8fc7b00a9e88fb8d389bbc2aa1bb3d5c412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Mon, 18 Oct 2021 18:17:27 +0200 Subject: [PATCH] Limit concurrent snapshot file restores in recovery per node (#79316) Today we limit the max number of concurrent snapshot file restores per recovery. This works well when the default node_concurrent_recoveries is used (which is 2). When this limit is increased, it is possible to exhaust the underlying repository connection pool, affecting other workloads. This commit adds a new setting `indices.recovery.max_concurrent_snapshot_file_downloads_per_node` that allows to limit the max number of snapshot file downloads per node during recoveries. When a recovery starts in the target node it tries to acquire a permit that allows it to download snapshot files when it is granted. This is communicated to the source node in the StartRecoveryRequest. This is a rather conservative approach since it is possible that a recovery that gets a permit to use snapshot files doesn't recover any snapshot file while there's a concurrent recovery that doesn't get a permit could take advantage of recovering from a snapshot. Closes #79044 --- .../modules/indices/recovery.asciidoc | 7 + .../SnapshotBasedIndexRecoveryIT.java | 272 +++++++++++++++++- .../action/index/MappingUpdatedAction.java | 29 +- .../common/settings/ClusterSettings.java | 1 + .../util/concurrent/AdjustableSemaphore.java | 37 +++ .../recovery/PeerRecoveryTargetService.java | 15 +- .../recovery/RecoveriesCollection.java | 8 +- .../indices/recovery/RecoverySettings.java | 84 +++++- .../recovery/RecoverySourceHandler.java | 3 +- .../indices/recovery/RecoveryTarget.java | 22 +- .../recovery/StartRecoveryRequest.java | 34 ++- .../index/MappingUpdatedActionTests.java | 2 +- .../IndexLevelReplicationTests.java | 4 +- .../RecoveryDuringReplicationTests.java | 6 +- .../index/shard/IndexShardTests.java | 6 +- .../PeerRecoverySourceServiceTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 41 ++- .../recovery/RecoverySettingsTests.java | 87 ++++++ .../recovery/RecoverySourceHandlerTests.java | 4 +- .../indices/recovery/RecoveryTests.java | 4 +- .../recovery/StartRecoveryRequestTests.java | 3 +- .../recovery/RecoveriesCollectionTests.java | 9 +- .../ESIndexLevelReplicationTestCase.java | 2 +- .../index/shard/IndexShardTestCase.java | 2 +- .../ShardFollowTaskReplicationTests.java | 2 +- 25 files changed, 612 insertions(+), 74 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/util/concurrent/AdjustableSemaphore.java create mode 100644 server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java diff --git a/docs/reference/modules/indices/recovery.asciidoc b/docs/reference/modules/indices/recovery.asciidoc index 1993d3da5db37..970c1b56af987 100644 --- a/docs/reference/modules/indices/recovery.asciidoc +++ b/docs/reference/modules/indices/recovery.asciidoc @@ -100,3 +100,10 @@ sent in parallel to the target node for each recovery. Defaults to `5`. + Do not increase this setting without carefully verifying that your cluster has the resources available to handle the extra load that will result. + +`indices.recovery.max_concurrent_snapshot_file_downloads_per_node`:: +(<>, Expert) Number of snapshot file downloads requests +execyted in parallel in the target node for all recoveries. Defaults to `25`. ++ +Do not increase this setting without carefully verifying that your cluster has +the resources available to handle the extra load that will result. diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java index 561955271e57d..5dd6d8633d9d7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java @@ -31,11 +31,11 @@ import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.shard.IndexShard; @@ -56,11 +56,15 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.NamedXContentRegistry; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -71,10 +75,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -827,6 +833,266 @@ public void testSeqNoBasedRecoveryIsUsedAfterPrimaryFailOver() throws Exception } } + public void testRecoveryUsingSnapshotsIsThrottledPerNode() throws Exception { + executeRecoveryWithSnapshotFileDownloadThrottled((indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + awaitForRecoverSnapshotFileRequestReceived, + respondToRecoverSnapshotFile) -> { + String indexRecoveredFromSnapshot1 = indices.get(0); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + awaitForRecoverSnapshotFileRequestReceived.run(); + + // Ensure that peer recoveries can make progress without restoring snapshot files + // while the permit is granted to a different recovery + String indexRecoveredFromPeer = indices.get(1); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromPeer) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromPeer); + assertPeerRecoveryDidNotUseSnapshots(indexRecoveredFromPeer, sourceNode, targetNode); + + // let snapshot file restore to proceed + respondToRecoverSnapshotFile.run(); + + ensureGreen(indexRecoveredFromSnapshot1); + + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot1, sourceNode, targetNode); + + for (RecoverySnapshotFileRequest recoverySnapshotFileRequest : recoverySnapshotFileRequests) { + String indexName = recoverySnapshotFileRequest.getShardId().getIndexName(); + assertThat(indexName, is(equalTo(indexRecoveredFromSnapshot1))); + } + + targetMockTransportService.clearAllRules(); + + String indexRecoveredFromSnapshot2 = indices.get(2); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot2) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromSnapshot2); + + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot2, sourceNode, targetNode); + + }); + } + + public void testRecoveryUsingSnapshotsPermitIsReturnedAfterFailureOrCancellation() throws Exception { + executeRecoveryWithSnapshotFileDownloadThrottled((indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + awaitForRecoverSnapshotFileRequestReceived, + respondToRecoverSnapshotFile) -> { + String indexRecoveredFromSnapshot1 = indices.get(0); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + awaitForRecoverSnapshotFileRequestReceived.run(); + + targetMockTransportService.clearAllRules(); + + boolean cancelRecovery = randomBoolean(); + if (cancelRecovery) { + assertAcked(client().admin().indices().prepareDelete(indexRecoveredFromSnapshot1).get()); + + respondToRecoverSnapshotFile.run(); + + assertThat(indexExists(indexRecoveredFromSnapshot1), is(equalTo(false))); + } else { + // Recovery would fail and should release the granted permit and allow other + // recoveries to use snapshots + CountDownLatch cleanFilesRequestReceived = new CountDownLatch(1); + AtomicReference channelRef = new AtomicReference<>(); + targetMockTransportService.addRequestHandlingBehavior(PeerRecoveryTargetService.Actions.CLEAN_FILES, + (handler, request, channel, task) -> { + channelRef.compareAndExchange(null, channel); + cleanFilesRequestReceived.countDown(); + } + ); + + respondToRecoverSnapshotFile.run(); + cleanFilesRequestReceived.await(); + + targetMockTransportService.clearAllRules(); + channelRef.get().sendResponse(new IOException("unable to clean files")); + } + + String indexRecoveredFromSnapshot2 = indices.get(1); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot2) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromSnapshot2); + + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot2, sourceNode, targetNode); + }); + } + + public void testRecoveryReEstablishKeepsTheGrantedSnapshotFileDownloadPermit() throws Exception { + executeRecoveryWithSnapshotFileDownloadThrottled((indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + awaitForRecoverSnapshotFileRequestReceived, + respondToRecoverSnapshotFile) -> { + AtomicReference startRecoveryConnection = new AtomicReference<>(); + CountDownLatch reestablishRecoverySent = new CountDownLatch(1); + targetMockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoverySourceService.Actions.START_RECOVERY)) { + startRecoveryConnection.compareAndExchange(null, connection); + } else if (action.equals(PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY)) { + reestablishRecoverySent.countDown(); + } + connection.sendRequest(requestId, action, request, options); + }); + + String indexRecoveredFromSnapshot1 = indices.get(0); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + awaitForRecoverSnapshotFileRequestReceived.run(); + + startRecoveryConnection.get().close(); + + reestablishRecoverySent.await(); + + String indexRecoveredFromPeer = indices.get(1); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromPeer) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromPeer); + assertPeerRecoveryDidNotUseSnapshots(indexRecoveredFromPeer, sourceNode, targetNode); + + respondToRecoverSnapshotFile.run(); + + ensureGreen(indexRecoveredFromSnapshot1); + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot1, sourceNode, targetNode); + + targetMockTransportService.clearAllRules(); + + final String indexRecoveredFromSnapshot2 = indices.get(2); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot2) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromSnapshot2); + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot2, sourceNode, targetNode); + }); + } + + private void executeRecoveryWithSnapshotFileDownloadThrottled(SnapshotBasedRecoveryThrottlingTestCase testCase) throws Exception { + updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), "1"); + updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), "1"); + + try { + List dataNodes = internalCluster().startDataOnlyNodes(2); + List indices = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") + .put("index.routing.allocation.require._name", dataNodes.get(0)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + indices.add(indexName); + } + + String repoName = "repo"; + createRepo(repoName, "fs"); + + for (String indexName : indices) { + int numDocs = randomIntBetween(300, 1000); + indexDocs(indexName, numDocs, numDocs); + + createSnapshot(repoName, "snap-" + indexName, Collections.singletonList(indexName)); + } + + String sourceNode = dataNodes.get(0); + String targetNode = dataNodes.get(1); + MockTransportService targetMockTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, targetNode); + + List recoverySnapshotFileRequests = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch recoverSnapshotFileRequestReceived = new CountDownLatch(1); + CountDownLatch respondToRecoverSnapshotFile = new CountDownLatch(1); + targetMockTransportService.addRequestHandlingBehavior(PeerRecoveryTargetService.Actions.RESTORE_FILE_FROM_SNAPSHOT, + (handler, request, channel, task) -> { + recoverySnapshotFileRequests.add((RecoverySnapshotFileRequest) request); + recoverSnapshotFileRequestReceived.countDown(); + respondToRecoverSnapshotFile.await(); + handler.messageReceived(request, channel, task); + } + ); + + testCase.execute(indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + recoverSnapshotFileRequestReceived::await, + respondToRecoverSnapshotFile::countDown + ); + } finally { + updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), null); + updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), null); + } + } + + interface SnapshotBasedRecoveryThrottlingTestCase { + void execute(List indices, + String sourceNode, + String targetNode, + MockTransportService targetMockTransportService, + List recoverySnapshotFileRequests, + CheckedRunnable awaitForRecoverSnapshotFileRequestReceived, + Runnable respondToRecoverSnapshotFile) throws Exception; + } + + private void assertPeerRecoveryUsedSnapshots(String indexName, String sourceNode, String targetNode) { + RecoveryState recoveryStateIndexRecoveredFromPeer = getLatestPeerRecoveryStateForShard(indexName, 0); + assertPeerRecoveryWasSuccessful(recoveryStateIndexRecoveredFromPeer, sourceNode, targetNode); + assertThat(recoveryStateIndexRecoveredFromPeer.getIndex().recoveredFromSnapshotBytes(), is(greaterThan(0L))); + } + + private void assertPeerRecoveryDidNotUseSnapshots(String indexName, String sourceNode, String targetNode) { + RecoveryState recoveryStateIndexRecoveredFromPeer = getLatestPeerRecoveryStateForShard(indexName, 0); + assertPeerRecoveryWasSuccessful(recoveryStateIndexRecoveredFromPeer, sourceNode, targetNode); + assertThat(recoveryStateIndexRecoveredFromPeer.getIndex().recoveredFromSnapshotBytes(), is(equalTo(0L))); + } + private Store.MetadataSnapshot getMetadataSnapshot(String nodeName, String indexName) throws IOException { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); @@ -926,7 +1192,7 @@ private void assertSearchResponseContainsAllIndexedDocs(SearchResponse searchRes } } - private void assertPeerRecoveryWasSuccessful(RecoveryState recoveryState, String sourceNode, String targetNode) throws Exception { + private void assertPeerRecoveryWasSuccessful(RecoveryState recoveryState, String sourceNode, String targetNode) { assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE)); assertThat(recoveryState.getRecoverySource(), equalTo(RecoverySource.PeerRecoverySource.INSTANCE)); diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 06e345c1aa2ff..0b54eb9ef0af6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -19,14 +19,13 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AdjustableSemaphore; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.Mapping; -import java.util.concurrent.Semaphore; - /** * Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated * in the cluster state meta data (and broadcast to all members). @@ -106,30 +105,4 @@ protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListe client.execute(AutoPutMappingAction.INSTANCE, putMappingRequest, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)); } - - static class AdjustableSemaphore extends Semaphore { - - private final Object maxPermitsMutex = new Object(); - private int maxPermits; - - AdjustableSemaphore(int maxPermits, boolean fair) { - super(maxPermits, fair); - this.maxPermits = maxPermits; - } - - void setMaxPermits(int permits) { - synchronized (maxPermitsMutex) { - final int diff = Math.subtractExact(permits, maxPermits); - if (diff > 0) { - // add permits - release(diff); - } else if (diff < 0) { - // remove permits - reducePermits(Math.negateExact(diff)); - } - - maxPermits = permits; - } - } - } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 50eccb7e1fdaa..a1f2606782f19 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -216,6 +216,7 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS, + RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AdjustableSemaphore.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AdjustableSemaphore.java new file mode 100644 index 0000000000000..b89378cd0a6d9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AdjustableSemaphore.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.concurrent; + +import java.util.concurrent.Semaphore; + +public class AdjustableSemaphore extends Semaphore { + + private final Object maxPermitsMutex = new Object(); + private int maxPermits; + + public AdjustableSemaphore(int maxPermits, boolean fair) { + super(maxPermits, fair); + this.maxPermits = maxPermits; + } + + public void setMaxPermits(int permits) { + synchronized (maxPermitsMutex) { + final int diff = Math.subtractExact(permits, maxPermits); + if (diff > 0) { + // add permits + release(diff); + } else if (diff < 0) { + // remove permits + reducePermits(Math.negateExact(diff)); + } + + maxPermits = permits; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 9153021c61a2f..471e93451532d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -138,9 +138,16 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) { + final Releasable snapshotFileDownloadsPermit = recoverySettings.tryAcquireSnapshotDownloadPermits(); // create a new recovery status, and process... - final long recoveryId = - onGoingRecoveries.startRecovery(indexShard, sourceNode, snapshotFilesProvider, listener, recoverySettings.activityTimeout()); + final long recoveryId = onGoingRecoveries.startRecovery( + indexShard, + sourceNode, + snapshotFilesProvider, + listener, + recoverySettings.activityTimeout(), + snapshotFileDownloadsPermit + ); // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause // assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool. threadPool.generic().execute(new RecoveryRunner(recoveryId)); @@ -267,7 +274,9 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId(), - startingSeqNo); + startingSeqNo, + recoveryTarget.hasPermitToDownloadSnapshotFiles() + ); return request; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 0646949c08c0f..7b2eb74bbacbc 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -14,6 +14,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -54,8 +56,10 @@ public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, SnapshotFilesProvider snapshotFilesProvider, PeerRecoveryTargetService.RecoveryListener listener, - TimeValue activityTimeout) { - RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener); + TimeValue activityTimeout, + @Nullable Releasable snapshotFileDownloadsPermit) { + RecoveryTarget recoveryTarget = + new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener); startRecoveryInternal(recoveryTarget, activityTimeout); return recoveryTarget.recoveryId(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 7d0c7669b0f8a..dfd94131cc2ab 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -20,17 +20,29 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.AdjustableSemaphore; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.jdk.JavaVersion; import org.elasticsearch.monitor.os.OsProbe; import org.elasticsearch.node.NodeRoleSettings; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.stream.Collectors; +import static org.elasticsearch.common.settings.Setting.parseInt; + public class RecoverySettings { public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0; public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0; + public static final Version SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION = Version.V_8_0_0; private static final Logger logger = LogManager.getLogger(RecoverySettings.class); @@ -134,7 +146,7 @@ public class RecoverySettings { /** * recoveries would try to use files from available snapshots instead of sending them from the source node. - * defaults to `false` + * defaults to `true` */ public static final Setting INDICES_RECOVERY_USE_SNAPSHOTS_SETTING = Setting.boolSetting("indices.recovery.use_snapshots", true, Property.Dynamic, Property.NodeScope); @@ -148,6 +160,43 @@ public class RecoverySettings { Property.NodeScope ); + public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE = new Setting<>( + "indices.recovery.max_concurrent_snapshot_file_downloads_per_node", + "25", + (s) -> parseInt(s, 1, 25, "indices.recovery.max_concurrent_snapshot_file_downloads_per_node", false), + new Setting.Validator<>() { + private final Collection> dependencies = + Collections.singletonList(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS); + @Override + public void validate(Integer value) { + // ignore + } + + @Override + public void validate(Integer maxConcurrentSnapshotFileDownloadsPerNode, Map, Object> settings) { + int maxConcurrentSnapshotFileDownloads = (int) settings.get(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS); + if (maxConcurrentSnapshotFileDownloadsPerNode < maxConcurrentSnapshotFileDownloads) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, + "[%s]=%d is less than [%s]=%d", + INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), + maxConcurrentSnapshotFileDownloadsPerNode, + INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), + maxConcurrentSnapshotFileDownloads + ) + ); + } + } + + @Override + public Iterator> settings() { + return dependencies.iterator(); + } + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); private volatile ByteSizeValue maxBytesPerSec; @@ -162,6 +211,9 @@ public class RecoverySettings { private volatile TimeValue internalActionLongTimeout; private volatile boolean useSnapshotsDuringRecovery; private volatile int maxConcurrentSnapshotFileDownloads; + private volatile int maxConcurrentSnapshotFileDownloadsPerNode; + + private final AdjustableSemaphore maxSnapshotFileDownloadsPerNodeSemaphore; private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; @@ -186,6 +238,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings); this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings); + this.maxConcurrentSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings); + this.maxSnapshotFileDownloadsPerNodeSemaphore = new AdjustableSemaphore(this.maxConcurrentSnapshotFileDownloadsPerNode, true); logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); @@ -202,6 +256,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, this::setUseSnapshotsDuringRecovery); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS, this::setMaxConcurrentSnapshotFileDownloads); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE, + this::setMaxConcurrentSnapshotFileDownloadsPerNode); } public RateLimiter rateLimiter() { @@ -303,4 +359,30 @@ public int getMaxConcurrentSnapshotFileDownloads() { public void setMaxConcurrentSnapshotFileDownloads(int maxConcurrentSnapshotFileDownloads) { this.maxConcurrentSnapshotFileDownloads = maxConcurrentSnapshotFileDownloads; } + + private void setMaxConcurrentSnapshotFileDownloadsPerNode(int maxConcurrentSnapshotFileDownloadsPerNode) { + this.maxConcurrentSnapshotFileDownloadsPerNode = maxConcurrentSnapshotFileDownloadsPerNode; + this.maxSnapshotFileDownloadsPerNodeSemaphore.setMaxPermits(maxConcurrentSnapshotFileDownloadsPerNode); + } + + @Nullable + Releasable tryAcquireSnapshotDownloadPermits() { + final int maxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads(); + final boolean permitAcquired = maxSnapshotFileDownloadsPerNodeSemaphore.tryAcquire(maxConcurrentSnapshotFileDownloads); + if (getUseSnapshotsDuringRecovery() == false || permitAcquired == false) { + if (permitAcquired == false) { + logger.warn(String.format(Locale.ROOT, + "Unable to acquire permit to use snapshot files during recovery, " + + "this recovery will recover index files from the source node. " + + "Ensure snapshot files can be used during recovery by setting [%s] to be no greater than [%d]", + INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), + this.maxConcurrentSnapshotFileDownloadsPerNode + ) + ); + } + return null; + } + + return Releasables.releaseOnce(() -> maxSnapshotFileDownloadsPerNodeSemaphore.release(maxConcurrentSnapshotFileDownloads)); + } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 145d7a88e23d2..a37af5f6928be 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -486,6 +486,7 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { cancellableThreads.checkForCancel(); + final boolean canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles(); recoveryPlannerService.computeRecoveryPlan(shard.shardId(), shardStateIdentifier, recoverySourceMetadata, @@ -493,7 +494,7 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A startingSeqNo, translogOps.getAsInt(), getRequest().targetNode().getVersion(), - useSnapshots, + canUseSnapshots, ActionListener.wrap(plan -> recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, listener), listener::onFailure) ); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 212c221a4786e..1a8e29e48c2de 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -67,6 +67,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final IndexShard indexShard; private final DiscoveryNode sourceNode; private final SnapshotFilesProvider snapshotFilesProvider; + @Nullable // if we're not downloading files from snapshots in this recovery + private final Releasable snapshotFileDownloadsPermit; private final MultiFileWriter multiFileWriter; private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker(); private final Store store; @@ -89,11 +91,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget * * @param indexShard local shard where we want to recover to * @param sourceNode source node of the recovery where we recover from + * @param snapshotFileDownloadsPermit a permit that allows to download files from a snapshot, + * limiting the concurrent snapshot file downloads per node + * preventing the exhaustion of repository resources. * @param listener called when recovery is completed/failed */ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, SnapshotFilesProvider snapshotFilesProvider, + @Nullable Releasable snapshotFileDownloadsPermit, PeerRecoveryTargetService.RecoveryListener listener) { this.cancellableThreads = new CancellableThreads(); this.recoveryId = idGenerator.incrementAndGet(); @@ -102,6 +108,7 @@ public RecoveryTarget(IndexShard indexShard, this.indexShard = indexShard; this.sourceNode = sourceNode; this.snapshotFilesProvider = snapshotFilesProvider; + this.snapshotFileDownloadsPermit = snapshotFileDownloadsPermit; this.shardId = indexShard.shardId(); final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + "."; this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger, @@ -118,7 +125,7 @@ public RecoveryTarget(IndexShard indexShard, * @return a copy of this recovery target */ public RecoveryTarget retryCopy() { - return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener); + return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener); } @Nullable @@ -151,6 +158,10 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } + public boolean hasPermitToDownloadSnapshotFiles() { + return snapshotFileDownloadsPermit != null; + } + /** return the last time this RecoveryStatus was used (based on System.nanoTime() */ public long lastAccessTime() { if (recoveryMonitorEnabled) { @@ -288,6 +299,13 @@ protected void closeInternal() { store.decRef(); indexShard.recoveryStats().decCurrentAsTarget(); closedLatch.countDown(); + releaseSnapshotFileDownloadsPermit(); + } + } + + private void releaseSnapshotFileDownloadsPermit() { + if (snapshotFileDownloadsPermit != null) { + snapshotFileDownloadsPermit.close(); } } @@ -506,6 +524,8 @@ public void restoreFileFromSnapshot(String repository, IndexId indexId, BlobStoreIndexShardSnapshot.FileInfo fileInfo, ActionListener listener) { + assert hasPermitToDownloadSnapshotFiles(); + try (InputStream inputStream = snapshotFilesProvider.getInputStreamForSnapshotFile(repository, indexId, shardId, fileInfo, this::registerThrottleTime)) { StoreFileMetadata metadata = fileInfo.metadata(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index bb90af079b316..5731ab3987e2c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -31,6 +31,7 @@ public class StartRecoveryRequest extends TransportRequest { private Store.MetadataSnapshot metadataSnapshot; private boolean primaryRelocation; private long startingSeqNo; + private boolean canDownloadSnapshotFiles; public StartRecoveryRequest(StreamInput in) throws IOException { super(in); @@ -42,19 +43,25 @@ public StartRecoveryRequest(StreamInput in) throws IOException { metadataSnapshot = new Store.MetadataSnapshot(in); primaryRelocation = in.readBoolean(); startingSeqNo = in.readLong(); + if (in.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION)) { + canDownloadSnapshotFiles = in.readBoolean(); + } else { + canDownloadSnapshotFiles = true; + } } /** * Construct a request for starting a peer recovery. * - * @param shardId the shard ID to recover - * @param targetAllocationId the allocation id of the target shard - * @param sourceNode the source node to remover from - * @param targetNode the target node to recover to - * @param metadataSnapshot the Lucene metadata - * @param primaryRelocation whether or not the recovery is a primary relocation - * @param recoveryId the recovery ID - * @param startingSeqNo the starting sequence number + * @param shardId the shard ID to recover + * @param targetAllocationId the allocation id of the target shard + * @param sourceNode the source node to remover from + * @param targetNode the target node to recover to + * @param metadataSnapshot the Lucene metadata + * @param primaryRelocation whether or not the recovery is a primary relocation + * @param recoveryId the recovery ID + * @param startingSeqNo the starting sequence number + * @param canDownloadSnapshotFiles flag that indicates if the snapshot files can be downloaded */ public StartRecoveryRequest(final ShardId shardId, final String targetAllocationId, @@ -63,7 +70,8 @@ public StartRecoveryRequest(final ShardId shardId, final Store.MetadataSnapshot metadataSnapshot, final boolean primaryRelocation, final long recoveryId, - final long startingSeqNo) { + final long startingSeqNo, + final boolean canDownloadSnapshotFiles) { this.recoveryId = recoveryId; this.shardId = shardId; this.targetAllocationId = targetAllocationId; @@ -72,6 +80,7 @@ public StartRecoveryRequest(final ShardId shardId, this.metadataSnapshot = metadataSnapshot; this.primaryRelocation = primaryRelocation; this.startingSeqNo = startingSeqNo; + this.canDownloadSnapshotFiles = canDownloadSnapshotFiles; assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || metadataSnapshot.getHistoryUUID() != null : "starting seq no is set but not history uuid"; } @@ -108,6 +117,10 @@ public long startingSeqNo() { return startingSeqNo; } + public boolean canDownloadSnapshotFiles() { + return canDownloadSnapshotFiles; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -119,5 +132,8 @@ public void writeTo(StreamOutput out) throws IOException { metadataSnapshot.writeTo(out); out.writeBoolean(primaryRelocation); out.writeLong(startingSeqNo); + if (out.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION)) { + out.writeBoolean(canDownloadSnapshotFiles); + } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java index ae229c6df6444..5f6c003bee155 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java @@ -16,7 +16,7 @@ import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction.AdjustableSemaphore; +import org.elasticsearch.common.util.concurrent.AdjustableSemaphore; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index f8c49e24a0530..1e68fa3434fd6 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -116,7 +116,7 @@ public void run() { thread.start(); IndexShard replica = shards.addReplica(); Future future = shards.asyncRecoverReplica(replica, - (indexShard, node) -> new RecoveryTarget(indexShard, node, null, recoveryListener) { + (indexShard, node) -> new RecoveryTarget(indexShard, node, null, null, recoveryListener) { @Override public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata, ActionListener listener) { @@ -193,7 +193,7 @@ public IndexResult index(Index op) throws IOException { thread.start(); IndexShard replica = shards.addReplica(); Future fut = shards.asyncRecoverReplica(replica, - (shard, node) -> new RecoveryTarget(shard, node, null, recoveryListener) { + (shard, node) -> new RecoveryTarget(shard, node, null, null, recoveryListener) { @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { try { diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index c32db56d83a9e..4a9d5aca9fa2b 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -423,7 +423,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { AtomicBoolean recoveryDone = new AtomicBoolean(false); final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { recoveryStart.countDown(); - return new RecoveryTarget(indexShard, node, null, recoveryListener) { + return new RecoveryTarget(indexShard, node, null, null, recoveryListener) { @Override public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { recoveryDone.set(true); @@ -478,7 +478,7 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { final IndexShard replica = shards.addReplica(); final Future recoveryFuture = shards.asyncRecoverReplica( replica, - (indexShard, node) -> new RecoveryTarget(indexShard, node, null, recoveryListener) { + (indexShard, node) -> new RecoveryTarget(indexShard, node, null, null, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -743,7 +743,7 @@ public static class BlockingTarget extends RecoveryTarget { public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, Logger logger) { - super(shard, sourceNode, null, listener); + super(shard, sourceNode, null, null, listener); this.recoveryBlocked = recoveryBlocked; this.releaseRecovery = releaseRecovery; this.stageToBlock = stageToBlock; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b100d61f1fbd4..e4f0bc9253460 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2536,7 +2536,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}"); IndexShard replica = newShard(primary.shardId(), false, "n2", metadata, null); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, null, recoveryListener) { + new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -2643,7 +2643,7 @@ public void testShardActiveDuringPeerRecovery() throws IOException { // Shard is still inactive since we haven't started recovering yet assertFalse(replica.isActive()); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, null, recoveryListener) { + new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -2702,7 +2702,7 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); assertListenerCalled.accept(replica); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, null, recoveryListener) { + new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) { // we're only checking that listeners are called when the engine is open, before there is no point @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 1a91d467df12a..32272130bd617 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -40,7 +40,7 @@ public void testDuplicateRecoveries() throws IOException { mock(SnapshotsRecoveryPlannerService.class)); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), - SequenceNumbers.UNASSIGNED_SEQ_NO); + SequenceNumbers.UNASSIGNED_SEQ_NO, true); peerRecoverySourceService.start(); RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class, diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index fe72340870728..bc98497f0bcda 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -31,7 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.NoOpEngine; @@ -47,6 +47,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.xcontent.XContentType; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -64,6 +65,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongConsumer; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -97,7 +99,7 @@ public void testWriteFileChunksConcurrently() throws Exception { final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode)); - final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null); + final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null, null); final PlainActionFuture receiveFileInfoFuture = new PlainActionFuture<>(); recoveryTarget.receiveFileInfo( mdFiles.stream().map(StoreFileMetadata::name).collect(Collectors.toList()), @@ -297,7 +299,7 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception { shard.prepareForIndexRecovery(); long startingSeqNo = shard.recoverLocallyUpToGlobalCheckpoint(); shard.store().markStoreCorrupted(new IOException("simulated")); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null, null); StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(logger, rNode, recoveryTarget, startingSeqNo); assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(request.metadataSnapshot().size(), equalTo(0)); @@ -324,7 +326,7 @@ public void testResetStartRequestIfTranslogIsCorrupted() throws Exception { shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode)); shard.prepareForIndexRecovery(); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null, null); StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode, recoveryTarget, randomNonNegativeLong()); assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); @@ -385,7 +387,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false); recoveryStateIndex.setFileDetailsComplete(); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null); PlainActionFuture writeSnapshotFileFuture = PlainActionFuture.newFuture(); recoveryTarget.restoreFileFromSnapshot(repositoryName, indexId, fileInfo, writeSnapshotFileFuture); @@ -457,7 +459,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false); recoveryStateIndex.setFileDetailsComplete(); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null); String repositoryName = "repo"; IndexId indexId = new IndexId("index", "uuid"); @@ -565,7 +567,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { } }; - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null); String[] fileNamesBeforeRecoveringSnapshotFiles = directory.listAll(); @@ -631,7 +633,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false); recoveryStateIndex.setFileDetailsComplete(); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null); String repository = "repo"; IndexId indexId = new IndexId("index", "uuid"); @@ -664,6 +666,29 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { closeShards(shard); } + public void testSnapshotFileDownloadPermitIsReleasedAfterClosingRecoveryTarget() throws Exception { + DiscoveryNode pNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + DiscoveryNode rNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + + IndexShard shard = newShard(false); + shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode)); + shard.prepareForIndexRecovery(); + + AtomicBoolean snapshotFileDownloadsPermitFlag = new AtomicBoolean(); + Releasable snapshotFileDownloadsPermit = () -> { + assertThat(snapshotFileDownloadsPermitFlag.compareAndSet(false, true), is(equalTo(true))); + }; + RecoveryTarget recoveryTarget = + new RecoveryTarget(shard, null, null, snapshotFileDownloadsPermit, null); + + recoveryTarget.decRef(); + + assertThat(snapshotFileDownloadsPermitFlag.get(), is(equalTo(true))); + closeShards(shard); + } + private Tuple createStoreFileMetadataWithRandomContent(String fileName) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("test", "file", out, 1024)) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java new file mode 100644 index 0000000000000..559da2ad71815 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class RecoverySettingsTests extends ESTestCase { + public void testSnapshotDownloadPermitsAreNotGrantedWhenSnapshotsUseFlagIsFalse() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder() + .put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 5) + .put(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.getKey(), false) + .build(), + clusterSettings + ); + + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(nullValue())); + } + + public void testGrantsSnapshotDownloadPermitsUpToMaxPermits() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 5).build(), + clusterSettings + ); + + Releasable permit = recoverySettings.tryAcquireSnapshotDownloadPermits(); + assertThat(permit, is(notNullValue())); + + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(nullValue())); + + permit.close(); + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(notNullValue())); + } + + public void testSnapshotDownloadPermitCanBeDynamicallyUpdated() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 5).build(), + clusterSettings + ); + + Releasable permit = recoverySettings.tryAcquireSnapshotDownloadPermits(); + assertThat(permit, is(notNullValue())); + + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(nullValue())); + clusterSettings.applySettings( + Settings.builder().put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 10).build() + ); + + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(notNullValue())); + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(nullValue())); + permit.close(); + } + + public void testMaxConcurrentSnapshotFileDownloadsPerNodeIsValidated() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + Settings settings = Settings.builder() + .put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), 10) + .put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 5) + .build(); + IllegalArgumentException exception = + expectThrows(IllegalArgumentException.class, () -> new RecoverySettings(settings, clusterSettings)); + assertThat(exception.getMessage(), + containsString("[indices.recovery.max_concurrent_snapshot_file_downloads_per_node]=5 " + + "is less than [indices.recovery.max_concurrent_snapshot_file_downloads]=10") + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index c5a30e4ef98ae..eea77a8cd121e 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -229,7 +229,9 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException { randomBoolean(), randomNonNegativeLong(), randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? - SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong(), + true + ); } public void testSendSnapshotSendsOps() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 29f82bfc64d38..c88defab8222b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -257,7 +257,7 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception { IndexShard replicaShard = newShard(primaryShard.shardId(), false); updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetadata()); recoverReplica(replicaShard, primaryShard, - (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, recoveryListener) { + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener) { @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { super.prepareForTranslogOperations(totalTranslogOps, listener); @@ -369,7 +369,7 @@ public long addDocument(Iterable doc) throws IOExcepti IndexShard replica = group.addReplica(); expectThrows(Exception.class, () -> group.recoverReplica(replica, (shard, sourceNode) -> { - return new RecoveryTarget(shard, sourceNode, null, new PeerRecoveryTargetService.RecoveryListener() { + return new RecoveryTarget(shard, sourceNode, null, null, new PeerRecoveryTargetService.RecoveryListener() { @Override public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { throw new AssertionError("recovery must fail"); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java index 21763e71753f8..95ecfd71dfcd7 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java @@ -44,7 +44,8 @@ public void testSerialization() throws Exception { randomBoolean(), randomNonNegativeLong(), randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? - SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong(), + randomBoolean()); final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); diff --git a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index dd6f13c8f2796..5cd177f658ede 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -150,6 +150,13 @@ long startRecovery(RecoveriesCollection collection, DiscoveryNode sourceNode, In final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId()); indexShard.markAsRecovering("remote", new RecoveryState(indexShard.routingEntry(), sourceNode, rNode)); indexShard.prepareForIndexRecovery(); - return collection.startRecovery(indexShard, sourceNode, null, listener, timeValue); + return collection.startRecovery( + indexShard, + sourceNode, + null, + listener, + timeValue, + null + ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 1c73e60d653e3..7248bcf10fa1b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -395,7 +395,7 @@ public synchronized boolean removeReplica(IndexShard replica) throws IOException public void recoverReplica(IndexShard replica) throws IOException { recoverReplica(replica, - (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, recoveryListener)); + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener)); } public void recoverReplica(IndexShard replica, BiFunction targetSupplier) diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 36a2f93e63188..cb0ae6021a717 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -582,7 +582,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) { /** recovers a replica from the given primary **/ protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { recoverReplica(replica, primary, - (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, recoveryListener), + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener), true, startReplica); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index e41053e19e0eb..411a9b1aeb20f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -372,7 +372,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { // We need to recover the replica async to release the main thread for the following task to fill missing // operations between the local checkpoint and max_seq_no which the recovering replica is waiting for. recoveryFuture = group.asyncRecoverReplica(newReplica, - (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, null, recoveryListener) {}); + (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, null, null, recoveryListener) {}); } } if (recoveryFuture != null) {