diff --git a/docs/reference/modules/indices/recovery.asciidoc b/docs/reference/modules/indices/recovery.asciidoc index 0ac99b41299ca..5f67a63f437e6 100644 --- a/docs/reference/modules/indices/recovery.asciidoc +++ b/docs/reference/modules/indices/recovery.asciidoc @@ -73,3 +73,18 @@ and may interfere with indexing, search, and other activities in your cluster. Do not increase this setting without carefully verifying that your cluster has the resources available to handle the extra load that will result. +`indices.recovery.use_snapshots`:: +(<>, Expert) Enables snapshot-based peer recoveries. ++ +{es} recovers replicas and relocates primary shards using the _peer recovery_ +process, which involves constructing a new copy of a shard on the target node. +When `indices.recovery.use_snapshots` is `false` {es} will construct this new +copy by transferring the index data from the current primary. When this setting +is `true` {es} will attempt to copy the index data from a recent snapshot +first, and will only copy data from the primary if it cannot identify a +suitable snapshot. ++ +Setting this option to `true` reduces your operating costs if your cluster runs +in an environment where the node-to-node data transfer costs are higher than +the costs of recovering data from a snapshot. It also reduces the amount of +work that the primary must do during a recovery. diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java new file mode 100644 index 0000000000000..16b6ff34a2b0f --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java @@ -0,0 +1,314 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery.plan; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.repositories.ShardSnapshotInfo; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotException; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class ShardSnapshotsServiceIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(FailingRepoPlugin.class); + } + + public static class FailingRepoPlugin extends Plugin implements RepositoryPlugin { + public static final String TYPE = "failingrepo"; + + @Override + public Map getRepositories( + Environment env, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + BigArrays bigArrays, + RecoverySettings recoverySettings + ) { + return Collections.singletonMap( + TYPE, + metadata -> new FailingRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) + ); + } + } + + public static class FailingRepo extends FsRepository { + static final String FAIL_GET_REPOSITORY_DATA_SETTING_KEY = "fail_get_repository_data"; + static final String FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY = "fail_load_shard_snapshot"; + static final String FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY = "fail_load_shard_snapshots"; + + private final boolean failGetRepositoryData; + private final boolean failLoadShardSnapshot; + private final boolean failLoadShardSnapshots; + + public FailingRepo(RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + BigArrays bigArrays, + RecoverySettings recoverySettings) { + super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings); + this.failGetRepositoryData = metadata.settings().getAsBoolean(FAIL_GET_REPOSITORY_DATA_SETTING_KEY, false); + this.failLoadShardSnapshot = metadata.settings().getAsBoolean(FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY, false); + this.failLoadShardSnapshots = metadata.settings().getAsBoolean(FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY, false); + } + + @Override + public void getRepositoryData(ActionListener listener) { + if (failGetRepositoryData) { + listener.onFailure(new IOException("Failure getting repository data")); + return; + } + super.getRepositoryData(listener); + } + + @Override + public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { + if (failLoadShardSnapshot) { + throw new SnapshotException( + metadata.name(), + snapshotId, + "failed to read shard snapshot file for [" + shardContainer.path() + ']', + new FileNotFoundException("unable to find file") + ); + } + return super.loadShardSnapshot(shardContainer, snapshotId); + } + + @Override + public BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots(IndexId indexId, + ShardId shardId, + ShardGeneration shardGen) throws IOException { + if (failLoadShardSnapshots) { + throw new FileNotFoundException("Failed to get blob store index shard snapshots"); + } + return super.getBlobStoreIndexShardSnapshots(indexId, shardId, shardGen); + } + } + + public void testReturnsEmptyListWhenThereAreNotAvailableRepositories() throws Exception { + String indexName = "test"; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); + ShardId shardId = getShardIdForIndex(indexName); + + List shardSnapshotData = getShardSnapshotShard(shardId); + assertThat(shardSnapshotData, is(empty())); + } + + public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception { + final String indexName = "test"; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); + ShardId shardId = getShardIdForIndex(indexName); + + for (int i = 0; i < randomIntBetween(1, 50); i++) { + index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + + String snapshotName = "snap"; + + int numberOfNonEnabledRepos = randomIntBetween(1, 3); + List nonEnabledRepos = new ArrayList<>(); + for (int i = 0; i < numberOfNonEnabledRepos; i++) { + String repositoryName = "non-enabled-repo-" + i; + Path repoPath = randomRepoPath(); + createRepository(repositoryName, "fs", repoPath, false); + createSnapshot(repositoryName, snapshotName, indexName); + nonEnabledRepos.add(repositoryName); + } + + int numberOfRecoveryEnabledRepositories = randomIntBetween(0, 4); + List recoveryEnabledRepos = new ArrayList<>(); + for (int i = 0; i < numberOfRecoveryEnabledRepositories; i++) { + String repositoryName = "repo-" + i; + createRepository(repositoryName, "fs", randomRepoPath(), true); + recoveryEnabledRepos.add(repositoryName); + createSnapshot(repositoryName, snapshotName, indexName); + } + + List shardSnapshotDataForShard = getShardSnapshotShard(shardId); + + assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfRecoveryEnabledRepositories))); + for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) { + assertThat(recoveryEnabledRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true))); + assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0))); + + ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo(); + assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId))); + assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName))); + assertThat(recoveryEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true))); + assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false))); + } + } + + public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Exception { + final String indexName = "test"; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); + ShardId shardId = getShardIdForIndex(indexName); + + for (int i = 0; i < randomIntBetween(1, 50); i++) { + index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + + String snapshotName = "snap"; + + int numberOfFailingRepos = randomIntBetween(1, 3); + List> failingRepos = new ArrayList<>(); + for (int i = 0; i < numberOfFailingRepos; i++) { + String repositoryName = "failing-repo-" + i; + Path repoPath = randomRepoPath(); + createRepository(repositoryName, FailingRepoPlugin.TYPE, repoPath, true); + createSnapshot(repositoryName, snapshotName, indexName); + failingRepos.add(Tuple.tuple(repositoryName, repoPath)); + } + + int numberOfWorkingRepositories = randomIntBetween(0, 4); + List workingRepos = new ArrayList<>(); + for (int i = 0; i < numberOfWorkingRepositories; i++) { + String repositoryName = "repo-" + i; + createRepository(repositoryName, "fs", randomRepoPath(), true); + workingRepos.add(repositoryName); + createSnapshot(repositoryName, snapshotName, indexName); + } + + for (Tuple failingRepo : failingRepos) { + // Update repository settings to fail fetching the repository information at any stage + String repoFailureType = + randomFrom(FailingRepo.FAIL_GET_REPOSITORY_DATA_SETTING_KEY, + FailingRepo.FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY, + FailingRepo.FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY + ); + + assertAcked(client().admin().cluster().preparePutRepository(failingRepo.v1()) + .setType(FailingRepoPlugin.TYPE) + .setVerify(false) + .setSettings(Settings.builder().put(repoFailureType, true).put("location", randomRepoPath())) + ); + } + + List shardSnapshotDataForShard = getShardSnapshotShard(shardId); + + assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfWorkingRepositories))); + for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) { + assertThat(workingRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true))); + assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0))); + + ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo(); + assertThat(shardSnapshotInfo.getShardId(), equalTo(shardId)); + assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), equalTo(snapshotName)); + } + } + + public void testFetchingInformationFromAnIncompatibleMasterNodeReturnsAnEmptyList() { + String indexName = "test"; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); + ShardId shardId = getShardIdForIndex(indexName); + + for (int i = 0; i < randomIntBetween(1, 50); i++) { + index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + + String snapshotName = "snap"; + String repositoryName = "repo"; + createRepository(repositoryName, "fs", randomRepoPath(), true); + createSnapshot(repositoryName, snapshotName, indexName); + + RepositoriesService repositoriesService = internalCluster().getMasterNodeInstance(RepositoriesService.class); + ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class); + ClusterService clusterService = internalCluster().getMasterNodeInstance(ClusterService.class); + ShardSnapshotsService shardSnapshotsService = new ShardSnapshotsService(client(), repositoriesService, threadPool, clusterService) { + @Override + protected boolean masterSupportsFetchingLatestSnapshots() { + return false; + } + }; + + PlainActionFuture> latestSnapshots = PlainActionFuture.newFuture(); + shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, latestSnapshots); + assertThat(latestSnapshots.actionGet(), is(empty())); + } + + private List getShardSnapshotShard(ShardId shardId) throws Exception { + ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService(); + + PlainActionFuture> future = PlainActionFuture.newFuture(); + shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, future); + return future.get(); + } + + private ShardSnapshotsService getShardSnapshotsService() { + RepositoriesService repositoriesService = internalCluster().getMasterNodeInstance(RepositoriesService.class); + ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class); + ClusterService clusterService = internalCluster().getMasterNodeInstance(ClusterService.class); + return new ShardSnapshotsService(client(), repositoriesService, threadPool, clusterService); + } + + private ShardId getShardIdForIndex(String indexName) { + ClusterState state = clusterAdmin().prepareState().get().getState(); + return state.routingTable().index(indexName).shard(0).shardId(); + } + + private void createRepository(String repositoryName, String type, Path location, boolean recoveryEnabledRepo) { + assertAcked(client().admin().cluster().preparePutRepository(repositoryName) + .setType(type) + .setVerify(false) + .setSettings(Settings.builder() + .put("location", location) + .put(BlobStoreRepository.USE_FOR_PEER_RECOVERY_SETTING.getKey(), recoveryEnabledRepo) + ) + ); + } + + private void createSnapshot(String repoName, String snapshotName, String index) { + clusterAdmin() + .prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .setIndices(index) + .get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 87859a0d881de..79206640c95c7 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -230,6 +230,7 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, + RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java b/server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java index 2929196c6bfaa..48f24cf7ac815 100644 --- a/server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java +++ b/server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java @@ -358,4 +358,11 @@ public static List> eagerPartition(List list, int size) { return result; } + + public static List concatLists(List listA, List listB) { + List concatList = new ArrayList<>(listA.size() + listB.size()); + concatList.addAll(listA); + concatList.addAll(listB); + return concatList; + } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 663b79fc7de1b..6e8c3ff95ffbd 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -20,17 +20,18 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -61,14 +62,19 @@ public static class Actions { private final TransportService transportService; private final IndicesService indicesService; private final RecoverySettings recoverySettings; + private final RecoveryPlannerService recoveryPlannerService; final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries(); @Inject - public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService, RecoverySettings recoverySettings) { + public PeerRecoverySourceService(TransportService transportService, + IndicesService indicesService, + RecoverySettings recoverySettings, + RecoveryPlannerService recoveryPlannerService) { this.transportService = transportService; this.indicesService = indicesService; this.recoverySettings = recoverySettings; + this.recoveryPlannerService = recoveryPlannerService; // When the target node wants to start a peer recovery it sends a START_RECOVERY request to the source // node. Upon receiving START_RECOVERY, the source node will initiate the peer recovery. transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new, @@ -318,7 +324,8 @@ private Tuple createRecovery handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks(), - recoverySettings.getMaxConcurrentOperations()); + recoverySettings.getMaxConcurrentOperations(), + recoveryPlannerService); return Tuple.tuple(handler, recoveryTarget); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 2f53423fe470b..1e2c98a99f9ed 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -12,7 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.store.RateLimiter.SimpleRateLimiter; -import org.elasticsearch.jdk.JavaVersion; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -21,6 +21,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.jdk.JavaVersion; import org.elasticsearch.monitor.os.OsProbe; import org.elasticsearch.node.NodeRoleSettings; @@ -28,6 +29,7 @@ import java.util.stream.Collectors; public class RecoverySettings { + public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0; private static final Logger logger = LogManager.getLogger(RecoverySettings.class); @@ -129,6 +131,13 @@ public class RecoverySettings { INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING::get, TimeValue.timeValueSeconds(0), Property.Dynamic, Property.NodeScope); + /** + * recoveries would try to use files from available snapshots instead of sending them from the source node. + * defaults to `false` + */ + public static final Setting INDICES_RECOVERY_USE_SNAPSHOTS_SETTING = + Setting.boolSetting("indices.recovery.use_snapshots", true, Property.Dynamic, Property.NodeScope); + public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); private volatile ByteSizeValue maxBytesPerSec; @@ -141,6 +150,7 @@ public class RecoverySettings { private volatile TimeValue internalActionTimeout; private volatile TimeValue internalActionRetryTimeout; private volatile TimeValue internalActionLongTimeout; + private volatile boolean useSnapshotsDuringRecovery; private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; @@ -163,7 +173,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } else { rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); } - + this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings); logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); @@ -177,6 +187,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING, this::setInternalActionLongTimeout); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, this::setUseSnapshotsDuringRecovery); } public RateLimiter rateLimiter() { @@ -216,7 +227,6 @@ public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests this.chunkSize = chunkSize; } - public void setRetryDelayStateSync(TimeValue retryDelayStateSync) { this.retryDelayStateSync = retryDelayStateSync; } @@ -263,4 +273,12 @@ public int getMaxConcurrentOperations() { private void setMaxConcurrentOperations(int maxConcurrentOperations) { this.maxConcurrentOperations = maxConcurrentOperations; } + + public boolean getUseSnapshotsDuringRecovery() { + return useSnapshotsDuringRecovery; + } + + private void setUseSnapshotsDuringRecovery(boolean useSnapshotsDuringRecovery) { + this.useSnapshotsDuringRecovery = useSnapshotsDuringRecovery; + } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e885bbb61ec53..29b50d059e2ef 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,19 +29,19 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; @@ -54,9 +54,12 @@ import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService; +import org.elasticsearch.indices.recovery.plan.ShardRecoveryPlan; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transports; @@ -103,16 +106,18 @@ public class RecoverySourceHandler { private final int maxConcurrentFileChunks; private final int maxConcurrentOperations; private final ThreadPool threadPool; + private final RecoveryPlannerService recoveryPlannerService; private final CancellableThreads cancellableThreads = new CancellableThreads(); private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool, StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks, - int maxConcurrentOperations) { + int maxConcurrentOperations, RecoveryPlannerService recoveryPlannerService) { this.shard = shard; this.recoveryTarget = recoveryTarget; this.threadPool = threadPool; + this.recoveryPlannerService = recoveryPlannerService; this.request = request; this.shardId = this.request.shardId().id(); this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName()); @@ -469,82 +474,17 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } } if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { - final List phase1FileNames = new ArrayList<>(); - final List phase1FileSizes = new ArrayList<>(); - final List phase1ExistingFileNames = new ArrayList<>(); - final List phase1ExistingFileSizes = new ArrayList<>(); - - // Total size of segment files that are recovered - long totalSizeInBytes = 0; - // Total size of segment files that were able to be re-used - long existingTotalSizeInBytes = 0; - - // Generate a "diff" of all the identical, different, and missing - // segment files on the target node, using the existing files on - // the source node - final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); - for (StoreFileMetadata md : diff.identical) { - phase1ExistingFileNames.add(md.name()); - phase1ExistingFileSizes.add(md.length()); - existingTotalSizeInBytes += md.length(); - if (logger.isTraceEnabled()) { - logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," + - " size [{}]", md.name(), md.checksum(), md.length()); - } - totalSizeInBytes += md.length(); - } - List phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size()); - phase1Files.addAll(diff.different); - phase1Files.addAll(diff.missing); - for (StoreFileMetadata md : phase1Files) { - if (request.metadataSnapshot().asMap().containsKey(md.name())) { - logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", - md.name(), request.metadataSnapshot().asMap().get(md.name()), md); - } else { - logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); - } - phase1FileNames.add(md.name()); - phase1FileSizes.add(md.length()); - totalSizeInBytes += md.length(); - } - - logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", - phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), - phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes)); - final StepListener sendFileInfoStep = new StepListener<>(); - final StepListener sendFilesStep = new StepListener<>(); - final StepListener createRetentionLeaseStep = new StepListener<>(); - final StepListener cleanFilesStep = new StepListener<>(); cancellableThreads.checkForCancel(); - recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, - phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep); - - sendFileInfoStep.whenComplete(r -> - sendFiles(store, phase1Files.toArray(new StoreFileMetadata[0]), translogOps, sendFilesStep), listener::onFailure); - - sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); - - createRetentionLeaseStep.whenComplete(retentionLease -> - { - final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); - assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint - : retentionLease + " vs " + lastKnownGlobalCheckpoint; - // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want - // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica - // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on - // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint. - cleanFiles(store, recoverySourceMetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep); - }, - listener::onFailure); - - final long totalSize = totalSizeInBytes; - final long existingTotalSize = existingTotalSizeInBytes; - cleanFilesStep.whenComplete(r -> { - final TimeValue took = stopWatch.totalTime(); - logger.trace("recovery [phase1]: took [{}]", took); - listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames, - phase1ExistingFileSizes, existingTotalSize, took)); - }, listener::onFailure); + recoveryPlannerService.computeRecoveryPlan(shard.shardId(), + recoverySourceMetadata, + request.metadataSnapshot(), + startingSeqNo, + translogOps.getAsInt(), + getRequest().targetNode().getVersion(), + false, + ActionListener.wrap(plan -> + recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, listener), listener::onFailure) + ); } else { logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId()); @@ -564,6 +504,96 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } } + void recoverFilesFromSourceAndSnapshot(ShardRecoveryPlan shardRecoveryPlan, + Store store, + StopWatch stopWatch, + ActionListener listener) { + cancellableThreads.checkForCancel(); + + final List filesToRecoverNames = shardRecoveryPlan.getFilesToRecoverNames(); + final List filesToRecoverSizes = shardRecoveryPlan.getFilesToRecoverSizes(); + final List phase1ExistingFileNames = shardRecoveryPlan.getFilesPresentInTargetNames(); + final List phase1ExistingFileSizes = shardRecoveryPlan.getFilesPresentInTargetSizes(); + final long totalSize = shardRecoveryPlan.getTotalSize(); + final long existingTotalSize = shardRecoveryPlan.getExistingSize(); + + if (logger.isTraceEnabled()) { + for (StoreFileMetadata md : shardRecoveryPlan.getFilesPresentInTarget()) { + logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," + + " size [{}]", md.name(), md.checksum(), md.length()); + } + + for (StoreFileMetadata md : shardRecoveryPlan.getSourceFilesToRecover()) { + if (request.metadataSnapshot().asMap().containsKey(md.name())) { + logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", + md.name(), request.metadataSnapshot().asMap().get(md.name()), md); + } else { + logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); + } + } + + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : shardRecoveryPlan.getSnapshotFilesToRecover()) { + final StoreFileMetadata md = fileInfo.metadata(); + if (request.metadataSnapshot().asMap().containsKey(md.name())) { + logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", + md.name(), request.metadataSnapshot().asMap().get(md.name()), md); + } else { + logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); + } + } + + logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", + filesToRecoverNames.size(), new ByteSizeValue(totalSize), + phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); + } + + final StepListener sendFileInfoStep = new StepListener<>(); + final StepListener sendFilesStep = new StepListener<>(); + final StepListener createRetentionLeaseStep = new StepListener<>(); + final StepListener cleanFilesStep = new StepListener<>(); + + final int translogOps = shardRecoveryPlan.getTranslogOps(); + recoveryTarget.receiveFileInfo(filesToRecoverNames, + filesToRecoverSizes, + phase1ExistingFileNames, + phase1ExistingFileSizes, + translogOps, + sendFileInfoStep + ); + + final List sourceFiles = shardRecoveryPlan.getSourceFilesToRecover(); + + sendFileInfoStep.whenComplete(r -> + sendFiles(store, + sourceFiles.toArray(new StoreFileMetadata[0]), shardRecoveryPlan::getTranslogOps, sendFilesStep), listener::onFailure); + + final long startingSeqNo = shardRecoveryPlan.getStartingSeqNo(); + sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); + + final Store.MetadataSnapshot recoverySourceMetadata = shardRecoveryPlan.getSourceMetadataSnapshot(); + createRetentionLeaseStep.whenComplete(retentionLease -> + { + final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); + assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint + : retentionLease + " vs " + lastKnownGlobalCheckpoint; + // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want + // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica + // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on + // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint. + cleanFiles(store, recoverySourceMetadata, () -> translogOps, lastKnownGlobalCheckpoint, cleanFilesStep); + }, + listener::onFailure); + + cleanFilesStep.whenComplete(r -> { + final TimeValue took = stopWatch.totalTime(); + logger.trace("recovery [phase1]: took [{}]", took); + listener.onResponse( + new SendFileResult(filesToRecoverNames, filesToRecoverSizes, totalSize, + phase1ExistingFileNames, phase1ExistingFileSizes, existingTotalSize, took) + ); + }, listener::onFailure); + } + void createRetentionLease(final long startingSeqNo, ActionListener listener) { runUnderPrimaryPermit(() -> { // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/RecoveryPlannerService.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/RecoveryPlannerService.java new file mode 100644 index 0000000000000..7ab7fe0d185cf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/RecoveryPlannerService.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery.plan; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; + +public interface RecoveryPlannerService { + void computeRecoveryPlan(ShardId shardId, + Store.MetadataSnapshot sourceMetadata, + Store.MetadataSnapshot targetMetadata, + long startingSeqNo, + int translogOps, + Version targetVersion, + boolean useSnapshots, + ActionListener listener); +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardRecoveryPlan.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardRecoveryPlan.java new file mode 100644 index 0000000000000..2bcd8e0dda102 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardRecoveryPlan.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery.plan; + +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.repositories.IndexId; + +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Collections.emptyList; + +public class ShardRecoveryPlan { + private final Store.MetadataSnapshot sourceMetadataSnapshot; + private final SnapshotFilesToRecover snapshotFilesToRecover; + private final List sourceFilesToRecover; + private final List filesPresentInTarget; + + private final long startingSeqNo; + private final int translogOps; + + public ShardRecoveryPlan(SnapshotFilesToRecover snapshotFilesToRecover, + List sourceFilesToRecover, + List filesPresentInTarget, + long startingSeqNo, + int translogOps, + Store.MetadataSnapshot sourceMetadataSnapshot) { + this.snapshotFilesToRecover = snapshotFilesToRecover; + this.sourceFilesToRecover = sourceFilesToRecover; + this.filesPresentInTarget = filesPresentInTarget; + this.sourceMetadataSnapshot = sourceMetadataSnapshot; + + this.startingSeqNo = startingSeqNo; + this.translogOps = translogOps; + } + + public List getFilesPresentInTarget() { + return filesPresentInTarget; + } + + public List getFilesPresentInTargetNames() { + return filesPresentInTarget.stream().map(StoreFileMetadata::name).collect(Collectors.toList()); + } + + public List getFilesPresentInTargetSizes() { + return filesPresentInTarget.stream().map(StoreFileMetadata::length).collect(Collectors.toList()); + } + + public List getSourceFilesToRecover() { + return sourceFilesToRecover; + } + + public List getFilesToRecoverNames() { + return getFilesToRecoverStream().map(StoreFileMetadata::name) + .collect(Collectors.toList()); + } + + public List getFilesToRecoverSizes() { + return getFilesToRecoverStream().map(StoreFileMetadata::length) + .collect(Collectors.toList()); + } + + public SnapshotFilesToRecover getSnapshotFilesToRecover() { + return snapshotFilesToRecover; + } + + public Store.MetadataSnapshot getSourceMetadataSnapshot() { + return sourceMetadataSnapshot; + } + + public long getTotalSize() { + return Stream.concat(getFilesToRecoverStream(), filesPresentInTarget.stream()).mapToLong(StoreFileMetadata::length).sum(); + } + + public long getExistingSize() { + return filesPresentInTarget.stream().mapToLong(StoreFileMetadata::length).sum(); + } + + public long getStartingSeqNo() { + return startingSeqNo; + } + + public int getTranslogOps() { + return translogOps; + } + + private Stream getFilesToRecoverStream() { + return Stream.concat( + snapshotFilesToRecover.snapshotFiles.stream().map(BlobStoreIndexShardSnapshot.FileInfo::metadata), + sourceFilesToRecover.stream() + ); + } + + public static class SnapshotFilesToRecover implements Iterable { + public static final SnapshotFilesToRecover EMPTY = new SnapshotFilesToRecover(null, null, emptyList()); + + private final IndexId indexId; + private final String repository; + private final List snapshotFiles; + + public SnapshotFilesToRecover(IndexId indexId, String repository, List snapshotFiles) { + this.indexId = indexId; + this.repository = repository; + this.snapshotFiles = snapshotFiles; + } + + public IndexId getIndexId() { + return indexId; + } + + public String getRepository() { + return repository; + } + + public int size() { + return snapshotFiles.size(); + } + + public boolean isEmpty() { + return snapshotFiles.isEmpty(); + } + + @Override + public Iterator iterator() { + return snapshotFiles.iterator(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshot.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshot.java new file mode 100644 index 0000000000000..9f6df2d491a32 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshot.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery.plan; + +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.ShardSnapshotInfo; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +class ShardSnapshot { + private final ShardSnapshotInfo shardSnapshotInfo; + // Segment file name -> file info + private final Map snapshotFiles; + private final Store.MetadataSnapshot metadataSnapshot; + + ShardSnapshot(ShardSnapshotInfo shardSnapshotInfo, List snapshotFiles) { + this.shardSnapshotInfo = shardSnapshotInfo; + this.snapshotFiles = snapshotFiles.stream() + .collect(Collectors.toMap(snapshotFile -> snapshotFile.metadata().name(), Function.identity())); + this.metadataSnapshot = convertToMetadataSnapshot(snapshotFiles); + } + + String getShardStateIdentifier() { + return shardSnapshotInfo.getShardStateIdentifier(); + } + + String getRepository() { + return shardSnapshotInfo.getRepository(); + } + + Store.MetadataSnapshot getMetadataSnapshot() { + return metadataSnapshot; + } + + IndexId getIndexId() { + return shardSnapshotInfo.getIndexId(); + } + + long getStartedAt() { + return shardSnapshotInfo.getStartedAt(); + } + + ShardSnapshotInfo getShardSnapshotInfo() { + return shardSnapshotInfo; + } + + List getSnapshotFiles(List segmentFiles) { + return segmentFiles.stream() + .map(storeFileMetadata -> snapshotFiles.get(storeFileMetadata.name())) + .collect(Collectors.toList()); + } + + static Store.MetadataSnapshot convertToMetadataSnapshot(List snapshotFiles) { + return new Store.MetadataSnapshot( + snapshotFiles.stream() + .map(BlobStoreIndexShardSnapshot.FileInfo::metadata) + .collect(Collectors.toMap(StoreFileMetadata::name, Function.identity())), + Collections.emptyMap(), + 0 + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java new file mode 100644 index 0000000000000..f0d51e7f65b04 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java @@ -0,0 +1,131 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery.plan; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotResponse; +import org.elasticsearch.action.support.ThreadedActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.ShardSnapshotInfo; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.elasticsearch.indices.recovery.RecoverySettings.SNAPSHOT_RECOVERIES_SUPPORTED_VERSION; + +public class ShardSnapshotsService { + private final Logger logger = LogManager.getLogger(ShardSnapshotsService.class); + + private final Client client; + private final RepositoriesService repositoriesService; + private final ThreadPool threadPool; + private final ClusterService clusterService; + + public ShardSnapshotsService(Client client, + RepositoriesService repositoriesService, + ThreadPool threadPool, + ClusterService clusterService) { + this.client = client; + this.repositoriesService = repositoriesService; + this.threadPool = threadPool; + this.clusterService = clusterService; + } + + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + assert shardId != null : "SharId was null but a value was expected"; + + final RepositoriesMetadata currentReposMetadata = clusterService.state() + .metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + + List repositories = currentReposMetadata.repositories() + .stream() + .filter(repositoryMetadata -> BlobStoreRepository.USE_FOR_PEER_RECOVERY_SETTING.get(repositoryMetadata.settings())) + .map(RepositoryMetadata::name) + .collect(Collectors.toList()); + + if (repositories.isEmpty() || masterSupportsFetchingLatestSnapshots() == false) { + logger.debug("Unable to use snapshots during peer recovery use_for_peer_recovery_repositories=[{}]," + + " masterSupportsFetchingLatestSnapshots=[{}]", repositories, masterSupportsFetchingLatestSnapshots()); + listener.onResponse(Collections.emptyList()); + return; + } + + logger.debug("Searching for peer recovery compatible snapshots in [{}]", repositories); + + GetShardSnapshotRequest request = GetShardSnapshotRequest.latestSnapshotInRepositories(shardId, repositories); + client.execute(GetShardSnapshotAction.INSTANCE, + request, + new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener.map(this::fetchSnapshotFiles), false) + ); + } + + private List fetchSnapshotFiles(GetShardSnapshotResponse shardSnapshotResponse) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); + + if (shardSnapshotResponse.getRepositoryShardSnapshots().isEmpty()) { + return Collections.emptyList(); + } + + Collection shardSnapshots = shardSnapshotResponse.getRepositoryShardSnapshots().values(); + List shardSnapshotData = new ArrayList<>(shardSnapshots.size()); + for (ShardSnapshotInfo shardSnapshot : shardSnapshots) { + final List snapshotFiles = getSnapshotFileList(shardSnapshot); + if (snapshotFiles.isEmpty() == false) { + shardSnapshotData.add(new ShardSnapshot(shardSnapshot, snapshotFiles)); + } + } + return shardSnapshotData; + } + + private List getSnapshotFileList(ShardSnapshotInfo shardSnapshotInfo) { + try { + final Snapshot snapshot = shardSnapshotInfo.getSnapshot(); + + final Repository repository = repositoriesService.repository(snapshot.getRepository()); + if (repository instanceof BlobStoreRepository == false) { + return Collections.emptyList(); + } + + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + BlobContainer blobContainer = blobStoreRepository.shardContainer(shardSnapshotInfo.getIndexId(), + shardSnapshotInfo.getShardId().getId()); + BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot = + blobStoreRepository.loadShardSnapshot(blobContainer, snapshot.getSnapshotId()); + + return blobStoreIndexShardSnapshot.indexFiles(); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("Unable to fetch shard snapshot files for {}", shardSnapshotInfo), e); + return Collections.emptyList(); + } + } + + protected boolean masterSupportsFetchingLatestSnapshots() { + return clusterService.state().nodes().getMinNodeVersion().onOrAfter(SNAPSHOT_RECOVERIES_SUPPORTED_VERSION); + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerService.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerService.java new file mode 100644 index 0000000000000..088c790d1fb41 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerService.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery.plan; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.indices.recovery.RecoverySettings; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.util.CollectionUtils.concatLists; + +public class SnapshotsRecoveryPlannerService implements RecoveryPlannerService { + private final Logger logger = LogManager.getLogger(SnapshotsRecoveryPlannerService.class); + + private final ShardSnapshotsService shardSnapshotsService; + + public SnapshotsRecoveryPlannerService(ShardSnapshotsService shardSnapshotsService) { + this.shardSnapshotsService = shardSnapshotsService; + } + + public void computeRecoveryPlan(ShardId shardId, + Store.MetadataSnapshot sourceMetadata, + Store.MetadataSnapshot targetMetadata, + long startingSeqNo, + int translogOps, + Version targetVersion, + boolean useSnapshots, + ActionListener listener) { + // Fallback to source only recovery if the target node is in an incompatible version + boolean canUseSnapshots = useSnapshots && targetVersion.onOrAfter(RecoverySettings.SNAPSHOT_RECOVERIES_SUPPORTED_VERSION); + + fetchAvailableSnapshotsIgnoringErrors(shardId, canUseSnapshots, availableSnapshots -> + ActionListener.completeWith(listener, () -> + computeRecoveryPlanWithSnapshots( + sourceMetadata, + targetMetadata, + startingSeqNo, + translogOps, + availableSnapshots + ) + ) + ); + } + + private ShardRecoveryPlan computeRecoveryPlanWithSnapshots(Store.MetadataSnapshot sourceMetadata, + Store.MetadataSnapshot targetMetadata, + long startingSeqNo, + int translogOps, + List availableSnapshots) { + Store.RecoveryDiff sourceTargetDiff = sourceMetadata.recoveryDiff(targetMetadata); + List filesMissingInTarget = concatLists(sourceTargetDiff.missing, sourceTargetDiff.different); + + if (availableSnapshots.isEmpty()) { + // If we couldn't find any valid snapshots, fallback to the source + return new ShardRecoveryPlan(ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY, + filesMissingInTarget, + sourceTargetDiff.identical, + startingSeqNo, + translogOps, + sourceMetadata + ); + } + + ShardSnapshot latestSnapshot = availableSnapshots.stream() + .max(Comparator.comparingLong(ShardSnapshot::getStartedAt)) + .get(); + + Store.MetadataSnapshot filesToRecoverFromSourceSnapshot = toMetadataSnapshot(filesMissingInTarget); + Store.RecoveryDiff snapshotDiff = filesToRecoverFromSourceSnapshot.recoveryDiff(latestSnapshot.getMetadataSnapshot()); + final ShardRecoveryPlan.SnapshotFilesToRecover snapshotFilesToRecover; + if (snapshotDiff.identical.isEmpty()) { + snapshotFilesToRecover = ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY; + } else { + snapshotFilesToRecover = new ShardRecoveryPlan.SnapshotFilesToRecover(latestSnapshot.getIndexId(), + latestSnapshot.getRepository(), + latestSnapshot.getSnapshotFiles(snapshotDiff.identical)); + } + + return new ShardRecoveryPlan(snapshotFilesToRecover, + concatLists(snapshotDiff.missing, snapshotDiff.different), + sourceTargetDiff.identical, + startingSeqNo, + translogOps, + sourceMetadata + ); + } + + private void fetchAvailableSnapshotsIgnoringErrors(ShardId shardId, boolean useSnapshots, Consumer> listener) { + if (useSnapshots == false) { + listener.accept(Collections.emptyList()); + return; + } + + ActionListener> listenerIgnoringErrors = new ActionListener>() { + @Override + public void onResponse(List shardSnapshotData) { + listener.accept(shardSnapshotData); + } + + @Override + public void onFailure(Exception e) { + logger.warn(new ParameterizedMessage("Unable to fetch available snapshots for shard {}", shardId), e); + listener.accept(Collections.emptyList()); + } + }; + + shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, listenerIgnoringErrors); + } + + private Store.MetadataSnapshot toMetadataSnapshot(List files) { + return new Store.MetadataSnapshot( + files + .stream() + .collect(Collectors.toMap(StoreFileMetadata::name, Function.identity())), + emptyMap(), + 0 + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/SourceOnlyRecoveryPlannerService.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/SourceOnlyRecoveryPlannerService.java new file mode 100644 index 0000000000000..2eb2737998cda --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/SourceOnlyRecoveryPlannerService.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery.plan; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetadata; + +import java.util.List; + +import static org.elasticsearch.common.util.CollectionUtils.concatLists; + +public class SourceOnlyRecoveryPlannerService implements RecoveryPlannerService { + public static final RecoveryPlannerService INSTANCE = new SourceOnlyRecoveryPlannerService(); + @Override + public void computeRecoveryPlan(ShardId shardId, + Store.MetadataSnapshot sourceMetadata, + Store.MetadataSnapshot targetMetadata, + long startingSeqNo, + int translogOps, + Version targetVersion, + boolean useSnapshots, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + Store.RecoveryDiff recoveryDiff = sourceMetadata.recoveryDiff(targetMetadata); + List filesMissingInTarget = concatLists(recoveryDiff.missing, recoveryDiff.different); + return new ShardRecoveryPlan(ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY, + filesMissingInTarget, + recoveryDiff.identical, + startingSeqNo, + translogOps, + sourceMetadata + ); + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index ce9ebf54521e4..deec3eefe5562 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -62,7 +62,6 @@ import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.core.Releasables; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.HeaderWarning; @@ -79,10 +78,11 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryModule; @@ -116,6 +116,7 @@ import org.elasticsearch.indices.recovery.PeerRecoverySourceService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.indices.recovery.plan.SourceOnlyRecoveryPlannerService; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.MonitorService; @@ -173,7 +174,6 @@ import org.elasticsearch.usage.UsageService; import org.elasticsearch.watcher.ResourceWatcherService; -import javax.net.ssl.SNIHostName; import java.io.BufferedWriter; import java.io.Closeable; import java.io.IOException; @@ -198,6 +198,7 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.net.ssl.SNIHostName; import static java.util.stream.Collectors.toList; import static org.elasticsearch.core.Types.forciblyCast; @@ -750,7 +751,7 @@ protected Node(final Environment initialEnvironment, { processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService, - indicesService, recoverySettings)); + indicesService, recoverySettings, SourceOnlyRecoveryPlannerService.INSTANCE)); b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); } diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java index 5f69a55d4c704..7cb247ab922cc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java @@ -166,7 +166,14 @@ private BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots() throws IO } private ShardSnapshotInfo createIndexShardSnapshotInfo(String indexMetadataId, SnapshotFiles snapshotFiles) { - return new ShardSnapshotInfo(indexId, shardId, snapshotInfo.snapshot(), indexMetadataId, snapshotFiles.shardStateIdentifier()); + return new ShardSnapshotInfo( + indexId, + shardId, + snapshotInfo.snapshot(), + indexMetadataId, + snapshotFiles.shardStateIdentifier(), + snapshotInfo.startTime() + ); } SnapshotInfo getSnapshotInfo() { diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardSnapshotInfo.java b/server/src/main/java/org/elasticsearch/repositories/ShardSnapshotInfo.java index bb0e82e5fe4b8..550befcfe0d18 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ShardSnapshotInfo.java +++ b/server/src/main/java/org/elasticsearch/repositories/ShardSnapshotInfo.java @@ -25,19 +25,22 @@ public class ShardSnapshotInfo implements Writeable { private final String indexMetadataIdentifier; @Nullable private final String shardStateIdentifier; + private final long startedAt; public ShardSnapshotInfo( IndexId indexId, ShardId shardId, Snapshot snapshot, String indexMetadataIdentifier, - @Nullable String shardStateIdentifier + @Nullable String shardStateIdentifier, + long startedAt ) { this.indexId = indexId; this.shardId = shardId; this.snapshot = snapshot; this.indexMetadataIdentifier = indexMetadataIdentifier; this.shardStateIdentifier = shardStateIdentifier; + this.startedAt = startedAt; } public ShardSnapshotInfo(StreamInput in) throws IOException { @@ -46,6 +49,7 @@ public ShardSnapshotInfo(StreamInput in) throws IOException { this.shardId = new ShardId(in); this.indexMetadataIdentifier = in.readString(); this.shardStateIdentifier = in.readOptionalString(); + this.startedAt = in.readLong(); } @Override @@ -55,6 +59,7 @@ public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); out.writeString(indexMetadataIdentifier); out.writeOptionalString(shardStateIdentifier); + out.writeLong(startedAt); } @Nullable @@ -76,12 +81,25 @@ public String getRepository() { return snapshot.getRepository(); } + public IndexId getIndexId() { + return indexId; + } + + public ShardId getShardId() { + return shardId; + } + + public long getStartedAt() { + return startedAt; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ShardSnapshotInfo that = (ShardSnapshotInfo) o; - return Objects.equals(indexId, that.indexId) + return startedAt == that.startedAt + && Objects.equals(indexId, that.indexId) && Objects.equals(snapshot, that.snapshot) && Objects.equals(shardId, that.shardId) && Objects.equals(indexMetadataIdentifier, that.indexMetadataIdentifier) @@ -90,6 +108,26 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(indexId, snapshot, shardId, indexMetadataIdentifier, shardStateIdentifier); + return Objects.hash(indexId, snapshot, shardId, indexMetadataIdentifier, shardStateIdentifier, startedAt); + } + + @Override + public String toString() { + return "ShardSnapshotInfo{" + + "indexId=" + + indexId + + ", snapshot=" + + snapshot + + ", shardId=" + + shardId + + ", indexMetadataIdentifier='" + + indexMetadataIdentifier + + '\'' + + ", shardStateIdentifier='" + + shardStateIdentifier + + '\'' + + ", startedAt=" + + startedAt + + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 6847bc301845e..f4cf074f3126c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -250,6 +250,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); + /** + * Setting that defines if the repository should be used to recover index files during peer recoveries. + */ + public static final Setting USE_FOR_PEER_RECOVERY_SETTING = Setting.boolSetting("use_for_peer_recovery", false); + protected final boolean supportURLRepo; private final boolean compress; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java index 18eff167d6403..674d0f31b6eac 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java @@ -79,7 +79,7 @@ private Tuple repositoryShardSnapshot() { String shardStateIdentifier = randomBoolean() ? randomString(30) : null; return Tuple.tuple( repositoryName, - new ShardSnapshotInfo(indexId, shardId, snapshot, indexMetadataIdentifier, shardStateIdentifier) + new ShardSnapshotInfo(indexId, shardId, snapshot, indexMetadataIdentifier, shardStateIdentifier, randomLongBetween(0, 2048)) ); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 14d1f451b41a4..1a91d467df12a 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.plan.SnapshotsRecoveryPlannerService; import org.elasticsearch.test.NodeRoles; import org.elasticsearch.transport.TransportService; @@ -35,7 +36,8 @@ public void testDuplicateRecoveries() throws IOException { when(indicesService.clusterService()).thenReturn(clusterService); PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService( mock(TransportService.class), indicesService, - new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + mock(SnapshotsRecoveryPlannerService.class)); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), SequenceNumbers.UNASSIGNED_SEQ_NO); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 9405055e042f0..794e05c6643e3 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -68,6 +69,9 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService; +import org.elasticsearch.indices.recovery.plan.ShardRecoveryPlan; +import org.elasticsearch.indices.recovery.plan.SourceOnlyRecoveryPlannerService; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -100,6 +104,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntSupplier; import java.util.stream.Collectors; import java.util.zip.CRC32; @@ -110,6 +115,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyObject; @@ -123,6 +130,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build()); private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1); private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final RecoveryPlannerService recoveryPlannerService = SourceOnlyRecoveryPlannerService.INSTANCE; private ThreadPool threadPool; private Executor recoveryExecutor; @@ -178,8 +186,14 @@ public void writeFileChunk(StoreFileMetadata md, long position, ReleasableBytesR }); } }; - RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), - threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5), between(1, 5)); + RecoverySourceHandler handler = new RecoverySourceHandler(null, + new AsyncRecoveryTarget(target, recoveryExecutor), + threadPool, + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + between(1, 5), + between(1, 5), + recoveryPlannerService); PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); sendFilesFuture.actionGet(); @@ -242,7 +256,7 @@ public void indexTranslogOperations(List operations, int tot } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), - threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10)); + threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10), recoveryPlannerService); PlainActionFuture future = new PlainActionFuture<>(); handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future); @@ -284,7 +298,7 @@ public void indexTranslogOperations(List operations, int tot } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), - threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10)); + threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10), recoveryPlannerService); PlainActionFuture future = new PlainActionFuture<>(); final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); @@ -338,7 +352,7 @@ public void indexTranslogOperations(List operations, int rec List skipOperations = randomSubsetOf(operations); Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations); RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(target, recoveryExecutor), - threadPool, getStartRecoveryRequest(), between(1, 10 * 1024), between(1, 5), between(1, 5)); + threadPool, getStartRecoveryRequest(), between(1, 10 * 1024), between(1, 5), between(1, 5), recoveryPlannerService); handler.phase2(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, sendFuture); RecoverySourceHandler.SendSnapshotResult sendSnapshotResult = sendFuture.actionGet(); @@ -414,7 +428,7 @@ public void writeFileChunk(StoreFileMetadata md, long position, ReleasableBytesR } }; RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, - request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8), between(1, 8)) { + request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8), between(1, 8), recoveryPlannerService) { @Override protected void failEngine(IOException cause) { assertFalse(failedEngine.get()); @@ -471,7 +485,7 @@ public void writeFileChunk(StoreFileMetadata md, long position, ReleasableBytesR } }; RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, - request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10), between(1, 4)) { + request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10), between(1, 4), recoveryPlannerService) { @Override protected void failEngine(IOException cause) { assertFalse(failedEngine.get()); @@ -526,7 +540,8 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - between(1, 8), between(1, 8)) { + between(1, 8), between(1, 8), + recoveryPlannerService) { @Override void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { @@ -603,7 +618,7 @@ public void writeFileChunk(StoreFileMetadata md, long position, ReleasableBytesR final int maxConcurrentChunks = between(1, 8); final int chunkSize = between(1, 32); final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, threadPool, getStartRecoveryRequest(), - chunkSize, maxConcurrentChunks, between(1, 10)); + chunkSize, maxConcurrentChunks, between(1, 10), recoveryPlannerService); Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); @@ -661,7 +676,7 @@ public void writeFileChunk(StoreFileMetadata md, long position, ReleasableBytesR final int maxConcurrentChunks = between(1, 4); final int chunkSize = between(1, 16); final RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor), - threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks, between(1, 5)); + threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks, between(1, 5), recoveryPlannerService); Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); @@ -742,7 +757,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada }; final StartRecoveryRequest startRecoveryRequest = getStartRecoveryRequest(); final RecoverySourceHandler handler = new RecoverySourceHandler( - shard, recoveryTarget, threadPool, startRecoveryRequest, between(1, 16), between(1, 4), between(1, 4)) { + shard, recoveryTarget, threadPool, startRecoveryRequest, between(1, 16), between(1, 4), between(1, 4), recoveryPlannerService) { @Override void createRetentionLease(long startingSeqNo, ActionListener listener) { final String leaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(startRecoveryRequest.targetNode().getId()); @@ -771,7 +786,14 @@ public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception { IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); RecoverySourceHandler handler = new RecoverySourceHandler( - shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4), between(1, 4)); + shard, + new TestRecoveryTargetHandler(), + threadPool, + getStartRecoveryRequest(), + between(1, 16), + between(1, 4), + between(1, 4), + recoveryPlannerService); String syncId = UUIDs.randomBase64UUID(); int numDocs = between(0, 1000); @@ -792,6 +814,96 @@ public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception { assertThat(error.getMessage(), containsString("try to recover [index][1] with sync id but seq_no stats are mismatched:")); } + public void testRecoveryPlannerServiceIsUsed() throws Exception { + try (Store store = newStore(createTempDir("source"), false)) { + IndexShard shard = mock(IndexShard.class); + when(shard.store()).thenReturn(store); + Directory dir = store.directory(); + RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); + int numDocs = randomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); + document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED)); + writer.addDocument(document); + } + writer.commit(); + writer.close(); + when(shard.state()).thenReturn(IndexShardState.STARTED); + + TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { + @Override + public void receiveFileInfo(List phase1FileNames, + List phase1FileSizes, + List phase1ExistingFileNames, + List phase1ExistingFileSizes, + int totalTranslogOps, + ActionListener listener) { + listener.onResponse(null); + } + + @Override + public void cleanFiles(int totalTranslogOps, + long globalCheckpoint, + Store.MetadataSnapshot sourceMetadata, + ActionListener listener) { + listener.onResponse(null); + } + + @Override + public void writeFileChunk(StoreFileMetadata fileMetadata, + long position, + ReleasableBytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener) { + listener.onResponse(null); + } + }; + AtomicReference computedRecoveryPlanRef = new AtomicReference<>(); + RecoverySourceHandler handler = new RecoverySourceHandler( + shard, + recoveryTarget, + threadPool, + getStartRecoveryRequest(), + between(1, 16), + between(1, 4), + between(1, 4), + recoveryPlannerService + ) { + @Override + void createRetentionLease(long startingSeqNo, ActionListener listener) { + listener.onResponse(new RetentionLease("id", startingSeqNo, 0, "test")); + } + + @Override + void recoverFilesFromSourceAndSnapshot(ShardRecoveryPlan shardRecoveryPlan, + Store store, + StopWatch stopWatch, + ActionListener listener) { + assertThat(computedRecoveryPlanRef.compareAndSet(null, shardRecoveryPlan), equalTo(true)); + super.recoverFilesFromSourceAndSnapshot(shardRecoveryPlan, store, stopWatch, listener); + } + }; + PlainActionFuture phase1Listener = PlainActionFuture.newFuture(); + IndexCommit indexCommit = DirectoryReader.listCommits(dir).get(0); + handler.phase1(indexCommit, + 0, + () -> 0, + phase1Listener); + phase1Listener.get(); + + ShardRecoveryPlan computedRecoveryPlan = computedRecoveryPlanRef.get(); + assertThat(computedRecoveryPlan, is(notNullValue())); + + Set sourceFilesToRecover = computedRecoveryPlan.getSourceFilesToRecover() + .stream() + .map(StoreFileMetadata::name) + .collect(Collectors.toSet()); + assertThat(sourceFilesToRecover, equalTo(new HashSet<>(indexCommit.getFileNames()))); + } + } + private Store.MetadataSnapshot newMetadataSnapshot(String syncId, String localCheckpoint, String maxSeqNo, int numDocs) { Map userData = new HashMap<>(); userData.put(Engine.SYNC_COMMIT_ID, syncId); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerServiceTests.java new file mode 100644 index 0000000000000..b1b9392b83657 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerServiceTests.java @@ -0,0 +1,571 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery.plan; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.NoMergeScheduler; +import org.apache.lucene.store.BaseDirectoryWrapper; +import org.apache.lucene.store.Directory; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.ShardSnapshotInfo; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList; +import static org.elasticsearch.index.engine.Engine.HISTORY_UUID_KEY; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class SnapshotsRecoveryPlannerServiceTests extends ESTestCase { + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build()); + private static final ByteSizeValue PART_SIZE = new ByteSizeValue(Long.MAX_VALUE); + private static final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1); + + private String shardHistoryUUID; + private final AtomicLong clock = new AtomicLong(); + + @Before + public void setUpHistoryUUID() { + shardHistoryUUID = UUIDs.randomBase64UUID(); + } + + public void testOnlyUsesSourceFilesWhenUseSnapshotsFlagIsFalse() throws Exception { + createStore(store -> { + Store.MetadataSnapshot targetMetadataSnapshot = generateRandomTargetState(store); + + writeRandomDocs(store, randomIntBetween(10, 100)); + Store.MetadataSnapshot sourceMetadata = store.getMetadata(null); + + long startingSeqNo = randomNonNegativeLong(); + int translogOps = randomIntBetween(1, 100); + + ShardRecoveryPlan shardRecoveryPlan = computeShardRecoveryPlan( + randomBoolean() ? randomAlphaOfLength(10) : null, + sourceMetadata, + targetMetadataSnapshot, + startingSeqNo, + translogOps, + new ShardSnapshotsService(null, null, null, null) { + @Override + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + assert false: "Unexpected call"; + } + }, + false + ); + assertPlanIsValid(shardRecoveryPlan, sourceMetadata); + assertAllSourceFilesAreAvailableInSource(shardRecoveryPlan, sourceMetadata); + assertAllIdenticalFilesAreAvailableInTarget(shardRecoveryPlan, targetMetadataSnapshot); + assertThat(shardRecoveryPlan.getSnapshotFilesToRecover(), is(equalTo(ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY))); + + assertThat(shardRecoveryPlan.getStartingSeqNo(), equalTo(startingSeqNo)); + assertThat(shardRecoveryPlan.getTranslogOps(), equalTo(translogOps)); + }); + } + + public void testFallbacksToRegularPlanIfThereAreNotAvailableSnapshotsOrThereIsAFailureDuringFetch() throws Exception { + createStore(store -> { + Store.MetadataSnapshot targetMetadataSnapshot = generateRandomTargetState(store); + + writeRandomDocs(store, randomIntBetween(10, 100)); + final Store.MetadataSnapshot sourceMetadata = store.getMetadata(null); + + long startingSeqNo = randomNonNegativeLong(); + int translogOps = randomIntBetween(1, 100); + ShardRecoveryPlan shardRecoveryPlan = computeShardRecoveryPlan( + null, + sourceMetadata, + targetMetadataSnapshot, + startingSeqNo, + translogOps, + new ShardSnapshotsService(null, null, null, null) { + @Override + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + if (randomBoolean()) { + listener.onResponse(Collections.emptyList()); + } else { + listener.onFailure(new IOException("Boom!")); + } + } + }, + true + ); + + assertPlanIsValid(shardRecoveryPlan, sourceMetadata); + assertAllSourceFilesAreAvailableInSource(shardRecoveryPlan, sourceMetadata); + assertAllIdenticalFilesAreAvailableInTarget(shardRecoveryPlan, targetMetadataSnapshot); + assertThat(shardRecoveryPlan.getSnapshotFilesToRecover(), is(equalTo(ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY))); + + assertThat(shardRecoveryPlan.getStartingSeqNo(), equalTo(startingSeqNo)); + assertThat(shardRecoveryPlan.getTranslogOps(), equalTo(translogOps)); + }); + } + + public void testLogicallyEquivalentSnapshotIsUsed() throws Exception { + createStore(store -> { + Store.MetadataSnapshot targetSourceMetadata = generateRandomTargetState(store); + + writeRandomDocs(store, randomIntBetween(10, 100)); + Store.MetadataSnapshot sourceMetadata = store.getMetadata(null); + + ShardSnapshot shardSnapshotData = createShardSnapshotThatSharesSegmentFiles(store, "repo"); + // The shardStateIdentifier is shared with the latest snapshot, + // meaning that the current shard and the snapshot are logically equivalent + String shardStateIdentifier = shardSnapshotData.getShardStateIdentifier(); + + long startingSeqNo = randomNonNegativeLong(); + int translogOps = randomIntBetween(1, 100); + ShardRecoveryPlan shardRecoveryPlan = computeShardRecoveryPlan( + shardStateIdentifier, + sourceMetadata, + targetSourceMetadata, + startingSeqNo, + translogOps, + new ShardSnapshotsService(null, null, null, null) { + @Override + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(Collections.singletonList(shardSnapshotData)); + } + }, + true + ); + + assertPlanIsValid(shardRecoveryPlan, sourceMetadata); + assertAllSourceFilesAreAvailableInSource(shardRecoveryPlan, sourceMetadata); + assertAllIdenticalFilesAreAvailableInTarget(shardRecoveryPlan, targetSourceMetadata); + assertUsesExpectedSnapshot(shardRecoveryPlan, shardSnapshotData); + + assertThat(shardRecoveryPlan.getStartingSeqNo(), equalTo(startingSeqNo)); + assertThat(shardRecoveryPlan.getTranslogOps(), equalTo(translogOps)); + }); + } + + public void testLogicallyEquivalentSnapshotIsSkippedIfUnderlyingFilesAreDifferent() throws Exception { + createStore(store -> { + Store.MetadataSnapshot targetSourceMetadata = generateRandomTargetState(store); + + writeRandomDocs(store, randomIntBetween(10, 100)); + Store.MetadataSnapshot sourceMetadata = store.getMetadata(null); + + // The snapshot shardStateIdentifier is the same as the source, but the files are different. + // This can happen after a primary fail-over. + ShardSnapshot shardSnapshotData = createShardSnapshotThatDoNotShareSegmentFiles("repo"); + String shardStateIdentifier = shardSnapshotData.getShardStateIdentifier(); + + long startingSeqNo = randomNonNegativeLong(); + int translogOps = randomIntBetween(1, 100); + ShardRecoveryPlan shardRecoveryPlan = computeShardRecoveryPlan( + shardStateIdentifier, + sourceMetadata, + targetSourceMetadata, + startingSeqNo, + translogOps, + new ShardSnapshotsService(null, null, null, null) { + @Override + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(Collections.singletonList(shardSnapshotData)); + } + }, + true + ); + + assertPlanIsValid(shardRecoveryPlan, sourceMetadata); + assertAllSourceFilesAreAvailableInSource(shardRecoveryPlan, sourceMetadata); + assertAllIdenticalFilesAreAvailableInTarget(shardRecoveryPlan, targetSourceMetadata); + assertThat(shardRecoveryPlan.getSnapshotFilesToRecover(), is(equalTo(ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY))); + + assertThat(shardRecoveryPlan.getStartingSeqNo(), equalTo(startingSeqNo)); + assertThat(shardRecoveryPlan.getTranslogOps(), equalTo(translogOps)); + }); + } + + public void testPlannerTriesToUseMostFilesFromSnapshots() throws Exception { + createStore(store -> { + Store.MetadataSnapshot targetMetadataSnapshot = generateRandomTargetState(store); + + List availableSnapshots = new ArrayList<>(); + + int numberOfStaleSnapshots = randomIntBetween(0, 5); + for (int i = 0; i < numberOfStaleSnapshots; i++) { + availableSnapshots.add(createShardSnapshotThatDoNotShareSegmentFiles("stale-repo-" + i)); + } + + int numberOfValidSnapshots = randomIntBetween(0, 10); + for (int i = 0; i < numberOfValidSnapshots; i++) { + writeRandomDocs(store, randomIntBetween(10, 100)); + availableSnapshots.add(createShardSnapshotThatSharesSegmentFiles(store, "repo-" + i)); + } + + // Write new segments + writeRandomDocs(store, randomIntBetween(20, 50)); + Store.MetadataSnapshot latestSourceMetadata = store.getMetadata(null); + String latestShardIdentifier = randomAlphaOfLength(10); + + long startingSeqNo = randomNonNegativeLong(); + int translogOps = randomIntBetween(0, 100); + ShardRecoveryPlan shardRecoveryPlan = computeShardRecoveryPlan( + latestShardIdentifier, + latestSourceMetadata, + targetMetadataSnapshot, + startingSeqNo, + translogOps, + new ShardSnapshotsService(null, null, null, null) { + @Override + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(availableSnapshots); + } + }, + true + ); + + assertPlanIsValid(shardRecoveryPlan, latestSourceMetadata); + assertAllSourceFilesAreAvailableInSource(shardRecoveryPlan, latestSourceMetadata); + assertAllIdenticalFilesAreAvailableInTarget(shardRecoveryPlan, targetMetadataSnapshot); + + if (numberOfValidSnapshots > 0) { + ShardSnapshot latestValidSnapshot = + availableSnapshots.get(availableSnapshots.size() - 1); + assertUsesExpectedSnapshot(shardRecoveryPlan, latestValidSnapshot); + } else { + assertThat(shardRecoveryPlan.getSnapshotFilesToRecover(), is(equalTo(ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY))); + } + + assertThat(shardRecoveryPlan.getStartingSeqNo(), equalTo(startingSeqNo)); + assertThat(shardRecoveryPlan.getTranslogOps(), equalTo(translogOps)); + }); + } + + public void testSnapshotsWithADifferentHistoryUUIDAreUsedIfFilesAreShared() throws Exception { + createStore(store -> { + Store.MetadataSnapshot targetMetadataSnapshot = generateRandomTargetState(store); + + List availableSnapshots = new ArrayList<>(); + int numberOfValidSnapshots = randomIntBetween(1, 4); + for (int i = 0; i < numberOfValidSnapshots; i++) { + writeRandomDocs(store, randomIntBetween(10, 100)); + availableSnapshots.add(createShardSnapshotThatSharesSegmentFiles(store, "repo-" + i)); + } + + // Simulate a restore/stale primary allocation + shardHistoryUUID = UUIDs.randomBase64UUID(); + String latestShardIdentifier = randomAlphaOfLength(10); + // Write new segments + writeRandomDocs(store, randomIntBetween(20, 50)); + Store.MetadataSnapshot latestSourceMetadata = store.getMetadata(null); + + long startingSeqNo = randomNonNegativeLong(); + int translogOps = randomIntBetween(0, 100); + ShardRecoveryPlan shardRecoveryPlan = computeShardRecoveryPlan( + latestShardIdentifier, + latestSourceMetadata, + targetMetadataSnapshot, + startingSeqNo, + translogOps, + new ShardSnapshotsService(null, null, null, null) { + @Override + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(availableSnapshots); + } + }, + true + ); + + assertPlanIsValid(shardRecoveryPlan, latestSourceMetadata); + assertAllSourceFilesAreAvailableInSource(shardRecoveryPlan, latestSourceMetadata); + assertAllIdenticalFilesAreAvailableInTarget(shardRecoveryPlan, targetMetadataSnapshot); + assertUsesExpectedSnapshot(shardRecoveryPlan, availableSnapshots.get(availableSnapshots.size() - 1)); + + assertThat(shardRecoveryPlan.getStartingSeqNo(), equalTo(startingSeqNo)); + assertThat(shardRecoveryPlan.getTranslogOps(), equalTo(translogOps)); + }); + } + + public void testFallbacksToSourceOnlyPlanIfTargetNodeIsInUnsupportedVersion() throws Exception { + createStore(store -> { + Store.MetadataSnapshot targetMetadataSnapshot = generateRandomTargetState(store); + + writeRandomDocs(store, randomIntBetween(10, 100)); + ShardSnapshot shardSnapshot = createShardSnapshotThatSharesSegmentFiles(store, "repo"); + + Store.MetadataSnapshot sourceMetadata = store.getMetadata(null); + + long startingSeqNo = randomNonNegativeLong(); + int translogOps = randomIntBetween(0, 100); + ShardRecoveryPlan shardRecoveryPlan = computeShardRecoveryPlan( + "shard-id", + sourceMetadata, + targetMetadataSnapshot, + startingSeqNo, + translogOps, + new ShardSnapshotsService(null, null, null, null) { + @Override + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(Collections.singletonList(shardSnapshot)); + } + }, + true, + Version.V_7_14_0 // Unsupported version + ); + + assertPlanIsValid(shardRecoveryPlan, sourceMetadata); + assertAllSourceFilesAreAvailableInSource(shardRecoveryPlan, sourceMetadata); + assertAllIdenticalFilesAreAvailableInTarget(shardRecoveryPlan, targetMetadataSnapshot); + assertThat(shardRecoveryPlan.getSnapshotFilesToRecover(), is(equalTo(ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY))); + + assertThat(shardRecoveryPlan.getStartingSeqNo(), equalTo(startingSeqNo)); + assertThat(shardRecoveryPlan.getTranslogOps(), equalTo(translogOps)); + }); + } + + private ShardRecoveryPlan computeShardRecoveryPlan(String shardIdentifier, + Store.MetadataSnapshot sourceMetadataSnapshot, + Store.MetadataSnapshot targetMetadataSnapshot, + long startingSeqNo, + int translogOps, + ShardSnapshotsService shardSnapshotsService, + boolean snapshotRecoveriesEnabled) throws Exception { + return computeShardRecoveryPlan(shardIdentifier, + sourceMetadataSnapshot, + targetMetadataSnapshot, + startingSeqNo, + translogOps, + shardSnapshotsService, + snapshotRecoveriesEnabled, + Version.CURRENT + ); + } + + private ShardRecoveryPlan computeShardRecoveryPlan(String shardIdentifier, + Store.MetadataSnapshot sourceMetadataSnapshot, + Store.MetadataSnapshot targetMetadataSnapshot, + long startingSeqNo, + int translogOps, + ShardSnapshotsService shardSnapshotsService, + boolean snapshotRecoveriesEnabled, + Version version) throws Exception { + SnapshotsRecoveryPlannerService recoveryPlannerService = + new SnapshotsRecoveryPlannerService(shardSnapshotsService); + + PlainActionFuture planFuture = PlainActionFuture.newFuture(); + recoveryPlannerService.computeRecoveryPlan(shardId, + sourceMetadataSnapshot, + targetMetadataSnapshot, + startingSeqNo, + translogOps, + version, + snapshotRecoveriesEnabled, + planFuture + ); + final ShardRecoveryPlan shardRecoveryPlan = planFuture.get(); + assertThat(shardRecoveryPlan, notNullValue()); + return shardRecoveryPlan; + } + + private void assertPlanIsValid(ShardRecoveryPlan shardRecoveryPlan, + Store.MetadataSnapshot expectedMetadataSnapshot) { + List planFiles = new ArrayList<>(); + planFiles.addAll(shardRecoveryPlan.getFilesPresentInTarget()); + planFiles.addAll(shardRecoveryPlan.getSourceFilesToRecover()); + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : shardRecoveryPlan.getSnapshotFilesToRecover()) { + planFiles.add(fileInfo.metadata()); + } + + final ArrayList storeFileMetadata = iterableAsArrayList(expectedMetadataSnapshot); + List missingFiles = storeFileMetadata.stream() + .filter(f -> containsFile(planFiles, f) == false) + .collect(Collectors.toList()); + + List unexpectedFiles = planFiles.stream() + .filter(f -> containsFile(storeFileMetadata, f) == false) + .collect(Collectors.toList()); + + assertThat(missingFiles, is(empty())); + assertThat(unexpectedFiles, is(empty())); + assertThat(planFiles.size(), is(equalTo(storeFileMetadata.size()))); + Store.MetadataSnapshot sourceMetadataSnapshot = shardRecoveryPlan.getSourceMetadataSnapshot(); + assertThat(sourceMetadataSnapshot.size(), equalTo(expectedMetadataSnapshot.size())); + assertThat(sourceMetadataSnapshot.getHistoryUUID(), equalTo(expectedMetadataSnapshot.getHistoryUUID())); + } + + private void assertAllSourceFilesAreAvailableInSource(ShardRecoveryPlan shardRecoveryPlan, + Store.MetadataSnapshot sourceMetadataSnapshot) { + for (StoreFileMetadata sourceFile : shardRecoveryPlan.getSourceFilesToRecover()) { + final StoreFileMetadata actual = sourceMetadataSnapshot.get(sourceFile.name()); + assertThat(actual, is(notNullValue())); + assertThat(actual.isSame(sourceFile), is(equalTo(true))); + } + } + + private void assertAllIdenticalFilesAreAvailableInTarget(ShardRecoveryPlan shardRecoveryPlan, + Store.MetadataSnapshot targetMetadataSnapshot) { + for (StoreFileMetadata identicalFile : shardRecoveryPlan.getFilesPresentInTarget()) { + final StoreFileMetadata targetFile = targetMetadataSnapshot.get(identicalFile.name()); + assertThat(targetFile, notNullValue()); + assertThat(targetFile.isSame(identicalFile), is(equalTo(true))); + } + } + + private void assertUsesExpectedSnapshot(ShardRecoveryPlan shardRecoveryPlan, + ShardSnapshot expectedSnapshotToUse) { + assertThat(shardRecoveryPlan.getSnapshotFilesToRecover().getIndexId(), equalTo(expectedSnapshotToUse.getIndexId())); + assertThat(shardRecoveryPlan.getSnapshotFilesToRecover().getRepository(), equalTo(expectedSnapshotToUse.getRepository())); + + final Store.MetadataSnapshot shardSnapshotMetadataSnapshot = expectedSnapshotToUse.getMetadataSnapshot(); + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : shardRecoveryPlan.getSnapshotFilesToRecover()) { + final StoreFileMetadata snapshotFile = shardSnapshotMetadataSnapshot.get(fileInfo.metadata().name()); + assertThat(snapshotFile, is(notNullValue())); + assertThat(snapshotFile.isSame(fileInfo.metadata()), is(equalTo(true))); + } + } + + // StoreFileMetadata doesn't implement #equals, we rely on StoreFileMetadata#isSame for equality checks + private boolean containsFile(List files, StoreFileMetadata fileMetadata) { + for (StoreFileMetadata file : files) { + if (file.isSame(fileMetadata)) { + return true; + } + } + return false; + } + + private void createStore(CheckedConsumer testBody) throws Exception { + BaseDirectoryWrapper baseDirectoryWrapper = newFSDirectory(createTempDir()); + Store store = new Store(shardId, INDEX_SETTINGS, baseDirectoryWrapper, new DummyShardLock(shardId)); + try { + testBody.accept(store); + } finally { + IOUtils.close(store); + } + } + + private Store.MetadataSnapshot generateRandomTargetState(Store store) throws IOException { + final Store.MetadataSnapshot targetMetadataSnapshot; + if (randomBoolean()) { + // The target can share some files with the source + writeRandomDocs(store, randomIntBetween(20, 50)); + targetMetadataSnapshot = store.getMetadata(null); + } else { + if (randomBoolean()) { + targetMetadataSnapshot = Store.MetadataSnapshot.EMPTY; + } else { + // None of the files in the target would match + final int filesInTargetCount = randomIntBetween(1, 20); + Map filesInTarget = IntStream.range(0, filesInTargetCount) + .mapToObj(i -> randomStoreFileMetadata()) + .collect(Collectors.toMap(StoreFileMetadata::name, Function.identity())); + targetMetadataSnapshot = new Store.MetadataSnapshot(filesInTarget, Collections.emptyMap(), 0); + } + } + return targetMetadataSnapshot; + } + + private void writeRandomDocs(Store store, int numDocs) throws IOException { + Directory dir = store.directory(); + + // Disable merges to control the files that are used in this tests + IndexWriterConfig indexWriterConfig = new IndexWriterConfig() + .setMergePolicy(NoMergePolicy.INSTANCE) + .setMergeScheduler(NoMergeScheduler.INSTANCE); + IndexWriter writer = new IndexWriter(dir, indexWriterConfig); + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); + document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED)); + writer.addDocument(document); + } + Map userData = new HashMap<>(); + userData.put(HISTORY_UUID_KEY, shardHistoryUUID); + writer.setLiveCommitData(userData.entrySet()); + writer.commit(); + writer.close(); + } + + private ShardSnapshot createShardSnapshotThatDoNotShareSegmentFiles(String repoName) { + List snapshotFiles = randomList(randomIntBetween(10, 20), () -> { + StoreFileMetadata storeFileMetadata = randomStoreFileMetadata(); + return new BlobStoreIndexShardSnapshot.FileInfo(randomAlphaOfLength(10), storeFileMetadata, PART_SIZE); + }); + + return createShardSnapshot(repoName, snapshotFiles); + } + + private ShardSnapshot createShardSnapshotThatSharesSegmentFiles(Store store, + String repository) throws Exception { + Store.MetadataSnapshot sourceMetadata = store.getMetadata(null); + assertThat(sourceMetadata.size(), is(greaterThan(1))); + + List snapshotFiles = new ArrayList<>(sourceMetadata.size()); + for (StoreFileMetadata storeFileMetadata : sourceMetadata) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = + new BlobStoreIndexShardSnapshot.FileInfo(randomAlphaOfLength(10), storeFileMetadata, PART_SIZE); + snapshotFiles.add(fileInfo); + } + return createShardSnapshot(repository, snapshotFiles); + } + + private ShardSnapshot createShardSnapshot(String repoName, + List snapshotFiles) { + String shardIdentifier = randomAlphaOfLength(10); + + Snapshot snapshot = new Snapshot(repoName, new SnapshotId("snap", UUIDs.randomBase64UUID(random()))); + IndexId indexId = randomIndexId(); + ShardSnapshotInfo shardSnapshotInfo = + new ShardSnapshotInfo(indexId, shardId, snapshot, randomAlphaOfLength(10), shardIdentifier, clock.incrementAndGet()); + + return new ShardSnapshot(shardSnapshotInfo, snapshotFiles); + } + + private StoreFileMetadata randomStoreFileMetadata() { + return new StoreFileMetadata(randomAlphaOfLength(10), randomLongBetween(1, 100), + randomAlphaOfLength(10), Version.CURRENT.toString()); + } + + private IndexId randomIndexId() { + return new IndexId(shardId.getIndexName(), randomAlphaOfLength(10)); + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2db1bca894f44..9608d75f335e5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -156,6 +156,7 @@ import org.elasticsearch.indices.recovery.PeerRecoverySourceService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.indices.recovery.plan.SourceOnlyRecoveryPlannerService; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.node.ResponseCollectorService; @@ -1798,7 +1799,13 @@ protected void assertSnapshotOrGenericThread() { ); nodeConnectionsService = new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService); final MetadataMappingService metadataMappingService = new MetadataMappingService(clusterService, indicesService); - peerRecoverySourceService = new PeerRecoverySourceService(transportService, indicesService, recoverySettings); + peerRecoverySourceService = new PeerRecoverySourceService( + transportService, + indicesService, + recoverySettings, + SourceOnlyRecoveryPlannerService.INSTANCE + ); + indicesClusterStateService = new IndicesClusterStateService( settings, indicesService, diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 29f5dbf4263f0..e94701bbc0cd0 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -24,8 +24,6 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.core.CheckedFunction; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; @@ -33,6 +31,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; @@ -67,11 +67,13 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.StartRecoveryRequest; +import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService; +import org.elasticsearch.indices.recovery.plan.SourceOnlyRecoveryPlannerService; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.ShardGeneration; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.ShardSnapshotResult; +import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.test.DummyShardLock; @@ -633,9 +635,10 @@ protected final void recoverUnstartedReplica(final IndexShard replica, logger, rNode, recoveryTarget, startingSeqNo); int fileChunkSizeInBytes = Math.toIntExact( randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024)); + final RecoveryPlannerService recoveryPlannerService = SourceOnlyRecoveryPlannerService.INSTANCE; final RecoverySourceHandler recovery = new RecoverySourceHandler(primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, - request, fileChunkSizeInBytes, between(1, 8), between(1, 8)); + request, fileChunkSizeInBytes, between(1, 8), between(1, 8), recoveryPlannerService); primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable); try {