From 1668e103c7c04d1bfa7e1f016bc28484bb49c2ea Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Thu, 22 Jun 2023 11:24:15 -0700 Subject: [PATCH] Addressed PR comments Signed-off-by: Bansi Kasundra --- CHANGELOG.md | 1 + .../repositories/s3/S3Repository.java | 4 +- .../snapshots/DeleteSnapshotIT.java | 84 ++++++ .../TransportCleanupRepositoryAction.java | 40 ++- .../repositories/FilterRepository.java | 4 +- .../opensearch/repositories/Repository.java | 2 +- .../blobstore/BlobStoreRepository.java | 245 +++++++++++++++--- .../snapshots/SnapshotsService.java | 61 +++-- .../DeleteDataStreamRequestTests.java | 1 - .../MetadataDeleteIndexServiceTests.java | 1 - .../MetadataIndexStateServiceTests.java | 1 - .../RepositoriesServiceTests.java | 2 - .../index/shard/RestoreOnlyRepository.java | 2 - .../AbstractSnapshotIntegTestCase.java | 43 +++ 14 files changed, 397 insertions(+), 94 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 16c794135cb23..6931c79e083c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321)) - [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118)) - Allow insecure string settings to warn-log usage and advise to migration of a newer secure variant ([#5496](https://github.com/opensearch-project/OpenSearch/pull/5496)) +- [Snapshot Interop] Add Changes in Delete Snapshot Flow for remote store interoperability. ([#7497](https://github.com/opensearch-project/OpenSearch/pull/7497)) ### Deprecated diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index e9d3a02646f3d..954b79035429f 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -51,7 +51,6 @@ import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.core.common.Strings; import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.repositories.RepositoryData; @@ -308,10 +307,9 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener); + super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener); } @Override diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java new file mode 100644 index 0000000000000..b5cacb5371478 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.test.FeatureFlagSetter; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase { + + public void testDeleteSnapshot() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "fs", snapshotRepoPath); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String snapshot = "snapshot"; + createFullSnapshot(snapshotRepoName, snapshot); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1); + + assertAcked(startDeleteSnapshot(snapshotRepoName, snapshot).get()); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0); + } + + public void testDeleteShallowCopySnapshot() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy()); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String shallowSnapshot = "shallow-snapshot"; + createFullSnapshot(snapshotRepoName, shallowSnapshot); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1); + + assertAcked(startDeleteSnapshot(snapshotRepoName, shallowSnapshot).get()); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 07b918e427784..fcd2994f3f98e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -38,6 +38,7 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.action.StepListener; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; @@ -50,8 +51,10 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryCleanupResult; @@ -93,6 +96,8 @@ public final class TransportCleanupRepositoryAction extends TransportClusterMana private final SnapshotsService snapshotsService; + private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory; + @Override protected String executor() { return ThreadPool.Names.SAME; @@ -119,6 +124,7 @@ public TransportCleanupRepositoryAction( ); this.repositoriesService = repositoriesService; this.snapshotsService = snapshotsService; + this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); // We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover. // This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent // operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes. @@ -260,17 +266,31 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { startedCleanup = true; logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId); - threadPool.executor(ThreadPool.Names.SNAPSHOT) - .execute( - ActionRunnable.wrap( - listener, - l -> blobStoreRepository.cleanup( - repositoryStateId, - snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null), - ActionListener.wrap(result -> after(null, result), e -> after(e, null)) - ) - ) + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { + final GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.wrap(repositoryCleanupResults -> { + long blobs = 0; + long bytes = 0; + for (RepositoryCleanupResult result : repositoryCleanupResults) { + blobs += result.blobs(); + bytes += result.bytes(); + } + RepositoryCleanupResult result = new RepositoryCleanupResult(new DeleteResult(blobs, bytes)); + after(null, result); + }, e -> { after(e, null); }), + 2 + ); + blobStoreRepository.cleanupRemoteStoreLockFiles( + repositoryStateId, + remoteStoreLockManagerFactory, + ActionListener.wrap(result -> groupedListener.onResponse(result), e -> groupedListener.onFailure(e)) + ); + blobStoreRepository.cleanup( + repositoryStateId, + snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null), + ActionListener.wrap(result -> groupedListener.onResponse(result), e -> groupedListener.onFailure(e)) ); + })); } private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) { diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 325d4ee265b0b..88e14a4dff3a0 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -46,7 +46,6 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; -import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; @@ -121,10 +120,9 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener); + in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 464f172e50607..791b8e43ee184 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.component.LifecycleComponent; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardId; @@ -165,7 +166,6 @@ void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 9a18c2d2d0b73..5a98c5f8afc3f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -748,7 +748,6 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { if (isReadOnly()) { @@ -769,7 +768,6 @@ protected void doRun() throws Exception { rootBlobs, repositoryData, repositoryMetaVersion, - remoteStoreLockManagerFactory, listener ); } @@ -782,6 +780,178 @@ public void onFailure(Exception e) { } } + @Override + public void cleanupRemoteStoreStaleLockFiles( + Collection snapshotIds, + long repositoryStateId, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + if (isReadOnly()) { + throw new RepositoryException(metadata.name(), "Cannot release locks for a readonly repository"); + } else { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + final Map rootBlobs = blobContainer().listBlobs(); + final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); + final List snapshotUUIDs = snapshotIds.stream().map(SnapshotId::getUUID).collect(Collectors.toList()); + final StepListener< + List>>> resourcesToReleaseLockListener = + new StepListener<>(); + resolveResourcesToReleaseLock(snapshotUUIDs, repositoryData, resourcesToReleaseLockListener); + + resourcesToReleaseLockListener.whenComplete(resourcesToReleaseLock -> { + if (resourcesToReleaseLock.isEmpty()) { + logger.info("No lock file to be released"); + listener.onResponse(DeleteResult.ZERO); + return; + } + try { + AtomicInteger counter = new AtomicInteger(); + Collection>>> subList = + resourcesToReleaseLock.stream() + .collect( + Collectors.groupingBy( + it -> counter.getAndIncrement() / MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get( + getMetadata().settings() + ) + ) + ) + .values(); + + final BlockingQueue< + List>>> lockFilesToDeleteInBatch = + new LinkedBlockingQueue<>(subList); + + final GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.wrap(deleteResults -> { + DeleteResult deleteResult = DeleteResult.ZERO; + for (DeleteResult result : deleteResults) { + deleteResult = deleteResult.add(result); + } + listener.onResponse(deleteResult); + }, e -> { listener.onFailure(e); }), + lockFilesToDeleteInBatch.size() + ); + + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min( + threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + lockFilesToDeleteInBatch.size() + ); + for (int i = 0; i < workers; ++i) { + executeLockFilesDelete(lockFilesToDeleteInBatch, groupedListener, remoteStoreLockManagerFactory); + } + } catch (Exception e) { + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of lock files", snapshotIds), e); + listener.onFailure(e); + } + }, ex -> listener.onFailure(ex)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(new RepositoryException(metadata.name(), "failed to release locks " + snapshotIds, e)); + } + }); + } + } + + private void resolveResourcesToReleaseLock( + Collection snapshotUUIDs, + RepositoryData repositoryData, + StepListener>>> listener + ) throws IOException { + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final Set allSnapshotIds = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet()); + final Map indexBlobs = blobStore().blobContainer(indicesPath()).children(); + final ActionListener>>>> allIndicesListener = + new GroupedActionListener<>( + ActionListener.map( + listener, + res -> res.stream().flatMap(Collection::stream).flatMap(Collection::stream).collect(Collectors.toList()) + ), + indexBlobs.size() + ); + for (Map.Entry indexBlob : indexBlobs.entrySet()) { + Map shardBlobs = indexBlob.getValue().children(); + final ActionListener>>> allShardsListener = + new GroupedActionListener<>(allIndicesListener, shardBlobs.size()); + for (Map.Entry shardBlob : shardBlobs.entrySet()) { + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + Map shardLevelBlobs = shardBlob.getValue().listBlobs(); + List>> resourcesToReleaseLock = new ArrayList<>(); + for (Map.Entry shardLevelBlob : shardLevelBlobs.entrySet()) { + String blob = shardLevelBlob.getKey(); + String snapshotUUID = blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()); + if (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) + && blob.endsWith(".dat") + && (!allSnapshotIds.contains(snapshotUUID) || snapshotUUIDs.contains(snapshotUUID))) { + RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( + shardBlob.getValue(), + snapshotUUID, + namedXContentRegistry + ); + resourcesToReleaseLock.add( + Tuple.tuple( + snapshotUUID, + Tuple.tuple(Integer.valueOf(shardBlob.getKey()), remoteStoreShardShallowCopySnapshot) + ) + ); + } + } + allShardsListener.onResponse(resourcesToReleaseLock); + } + + @Override + public void onFailure(Exception ex) { + // Just passing null here to count down the listener instead of failing it, the stale data left behind + // here will be retried in the next delete or repository cleanup + allShardsListener.onResponse(null); + } + }); + } + } + } + + private void executeLockFilesDelete( + BlockingQueue>>> lockFilesToDeleteInBatch, + GroupedActionListener listener, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory + ) throws InterruptedException { + List>> lockFilesToDelete = lockFilesToDeleteInBatch.poll( + 0L, + TimeUnit.MILLISECONDS + ); + if (lockFilesToDelete != null) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { + try { + for (Tuple> lockFile : lockFilesToDelete) { + String indexUUID = lockFile.v2().v2().getIndexUUID(); + String remoteStoreRepoForIndex = lockFile.v2().v2().getRemoteStoreRepository(); + RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepoForIndex, + indexUUID, + String.valueOf(lockFile.v2().v1()) + ); + remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(lockFile.v1()).build()); + } + l.onResponse(new DeleteResult(lockFilesToDelete.size(), 0)); + } catch (Exception e) { + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during deletion of lock files", metadata.name()), e); + l.onFailure(e); + } + executeLockFilesDelete(lockFilesToDeleteInBatch, listener, remoteStoreLockManagerFactory); + })); + } + } + /** * Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation. * @@ -849,7 +1019,6 @@ private void doDeleteShardSnapshots( Map rootBlobs, RepositoryData repositoryData, Version repoMetaVersion, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets @@ -884,7 +1053,6 @@ private void doDeleteShardSnapshots( ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), 2 ); - releaseResourceLockFiles(snapshotIds, repositoryData, remoteStoreLockManagerFactory, afterCleanupsListener); cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); asyncCleanupUnlinkedShardLevelBlobs( repositoryData, @@ -1047,8 +1215,11 @@ protected void doRun() throws Exception { final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots; final long newGen; - // Index- file would be present if snapshots other than shallow snapshots are present for this shard - if (shardContainer.listBlobsByPrefix(SNAPSHOT_INDEX_PREFIX).size() > 0) { + // Index-N file would be present if snapshots other than shallow snapshots are present for this shard + if (blobs.stream() + .filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)) + .collect(Collectors.toSet()) + .size() > 0) { if (useUUIDs) { newGen = -1L; blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots( @@ -1103,42 +1274,6 @@ public void onFailure(Exception ex) { } } - private void releaseResourceLockFiles( - Collection snapshotIds, - RepositoryData repositoryData, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, - ActionListener onAllShardsCompleted - ) { - for (SnapshotId snapshotId : snapshotIds) { - List indices = this.getSnapshotInfo(snapshotId).indices(); - List indexIds = repositoryData.resolveIndices(indices); - for (IndexId indexId : indexIds) { - try { - IndexMetadata indexMetadata = this.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId); - if (this.getSnapshotInfo(snapshotId).isRemoteStoreIndexShallowCopyEnabled() - && indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { - int numberOfShards = indexMetadata.getNumberOfShards(); - for (int shardId = 0; shardId < numberOfShards; shardId++) { - final int finalShardId = shardId; - String indexUUID = indexMetadata.getIndexUUID(); - String remoteStoreRepoForIndex = indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); - RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepoForIndex, - indexUUID, - String.valueOf(finalShardId) - ); - remoteStoreMetadataLockManager.release( - FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotId.getUUID()).build() - ); - } - } - } catch (IOException e) { - onAllShardsCompleted.onFailure(e); - } - } - } - } - private List resolveFilesToDelete( RepositoryData oldRepositoryData, Collection snapshotIds, @@ -1207,6 +1342,26 @@ private void cleanupStaleBlobs( } } + public void cleanupRemoteStoreLockFiles( + long repositoryStateId, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + try { + if (isReadOnly()) { + throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"); + } + cleanupRemoteStoreStaleLockFiles( + Collections.emptyList(), + repositoryStateId, + remoteStoreLockManagerFactory, + ActionListener.map(listener, RepositoryCleanupResult::new) + ); + } catch (Exception e) { + listener.onFailure(e); + } + } + /** * Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the * repository. @@ -2960,6 +3115,7 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( } } else { updatedSnapshots = BlobStoreIndexShardSnapshots.EMPTY; + writtenGeneration = ShardGenerations.NEW_SHARD_GEN; } final Set survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet()); return new ShardSnapshotMetaDeleteResult( @@ -3014,6 +3170,11 @@ private static List unusedBlobs( && survivingSnapshotUUIDs.contains( blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()) ) == false) + || (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) + && blob.endsWith(".dat") + && survivingSnapshotUUIDs.contains( + blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()) + ) == false) || (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null) || FsBlobContainer.isTempBlobName(blob) ) diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index fd1ac09766832..e971e786e1bf7 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -99,6 +99,7 @@ import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.RepositoryShardId; import org.opensearch.repositories.ShardGenerations; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -1542,12 +1543,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS if (deletionToRun == null) { runNextQueuedOperation(repositoryData, repository, false); } else { - deleteSnapshotsFromRepository( - deletionToRun, - repositoryData, - newState.nodes().getMinNodeVersion(), - remoteStoreLockManagerFactory - ); + deleteSnapshotsFromRepository(deletionToRun, repositoryData, newState.nodes().getMinNodeVersion()); } } }); @@ -2072,12 +2068,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) { if (tryEnterRepoLoop(repoName)) { - deleteSnapshotsFromRepository( - newDelete, - repositoryData, - newState.nodes().getMinNodeVersion(), - remoteStoreLockManagerFactory - ); + deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion()); } else { logger.trace("Delete [{}] could not execute directly and was queued", newDelete); } @@ -2154,7 +2145,7 @@ public void onResponse(RepositoryData repositoryData) { + "] in cluster state and [" + repositoryData.getGenId() + "] in the repository"; - deleteSnapshotsFromRepository(deleteEntry, repositoryData, minNodeVersion, remoteStoreLockManagerFactory); + deleteSnapshotsFromRepository(deleteEntry, repositoryData, minNodeVersion); } @Override @@ -2176,24 +2167,43 @@ public void onFailure(Exception e) { private void deleteSnapshotsFromRepository( SnapshotDeletionsInProgress.Entry deleteEntry, RepositoryData repositoryData, - Version minNodeVersion, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory + Version minNodeVersion ) { if (repositoryOperations.startDeletion(deleteEntry.uuid())) { assert currentlyFinalizing.contains(deleteEntry.repository()); final List snapshotIds = deleteEntry.getSnapshots(); assert deleteEntry.state() == SnapshotDeletionsInProgress.State.STARTED : "incorrect state for entry [" + deleteEntry + "]"; - repositoriesService.repository(deleteEntry.repository()) - .deleteSnapshots( + StepListener cleanupRemoteStoreLockFilesStep = new StepListener<>(); + boolean cleanupRemoteStoreLockFiles = repositoriesService.repository(deleteEntry.repository()) instanceof BlobStoreRepository; + if (cleanupRemoteStoreLockFiles) { + final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repositoriesService.repository( + deleteEntry.repository() + ); + blobStoreRepository.cleanupRemoteStoreStaleLockFiles( snapshotIds, repositoryData.getGenId(), - minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds), remoteStoreLockManagerFactory, - ActionListener.wrap(updatedRepoData -> { - logger.info("snapshots {} deleted", snapshotIds); - removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData); - }, ex -> removeSnapshotDeletionFromClusterState(deleteEntry, ex, repositoryData)) + ActionListener.wrap(cleanupRemoteStoreLockFilesStep::onResponse, cleanupRemoteStoreLockFilesStep::onFailure) ); + } else { + cleanupRemoteStoreLockFilesStep.onResponse(null); + } + cleanupRemoteStoreLockFilesStep.whenComplete(x -> { + repositoriesService.repository(deleteEntry.repository()) + .deleteSnapshots( + snapshotIds, + repositoryData.getGenId(), + minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds), + ActionListener.wrap(updatedRepoData -> { + logger.info("snapshots {} deleted", snapshotIds); + removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData); + }, ex -> removeSnapshotDeletionFromClusterState(deleteEntry, ex, repositoryData)) + ); + }, e -> { + // Ignoring the exception encountered while cleaning up lock files, the remaining lock files to be cleaned + // will be retried in the next delete or repository cleanup + logger.warn("[{}] Exception while cleaning up lock files. These will be retried in the next delete/cleanup operation", e); + }); } } @@ -2343,12 +2353,7 @@ public final void clusterStateProcessed(String source, ClusterState oldState, Cl leaveRepoLoop(deleteEntry.repository()); } else { for (SnapshotDeletionsInProgress.Entry readyDeletion : readyDeletions) { - deleteSnapshotsFromRepository( - readyDeletion, - repositoryData, - newState.nodes().getMinNodeVersion(), - remoteStoreLockManagerFactory - ); + deleteSnapshotsFromRepository(readyDeletion, repositoryData, newState.nodes().getMinNodeVersion()); } } } else { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java index 21a111e579a47..008046aa83dfe 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -162,7 +162,6 @@ private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo return new SnapshotsInProgress.Entry( new Snapshot(repo, new SnapshotId("", "")), false, - false, partial, SnapshotsInProgress.State.SUCCESS, Collections.emptyList(), diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java index 8c03766857eb4..b3772edcbe27c 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java @@ -103,7 +103,6 @@ public void testDeleteSnapshotting() { snapshot, true, false, - false, SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")), Collections.emptyList(), diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java index c1cbf937936cc..5cffa1931dd39 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java @@ -431,7 +431,6 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh snapshot, randomBoolean(), false, - false, SnapshotsInProgress.State.INIT, Collections.singletonList(new IndexId(index, index)), Collections.emptyList(), diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index ad2ef3d5058ef..43d371bf5a187 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -59,7 +59,6 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; -import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; @@ -265,7 +264,6 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { listener.onResponse(null); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 7bfff70466860..2a85fffa8699a 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -44,7 +44,6 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; -import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.IndexMetaDataGenerations; import org.opensearch.repositories.Repository; @@ -134,7 +133,6 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { listener.onResponse(null); diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 9933297aa1c96..7a0c6dbc79428 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -51,16 +51,22 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.compress.CompressorType; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; @@ -121,6 +127,8 @@ protected Settings nodeSettings(int nodeOrdinal) { // Rebalancing is causing some checks after restore to randomly fail // due to https://github.com/elastic/elasticsearch/issues/9421 .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .put(FeatureFlags.REMOTE_STORE, "true") .build(); } @@ -388,6 +396,13 @@ protected Settings.Builder randomRepositorySettings() { return settings; } + protected Settings.Builder snapshotRepoSettingsForShallowCopy() { + final Settings.Builder settings = Settings.builder(); + settings.put("location", randomRepoPath()); + settings.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE); + return settings; + } + protected static Settings.Builder indexSettingsNoReplicas(int shards) { return Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shards).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); } @@ -479,6 +494,19 @@ protected void indexRandomDocs(String index, int numdocs) throws InterruptedExce assertDocCount(index, numdocs); } + protected Settings getRemoteStoreBackedIndexSettings(String remoteStoreRepo) { + return Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey()) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepo) + .build(); + } + protected long getCountForIndex(String indexName) { return client().search( new SearchRequest(new SearchRequest(indexName).source(new SearchSourceBuilder().size(0).trackTotalHits(true))) @@ -489,6 +517,21 @@ protected void assertDocCount(String index, long count) { assertEquals(getCountForIndex(index), count); } + protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String remoteStoreRepositoryName) throws IOException { + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreIndex) + .get() + .getSetting(remoteStoreIndex, IndexMetadata.SETTING_INDEX_UUID); + final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class); + final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(remoteStoreRepositoryName); + BlobPath shardLevelBlobPath = remoteStoreRepository.basePath().add(indexUUID).add("0").add("segments").add("lock_files"); + BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath); + try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) { + return lockDirectory.listAll(); + } + } + /** * Adds a snapshot in state {@link SnapshotState#FAILED} to the given repository. *