Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Changes in Snapshot Delete Flow for remote store interoperability. #7497

Merged
merged 7 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.snapshots;

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.client.Client;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.is;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase {

public void testDeleteSnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "fs", snapshotRepoPath);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1);

assertAcked(startDeleteSnapshot(snapshotRepoName, snapshot).get());
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0);
}

public void testDeleteShallowCopySnapshot() throws Exception {
kasundra07 marked this conversation as resolved.
Show resolved Hide resolved
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy());

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String shallowSnapshot = "shallow-snapshot";
createFullSnapshot(snapshotRepoName, shallowSnapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1);

assertAcked(startDeleteSnapshot(snapshotRepoName, shallowSnapshot).get());
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
}

// Deleting multiple shallow copy snapshots as part of single delete call with repo having only shallow copy snapshots.
public void testDeleteMultipleShallowCopySnapshotsCase1() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

// Creating some shallow copy snapshots
int totalShallowCopySnapshotsCount = randomIntBetween(4, 10);
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount);
List<String> snapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount));
int tobeDeletedSnapshotsCount = snapshotsToBeDeleted.size();
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalShallowCopySnapshotsCount);
// Deleting subset of shallow copy snapshots
assertAcked(
clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotsToBeDeleted.toArray(new String[0]))
.get()
);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalShallowCopySnapshotsCount - tobeDeletedSnapshotsCount);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount
- tobeDeletedSnapshotsCount);
}

// Deleting multiple shallow copy snapshots as part of single delete call with both partial and full copy snapshot present in the repo
// And then deleting multiple full copy snapshots as part of single delete call with both partial and shallow copy snapshots present in
// the repo
public void testDeleteMultipleShallowCopySnapshotsCase2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(2);
final String clusterManagerNode = internalCluster().getClusterManagerName();

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

// Creating a partial shallow copy snapshot
final String snapshot = "snapshot";
blockNodeWithIndex(snapshotRepoName, testIndex);
blockDataNode(snapshotRepoName, dataNode);

final Client clusterManagerClient = internalCluster().clusterManagerClient();
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterManagerClient.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshot)
.setWaitForCompletion(true)
.execute();

awaitNumberOfSnapshotsInProgress(1);
waitForBlock(dataNode, snapshotRepoName, TimeValue.timeValueSeconds(30L));
internalCluster().restartNode(dataNode);
assertThat(snapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));

unblockAllDataNodes(snapshotRepoName);

ensureStableCluster(2, clusterManagerNode);

// Creating some shallow copy snapshots
int totalShallowCopySnapshotsCount = randomIntBetween(4, 10);
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount);
List<String> shallowCopySnapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount));
int tobeDeletedShallowCopySnapshotsCount = shallowCopySnapshotsToBeDeleted.size();
totalShallowCopySnapshotsCount += 1; // Adding partial shallow snapshot here
// Updating the snapshot repository flag to disable shallow snapshots
createRepository(snapshotRepoName, "mock", snapshotRepoPath);
// Creating some full copy snapshots
int totalFullCopySnapshotsCount = randomIntBetween(4, 10);
List<String> fullCopySnapshots = createNSnapshots(snapshotRepoName, totalFullCopySnapshotsCount);
List<String> fullCopySnapshotsToBeDeleted = fullCopySnapshots.subList(0, randomIntBetween(2, totalFullCopySnapshotsCount));
int tobeDeletedFullCopySnapshotsCount = fullCopySnapshotsToBeDeleted.size();

int totalSnapshotsCount = totalFullCopySnapshotsCount + totalShallowCopySnapshotsCount;

assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
// Deleting subset of shallow copy snapshots
assertAcked(
clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshotsToBeDeleted.toArray(new String[0]))
.get()
);
totalSnapshotsCount -= tobeDeletedShallowCopySnapshotsCount;
totalShallowCopySnapshotsCount -= tobeDeletedShallowCopySnapshotsCount;
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);

// Deleting subset of full copy snapshots
assertAcked(
clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, fullCopySnapshotsToBeDeleted.toArray(new String[0]))
.get()
);
totalSnapshotsCount -= tobeDeletedFullCopySnapshotsCount;
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);
}

// Deleting subset of shallow and full copy snapshots as part of single delete call and then deleting all snapshots in the repo.
public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

// Creating some shallow copy snapshots
int totalShallowCopySnapshotsCount = randomIntBetween(4, 10);
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount);
List<String> shallowCopySnapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount));
int tobeDeletedShallowCopySnapshotsCount = shallowCopySnapshotsToBeDeleted.size();
// Updating the snapshot repository flag to disable shallow snapshots
createRepository(snapshotRepoName, "mock", snapshotRepoPath);
// Creating some full copy snapshots
int totalFullCopySnapshotsCount = randomIntBetween(4, 10);
List<String> fullCopySnapshots = createNSnapshots(snapshotRepoName, totalFullCopySnapshotsCount);
List<String> fullCopySnapshotsToBeDeleted = fullCopySnapshots.subList(0, randomIntBetween(2, totalFullCopySnapshotsCount));
int tobeDeletedFullCopySnapshotsCount = fullCopySnapshotsToBeDeleted.size();

int totalSnapshotsCount = totalFullCopySnapshotsCount + totalShallowCopySnapshotsCount;

assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
// Deleting subset of shallow copy snapshots and full copy snapshots
assertAcked(
clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(
snapshotRepoName,
Stream.concat(shallowCopySnapshotsToBeDeleted.stream(), fullCopySnapshotsToBeDeleted.stream()).toArray(String[]::new)
)
.get()
);
totalSnapshotsCount -= (tobeDeletedShallowCopySnapshotsCount + tobeDeletedFullCopySnapshotsCount);
totalShallowCopySnapshotsCount -= tobeDeletedShallowCopySnapshotsCount;
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount);

// Deleting all the remaining snapshots
assertAcked(clusterManagerClient.admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "*").get());
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
}

private List<String> createNSnapshots(String repoName, int count) {
final List<String> snapshotNames = new ArrayList<>(count);
final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
for (int i = 0; i < count; i++) {
final String name = prefix + i;
createFullSnapshot(repoName, name);
snapshotNames.add(name);
}
logger.info("--> created {} in [{}]", snapshotNames, repoName);
return snapshotNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryCleanupResult;
Expand Down Expand Up @@ -93,6 +94,8 @@ public final class TransportCleanupRepositoryAction extends TransportClusterMana

private final SnapshotsService snapshotsService;

private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory;

@Override
protected String executor() {
return ThreadPool.Names.SAME;
Expand All @@ -119,6 +122,7 @@ public TransportCleanupRepositoryAction(
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down Expand Up @@ -267,6 +271,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
l -> blobStoreRepository.cleanup(
repositoryStateId,
snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null),
remoteStoreLockManagerFactory,
ActionListener.wrap(result -> after(null, result), e -> after(e, null))
)
)
Expand Down
Loading
Loading