Skip to content

Commit

Permalink
Keep track of data recovered from snapshots in RecoveryState
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed Aug 13, 2021
1 parent 59d75f2 commit f5bfa52
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.recovery;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -24,10 +25,13 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

public abstract class AbstractSnapshotBasedRecoveryRestTestCase extends ESRestTestCase {
Expand Down Expand Up @@ -57,9 +61,9 @@ public void testRecoveryUsingSnapshots() throws Exception {
);
ensureGreen(indexName);

final int numDocs = randomIntBetween(1, 500);
final int numDocs = randomIntBetween(500, 1000);
indexDocs(indexName, numDocs);

waitUntilGlobalCheckpointIsStable(indexName);
forceMerge(indexName, randomBoolean(), randomBoolean());

deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true);
Expand All @@ -68,28 +72,88 @@ public void testRecoveryUsingSnapshots() throws Exception {
// Add a new replica
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(indexName);
assertSnapshotIsUsed(indexName);

assertMatchAllReturnsAllDocuments(indexName, numDocs);
assertMatchQueryReturnsAllDocuments(indexName, numDocs);

for (int i = 0; i < 4; i++) {
assertSearchResultsAreCorrect(indexName, numDocs);
}
deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, false);
}

private void assertSearchResultsAreCorrect(String indexName, int numDocs) throws IOException {
if (randomBoolean()) {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
private void waitUntilGlobalCheckpointIsStable(String index) throws Exception {
assertBusy(() -> {
Request request = new Request(HttpGet.METHOD_NAME, '/' + index + "/_stats?level=shards");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> responseAsMap = responseAsMap(response);
Map<String, Object> indices = extractValue(responseAsMap, "indices");
Map<String, Object> indexShardsStats = extractValue(extractValue(indices, index), "shards");
List<Map<String, Object>> shardStats = extractValue(indexShardsStats, "0");
for (Map<String, Object> shardStat : shardStats) {
final boolean isPrimary = extractValue(shardStat, "routing.primary");
if (isPrimary == false) {
continue;
}
Map<Object, Integer> seqNos = extractValue(shardStat, "seq_no");
assertThat(seqNos.toString(), seqNos.get("max_seq_no"), is(equalTo(seqNos.get("global_checkpoint"))));
}
}, 60, TimeUnit.SECONDS);
}

private void assertMatchAllReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
}
}

private void assertSnapshotIsUsed(String index) throws Exception {
Request request = new Request(HttpGet.METHOD_NAME, '/' + index + "/_recovery?detailed=true");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> responseAsMap = responseAsMap(response);
List<Map<String, Object>> shardRecoveries = extractValue(responseAsMap, index + ".shards");
long totalRecoveredFromSnapshot = 0;
for (Map<String, Object> shardRecoveryState : shardRecoveries) {
String recoveryType = extractValue(shardRecoveryState, "type");
if (recoveryType.equals("PEER") == false) {
continue;
}
String stage = extractValue(shardRecoveryState, "stage");
assertThat(stage, is(equalTo("DONE")));

List<Map<String, Object>> fileDetails = extractValue(shardRecoveryState, "index.files.details");
for (Map<String, Object> fileDetail : fileDetails) {
int recoveredFromSnapshot = extractValue(fileDetail, "recovered_from_snapshot_in_bytes");
assertThat(recoveredFromSnapshot, is(greaterThan(0)));
totalRecoveredFromSnapshot += recoveredFromSnapshot;
}
} else {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}
long snapshotSize = getSnapshotSizeForIndex(index);
assertThat(totalRecoveredFromSnapshot, is(greaterThan(0L)));
assertThat(totalRecoveredFromSnapshot, is(equalTo(snapshotSize)));
}

private int getSnapshotSizeForIndex(String indexName) throws Exception {
Request request = new Request(HttpGet.METHOD_NAME, "/_snapshot/" + REPOSITORY_NAME + "/" + SNAPSHOT_NAME);
request.addParameter("index_details", "true");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> snapshotsResponse = responseAsMap(response);
List<Map<String, Object>> snapshots = extractValue(snapshotsResponse, "snapshots");
assertThat(snapshots.size(), is(equalTo(1)));
Map<String, Object> snapshot = snapshots.get(0);
return extractValue(snapshot, "index_details." + indexName + ".size_in_bytes");
}

private void assertMatchQueryReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}

private static void forceMerge(String index, boolean onlyExpungeDeletes, boolean flush) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,29 @@
index: [test_2]
human: true

- match: { test_2.shards.0.type: "EXISTING_STORE" }
- match: { test_2.shards.0.stage: "DONE" }
- match: { test_2.shards.0.primary: true }
- match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_2.shards.0.index.files.total: 0 }
- gte: { test_2.shards.0.index.files.reused: 0 }
- gte: { test_2.shards.0.index.files.recovered: 0 }
- match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 }
- match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.translog.recovered: 0 }
- gte: { test_2.shards.0.translog.total: 0 }
- gte: { test_2.shards.0.translog.total_on_start: 0 }
- gte: { test_2.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 }
- match: { test_2.shards.0.type: "EXISTING_STORE" }
- match: { test_2.shards.0.stage: "DONE" }
- match: { test_2.shards.0.primary: true }
- match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_2.shards.0.index.files.total: 0 }
- gte: { test_2.shards.0.index.files.reused: 0 }
- gte: { test_2.shards.0.index.files.recovered: 0 }
- match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_from_snapshot_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_from_source_in_bytes: 0 }
- match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.translog.recovered: 0 }
- gte: { test_2.shards.0.translog.total: 0 }
- gte: { test_2.shards.0.translog.total_on_start: 0 }
- gte: { test_2.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 }
---
"Indices recovery test index name not matching":

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ public void testPeerRecoveryUsesSnapshots() throws Exception {

// segments_N and .si files are recovered from the file metadata directly
long expectedRecoveredBytesFromRepo = 0;
long totalBytesRecoveredFromSnapshot = 0;
for (RecoveryState.FileDetail fileDetail : recoveryState.getIndex().fileDetails()) {
totalBytesRecoveredFromSnapshot += fileDetail.recoveredFromSnapshot();
if (fileDetail.name().startsWith("segments") || fileDetail.name().endsWith(".si")) {
continue;
}
Expand All @@ -264,6 +266,7 @@ public void testPeerRecoveryUsesSnapshots() throws Exception {
long snapshotSizeForIndex = getSnapshotSizeForIndex(repoName, snapshot, indexName);
assertThat(repository.totalBytesRead.get(), is(greaterThan(0L)));
assertThat(repository.totalBytesRead.get(), is(lessThanOrEqualTo(snapshotSizeForIndex)));
assertThat(totalBytesRecoveredFromSnapshot, is(equalTo(snapshotSizeForIndex)));

assertDocumentsAreEqual(indexName, numDocs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void writeFile(StoreFileMetadata fileMetadata, long readSnapshotFileBuffe
long bytesWritten = 0;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, length);
indexState.addRecoveredBytesToFile(fileName, length);
indexState.addRecoveredFromSnapshotBytesToFile(fileName, length);
bytesWritten += length;
}

Expand All @@ -103,10 +103,10 @@ public void writeFile(StoreFileMetadata fileMetadata, long readSnapshotFileBuffe
assert Arrays.asList(store.directory().listAll()).contains(tempFileName) :
"expected: [" + tempFileName + "] in " + Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(tempFileName));
indexState.setFullyRecoveredFromSnapshot(fileName);
} catch (Exception e) {
tempFileNames.remove(tempFileName);
store.deleteQuiet(tempFileName);
indexState.resetRecoveredBytesOfFile(fileName);
throw e;
} finally {
decRef();
Expand Down
Loading

0 comments on commit f5bfa52

Please sign in to comment.