Skip to content

Commit

Permalink
Add Changes in Snapshot Delete Flow for remote store interoperability.
Browse files Browse the repository at this point in the history
Signed-off-by: Bansi Kasundra <kasundra@amazon.com>
  • Loading branch information
kasundra07 committed May 9, 2023
1 parent 4956db3 commit 94b8f26
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.repositories.RepositoryData;
Expand Down Expand Up @@ -313,9 +314,10 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
) {
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -120,9 +121,10 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
) {
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -164,6 +165,7 @@ void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.lockmanager.ShardLockInfo;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -718,6 +721,7 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
) {
if (isReadOnly()) {
Expand All @@ -738,6 +742,7 @@ protected void doRun() throws Exception {
rootBlobs,
repositoryData,
repositoryMetaVersion,
remoteStoreLockManagerFactory,
listener
);
}
Expand Down Expand Up @@ -817,6 +822,7 @@ private void doDeleteShardSnapshots(
Map<String, BlobMetadata> rootBlobs,
RepositoryData repositoryData,
Version repoMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
) {
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
Expand Down Expand Up @@ -851,6 +857,12 @@ private void doDeleteShardSnapshots(
ActionListener.wrap(() -> listener.onResponse(updatedRepoData)),
2
);
releaseResourceLockFiles(
snapshotIds,
repositoryData,
remoteStoreLockManagerFactory,
afterCleanupsListener
);
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
asyncCleanupUnlinkedShardLevelBlobs(
repositoryData,
Expand Down Expand Up @@ -1012,17 +1024,23 @@ protected void doRun() throws Exception {
final Set<String> blobs = shardContainer.listBlobs().keySet();
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
final long newGen;
if (useUUIDs) {
if (shardContainer.listBlobsByPrefix(SNAPSHOT_INDEX_PREFIX).size() > 0) {
if (useUUIDs) {
newGen = -1L;
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(
blobs,
shardContainer,
oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId)
).v1();
} else {
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
newGen = tuple.v2() + 1;
blobStoreIndexShardSnapshots = tuple.v1();
}
}
else {
newGen = -1L;
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(
blobs,
shardContainer,
oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId)
).v1();
} else {
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
newGen = tuple.v2() + 1;
blobStoreIndexShardSnapshots = tuple.v1();
blobStoreIndexShardSnapshots = BlobStoreIndexShardSnapshots.EMPTY;
}
allShardsListener.onResponse(
deleteFromShardSnapshotMeta(
Expand Down Expand Up @@ -1059,6 +1077,42 @@ public void onFailure(Exception ex) {
}
}

private void releaseResourceLockFiles (
Collection<SnapshotId> snapshotIds,
RepositoryData repositoryData,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<Void> onAllShardsCompleted
){
for (SnapshotId snapshotId: snapshotIds) {
List<String> indices = this.getSnapshotInfo(snapshotId).indices();
List<IndexId> indexIds = repositoryData.resolveIndices(indices);
for (IndexId indexId: indexIds) {
try {
IndexMetadata indexMetadata = this.getSnapshotIndexMetaData(repositoryData,
snapshotId, indexId);
if (this.getSnapshotInfo(snapshotId).isRemoteStoreInteropEnabled()
&& indexMetadata.getSettings().getAsBoolean(
IndexMetadata.SETTING_REMOTE_STORE_ENABLED,
false)) {
int numberOfShards = indexMetadata.getNumberOfShards();
for (int shardId = 0; shardId < numberOfShards; shardId++) {
final int finalShardId = shardId;
String indexUUID = indexMetadata.getIndexUUID();
String remoteStoreRepoForIndex = indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, String.valueOf(finalShardId));
remoteStoreMetadataLockManager.release(
ShardLockInfo.getLockInfoBuilder().withResourceId(snapshotId.getUUID()).build()
);
}
}
} catch (IOException e) {
onAllShardsCompleted.onFailure(e);
}
}
}
}

private List<String> resolveFilesToDelete(
RepositoryData oldRepositoryData,
Collection<SnapshotId> snapshotIds,
Expand Down Expand Up @@ -2783,16 +2837,18 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(
}
String writtenGeneration = null;
try {
if (newSnapshotsList.isEmpty()) {
if (survivingSnapshots.isEmpty()) {
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, ShardGenerations.DELETED_SHARD_GEN, blobs);
} else {
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
if (indexGeneration < 0L) {
writtenGeneration = UUIDs.randomBase64UUID();
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress);
} else {
writtenGeneration = String.valueOf(indexGeneration);
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
if (snapshots.snapshots().size() > 0) {
if (indexGeneration < 0L) {
writtenGeneration = UUIDs.randomBase64UUID();
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress);
} else {
writtenGeneration = String.valueOf(indexGeneration);
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
}
}
final Set<String> survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
return new ShardSnapshotMetaDeleteResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -2162,11 +2163,13 @@ private void deleteSnapshotsFromRepository(
assert currentlyFinalizing.contains(deleteEntry.repository());
final List<SnapshotId> snapshotIds = deleteEntry.getSnapshots();
assert deleteEntry.state() == SnapshotDeletionsInProgress.State.STARTED : "incorrect state for entry [" + deleteEntry + "]";
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
repositoriesService.repository(deleteEntry.repository())
.deleteSnapshots(
snapshotIds,
repositoryData.getGenId(),
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
remoteStoreLockManagerFactory,
ActionListener.wrap(updatedRepoData -> {
logger.info("snapshots {} deleted", snapshotIds);
removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo
return new SnapshotsInProgress.Entry(
new Snapshot(repo, new SnapshotId("", "")),
false,
false,
partial,
SnapshotsInProgress.State.SUCCESS,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void testDeleteSnapshotting() {
snapshot,
true,
false,
false,
SnapshotsInProgress.State.INIT,
singletonList(new IndexId(index, "doesn't matter")),
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh
snapshot,
randomBoolean(),
false,
false,
SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId(index, index)),
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.IndexMetaDataGenerations;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -133,6 +134,7 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
) {
listener.onResponse(null);
Expand Down

0 comments on commit 94b8f26

Please sign in to comment.