Skip to content

Commit

Permalink
Initial Commit to support centralize snapshot creation and implicit l…
Browse files Browse the repository at this point in the history
…ocking mechanism

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
Anshu Agarwal committed Aug 6, 2024
1 parent 47feca7 commit 9142a25
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -746,4 +746,58 @@ public void testInvalidRestoreRequestScenarios() throws Exception {
assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore"));
}

public void testCentralizedCreateAndRestoreShallowCopy() throws Exception {

Settings snapshotSettings = Settings.builder().put("snapshot.centralized_create_operation", true).build();
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(snapshotSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put(snapshotSettings).build());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);
String restoredIndexName1 = indexName1 + "-restored";

createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

internalCluster().startDataOnlyNode(Settings.builder().put(snapshotSettings).build());
logger.info("--> snapshot");

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1, indexName2)));
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));

// delete indices
DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(false)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1)
.get();

assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED);
ensureYellowAndNoInitializingShards(restoredIndexName1);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected void clusterManagerOperation(
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
snapshotsService.startCreateSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING,
SnapshotsService.SHALLOW_SNAPSHOT_V2,
FsHealthService.ENABLED_SETTING,
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
Expand Down
61 changes: 51 additions & 10 deletions server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";

private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy";

private static final String PINNED_TIMESTAMP = "pinned_timestamp";
private static final String USER_METADATA = "metadata";

private static final Comparator<SnapshotInfo> COMPARATOR = Comparator.comparing(SnapshotInfo::startTime)
.thenComparing(SnapshotInfo::snapshotId);
private XContentBuilder builder;
private XContentBuilder build;

/**
* Builds snapshot information
Expand All @@ -121,6 +125,7 @@ public static final class SnapshotInfoBuilder {
private Boolean includeGlobalState = null;

private Boolean remoteStoreIndexShallowCopy = null;
private long pinnedTimestamp;
private Map<String, Object> userMetadata = null;
private int version = -1;
private List<SnapshotShardFailure> shardFailures = null;
Expand Down Expand Up @@ -177,6 +182,10 @@ private void setRemoteStoreIndexShallowCopy(Boolean remoteStoreIndexShallowCopy)
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
}

private void setPinnedTimestamp(long pinnedTimestamp) {
this.pinnedTimestamp = pinnedTimestamp;
}

private void setShardFailures(List<SnapshotShardFailure> shardFailures) {
this.shardFailures = shardFailures;
}
Expand Down Expand Up @@ -216,7 +225,8 @@ public SnapshotInfo build() {
shardFailures,
includeGlobalState,
userMetadata,
remoteStoreIndexShallowCopy
remoteStoreIndexShallowCopy,
pinnedTimestamp
);
}
}
Expand Down Expand Up @@ -271,6 +281,7 @@ int getSuccessfulShards() {
SnapshotInfoBuilder::setRemoteStoreIndexShallowCopy,
new ParseField(REMOTE_STORE_INDEX_SHALLOW_COPY)
);
SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setPinnedTimestamp, new ParseField(PINNED_TIMESTAMP));
SNAPSHOT_INFO_PARSER.declareObjectArray(
SnapshotInfoBuilder::setShardFailures,
SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER,
Expand Down Expand Up @@ -307,6 +318,7 @@ int getSuccessfulShards() {
@Nullable
private Boolean remoteStoreIndexShallowCopy;

private long pinnedTimestamp;
@Nullable
private final Map<String, Object> userMetadata;

Expand All @@ -316,11 +328,11 @@ int getSuccessfulShards() {
private final List<SnapshotShardFailure> shardFailures;

public SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state) {
this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null);
this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0);
}

public SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state, Version version) {
this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null);
this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0);
}

public SnapshotInfo(SnapshotsInProgress.Entry entry) {
Expand All @@ -338,7 +350,8 @@ public SnapshotInfo(SnapshotsInProgress.Entry entry) {
Collections.emptyList(),
entry.includeGlobalState(),
entry.userMetadata(),
entry.remoteStoreIndexShallowCopy()
entry.remoteStoreIndexShallowCopy(),
0
);
}

Expand All @@ -353,7 +366,8 @@ public SnapshotInfo(
List<SnapshotShardFailure> shardFailures,
Boolean includeGlobalState,
Map<String, Object> userMetadata,
Boolean remoteStoreIndexShallowCopy
Boolean remoteStoreIndexShallowCopy,
long pinnedTimestamp
) {
this(
snapshotId,
Expand All @@ -369,7 +383,8 @@ public SnapshotInfo(
shardFailures,
includeGlobalState,
userMetadata,
remoteStoreIndexShallowCopy
remoteStoreIndexShallowCopy,
pinnedTimestamp
);
}

Expand All @@ -387,7 +402,8 @@ public SnapshotInfo(
List<SnapshotShardFailure> shardFailures,
Boolean includeGlobalState,
Map<String, Object> userMetadata,
Boolean remoteStoreIndexShallowCopy
Boolean remoteStoreIndexShallowCopy,
long pinnedTimestamp
) {
this.snapshotId = Objects.requireNonNull(snapshotId);
this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices));
Expand All @@ -403,6 +419,7 @@ public SnapshotInfo(
this.includeGlobalState = includeGlobalState;
this.userMetadata = userMetadata;
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.pinnedTimestamp = pinnedTimestamp;
}

/**
Expand All @@ -425,6 +442,9 @@ public SnapshotInfo(final StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
remoteStoreIndexShallowCopy = in.readOptionalBoolean();
}
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
pinnedTimestamp = in.readLong();
}
}

/**
Expand Down Expand Up @@ -539,6 +559,10 @@ public Boolean isRemoteStoreIndexShallowCopyEnabled() {
return remoteStoreIndexShallowCopy;
}

public long getPinnedTimestamp() {
return pinnedTimestamp;
}

/**
* Returns shard failures; an empty list will be returned if there were no shard
* failures, or if {@link #state()} returns {@code null}.
Expand Down Expand Up @@ -606,6 +630,8 @@ public String toString() {
+ shardFailures
+ ", isRemoteStoreInteropEnabled="
+ remoteStoreIndexShallowCopy
+ ", pinnedTimestamp="
+ pinnedTimestamp
+ '}';
}

Expand Down Expand Up @@ -641,6 +667,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
if (remoteStoreIndexShallowCopy != null) {
builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy);
}
if (pinnedTimestamp != 0) {
builder.field(PINNED_TIMESTAMP);
}
builder.startArray(INDICES);
for (String index : indices) {
builder.value(index);
Expand Down Expand Up @@ -699,6 +728,9 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final
if (remoteStoreIndexShallowCopy != null) {
builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy);
}
if (pinnedTimestamp != 0) {
builder.field(PINNED_TIMESTAMP, pinnedTimestamp);
}
builder.startArray(INDICES);
for (String index : indices) {
builder.value(index);
Expand Down Expand Up @@ -747,6 +779,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr
long endTime = 0;
int totalShards = 0;
int successfulShards = 0;
long pinnedTimestamp = 0;
Boolean includeGlobalState = null;
Boolean remoteStoreIndexShallowCopy = null;
Map<String, Object> userMetadata = null;
Expand Down Expand Up @@ -788,6 +821,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr
includeGlobalState = parser.booleanValue();
} else if (REMOTE_STORE_INDEX_SHALLOW_COPY.equals(currentFieldName)) {
remoteStoreIndexShallowCopy = parser.booleanValue();
} else if (PINNED_TIMESTAMP.equals(currentFieldName)) {
pinnedTimestamp = parser.longValue();
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (DATA_STREAMS.equals(currentFieldName)) {
Expand Down Expand Up @@ -840,7 +875,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr
shardFailures,
includeGlobalState,
userMetadata,
remoteStoreIndexShallowCopy
remoteStoreIndexShallowCopy,
pinnedTimestamp
);
}

Expand Down Expand Up @@ -872,6 +908,9 @@ public void writeTo(final StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalBoolean(remoteStoreIndexShallowCopy);
}
if (out.getVersion().onOrAfter(Version.V_2_16_0)) {
out.writeVLong(pinnedTimestamp);
}
}

private static SnapshotState snapshotState(final String reason, final List<SnapshotShardFailure> shardFailures) {
Expand Down Expand Up @@ -904,7 +943,8 @@ public boolean equals(Object o) {
&& Objects.equals(version, that.version)
&& Objects.equals(shardFailures, that.shardFailures)
&& Objects.equals(userMetadata, that.userMetadata)
&& Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy);
&& Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy)
&& Objects.equals(pinnedTimestamp, that.pinnedTimestamp);
}

@Override
Expand All @@ -924,7 +964,8 @@ public int hashCode() {
version,
shardFailures,
userMetadata,
remoteStoreIndexShallowCopy
remoteStoreIndexShallowCopy,
pinnedTimestamp
);
}
}
Loading

0 comments on commit 9142a25

Please sign in to comment.