Skip to content

Commit

Permalink
Throttle per-index snapshot deletes
Browse files Browse the repository at this point in the history
Each per-index process during snapshot deletion takes some nonzero
amount of working memory to hold the relevant snapshot IDs and metadata
generations etc. which we can keep under tighter limits and release
sooner if we limit the number of per-index processes running
concurrently. That's what this commit does.
  • Loading branch information
DaveCTurner committed Oct 16, 2023
1 parent e26ad8f commit 9d5d363
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.XContentParserUtils;
Expand All @@ -35,6 +36,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -326,25 +328,35 @@ public Map<String, IndexId> getIndices() {
}

/**
* Returns the list of {@link IndexId} that have their snapshots updated but not removed (because they are still referenced by other
* snapshots) after removing the given snapshot from the repository.
* Returns an iterator over {@link IndexId} that have their snapshots updated but not removed (because they are still referenced by
* other snapshots) after removing the given snapshot from the repository.
*
* @param snapshotIds SnapshotId to remove
* @return List of indices that are changed but not removed
* @return Iterator over indices that are changed but not removed
*/
public List<IndexId> indicesToUpdateAfterRemovingSnapshot(Collection<SnapshotId> snapshotIds) {
return indexSnapshots.entrySet().stream().filter(entry -> {
final Collection<SnapshotId> existingIds = entry.getValue();
if (snapshotIds.containsAll(existingIds)) {
return existingIds.size() > snapshotIds.size();
public Iterator<IndexId> indicesToUpdateAfterRemovingSnapshot(Collection<SnapshotId> snapshotIds) {
return Iterators.flatMap(indexSnapshots.entrySet().iterator(), entry -> {
if (isIndexToUpdateAfterRemovingSnapshots(entry.getValue(), snapshotIds)) {
return Iterators.single(entry.getKey());
} else {
return Collections.emptyIterator();
}
for (SnapshotId snapshotId : snapshotIds) {
if (entry.getValue().contains(snapshotId)) {
return true;
}
});
}

private static boolean isIndexToUpdateAfterRemovingSnapshots(
Collection<SnapshotId> snapshotsContainingIndex,
Collection<SnapshotId> snapshotsToDelete
) {
if (snapshotsToDelete.containsAll(snapshotsContainingIndex)) {
return snapshotsContainingIndex.size() > snapshotsToDelete.size();
}
for (SnapshotId snapshotId : snapshotsToDelete) {
if (snapshotsContainingIndex.contains(snapshotId)) {
return true;
}
return false;
}).map(Map.Entry::getKey).toList();
}
return false;
}

/**
Expand All @@ -356,15 +368,16 @@ public List<IndexId> indicesToUpdateAfterRemovingSnapshot(Collection<SnapshotId>
* @return map of index to index metadata blob id to delete
*/
public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
Collection<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
.stream()
.filter(e -> snapshotIds.contains(e.getKey()) == false)
.flatMap(e -> e.getValue().values().stream())
.map(indexMetaDataGenerations::getIndexMetaBlobId)
.collect(Collectors.toSet());
final Map<IndexId, Collection<String>> toRemove = new HashMap<>();
for (IndexId indexId : indicesForSnapshot) {
while (indicesForSnapshot.hasNext()) {
final var indexId = indicesForSnapshot.next();
for (SnapshotId snapshotId : snapshotIds) {
final String identifier = indexMetaDataGenerations.indexMetaBlobId(snapshotId, indexId);
if (allRemainingIdentifiers.contains(identifier) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThrottledIterator;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -1140,11 +1141,23 @@ void runCleanup(ActionListener<DeleteResult> listener) {
// Updating the shard-level metadata and accumulating results

private void writeUpdatedShardMetadataAndComputeDeletes(ActionListener<Void> listener) {
try (var listeners = new RefCountingListener(listener)) {
for (IndexId indexId : originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds)) {
new IndexSnapshotsDeletion(indexId).run(listeners.acquire());
}
}
// noinspection resource -- closed safely at the end of the iteration
final var listeners = new RefCountingListener(listener);

// Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs and metadata generations
// etc. which we can keep under tighter limits and release sooner if we limit the number of concurrently processing indices.
// Each one needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax() of them at once is enough to
// keep the threadpool fully utilized.
ThrottledIterator.run(
originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds),
(ref, indexId) -> ActionListener.run(
ActionListener.releaseAfter(listeners.acquire(), ref),
l -> new IndexSnapshotsDeletion(indexId).run(l)
),
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
() -> {},
listeners::close
);
}

private class IndexSnapshotsDeletion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ public void testIndicesToUpdateAfterRemovingSnapshot() {
final List<SnapshotId> snapshotIds = repositoryData.getSnapshots(index);
return snapshotIds.contains(randomSnapshot) && snapshotIds.size() > 1;
}).toArray(IndexId[]::new);
assertThat(
repositoryData.indicesToUpdateAfterRemovingSnapshot(Collections.singleton(randomSnapshot)),
containsInAnyOrder(indicesToUpdate)
);
assertThat(getIndicesToUpdateAfterRemovingSnapshot(repositoryData, randomSnapshot), containsInAnyOrder(indicesToUpdate));
}

public void testXContent() throws IOException {
Expand Down Expand Up @@ -347,7 +344,7 @@ public void testIndexMetaDataToRemoveAfterRemovingSnapshotNoSharing() {
final RepositoryData repositoryData = generateRandomRepoData();
final SnapshotId snapshotId = randomFrom(repositoryData.getSnapshotIds());
final IndexMetaDataGenerations indexMetaDataGenerations = repositoryData.indexMetaDataGenerations();
final Collection<IndexId> indicesToUpdate = repositoryData.indicesToUpdateAfterRemovingSnapshot(Collections.singleton(snapshotId));
final Collection<IndexId> indicesToUpdate = getIndicesToUpdateAfterRemovingSnapshot(repositoryData, snapshotId);
final Map<IndexId, Collection<String>> identifiersToRemove = indexMetaDataGenerations.lookup.get(snapshotId)
.entrySet()
.stream()
Expand Down Expand Up @@ -485,4 +482,10 @@ private static Map<IndexId, List<SnapshotId>> randomIndices(final Map<String, Sn
}
return indices;
}

private static Collection<IndexId> getIndicesToUpdateAfterRemovingSnapshot(RepositoryData repositoryData, SnapshotId snapshotToDelete) {
final var result = new ArrayList<IndexId>();
repositoryData.indicesToUpdateAfterRemovingSnapshot(List.of(snapshotToDelete)).forEachRemaining(result::add);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class BlobStoreRepositoryDeleteThrottlingTests extends ESSingleNodeTestCase {

// This test ensures that we appropriately throttle the per-index activity when deleting a snapshot by marking an index as "active" when
// its index metadata is read, and then as "inactive" when it updates the shard-level metadata. Without throttling, we would pretty much
// read all the index metadata first, and then update all the shard-level metadata. With too much throttling, we would work one index at
// a time and would not fully utilize all snapshot threads. This test shows that we do neither of these things.

private static final String TEST_REPO_TYPE = "concurrency-limiting-fs";
private static final String TEST_REPO_NAME = "test-repo";
private static final int MAX_SNAPSHOT_THREADS = 3;

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put("thread_pool.snapshot.max", MAX_SNAPSHOT_THREADS).build();
}

protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(ConcurrencyLimitingFsRepositoryPlugin.class);
}

public static class ConcurrencyLimitingFsRepositoryPlugin extends Plugin implements RepositoryPlugin {
@Override
public Map<String, Repository.Factory> getRepositories(
Environment env,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
return Collections.singletonMap(
TEST_REPO_TYPE,
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) {
@Override
protected BlobStore createBlobStore() throws Exception {
return new ConcurrencyLimitingBlobStore(super.createBlobStore());
}
}
);
}
}

private static class ConcurrencyLimitingBlobStore implements BlobStore {
private final BlobStore delegate;
private final Set<String> activeIndices = ConcurrentCollections.newConcurrentSet();
private final CountDownLatch countDownLatch = new CountDownLatch(MAX_SNAPSHOT_THREADS);

private ConcurrencyLimitingBlobStore(BlobStore delegate) {
this.delegate = delegate;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new ConcurrencyLimitingBlobContainer(delegate.blobContainer(path), activeIndices, countDownLatch);
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
delegate.deleteBlobsIgnoringIfNotExists(purpose, blobNames);
}

@Override
public void close() throws IOException {
delegate.close();
}
}

private static class ConcurrencyLimitingBlobContainer extends FilterBlobContainer {
private final Set<String> activeIndices;
private final CountDownLatch countDownLatch;

ConcurrencyLimitingBlobContainer(BlobContainer delegate, Set<String> activeIndices, CountDownLatch countDownLatch) {
super(delegate);
this.activeIndices = activeIndices;
this.countDownLatch = countDownLatch;
}

@Override
protected BlobContainer wrapChild(BlobContainer child) {
return new ConcurrencyLimitingBlobContainer(child, activeIndices, countDownLatch);
}

@Override
public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException {
final var pathParts = path().parts();
if (pathParts.size() == 2 && pathParts.get(0).equals("indices") && blobName.startsWith("meta-")) {
// reading index metadata, so mark index as active
assertTrue(activeIndices.add(pathParts.get(1)));
assertThat(activeIndices.size(), lessThanOrEqualTo(MAX_SNAPSHOT_THREADS));
countDownLatch.countDown();
safeAwait(countDownLatch); // ensure that we do use all the threads
}
return super.readBlob(purpose, blobName);
}

@Override
public void writeMetadataBlob(
OperationPurpose purpose,
String blobName,
boolean failIfAlreadyExists,
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer
) throws IOException {
final var pathParts = path().parts();
if (pathParts.size() == 3
&& pathParts.get(0).equals("indices")
&& pathParts.get(2).equals("0")
&& blobName.startsWith("index-")) {
// writing shard-level BlobStoreIndexShardSnapshots, mark index as inactive again
assertTrue(activeIndices.remove(pathParts.get(1)));
}
super.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer);
}
}

public void testDeleteThrottling() {
final var repoPath = ESIntegTestCase.randomRepoPath(node().settings());

assertAcked(
client().admin()
.cluster()
.preparePutRepository(TEST_REPO_NAME)
.setType(FsRepository.TYPE)
.setSettings(Settings.builder().put("location", repoPath))
);

for (int i = 0; i < 3 * MAX_SNAPSHOT_THREADS; i++) {
createIndex("index-" + i, indexSettings(1, 0).build());
}

client().admin().cluster().prepareCreateSnapshot(TEST_REPO_NAME, "snapshot-1").setWaitForCompletion(true).get();
client().admin().cluster().prepareCreateSnapshot(TEST_REPO_NAME, "snapshot-2").setWaitForCompletion(true).get();

assertAcked(client().admin().cluster().prepareDeleteRepository(TEST_REPO_NAME));

assertAcked(
client().admin()
.cluster()
.preparePutRepository(TEST_REPO_NAME)
.setType(TEST_REPO_TYPE)
.setSettings(Settings.builder().put("location", repoPath))
);

assertAcked(client().admin().cluster().prepareDeleteSnapshot(TEST_REPO_NAME, "snapshot-1").get());

assertAcked(client().admin().cluster().prepareDeleteRepository(TEST_REPO_NAME));
}
}

0 comments on commit 9d5d363

Please sign in to comment.