Skip to content

Commit

Permalink
Add Changes in Snapshot Delete Flow for remote store interoperability. (
Browse files Browse the repository at this point in the history
opensearch-project#7497)

Signed-off-by: Bansi Kasundra <kasundra@amazon.com>
  • Loading branch information
kasundra07 authored and baba-devv committed Jul 29, 2023
1 parent 5561832 commit 411f4e9
Show file tree
Hide file tree
Showing 5 changed files with 569 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.snapshots;

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.client.Client;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.is;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase {

public void testDeleteSnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "fs", snapshotRepoPath);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1);

assertAcked(startDeleteSnapshot(snapshotRepoName, snapshot).get());
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0);
}

public void testDeleteShallowCopySnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy());

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String shallowSnapshot = "shallow-snapshot";
createFullSnapshot(snapshotRepoName, shallowSnapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1);

assertAcked(startDeleteSnapshot(snapshotRepoName, shallowSnapshot).get());
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
}

// Deleting multiple shallow copy snapshots as part of single delete call with repo having only shallow copy snapshots.
public void testDeleteMultipleShallowCopySnapshotsCase1() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

// Creating some shallow copy snapshots
int totalShallowCopySnapshotsCount = randomIntBetween(4, 10);
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount);
List<String> snapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount));
int tobeDeletedSnapshotsCount = snapshotsToBeDeleted.size();
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalShallowCopySnapshotsCount);
// Deleting subset of shallow copy snapshots
assertAcked(
clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotsToBeDeleted.toArray(new String[0]))
.get()
);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalShallowCopySnapshotsCount - tobeDeletedSnapshotsCount);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount
- tobeDeletedSnapshotsCount);
}

// Deleting multiple shallow copy snapshots as part of single delete call with both partial and full copy snapshot present in the repo
// And then deleting multiple full copy snapshots as part of single delete call with both partial and shallow copy snapshots present in
// the repo
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8610")
public void testDeleteMultipleShallowCopySnapshotsCase2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(2);
final String clusterManagerNode = internalCluster().getClusterManagerName();

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

// Creating a partial shallow copy snapshot
final String snapshot = "snapshot";
blockNodeWithIndex(snapshotRepoName, testIndex);
blockDataNode(snapshotRepoName, dataNode);

final Client clusterManagerClient = internalCluster().clusterManagerClient();
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterManagerClient.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshot)
.setWaitForCompletion(true)
.execute();

awaitNumberOfSnapshotsInProgress(1);
waitForBlock(dataNode, snapshotRepoName, TimeValue.timeValueSeconds(30L));
internalCluster().restartNode(dataNode);
assertThat(snapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));

unblockAllDataNodes(snapshotRepoName);

ensureStableCluster(2, clusterManagerNode);

// Creating some shallow copy snapshots
int totalShallowCopySnapshotsCount = randomIntBetween(4, 10);
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount);
List<String> shallowCopySnapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount));
int tobeDeletedShallowCopySnapshotsCount = shallowCopySnapshotsToBeDeleted.size();
totalShallowCopySnapshotsCount += 1; // Adding partial shallow snapshot here
// Updating the snapshot repository flag to disable shallow snapshots
createRepository(snapshotRepoName, "mock", snapshotRepoPath);
// Creating some full copy snapshots
int totalFullCopySnapshotsCount = randomIntBetween(4, 10);
List<String> fullCopySnapshots = createNSnapshots(snapshotRepoName, totalFullCopySnapshotsCount);
List<String> fullCopySnapshotsToBeDeleted = fullCopySnapshots.subList(0, randomIntBetween(2, totalFullCopySnapshotsCount));
int tobeDeletedFullCopySnapshotsCount = fullCopySnapshotsToBeDeleted.size();

int totalSnapshotsCount = totalFullCopySnapshotsCount + totalShallowCopySnapshotsCount;

assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
// Deleting subset of shallow copy snapshots
assertAcked(
clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshotsToBeDeleted.toArray(new String[0]))
.get()
);
totalSnapshotsCount -= tobeDeletedShallowCopySnapshotsCount;
totalShallowCopySnapshotsCount -= tobeDeletedShallowCopySnapshotsCount;
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);

// Deleting subset of full copy snapshots
assertAcked(
clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, fullCopySnapshotsToBeDeleted.toArray(new String[0]))
.get()
);
totalSnapshotsCount -= tobeDeletedFullCopySnapshotsCount;
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);
}

// Deleting subset of shallow and full copy snapshots as part of single delete call and then deleting all snapshots in the repo.
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8610")
public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

// Creating some shallow copy snapshots
int totalShallowCopySnapshotsCount = randomIntBetween(4, 10);
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount);
List<String> shallowCopySnapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount));
int tobeDeletedShallowCopySnapshotsCount = shallowCopySnapshotsToBeDeleted.size();
// Updating the snapshot repository flag to disable shallow snapshots
createRepository(snapshotRepoName, "mock", snapshotRepoPath);
// Creating some full copy snapshots
int totalFullCopySnapshotsCount = randomIntBetween(4, 10);
List<String> fullCopySnapshots = createNSnapshots(snapshotRepoName, totalFullCopySnapshotsCount);
List<String> fullCopySnapshotsToBeDeleted = fullCopySnapshots.subList(0, randomIntBetween(2, totalFullCopySnapshotsCount));
int tobeDeletedFullCopySnapshotsCount = fullCopySnapshotsToBeDeleted.size();

int totalSnapshotsCount = totalFullCopySnapshotsCount + totalShallowCopySnapshotsCount;

assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
// Deleting subset of shallow copy snapshots and full copy snapshots
assertAcked(
clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(
snapshotRepoName,
Stream.concat(shallowCopySnapshotsToBeDeleted.stream(), fullCopySnapshotsToBeDeleted.stream()).toArray(String[]::new)
)
.get()
);
totalSnapshotsCount -= (tobeDeletedShallowCopySnapshotsCount + tobeDeletedFullCopySnapshotsCount);
totalShallowCopySnapshotsCount -= tobeDeletedShallowCopySnapshotsCount;
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);

// Deleting all the remaining snapshots
assertAcked(clusterManagerClient.admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "*").get());
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
}

private List<String> createNSnapshots(String repoName, int count) {
final List<String> snapshotNames = new ArrayList<>(count);
final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
for (int i = 0; i < count; i++) {
final String name = prefix + i;
createFullSnapshot(repoName, name);
snapshotNames.add(name);
}
logger.info("--> created {} in [{}]", snapshotNames, repoName);
return snapshotNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryCleanupResult;
Expand Down Expand Up @@ -93,6 +94,8 @@ public final class TransportCleanupRepositoryAction extends TransportClusterMana

private final SnapshotsService snapshotsService;

private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory;

@Override
protected String executor() {
return ThreadPool.Names.SAME;
Expand All @@ -119,6 +122,7 @@ public TransportCleanupRepositoryAction(
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down Expand Up @@ -267,6 +271,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
l -> blobStoreRepository.cleanup(
repositoryStateId,
snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null),
remoteStoreLockManagerFactory,
ActionListener.wrap(result -> after(null, result), e -> after(e, null))
)
)
Expand Down
Loading

0 comments on commit 411f4e9

Please sign in to comment.