From 94b8f260992bc950d2d32fa0662789afaac85883 Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Tue, 9 May 2023 09:38:05 -0700 Subject: [PATCH] Add Changes in Snapshot Delete Flow for remote store interoperability. Signed-off-by: Bansi Kasundra --- .../repositories/s3/S3Repository.java | 4 +- .../repositories/FilterRepository.java | 4 +- .../opensearch/repositories/Repository.java | 2 + .../blobstore/BlobStoreRepository.java | 90 +++++++++++++++---- .../snapshots/SnapshotsService.java | 3 + .../DeleteDataStreamRequestTests.java | 1 + .../MetadataDeleteIndexServiceTests.java | 1 + .../MetadataIndexStateServiceTests.java | 1 + .../index/shard/RestoreOnlyRepository.java | 2 + 9 files changed, 89 insertions(+), 19 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 8094817035845..e69959b375abf 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -51,6 +51,7 @@ import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.repositories.RepositoryData; @@ -313,9 +314,10 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener); + super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index a6a649fa2cd44..7eae581a3d2e0 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -46,6 +46,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; @@ -120,9 +121,10 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener); + in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 1826fe1aa51da..da8e73829bb7f 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -47,6 +47,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; @@ -164,6 +165,7 @@ void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index d90c4f8c964d0..ae21ea2c8bf32 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -111,6 +111,9 @@ import org.opensearch.index.snapshots.blobstore.SnapshotFiles; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; +import org.opensearch.index.store.lockmanager.ShardLockInfo; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.IndexId; @@ -718,6 +721,7 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { if (isReadOnly()) { @@ -738,6 +742,7 @@ protected void doRun() throws Exception { rootBlobs, repositoryData, repositoryMetaVersion, + remoteStoreLockManagerFactory, listener ); } @@ -817,6 +822,7 @@ private void doDeleteShardSnapshots( Map rootBlobs, RepositoryData repositoryData, Version repoMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets @@ -851,6 +857,12 @@ private void doDeleteShardSnapshots( ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), 2 ); + releaseResourceLockFiles( + snapshotIds, + repositoryData, + remoteStoreLockManagerFactory, + afterCleanupsListener + ); cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); asyncCleanupUnlinkedShardLevelBlobs( repositoryData, @@ -1012,17 +1024,23 @@ protected void doRun() throws Exception { final Set blobs = shardContainer.listBlobs().keySet(); final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots; final long newGen; - if (useUUIDs) { + if (shardContainer.listBlobsByPrefix(SNAPSHOT_INDEX_PREFIX).size() > 0) { + if (useUUIDs) { + newGen = -1L; + blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots( + blobs, + shardContainer, + oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId) + ).v1(); + } else { + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); + newGen = tuple.v2() + 1; + blobStoreIndexShardSnapshots = tuple.v1(); + } + } + else { newGen = -1L; - blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots( - blobs, - shardContainer, - oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId) - ).v1(); - } else { - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); - newGen = tuple.v2() + 1; - blobStoreIndexShardSnapshots = tuple.v1(); + blobStoreIndexShardSnapshots = BlobStoreIndexShardSnapshots.EMPTY; } allShardsListener.onResponse( deleteFromShardSnapshotMeta( @@ -1059,6 +1077,42 @@ public void onFailure(Exception ex) { } } + private void releaseResourceLockFiles ( + Collection snapshotIds, + RepositoryData repositoryData, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener onAllShardsCompleted + ){ + for (SnapshotId snapshotId: snapshotIds) { + List indices = this.getSnapshotInfo(snapshotId).indices(); + List indexIds = repositoryData.resolveIndices(indices); + for (IndexId indexId: indexIds) { + try { + IndexMetadata indexMetadata = this.getSnapshotIndexMetaData(repositoryData, + snapshotId, indexId); + if (this.getSnapshotInfo(snapshotId).isRemoteStoreInteropEnabled() + && indexMetadata.getSettings().getAsBoolean( + IndexMetadata.SETTING_REMOTE_STORE_ENABLED, + false)) { + int numberOfShards = indexMetadata.getNumberOfShards(); + for (int shardId = 0; shardId < numberOfShards; shardId++) { + final int finalShardId = shardId; + String indexUUID = indexMetadata.getIndexUUID(); + String remoteStoreRepoForIndex = indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); + RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory + .newLockManager(remoteStoreRepoForIndex, indexUUID, String.valueOf(finalShardId)); + remoteStoreMetadataLockManager.release( + ShardLockInfo.getLockInfoBuilder().withResourceId(snapshotId.getUUID()).build() + ); + } + } + } catch (IOException e) { + onAllShardsCompleted.onFailure(e); + } + } + } + } + private List resolveFilesToDelete( RepositoryData oldRepositoryData, Collection snapshotIds, @@ -2783,16 +2837,18 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( } String writtenGeneration = null; try { - if (newSnapshotsList.isEmpty()) { + if (survivingSnapshots.isEmpty()) { return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, ShardGenerations.DELETED_SHARD_GEN, blobs); } else { final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); - if (indexGeneration < 0L) { - writtenGeneration = UUIDs.randomBase64UUID(); - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress); - } else { - writtenGeneration = String.valueOf(indexGeneration); - writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots); + if (snapshots.snapshots().size() > 0) { + if (indexGeneration < 0L) { + writtenGeneration = UUIDs.randomBase64UUID(); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress); + } else { + writtenGeneration = String.valueOf(indexGeneration); + writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots); + } } final Set survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet()); return new ShardSnapshotMetaDeleteResult( diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 51ca07adddd5a..e0640f556dd18 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -90,6 +90,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -2162,11 +2163,13 @@ private void deleteSnapshotsFromRepository( assert currentlyFinalizing.contains(deleteEntry.repository()); final List snapshotIds = deleteEntry.getSnapshots(); assert deleteEntry.state() == SnapshotDeletionsInProgress.State.STARTED : "incorrect state for entry [" + deleteEntry + "]"; + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); repositoriesService.repository(deleteEntry.repository()) .deleteSnapshots( snapshotIds, repositoryData.getGenId(), minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds), + remoteStoreLockManagerFactory, ActionListener.wrap(updatedRepoData -> { logger.info("snapshots {} deleted", snapshotIds); removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java index c4fd5c553f069..3f52499571623 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -162,6 +162,7 @@ private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo return new SnapshotsInProgress.Entry( new Snapshot(repo, new SnapshotId("", "")), false, + false, partial, SnapshotsInProgress.State.SUCCESS, Collections.emptyList(), diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java index c309b90f1a777..fc2a7696cfcb2 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java @@ -103,6 +103,7 @@ public void testDeleteSnapshotting() { snapshot, true, false, + false, SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")), Collections.emptyList(), diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java index ffda58adcd019..e69403b788cdc 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java @@ -431,6 +431,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh snapshot, randomBoolean(), false, + false, SnapshotsInProgress.State.INIT, Collections.singletonList(new IndexId(index, index)), Collections.emptyList(), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 2a85fffa8699a..7bfff70466860 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -44,6 +44,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.IndexMetaDataGenerations; import org.opensearch.repositories.Repository; @@ -133,6 +134,7 @@ public void deleteSnapshots( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { listener.onResponse(null);