Skip to content

Commit

Permalink
Add peer recovery planners that take into account available snapshots
Browse files Browse the repository at this point in the history
This commit adds a new set of classes that would compute a peer
recovery plan, based on source files + target files + available
snapshots. When possible it would try to maximize the number of
files used from a snapshot. It uses repositories with `use_for_peer_recovery`
setting set to true.

It adds a new recovery setting `indices.recovery.use_snapshots`

Relates elastic#73496
Backport of elastic#75840
  • Loading branch information
fcofdez committed Aug 9, 2021
1 parent f86929f commit e90bb68
Show file tree
Hide file tree
Showing 23 changed files with 1,803 additions and 114 deletions.
15 changes: 15 additions & 0 deletions docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,18 @@ and may interfere with indexing, search, and other activities in your cluster.
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

`indices.recovery.use_snapshots`::
(<<cluster-update-settings,Dynamic>>, Expert) Enables snapshot-based peer recoveries.
+
{es} recovers replicas and relocates primary shards using the _peer recovery_
process, which involves constructing a new copy of a shard on the target node.
When `indices.recovery.use_snapshots` is `false` {es} will construct this new
copy by transferring the index data from the current primary. When this setting
is `true` {es} will attempt to copy the index data from a recent snapshot
first, and will only copy data from the primary if it cannot identify a
suitable snapshot.
+
Setting this option to `true` reduces your operating costs if your cluster runs
in an environment where the node-to-node data transfer costs are higher than
the costs of recovering data from a snapshot. It also reduces the amount of
work that the primary must do during a recovery.
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices.recovery.plan;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGeneration;
import org.elasticsearch.repositories.ShardSnapshotInfo;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class ShardSnapshotsServiceIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(FailingRepoPlugin.class);
}

public static class FailingRepoPlugin extends Plugin implements RepositoryPlugin {
public static final String TYPE = "failingrepo";

@Override
public Map<String, Repository.Factory> getRepositories(
Environment env,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
return Collections.singletonMap(
TYPE,
metadata -> new FailingRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
);
}
}

public static class FailingRepo extends FsRepository {
static final String FAIL_GET_REPOSITORY_DATA_SETTING_KEY = "fail_get_repository_data";
static final String FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY = "fail_load_shard_snapshot";
static final String FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY = "fail_load_shard_snapshots";

private final boolean failGetRepositoryData;
private final boolean failLoadShardSnapshot;
private final boolean failLoadShardSnapshots;

public FailingRepo(RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
BigArrays bigArrays,
RecoverySettings recoverySettings) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
this.failGetRepositoryData = metadata.settings().getAsBoolean(FAIL_GET_REPOSITORY_DATA_SETTING_KEY, false);
this.failLoadShardSnapshot = metadata.settings().getAsBoolean(FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY, false);
this.failLoadShardSnapshots = metadata.settings().getAsBoolean(FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY, false);
}

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (failGetRepositoryData) {
listener.onFailure(new IOException("Failure getting repository data"));
return;
}
super.getRepositoryData(listener);
}

@Override
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
if (failLoadShardSnapshot) {
throw new SnapshotException(
metadata.name(),
snapshotId,
"failed to read shard snapshot file for [" + shardContainer.path() + ']',
new FileNotFoundException("unable to find file")
);
}
return super.loadShardSnapshot(shardContainer, snapshotId);
}

@Override
public BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots(IndexId indexId,
ShardId shardId,
ShardGeneration shardGen) throws IOException {
if (failLoadShardSnapshots) {
throw new FileNotFoundException("Failed to get blob store index shard snapshots");
}
return super.getBlobStoreIndexShardSnapshots(indexId, shardId, shardGen);
}
}

public void testReturnsEmptyListWhenThereAreNotAvailableRepositories() throws Exception {
String indexName = "test";
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
ShardId shardId = getShardIdForIndex(indexName);

List<ShardSnapshot> shardSnapshotData = getShardSnapshotShard(shardId);
assertThat(shardSnapshotData, is(empty()));
}

public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
final String indexName = "test";
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
ShardId shardId = getShardIdForIndex(indexName);

for (int i = 0; i < randomIntBetween(1, 50); i++) {
index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar"));
}

String snapshotName = "snap";

int numberOfNonEnabledRepos = randomIntBetween(1, 3);
List<String> nonEnabledRepos = new ArrayList<>();
for (int i = 0; i < numberOfNonEnabledRepos; i++) {
String repositoryName = "non-enabled-repo-" + i;
Path repoPath = randomRepoPath();
createRepository(repositoryName, "fs", repoPath, false);
createSnapshot(repositoryName, snapshotName, indexName);
nonEnabledRepos.add(repositoryName);
}

int numberOfRecoveryEnabledRepositories = randomIntBetween(0, 4);
List<String> recoveryEnabledRepos = new ArrayList<>();
for (int i = 0; i < numberOfRecoveryEnabledRepositories; i++) {
String repositoryName = "repo-" + i;
createRepository(repositoryName, "fs", randomRepoPath(), true);
recoveryEnabledRepos.add(repositoryName);
createSnapshot(repositoryName, snapshotName, indexName);
}

List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);

assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfRecoveryEnabledRepositories)));
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
assertThat(recoveryEnabledRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));

ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
assertThat(recoveryEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true)));
assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));
}
}

public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Exception {
final String indexName = "test";
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
ShardId shardId = getShardIdForIndex(indexName);

for (int i = 0; i < randomIntBetween(1, 50); i++) {
index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar"));
}

String snapshotName = "snap";

int numberOfFailingRepos = randomIntBetween(1, 3);
List<Tuple<String, Path>> failingRepos = new ArrayList<>();
for (int i = 0; i < numberOfFailingRepos; i++) {
String repositoryName = "failing-repo-" + i;
Path repoPath = randomRepoPath();
createRepository(repositoryName, FailingRepoPlugin.TYPE, repoPath, true);
createSnapshot(repositoryName, snapshotName, indexName);
failingRepos.add(Tuple.tuple(repositoryName, repoPath));
}

int numberOfWorkingRepositories = randomIntBetween(0, 4);
List<String> workingRepos = new ArrayList<>();
for (int i = 0; i < numberOfWorkingRepositories; i++) {
String repositoryName = "repo-" + i;
createRepository(repositoryName, "fs", randomRepoPath(), true);
workingRepos.add(repositoryName);
createSnapshot(repositoryName, snapshotName, indexName);
}

for (Tuple<String, Path> failingRepo : failingRepos) {
// Update repository settings to fail fetching the repository information at any stage
String repoFailureType =
randomFrom(FailingRepo.FAIL_GET_REPOSITORY_DATA_SETTING_KEY,
FailingRepo.FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY,
FailingRepo.FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY
);

assertAcked(client().admin().cluster().preparePutRepository(failingRepo.v1())
.setType(FailingRepoPlugin.TYPE)
.setVerify(false)
.setSettings(Settings.builder().put(repoFailureType, true).put("location", randomRepoPath()))
);
}

List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);

assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfWorkingRepositories)));
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
assertThat(workingRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));

ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(shardSnapshotInfo.getShardId(), equalTo(shardId));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), equalTo(snapshotName));
}
}

public void testFetchingInformationFromAnIncompatibleMasterNodeReturnsAnEmptyList() {
String indexName = "test";
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
ShardId shardId = getShardIdForIndex(indexName);

for (int i = 0; i < randomIntBetween(1, 50); i++) {
index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar"));
}

String snapshotName = "snap";
String repositoryName = "repo";
createRepository(repositoryName, "fs", randomRepoPath(), true);
createSnapshot(repositoryName, snapshotName, indexName);

RepositoriesService repositoriesService = internalCluster().getMasterNodeInstance(RepositoriesService.class);
ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
ClusterService clusterService = internalCluster().getMasterNodeInstance(ClusterService.class);
ShardSnapshotsService shardSnapshotsService = new ShardSnapshotsService(client(), repositoriesService, threadPool, clusterService) {
@Override
protected boolean masterSupportsFetchingLatestSnapshots() {
return false;
}
};

PlainActionFuture<List<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, latestSnapshots);
assertThat(latestSnapshots.actionGet(), is(empty()));
}

private List<ShardSnapshot> getShardSnapshotShard(ShardId shardId) throws Exception {
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();

PlainActionFuture<List<ShardSnapshot>> future = PlainActionFuture.newFuture();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, future);
return future.get();
}

private ShardSnapshotsService getShardSnapshotsService() {
RepositoriesService repositoriesService = internalCluster().getMasterNodeInstance(RepositoriesService.class);
ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
ClusterService clusterService = internalCluster().getMasterNodeInstance(ClusterService.class);
return new ShardSnapshotsService(client(), repositoriesService, threadPool, clusterService);
}

private ShardId getShardIdForIndex(String indexName) {
ClusterState state = clusterAdmin().prepareState().get().getState();
return state.routingTable().index(indexName).shard(0).shardId();
}

private void createRepository(String repositoryName, String type, Path location, boolean recoveryEnabledRepo) {
assertAcked(client().admin().cluster().preparePutRepository(repositoryName)
.setType(type)
.setVerify(false)
.setSettings(Settings.builder()
.put("location", location)
.put(BlobStoreRepository.USE_FOR_PEER_RECOVERY_SETTING.getKey(), recoveryEnabledRepo)
)
);
}

private void createSnapshot(String repoName, String snapshotName, String index) {
clusterAdmin()
.prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setIndices(index)
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,11 @@ public static <E> List<List<E>> eagerPartition(List<E> list, int size) {

return result;
}

public static <E> List<E> concatLists(List<E> listA, List<E> listB) {
List<E> concatList = new ArrayList<>(listA.size() + listB.size());
concatList.addAll(listA);
concatList.addAll(listB);
return concatList;
}
}
Loading

0 comments on commit e90bb68

Please sign in to comment.