Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bansi Kasundra <kasundra@amazon.com>
  • Loading branch information
kasundra07 committed Jun 22, 2023
1 parent 4be4ea0 commit 1668e10
Show file tree
Hide file tree
Showing 14 changed files with 397 additions and 94 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321))
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118))
- Allow insecure string settings to warn-log usage and advise to migration of a newer secure variant ([#5496](https://github.com/opensearch-project/OpenSearch/pull/5496))
- [Snapshot Interop] Add Changes in Delete Snapshot Flow for remote store interoperability. ([#7497](https://github.com/opensearch-project/OpenSearch/pull/7497))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.repositories.RepositoryData;
Expand Down Expand Up @@ -308,10 +307,9 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
) {
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener);
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.snapshots;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase {

public void testDeleteSnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "fs", snapshotRepoPath);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1);

assertAcked(startDeleteSnapshot(snapshotRepoName, snapshot).get());
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0);
}

public void testDeleteShallowCopySnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy());

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String shallowSnapshot = "shallow-snapshot";
createFullSnapshot(snapshotRepoName, shallowSnapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1);
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1);

assertAcked(startDeleteSnapshot(snapshotRepoName, shallowSnapshot).get());
assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
Expand All @@ -50,8 +51,10 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryCleanupResult;
Expand Down Expand Up @@ -93,6 +96,8 @@ public final class TransportCleanupRepositoryAction extends TransportClusterMana

private final SnapshotsService snapshotsService;

private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory;

@Override
protected String executor() {
return ThreadPool.Names.SAME;
Expand All @@ -119,6 +124,7 @@ public TransportCleanupRepositoryAction(
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down Expand Up @@ -260,17 +266,31 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
startedCleanup = true;
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
threadPool.executor(ThreadPool.Names.SNAPSHOT)
.execute(
ActionRunnable.wrap(
listener,
l -> blobStoreRepository.cleanup(
repositoryStateId,
snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null),
ActionListener.wrap(result -> after(null, result), e -> after(e, null))
)
)
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
final GroupedActionListener<RepositoryCleanupResult> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(repositoryCleanupResults -> {
long blobs = 0;
long bytes = 0;
for (RepositoryCleanupResult result : repositoryCleanupResults) {
blobs += result.blobs();
bytes += result.bytes();
}
RepositoryCleanupResult result = new RepositoryCleanupResult(new DeleteResult(blobs, bytes));
after(null, result);
}, e -> { after(e, null); }),
2
);
blobStoreRepository.cleanupRemoteStoreLockFiles(
repositoryStateId,
remoteStoreLockManagerFactory,
ActionListener.wrap(result -> groupedListener.onResponse(result), e -> groupedListener.onFailure(e))
);
blobStoreRepository.cleanup(
repositoryStateId,
snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null),
ActionListener.wrap(result -> groupedListener.onResponse(result), e -> groupedListener.onFailure(e))
);
}));
}

private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -121,10 +120,9 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
) {
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener);
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.component.LifecycleComponent;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.ShardId;
Expand Down Expand Up @@ -165,7 +166,6 @@ void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<RepositoryData> listener
);

Expand Down
Loading

0 comments on commit 1668e10

Please sign in to comment.