diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreDynamicSettingsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreDynamicSettingsIT.java new file mode 100644 index 0000000000000..7416965d46639 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreDynamicSettingsIT.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.repositories.blobstore; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.hamcrest.Matchers.instanceOf; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class BlobStoreDynamicSettingsIT extends AbstractSnapshotIntegTestCase { + + public void testUpdateRateLimitsDynamically() throws Exception { + final String masterNode = internalCluster().startMasterOnlyNode(); + + final boolean largeSnapshotPool = randomBoolean(); + final String dataNode; + if (largeSnapshotPool) { + dataNode = startDataNodeWithLargeSnapshotPool(); + } else { + dataNode = internalCluster().startDataOnlyNode(); + } + + final String repoName = "test-repo"; + // use a small chunk size so the rate limiter does not overshoot to far and get blocked a very long time below + createRepository(repoName, "mock", randomRepositorySettings().put("chunk_size", "100b")); + + if (randomBoolean()) { + createFullSnapshot(repoName, "snapshot-1"); + } + + final String indexName = "test-idx"; + createIndexWithContent(indexName); + + final Repository repoOnMaster = getRepositoryOnNode(repoName, masterNode); + final Repository repoOnDataNode = getRepositoryOnNode(repoName, dataNode); + + final Settings currentSettings = repoOnMaster.getMetadata().settings(); + assertNull(currentSettings.get(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey())); + assertNull(currentSettings.get(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey())); + + createRepository( + repoName, + "mock", + Settings.builder().put(currentSettings).put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), "1b"), + randomBoolean() + ); + + assertSame(repoOnMaster, getRepositoryOnNode(repoName, masterNode)); + assertSame(repoOnDataNode, getRepositoryOnNode(repoName, dataNode)); + + final Settings updatedSettings = repoOnMaster.getMetadata().settings(); + assertEquals(ByteSizeValue.ofBytes(1L), + updatedSettings.getAsBytesSize(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO)); + assertNull(currentSettings.get(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey())); + + final ActionFuture snapshot1 = startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode); + + // we only run concurrent verification when we have a large SNAPSHOT pool on the data node because otherwise the verification would + // deadlock since the small pool is already blocked by the snapshot on the data node + createRepository( + repoName, + "mock", + Settings.builder().put(updatedSettings).put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), "1024b"), + largeSnapshotPool && randomBoolean() + ); + assertSame(repoOnMaster, getRepositoryOnNode(repoName, masterNode)); + assertSame(repoOnDataNode, getRepositoryOnNode(repoName, dataNode)); + + logger.info("--> verify that we can't update [location] dynamically"); + try { + // this setting update will fail so we can set the verification parameter randomly even if the SNAPSHOT pool is already blocked + // since we will never actually get to the verification step + createRepository( + repoName, + "mock", + Settings.builder().put(repoOnMaster.getMetadata().settings()).put("location", randomRepoPath()), + randomBoolean() + ); + } catch (Exception e) { + final Throwable ise = ExceptionsHelper.unwrap(e, IllegalStateException.class); + assertThat(ise, instanceOf(IllegalStateException.class)); + assertEquals(ise.getMessage(), "trying to modify or unregister repository that is currently used"); + } + + logger.info("--> verify that we can update [{}] dynamically", MockRepository.DUMMY_UPDATABLE_SETTING_NAME); + final String dummySettingValue = randomUnicodeOfCodepointLength(10); + // we only run concurrent verification when we have a large SNAPSHOT pool on the data node because otherwise the verification would + // deadlock since the small pool is already blocked by the snapshot on the data node + createRepository( + repoName, + "mock", + Settings.builder().put(repoOnMaster.getMetadata().settings()) + .put(MockRepository.DUMMY_UPDATABLE_SETTING_NAME, dummySettingValue), + largeSnapshotPool && randomBoolean() + ); + final Repository newRepoOnMaster = getRepositoryOnNode(repoName, masterNode); + assertSame(repoOnMaster, newRepoOnMaster); + assertSame(repoOnDataNode, getRepositoryOnNode(repoName, dataNode)); + assertEquals(dummySettingValue, newRepoOnMaster.getMetadata().settings().get(MockRepository.DUMMY_UPDATABLE_SETTING_NAME)); + + unblockNode(repoName, dataNode); + assertSuccessful(snapshot1); + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index 955adccd7a057..93cf8ae59ebe7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -1259,10 +1259,6 @@ public void testStartWithSuccessfulShardSnapshotPendingFinalization() throws Exc assertSuccessful(otherSnapshot); } - private static String startDataNodeWithLargeSnapshotPool() { - return internalCluster().startDataOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); - } - private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) { final SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus(otherBlockedRepoName).get(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java index 2f005f3826433..6e0c2de3593ed 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java @@ -196,4 +196,8 @@ public String toString() { public RepositoryMetadata withUuid(String uuid) { return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration); } + + public RepositoryMetadata withSettings(Settings settings) { + return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration); + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 14d9b4f5122cd..dd31fd6da1677 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -26,6 +27,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; @@ -125,6 +127,11 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In return in.getShardSnapshotStatus(snapshotId, indexId, shardId); } + @Override + public boolean canUpdateInPlace(Settings updatedSettings, Set ignoredSettings) { + return in.canUpdateInPlace(updatedSettings, ignoredSettings); + } + @Override public void updateState(ClusterState state) { in.updateState(state); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 74eb993c19cf9..ee6ae89f32ebc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -173,7 +173,6 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL @Override public ClusterState execute(ClusterState currentState) { - ensureRepositoryNotInUse(currentState, request.name()); Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); @@ -184,8 +183,22 @@ public ClusterState execute(ClusterState currentState) { // Previous version is the same as this one no update is needed. return currentState; } + Repository existing = RepositoriesService.this.repositories.get(request.name()); + if (existing == null) { + existing = RepositoriesService.this.internalRepositories.get(request.name()); + } + assert existing != null : "repository [" + newRepositoryMetadata.name() + "] must exist"; + assert existing.getMetadata() == repositoryMetadata; + final RepositoryMetadata updatedMetadata; + if (canUpdateInPlace(newRepositoryMetadata, existing)) { + // we're updating in place so the updated metadata must point at the same uuid and generations + updatedMetadata = repositoryMetadata.withSettings(newRepositoryMetadata.settings()); + } else { + ensureRepositoryNotInUse(currentState, request.name()); + updatedMetadata = newRepositoryMetadata; + } found = true; - repositoriesMetadata.add(newRepositoryMetadata); + repositoriesMetadata.add(updatedMetadata); } else { repositoriesMetadata.add(repositoryMetadata); } @@ -430,9 +443,7 @@ public void applyClusterState(ClusterChangedEvent event) { Repository repository = survivors.get(repositoryMetadata.name()); if (repository != null) { // Found previous version of this repository - RepositoryMetadata previousMetadata = repository.getMetadata(); - if (previousMetadata.type().equals(repositoryMetadata.type()) == false - || previousMetadata.settings().equals(repositoryMetadata.settings()) == false) { + if (canUpdateInPlace(repositoryMetadata, repository) == false) { // Previous version is different from the version in settings logger.debug("updating repository [{}]", repositoryMetadata.name()); closeRepository(repository); @@ -469,6 +480,12 @@ public void applyClusterState(ClusterChangedEvent event) { } } + private boolean canUpdateInPlace(RepositoryMetadata updatedMetadata, Repository repository) { + assert updatedMetadata.name().equals(repository.getMetadata().name()); + return repository.getMetadata().type().equals(updatedMetadata.type()) + && repository.canUpdateInPlace(updatedMetadata.settings(), Collections.emptySet()); + } + /** * Gets the {@link RepositoryData} for the given repository. * diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 22668e1d78095..cdd20706e9929 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -28,6 +29,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; @@ -214,6 +216,17 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s */ IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId); + /** + * Check if this instances {@link Settings} can be changed to the provided updated settings without recreating the repository. + * + * @param updatedSettings new repository settings + * @param ignoredSettings setting names to ignore even if changed + * @return true if the repository can be updated in place + */ + default boolean canUpdateInPlace(Settings updatedSettings, Set ignoredSettings) { + return getMetadata().settings().equals(updatedSettings); + } + /** * Update the repository with the incoming cluster state. This method is invoked from {@link RepositoriesService#applyClusterState} and * thus the same semantics as with {@link org.elasticsearch.cluster.ClusterStateApplier#applyClusterState} apply for the diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 421af75c20605..38f01969619e6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -121,9 +121,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -234,9 +236,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final boolean cacheRepositoryData; - private final RateLimiter snapshotRateLimiter; + private volatile RateLimiter snapshotRateLimiter; - private final RateLimiter restoreRateLimiter; + private volatile RateLimiter restoreRateLimiter; private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric(); @@ -259,6 +261,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final ChecksumBlobStoreFormat INDEX_SHARD_SNAPSHOTS_FORMAT = new ChecksumBlobStoreFormat<>("snapshots", SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots::fromXContent); + public static final Setting MAX_SNAPSHOT_BYTES_PER_SEC = Setting.byteSizeSetting("max_snapshot_bytes_per_sec", + new ByteSizeValue(40, ByteSizeUnit.MB), Setting.Property.Dynamic, Setting.Property.NodeScope); + + public static final Setting MAX_RESTORE_BYTES_PER_SEC = Setting.byteSizeSetting("max_restore_bytes_per_sec", + ByteSizeValue.ZERO, Setting.Property.Dynamic, Setting.Property.NodeScope); + + /** + * Repository settings that can be updated dynamically without having to create a new repository. + */ + private static final Set DYNAMIC_SETTING_NAMES = + Set.of(MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), MAX_RESTORE_BYTES_PER_SEC.getKey()); + private final boolean readOnly; private final Object lock = new Object(); @@ -331,8 +345,8 @@ protected BlobStoreRepository( this.recoverySettings = recoverySettings; this.compress = COMPRESS_SETTING.get(metadata.settings()); this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings()); - snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); - restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO); + snapshotRateLimiter = getRateLimiter(metadata.settings(), MAX_SNAPSHOT_BYTES_PER_SEC); + restoreRateLimiter = getRateLimiter(metadata.settings(), MAX_RESTORE_BYTES_PER_SEC); readOnly = metadata.settings().getAsBoolean(READONLY_SETTING_KEY, false); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); @@ -490,11 +504,32 @@ private static int getSegmentInfoFileCount(List fi.physicalName().endsWith(".si")).count())); } + @Override + public boolean canUpdateInPlace(Settings updatedSettings, Set ignoredSettings) { + final Settings current = metadata.settings(); + if (current.equals(updatedSettings)) { + return true; + } + final Set changedSettingNames = new HashSet<>(current.keySet()); + changedSettingNames.addAll(updatedSettings.keySet()); + changedSettingNames.removeAll(ignoredSettings); + changedSettingNames.removeIf(setting -> Objects.equals(current.get(setting), updatedSettings.get(setting))); + changedSettingNames.removeAll(DYNAMIC_SETTING_NAMES); + return changedSettingNames.isEmpty(); + } + // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates // #latestKnownRepoGen if a newer than currently known generation is found @Override public void updateState(ClusterState state) { + final Settings previousSettings = metadata.settings(); metadata = getRepoMetadata(state); + final Settings updatedSettings = metadata.settings(); + if (updatedSettings.equals(previousSettings) == false) { + snapshotRateLimiter = getRateLimiter(metadata.settings(), MAX_SNAPSHOT_BYTES_PER_SEC); + restoreRateLimiter = getRateLimiter(metadata.settings(), MAX_RESTORE_BYTES_PER_SEC); + } + uncleanStart = uncleanStart && metadata.generation() != metadata.pendingGeneration(); final boolean wasBestEffortConsistency = bestEffortConsistency; bestEffortConsistency = uncleanStart || isReadOnly() || metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN; @@ -1257,11 +1292,10 @@ public BlobContainer shardContainer(IndexId indexId, int shardId) { * * @param repositorySettings repository settings * @param setting setting to use to configure rate limiter - * @param defaultRate default limiting rate * @return rate limiter or null of no throttling is needed */ - private RateLimiter getRateLimiter(Settings repositorySettings, String setting, ByteSizeValue defaultRate) { - ByteSizeValue maxSnapshotBytesPerSec = repositorySettings.getAsBytesSize(setting, defaultRate); + private static RateLimiter getRateLimiter(Settings repositorySettings, Setting setting) { + ByteSizeValue maxSnapshotBytesPerSec = setting.get(repositorySettings); if (maxSnapshotBytesPerSec.getBytes() <= 0) { return null; } else { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index cddcae7af5b47..e173025d52ded 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -190,6 +190,10 @@ protected void stopNode(final String node) throws IOException { internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(node)); } + protected static String startDataNodeWithLargeSnapshotPool() { + return internalCluster().startDataOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); + } + public void waitForBlock(String node, String repository) throws Exception { logger.info("--> waiting for [{}] to be blocked on node [{}]", repository, node); MockRepository mockRepository = getRepositoryOnNode(repository, node); @@ -277,10 +281,16 @@ public void unblockNode(final String repository, final String node) { AbstractSnapshotIntegTestCase.getRepositoryOnNode(repository, node).unblock(); } - protected void createRepository(String repoName, String type, Settings.Builder settings) { - logger.info("--> creating repository [{}] [{}]", repoName, type); + protected void createRepository(String repoName, String type, Settings.Builder settings, boolean verify) { + logger.info("--> creating or updating repository [{}] [{}]", repoName, type); assertAcked(clusterAdmin().preparePutRepository(repoName) - .setType(type).setSettings(settings)); + .setVerify(verify) + .setType(type) + .setSettings(settings)); + } + + protected void createRepository(String repoName, String type, Settings.Builder settings) { + createRepository(repoName, type, settings, true); } protected void createRepository(String repoName, String type, Path location) { @@ -292,11 +302,7 @@ protected void createRepository(String repoName, String type) { } protected void createRepositoryNoVerify(String repoName, String type) { - logger.info("--> creating repository [{}] [{}]", repoName, type); - assertAcked(clusterAdmin().preparePutRepository(repoName) - .setVerify(false) - .setType(type) - .setSettings(randomRepositorySettings())); + createRepository(repoName, type, randomRepositorySettings(), false); } protected Settings.Builder randomRepositorySettings() { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index b2ca6432ec7bb..f4b787b635ec8 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -51,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -81,6 +83,11 @@ public List> getSettings() { } } + /** + * Setting name for a setting that can be updated dynamically to test {@link #canUpdateInPlace(Settings, Set)}. + */ + public static final String DUMMY_UPDATABLE_SETTING_NAME = "dummy_setting"; + private final AtomicLong failureCounter = new AtomicLong(); public long getFailureCount() { @@ -162,6 +169,12 @@ public RepositoryMetadata getMetadata() { return overrideSettings(super.getMetadata(), env); } + @Override + public boolean canUpdateInPlace(Settings updatedSettings, Set ignoredSettings) { + // allow updating dummy setting for test purposes + return super.canUpdateInPlace(updatedSettings, Sets.union(ignoredSettings, Set.of(DUMMY_UPDATABLE_SETTING_NAME))); + } + private static RepositoryMetadata overrideSettings(RepositoryMetadata metadata, Environment environment) { // TODO: use another method of testing not being able to read the test file written by the master... // this is super duper hacky