Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SnapshotV2] Support centralize snapshot creation #15124

Merged
merged 35 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9142a25
Initial Commit to support centralize snapshot creation and implicit l…
Aug 6, 2024
d9bbc65
Fix deserilization error
Aug 6, 2024
f72702f
Fix gradle spotless check
Aug 6, 2024
8e85231
Fix listener
Aug 6, 2024
8a507ad
Merge branch 'main' into snapshot-pinned-timestamp
Aug 7, 2024
2d404e8
Fix test
Aug 7, 2024
90c860c
Fix snapshot generation
Aug 8, 2024
193da65
Modify cluster setting name
Aug 14, 2024
fe2aaaf
Add more tests
Aug 14, 2024
14c08ae
Merge branch 'main' into snapshot-pinned-timestamp
Aug 15, 2024
ec17028
Merge branch 'main' into snapshot-pinned-timestamp
Aug 20, 2024
6504169
Uncomment pin timestamp code
Aug 20, 2024
626c2fa
Modify log messages
Aug 21, 2024
be65f6d
Add spotless check failure fix
Aug 21, 2024
62452ee
Fix completion listener for snapshot v2
Aug 21, 2024
00031ec
Elevate cluster state update priority for repository metadata update …
Aug 21, 2024
0c636ef
Add more integ tests
Aug 22, 2024
623f994
Add priority as IMMEDIATE for cluster state repo update task only for…
Aug 23, 2024
2e4795b
Fix build error
Aug 23, 2024
a6090a7
Fix spotless error
Aug 23, 2024
b5d012f
Add repository setting for snapshot v2
Aug 23, 2024
1394d8c
Merge branch 'main' into snapshot-pinned-timestamp
Aug 23, 2024
80bf6cc
Address review comments
Aug 26, 2024
b0cbc08
Add integ test to verify snapshot creation if shallow copy repo setti…
Aug 26, 2024
38af0f6
Fix spotless vilation error
Aug 26, 2024
73376a8
Address review comment
Aug 26, 2024
39b57e3
Address review comments
Aug 26, 2024
e1eecbd
Add min version check for backward compatibility
Aug 26, 2024
983d2b5
address review comments
Aug 27, 2024
a423475
add integ test for master failover scenario
Aug 27, 2024
207b03d
Add more integ tests
Aug 27, 2024
872e136
refactor code
Aug 27, 2024
1ef061a
add changelog
Aug 27, 2024
0289d67
Merge branch 'main' into snapshot-pinned-timestamp
anshu1106 Aug 28, 2024
fb48a2d
Add pinned timestamp setting in integ tests
Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -746,4 +747,58 @@ public void testInvalidRestoreRequestScenarios() throws Exception {
assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore"));
}

public void testCreateShallowCopyV2() throws Exception {

Settings snapshotSettings = Settings.builder().put("snapshot.shallow_snapshot_v2", true).build();
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(snapshotSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put(snapshotSettings).build());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);
String restoredIndexName1 = indexName1 + "-restored";

createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

internalCluster().startDataOnlyNode(Settings.builder().put(snapshotSettings).build());
logger.info("--> snapshot");

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList());
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));

// // delete indices
// DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
// assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
// RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin()
// .cluster()
// .prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
// .setWaitForCompletion(false)
// .setIndices(indexName1)
// .setRenamePattern(indexName1)
// .setRenameReplacement(restoredIndexName1)
// .get();
//
// assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED);
// ensureYellowAndNoInitializingShards(restoredIndexName1);
// ensureGreen(restoredIndexName1);
// assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected void clusterManagerOperation(
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
snapshotsService.startCreateSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING,
SnapshotsService.SHALLOW_SNAPSHOT_V2,
FsHealthService.ENABLED_SETTING,
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
Expand Down
60 changes: 50 additions & 10 deletions server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";

private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy";

private static final String PINNED_TIMESTAMP = "pinned_timestamp";
private static final String USER_METADATA = "metadata";

private static final Comparator<SnapshotInfo> COMPARATOR = Comparator.comparing(SnapshotInfo::startTime)
.thenComparing(SnapshotInfo::snapshotId);
private XContentBuilder builder;
private XContentBuilder build;

/**
* Builds snapshot information
Expand All @@ -121,6 +125,7 @@ public static final class SnapshotInfoBuilder {
private Boolean includeGlobalState = null;

private Boolean remoteStoreIndexShallowCopy = null;
private long pinnedTimestamp = 0L;
private Map<String, Object> userMetadata = null;
private int version = -1;
private List<SnapshotShardFailure> shardFailures = null;
Expand Down Expand Up @@ -177,6 +182,10 @@ private void setRemoteStoreIndexShallowCopy(Boolean remoteStoreIndexShallowCopy)
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
}

private void setPinnedTimestamp(long pinnedTimestamp) {
this.pinnedTimestamp = pinnedTimestamp;
}

private void setShardFailures(List<SnapshotShardFailure> shardFailures) {
this.shardFailures = shardFailures;
}
Expand Down Expand Up @@ -216,7 +225,8 @@ public SnapshotInfo build() {
shardFailures,
includeGlobalState,
userMetadata,
remoteStoreIndexShallowCopy
remoteStoreIndexShallowCopy,
pinnedTimestamp
);
}
}
Expand Down Expand Up @@ -271,6 +281,7 @@ int getSuccessfulShards() {
SnapshotInfoBuilder::setRemoteStoreIndexShallowCopy,
new ParseField(REMOTE_STORE_INDEX_SHALLOW_COPY)
);
SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setPinnedTimestamp, new ParseField(PINNED_TIMESTAMP));
SNAPSHOT_INFO_PARSER.declareObjectArray(
SnapshotInfoBuilder::setShardFailures,
SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER,
Expand Down Expand Up @@ -307,6 +318,7 @@ int getSuccessfulShards() {
@Nullable
private Boolean remoteStoreIndexShallowCopy;

private long pinnedTimestamp;
@Nullable
private final Map<String, Object> userMetadata;

Expand All @@ -316,11 +328,11 @@ int getSuccessfulShards() {
private final List<SnapshotShardFailure> shardFailures;

public SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state) {
this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null);
this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0);
}

public SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state, Version version) {
this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null);
this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0);
}

public SnapshotInfo(SnapshotsInProgress.Entry entry) {
Expand All @@ -338,7 +350,8 @@ public SnapshotInfo(SnapshotsInProgress.Entry entry) {
Collections.emptyList(),
entry.includeGlobalState(),
entry.userMetadata(),
entry.remoteStoreIndexShallowCopy()
entry.remoteStoreIndexShallowCopy(),
0L
);
}

Expand All @@ -353,7 +366,8 @@ public SnapshotInfo(
List<SnapshotShardFailure> shardFailures,
Boolean includeGlobalState,
Map<String, Object> userMetadata,
Boolean remoteStoreIndexShallowCopy
Boolean remoteStoreIndexShallowCopy,
long pinnedTimestamp
) {
this(
snapshotId,
Expand All @@ -369,7 +383,8 @@ public SnapshotInfo(
shardFailures,
includeGlobalState,
userMetadata,
remoteStoreIndexShallowCopy
remoteStoreIndexShallowCopy,
pinnedTimestamp
);
}

Expand All @@ -387,7 +402,8 @@ public SnapshotInfo(
List<SnapshotShardFailure> shardFailures,
Boolean includeGlobalState,
Map<String, Object> userMetadata,
Boolean remoteStoreIndexShallowCopy
Boolean remoteStoreIndexShallowCopy,
long pinnedTimestamp
) {
this.snapshotId = Objects.requireNonNull(snapshotId);
this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices));
Expand All @@ -403,6 +419,7 @@ public SnapshotInfo(
this.includeGlobalState = includeGlobalState;
this.userMetadata = userMetadata;
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.pinnedTimestamp = pinnedTimestamp;
}

/**
Expand All @@ -425,6 +442,9 @@ public SnapshotInfo(final StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
remoteStoreIndexShallowCopy = in.readOptionalBoolean();
}
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
pinnedTimestamp = in.readVLong();
}
}

/**
Expand Down Expand Up @@ -539,6 +559,10 @@ public Boolean isRemoteStoreIndexShallowCopyEnabled() {
return remoteStoreIndexShallowCopy;
}

public long getPinnedTimestamp() {
return pinnedTimestamp;
}

/**
* Returns shard failures; an empty list will be returned if there were no shard
* failures, or if {@link #state()} returns {@code null}.
Expand Down Expand Up @@ -606,6 +630,8 @@ public String toString() {
+ shardFailures
+ ", isRemoteStoreInteropEnabled="
+ remoteStoreIndexShallowCopy
+ ", pinnedTimestamp="
+ pinnedTimestamp
+ '}';
}

Expand Down Expand Up @@ -641,6 +667,8 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
if (remoteStoreIndexShallowCopy != null) {
builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy);
}
builder.field(PINNED_TIMESTAMP);

builder.startArray(INDICES);
for (String index : indices) {
builder.value(index);
Expand Down Expand Up @@ -699,6 +727,9 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final
if (remoteStoreIndexShallowCopy != null) {
builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy);
}
if (pinnedTimestamp != 0) {
builder.field(PINNED_TIMESTAMP, pinnedTimestamp);
}
builder.startArray(INDICES);
for (String index : indices) {
builder.value(index);
Expand Down Expand Up @@ -747,6 +778,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr
long endTime = 0;
int totalShards = 0;
int successfulShards = 0;
long pinnedTimestamp = 0;
Boolean includeGlobalState = null;
Boolean remoteStoreIndexShallowCopy = null;
Map<String, Object> userMetadata = null;
Expand Down Expand Up @@ -788,6 +820,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr
includeGlobalState = parser.booleanValue();
} else if (REMOTE_STORE_INDEX_SHALLOW_COPY.equals(currentFieldName)) {
remoteStoreIndexShallowCopy = parser.booleanValue();
} else if (PINNED_TIMESTAMP.equals(currentFieldName)) {
pinnedTimestamp = parser.longValue();
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (DATA_STREAMS.equals(currentFieldName)) {
Expand Down Expand Up @@ -840,7 +874,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr
shardFailures,
includeGlobalState,
userMetadata,
remoteStoreIndexShallowCopy
remoteStoreIndexShallowCopy,
pinnedTimestamp
);
}

Expand Down Expand Up @@ -872,6 +907,9 @@ public void writeTo(final StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalBoolean(remoteStoreIndexShallowCopy);
}
if (out.getVersion().onOrAfter(Version.V_2_16_0)) {
out.writeVLong(pinnedTimestamp);
}
}

private static SnapshotState snapshotState(final String reason, final List<SnapshotShardFailure> shardFailures) {
Expand Down Expand Up @@ -904,7 +942,8 @@ public boolean equals(Object o) {
&& Objects.equals(version, that.version)
&& Objects.equals(shardFailures, that.shardFailures)
&& Objects.equals(userMetadata, that.userMetadata)
&& Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy);
&& Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy)
&& Objects.equals(pinnedTimestamp, that.pinnedTimestamp);
}

@Override
Expand All @@ -924,7 +963,8 @@ public int hashCode() {
version,
shardFailures,
userMetadata,
remoteStoreIndexShallowCopy
remoteStoreIndexShallowCopy,
pinnedTimestamp
);
}
}
Loading
Loading