Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of data recovered from snapshots in RecoveryState #76499

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/reference/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ The API returns the following response:
"reused_in_bytes" : 0,
"recovered" : "65.7mb",
"recovered_in_bytes" : 68891939,
"recovered_from_snapshot" : "0b",
"recovered_from_snapshot_in_bytes" : 0,
"percent" : "87.1%"
},
"files" : {
Expand Down Expand Up @@ -380,6 +382,8 @@ The API returns the following response:
"reused_in_bytes" : 26001617,
"recovered" : "0b",
"recovered_in_bytes" : 0,
"recovered_from_snapshot" : "0b",
"recovered_from_snapshot_in_bytes" : 0,
"percent" : "100.0%"
},
"files" : {
Expand All @@ -394,11 +398,13 @@ The API returns the following response:
}, {
"name" : "_0.cfs",
"length" : 135306,
"recovered" : 135306
"recovered" : 135306,
"recovered_from_snapshot": 0
}, {
"name" : "segments_2",
"length" : 251,
"recovered" : 251
"recovered" : 251,
"recovered_from_snapshot": 0
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.recovery;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -24,10 +25,13 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

public abstract class AbstractSnapshotBasedRecoveryRestTestCase extends ESRestTestCase {
Expand Down Expand Up @@ -57,9 +61,9 @@ public void testRecoveryUsingSnapshots() throws Exception {
);
ensureGreen(indexName);

final int numDocs = randomIntBetween(1, 500);
final int numDocs = randomIntBetween(500, 1000);
indexDocs(indexName, numDocs);

waitUntilGlobalCheckpointIsStable(indexName);
forceMerge(indexName, randomBoolean(), randomBoolean());

deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true);
Expand All @@ -68,28 +72,88 @@ public void testRecoveryUsingSnapshots() throws Exception {
// Add a new replica
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(indexName);
assertSnapshotIsUsed(indexName);

assertMatchAllReturnsAllDocuments(indexName, numDocs);
assertMatchQueryReturnsAllDocuments(indexName, numDocs);

for (int i = 0; i < 4; i++) {
assertSearchResultsAreCorrect(indexName, numDocs);
}
deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, false);
}

private void assertSearchResultsAreCorrect(String indexName, int numDocs) throws IOException {
if (randomBoolean()) {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
private void waitUntilGlobalCheckpointIsStable(String index) throws Exception {
assertBusy(() -> {
Request request = new Request(HttpGet.METHOD_NAME, '/' + index + "/_stats?level=shards");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> responseAsMap = responseAsMap(response);
Map<String, Object> indices = extractValue(responseAsMap, "indices");
Map<String, Object> indexShardsStats = extractValue(extractValue(indices, index), "shards");
List<Map<String, Object>> shardStats = extractValue(indexShardsStats, "0");
for (Map<String, Object> shardStat : shardStats) {
final boolean isPrimary = extractValue(shardStat, "routing.primary");
if (isPrimary == false) {
continue;
}
Map<Object, Integer> seqNos = extractValue(shardStat, "seq_no");
assertThat(seqNos.toString(), seqNos.get("max_seq_no"), is(equalTo(seqNos.get("global_checkpoint"))));
}
}, 60, TimeUnit.SECONDS);
}

private void assertMatchAllReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
}
}

private void assertSnapshotIsUsed(String index) throws Exception {
Request request = new Request(HttpGet.METHOD_NAME, '/' + index + "/_recovery?detailed=true");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> responseAsMap = responseAsMap(response);
List<Map<String, Object>> shardRecoveries = extractValue(responseAsMap, index + ".shards");
long totalRecoveredFromSnapshot = 0;
for (Map<String, Object> shardRecoveryState : shardRecoveries) {
String recoveryType = extractValue(shardRecoveryState, "type");
if (recoveryType.equals("PEER") == false) {
continue;
}
String stage = extractValue(shardRecoveryState, "stage");
assertThat(stage, is(equalTo("DONE")));

List<Map<String, Object>> fileDetails = extractValue(shardRecoveryState, "index.files.details");
for (Map<String, Object> fileDetail : fileDetails) {
int recoveredFromSnapshot = extractValue(fileDetail, "recovered_from_snapshot_in_bytes");
assertThat(recoveredFromSnapshot, is(greaterThan(0)));
totalRecoveredFromSnapshot += recoveredFromSnapshot;
}
} else {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}
long snapshotSize = getSnapshotSizeForIndex(index);
assertThat(totalRecoveredFromSnapshot, is(greaterThan(0L)));
assertThat(totalRecoveredFromSnapshot, is(equalTo(snapshotSize)));
}

private int getSnapshotSizeForIndex(String indexName) throws Exception {
Request request = new Request(HttpGet.METHOD_NAME, "/_snapshot/" + REPOSITORY_NAME + "/" + SNAPSHOT_NAME);
request.addParameter("index_details", "true");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> snapshotsResponse = responseAsMap(response);
List<Map<String, Object>> snapshots = extractValue(snapshotsResponse, "snapshots");
assertThat(snapshots.size(), is(equalTo(1)));
Map<String, Object> snapshot = snapshots.get(0);
return extractValue(snapshot, "index_details." + indexName + ".size_in_bytes");
}

private void assertMatchQueryReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}

private static void forceMerge(String index, boolean onlyExpungeDeletes, boolean flush) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
---
"Indices recovery test":
- skip:
# todo: change after backport
version: " - 7.99.99"
reason: recovery from snapshot bytes not available until 8.0

- do:
indices.create:
index: test_1
body:
settings:
index:
number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green

- do:
indices.recovery:
index: [test_1]
human: true

- match: { test_1.shards.0.type: "EMPTY_STORE" }
- match: { test_1.shards.0.stage: "DONE" }
- match: { test_1.shards.0.primary: true }
- match: { test_1.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_1.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_1.shards.0.index.files.total: 0 }
- gte: { test_1.shards.0.index.files.reused: 0 }
- gte: { test_1.shards.0.index.files.recovered: 0 }
- match: { test_1.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_1.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_1.shards.0.index.size.recovered_in_bytes: 0 }
- gte: { test_1.shards.0.index.size.recovered_from_snapshot_in_bytes: 0 }
- match: { test_1.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_1.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_1.shards.0.translog.recovered: 0 }
- gte: { test_1.shards.0.translog.total: -1 }
- gte: { test_1.shards.0.translog.total_on_start: 0 }
- gte: { test_1.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_1.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_1.shards.0.verify_index.total_time_in_millis: 0 }

---
"Indices recovery test without recovery from snapshot":

- do:
indices.create:
Expand Down Expand Up @@ -71,27 +118,27 @@
index: [test_2]
human: true

- match: { test_2.shards.0.type: "EXISTING_STORE" }
- match: { test_2.shards.0.stage: "DONE" }
- match: { test_2.shards.0.primary: true }
- match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_2.shards.0.index.files.total: 0 }
- gte: { test_2.shards.0.index.files.reused: 0 }
- gte: { test_2.shards.0.index.files.recovered: 0 }
- match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 }
- match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.translog.recovered: 0 }
- gte: { test_2.shards.0.translog.total: 0 }
- gte: { test_2.shards.0.translog.total_on_start: 0 }
- gte: { test_2.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 }
- match: { test_2.shards.0.type: "EXISTING_STORE" }
- match: { test_2.shards.0.stage: "DONE" }
- match: { test_2.shards.0.primary: true }
- match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_2.shards.0.index.files.total: 0 }
- gte: { test_2.shards.0.index.files.reused: 0 }
- gte: { test_2.shards.0.index.files.recovered: 0 }
- match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 }
- match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.translog.recovered: 0 }
- gte: { test_2.shards.0.translog.total: 0 }
- gte: { test_2.shards.0.translog.total_on_start: 0 }
- gte: { test_2.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 }
---
"Indices recovery test index name not matching":

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ public void testPeerRecoveryUsesSnapshots() throws Exception {

// segments_N and .si files are recovered from the file metadata directly
long expectedRecoveredBytesFromRepo = 0;
long totalBytesRecoveredFromSnapshot = 0;
for (RecoveryState.FileDetail fileDetail : recoveryState.getIndex().fileDetails()) {
totalBytesRecoveredFromSnapshot += fileDetail.recoveredFromSnapshot();
if (fileDetail.name().startsWith("segments") || fileDetail.name().endsWith(".si")) {
continue;
}
Expand All @@ -264,6 +266,7 @@ public void testPeerRecoveryUsesSnapshots() throws Exception {
long snapshotSizeForIndex = getSnapshotSizeForIndex(repoName, snapshot, indexName);
assertThat(repository.totalBytesRead.get(), is(greaterThan(0L)));
assertThat(repository.totalBytesRead.get(), is(lessThanOrEqualTo(snapshotSizeForIndex)));
assertThat(totalBytesRecoveredFromSnapshot, is(equalTo(snapshotSizeForIndex)));

assertDocumentsAreEqual(indexName, numDocs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void writeFile(StoreFileMetadata fileMetadata, long readSnapshotFileBuffe
long bytesWritten = 0;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, length);
indexState.addRecoveredBytesToFile(fileName, length);
indexState.addRecoveredFromSnapshotBytesToFile(fileName, length);
bytesWritten += length;
}

Expand Down
Loading