From f5bfa52a2f84f236b3b9e4ef888e3bf82e61e14b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Fri, 13 Aug 2021 10:42:50 +0200 Subject: [PATCH] Keep track of data recovered from snapshots in RecoveryState Relates #73496 --- ...ractSnapshotBasedRecoveryRestTestCase.java | 100 ++++++++++++++---- .../test/indices.recovery/10_basic.yml | 44 ++++---- .../SnapshotBasedIndexRecoveryIT.java | 3 + .../indices/recovery/MultiFileWriter.java | 4 +- .../indices/recovery/RecoveryState.java | 99 ++++++++++++++--- .../PeerRecoveryTargetServiceTests.java | 7 ++ .../indices/recovery/RecoveryTargetTests.java | 38 +++++++ 7 files changed, 237 insertions(+), 58 deletions(-) diff --git a/qa/snapshot-based-recoveries/src/test/java/org/elasticsearch/recovery/AbstractSnapshotBasedRecoveryRestTestCase.java b/qa/snapshot-based-recoveries/src/test/java/org/elasticsearch/recovery/AbstractSnapshotBasedRecoveryRestTestCase.java index dac2eba1ba02f..88a821b521b00 100644 --- a/qa/snapshot-based-recoveries/src/test/java/org/elasticsearch/recovery/AbstractSnapshotBasedRecoveryRestTestCase.java +++ b/qa/snapshot-based-recoveries/src/test/java/org/elasticsearch/recovery/AbstractSnapshotBasedRecoveryRestTestCase.java @@ -8,6 +8,7 @@ package org.elasticsearch.recovery; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -24,10 +25,13 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; public abstract class AbstractSnapshotBasedRecoveryRestTestCase extends ESRestTestCase { @@ -57,9 +61,9 @@ public void testRecoveryUsingSnapshots() throws Exception { ); ensureGreen(indexName); - final int numDocs = randomIntBetween(1, 500); + final int numDocs = randomIntBetween(500, 1000); indexDocs(indexName, numDocs); - + waitUntilGlobalCheckpointIsStable(indexName); forceMerge(indexName, randomBoolean(), randomBoolean()); deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true); @@ -68,28 +72,88 @@ public void testRecoveryUsingSnapshots() throws Exception { // Add a new replica updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)); ensureGreen(indexName); + assertSnapshotIsUsed(indexName); + + assertMatchAllReturnsAllDocuments(indexName, numDocs); + assertMatchQueryReturnsAllDocuments(indexName, numDocs); - for (int i = 0; i < 4; i++) { - assertSearchResultsAreCorrect(indexName, numDocs); - } deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, false); } - private void assertSearchResultsAreCorrect(String indexName, int numDocs) throws IOException { - if (randomBoolean()) { - Map searchResults = search(indexName, QueryBuilders.matchAllQuery()); - assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs)); - List> hits = extractValue(searchResults, "hits.hits"); - for (Map hit : hits) { - String docId = extractValue(hit, "_id"); - assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs))); - assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId))); - assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId)); + private void waitUntilGlobalCheckpointIsStable(String index) throws Exception { + assertBusy(() -> { + Request request = new Request(HttpGet.METHOD_NAME, '/' + index + "/_stats?level=shards"); + Response response = client().performRequest(request); + assertOK(response); + Map responseAsMap = responseAsMap(response); + Map indices = extractValue(responseAsMap, "indices"); + Map indexShardsStats = extractValue(extractValue(indices, index), "shards"); + List> shardStats = extractValue(indexShardsStats, "0"); + for (Map shardStat : shardStats) { + final boolean isPrimary = extractValue(shardStat, "routing.primary"); + if (isPrimary == false) { + continue; + } + Map seqNos = extractValue(shardStat, "seq_no"); + assertThat(seqNos.toString(), seqNos.get("max_seq_no"), is(equalTo(seqNos.get("global_checkpoint")))); + } + }, 60, TimeUnit.SECONDS); + } + + private void assertMatchAllReturnsAllDocuments(String indexName, int numDocs) throws IOException { + Map searchResults = search(indexName, QueryBuilders.matchAllQuery()); + assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs)); + List> hits = extractValue(searchResults, "hits.hits"); + for (Map hit : hits) { + String docId = extractValue(hit, "_id"); + assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs))); + assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId))); + assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId)); + } + } + + private void assertSnapshotIsUsed(String index) throws Exception { + Request request = new Request(HttpGet.METHOD_NAME, '/' + index + "/_recovery?detailed=true"); + Response response = client().performRequest(request); + assertOK(response); + Map responseAsMap = responseAsMap(response); + List> shardRecoveries = extractValue(responseAsMap, index + ".shards"); + long totalRecoveredFromSnapshot = 0; + for (Map shardRecoveryState : shardRecoveries) { + String recoveryType = extractValue(shardRecoveryState, "type"); + if (recoveryType.equals("PEER") == false) { + continue; + } + String stage = extractValue(shardRecoveryState, "stage"); + assertThat(stage, is(equalTo("DONE"))); + + List> fileDetails = extractValue(shardRecoveryState, "index.files.details"); + for (Map fileDetail : fileDetails) { + int recoveredFromSnapshot = extractValue(fileDetail, "recovered_from_snapshot_in_bytes"); + assertThat(recoveredFromSnapshot, is(greaterThan(0))); + totalRecoveredFromSnapshot += recoveredFromSnapshot; } - } else { - Map searchResults = search(indexName, QueryBuilders.matchQuery("text", "some")); - assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs)); } + long snapshotSize = getSnapshotSizeForIndex(index); + assertThat(totalRecoveredFromSnapshot, is(greaterThan(0L))); + assertThat(totalRecoveredFromSnapshot, is(equalTo(snapshotSize))); + } + + private int getSnapshotSizeForIndex(String indexName) throws Exception { + Request request = new Request(HttpGet.METHOD_NAME, "/_snapshot/" + REPOSITORY_NAME + "/" + SNAPSHOT_NAME); + request.addParameter("index_details", "true"); + Response response = client().performRequest(request); + assertOK(response); + Map snapshotsResponse = responseAsMap(response); + List> snapshots = extractValue(snapshotsResponse, "snapshots"); + assertThat(snapshots.size(), is(equalTo(1))); + Map snapshot = snapshots.get(0); + return extractValue(snapshot, "index_details." + indexName + ".size_in_bytes"); + } + + private void assertMatchQueryReturnsAllDocuments(String indexName, int numDocs) throws IOException { + Map searchResults = search(indexName, QueryBuilders.matchQuery("text", "some")); + assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs)); } private static void forceMerge(String index, boolean onlyExpungeDeletes, boolean flush) throws IOException { diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/10_basic.yml index 08b1a7ad8d690..1dfeeba6cefd4 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/10_basic.yml @@ -71,27 +71,29 @@ index: [test_2] human: true - - match: { test_2.shards.0.type: "EXISTING_STORE" } - - match: { test_2.shards.0.stage: "DONE" } - - match: { test_2.shards.0.primary: true } - - match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ } - - match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ } - - gte: { test_2.shards.0.index.files.total: 0 } - - gte: { test_2.shards.0.index.files.reused: 0 } - - gte: { test_2.shards.0.index.files.recovered: 0 } - - match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ } - - gte: { test_2.shards.0.index.size.total_in_bytes: 0 } - - gte: { test_2.shards.0.index.size.reused_in_bytes: 0 } - - gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 } - - match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ } - - gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 } - - gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 } - - gte: { test_2.shards.0.translog.recovered: 0 } - - gte: { test_2.shards.0.translog.total: 0 } - - gte: { test_2.shards.0.translog.total_on_start: 0 } - - gte: { test_2.shards.0.translog.total_time_in_millis: 0 } - - gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 } - - gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 } + - match: { test_2.shards.0.type: "EXISTING_STORE" } + - match: { test_2.shards.0.stage: "DONE" } + - match: { test_2.shards.0.primary: true } + - match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ } + - match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ } + - gte: { test_2.shards.0.index.files.total: 0 } + - gte: { test_2.shards.0.index.files.reused: 0 } + - gte: { test_2.shards.0.index.files.recovered: 0 } + - match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ } + - gte: { test_2.shards.0.index.size.total_in_bytes: 0 } + - gte: { test_2.shards.0.index.size.reused_in_bytes: 0 } + - gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 } + - gte: { test_2.shards.0.index.size.recovered_from_snapshot_in_bytes: 0 } + - gte: { test_2.shards.0.index.size.recovered_from_source_in_bytes: 0 } + - match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ } + - gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 } + - gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 } + - gte: { test_2.shards.0.translog.recovered: 0 } + - gte: { test_2.shards.0.translog.total: 0 } + - gte: { test_2.shards.0.translog.total_on_start: 0 } + - gte: { test_2.shards.0.translog.total_time_in_millis: 0 } + - gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 } + - gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 } --- "Indices recovery test index name not matching": diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java index ea0a0558a6d27..053c3d32a6b23 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java @@ -252,7 +252,9 @@ public void testPeerRecoveryUsesSnapshots() throws Exception { // segments_N and .si files are recovered from the file metadata directly long expectedRecoveredBytesFromRepo = 0; + long totalBytesRecoveredFromSnapshot = 0; for (RecoveryState.FileDetail fileDetail : recoveryState.getIndex().fileDetails()) { + totalBytesRecoveredFromSnapshot += fileDetail.recoveredFromSnapshot(); if (fileDetail.name().startsWith("segments") || fileDetail.name().endsWith(".si")) { continue; } @@ -264,6 +266,7 @@ public void testPeerRecoveryUsesSnapshots() throws Exception { long snapshotSizeForIndex = getSnapshotSizeForIndex(repoName, snapshot, indexName); assertThat(repository.totalBytesRead.get(), is(greaterThan(0L))); assertThat(repository.totalBytesRead.get(), is(lessThanOrEqualTo(snapshotSizeForIndex))); + assertThat(totalBytesRecoveredFromSnapshot, is(equalTo(snapshotSizeForIndex))); assertDocumentsAreEqual(indexName, numDocs); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java index 8c045600f37a2..b3f774b7d5726 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java @@ -90,7 +90,7 @@ public void writeFile(StoreFileMetadata fileMetadata, long readSnapshotFileBuffe long bytesWritten = 0; while ((length = stream.read(buffer)) > 0) { indexOutput.writeBytes(buffer, length); - indexState.addRecoveredBytesToFile(fileName, length); + indexState.addRecoveredFromSnapshotBytesToFile(fileName, length); bytesWritten += length; } @@ -103,10 +103,10 @@ public void writeFile(StoreFileMetadata fileMetadata, long readSnapshotFileBuffe assert Arrays.asList(store.directory().listAll()).contains(tempFileName) : "expected: [" + tempFileName + "] in " + Arrays.toString(store.directory().listAll()); store.directory().sync(Collections.singleton(tempFileName)); + indexState.setFullyRecoveredFromSnapshot(fileName); } catch (Exception e) { tempFileNames.remove(tempFileName); store.deleteQuiet(tempFileName); - indexState.resetRecoveredBytesOfFile(fileName); throw e; } finally { decRef(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 0b9671e12166f..05f66379b296f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -343,6 +343,10 @@ static final class Fields { static final String VERIFY_INDEX = "verify_index"; static final String RECOVERED = "recovered"; static final String RECOVERED_IN_BYTES = "recovered_in_bytes"; + static final String RECOVERED_FROM_SNAPSHOT = "recovered_from_snapshot"; + static final String RECOVERED_FROM_SNAPSHOT_IN_BYTES = "recovered_from_snapshot_in_bytes"; + static final String RECOVERED_FROM_SOURCE = "recovered_from_source"; + static final String RECOVERED_FROM_SOURCE_IN_BYTES = "recovered_from_source_in_bytes"; static final String CHECK_INDEX_TIME = "check_index_time"; static final String CHECK_INDEX_TIME_IN_MILLIS = "check_index_time_in_millis"; static final String LENGTH = "length"; @@ -602,8 +606,10 @@ public synchronized XContentBuilder toXContent(XContentBuilder builder, Params p public static class FileDetail implements ToXContentObject, Writeable { private String name; private long length; - private long recovered; + private long recoveredFromSource; private boolean reused; + private long recoveredFromSnapshot; + private boolean fullyRecoveredFromSnapshot; public FileDetail(String name, long length, boolean reused) { assert name != null; @@ -615,28 +621,49 @@ public FileDetail(String name, long length, boolean reused) { public FileDetail(StreamInput in) throws IOException { name = in.readString(); length = in.readVLong(); - recovered = in.readVLong(); + recoveredFromSource = in.readVLong(); reused = in.readBoolean(); + if (in.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_RECOVERIES_SUPPORTED_VERSION)) { + recoveredFromSnapshot = in.readLong(); + fullyRecoveredFromSnapshot = in.readBoolean(); + } } @Override public void writeTo(StreamOutput out) throws IOException { + boolean snapshotBasedRecoveriesSupported = out.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_RECOVERIES_SUPPORTED_VERSION); out.writeString(name); out.writeVLong(length); - out.writeVLong(recovered); + // If the node receiving this information doesn't support this feature yet we should serialize + // the computed recovered bytes based on fullyRecoveredSnapshot + recoveredFromSnapshot + out.writeVLong(snapshotBasedRecoveriesSupported ? recoveredFromSource : recovered()); out.writeBoolean(reused); + if (snapshotBasedRecoveriesSupported) { + out.writeLong(recoveredFromSnapshot); + out.writeBoolean(fullyRecoveredFromSnapshot); + } } void addRecoveredBytes(long bytes) { assert reused == false : "file is marked as reused, can't update recovered bytes"; assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; - recovered += bytes; + assert fullyRecoveredFromSnapshot == false : "File is marked as fully recovered from snapshot"; + recoveredFromSource += bytes; } - void resetRecoveredBytes() { + void addRecoveredFromSnapshotBytes(long bytes) { assert reused == false : "file is marked as reused, can't update recovered bytes"; - // TODO: change this once we keep track of recovered data broke down by snapshot/primary - recovered = 0; + assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; + recoveredFromSnapshot += bytes; + } + + void setFullyRecoveredFromSnapshot() { + assert recoveredFromSnapshot == length; + assert recoveredFromSource == 0; + // Even though we could compute this based on length == recoveredFromSnapshot, + // it's possible that we got a corrupted file and failed to verify the checksum + // marking this as a fully recovered file when that's not the case + fullyRecoveredFromSnapshot = true; } /** @@ -657,7 +684,21 @@ public long length() { * number of bytes recovered for this file (so far). 0 if the file is reused * */ public long recovered() { - return recovered; + return fullyRecoveredFromSnapshot ? recoveredFromSnapshot : recoveredFromSource; + } + + /** + * number of bytes recovered from this file (so far) from a snapshot. + */ + public long recoveredFromSnapshot() { + return recoveredFromSnapshot; + } + + /** + * number of bytes recovered from this file (so far) from the source node. + */ + public long recoveredFromSource() { + return recoveredFromSource; } /** @@ -668,7 +709,7 @@ public boolean reused() { } boolean fullyRecovered() { - return reused == false && length == recovered; + return reused == false && length == recovered(); } @Override @@ -677,7 +718,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.NAME, name); builder.humanReadableField(Fields.LENGTH_IN_BYTES, Fields.LENGTH, new ByteSizeValue(length)); builder.field(Fields.REUSED, reused); - builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recovered)); + builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recovered())); + builder.humanReadableField( + Fields.RECOVERED_FROM_SOURCE_IN_BYTES, Fields.RECOVERED_FROM_SOURCE, new ByteSizeValue(recoveredFromSource) + ); + builder.humanReadableField( + Fields.RECOVERED_FROM_SNAPSHOT_IN_BYTES, Fields.RECOVERED_FROM_SNAPSHOT, new ByteSizeValue(recoveredFromSnapshot) + ); builder.endObject(); return builder; } @@ -686,7 +733,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public boolean equals(Object obj) { if (obj instanceof FileDetail) { FileDetail other = (FileDetail) obj; - return name.equals(other.name) && length == other.length() && reused == other.reused() && recovered == other.recovered(); + return name.equals(other.name) && + length == other.length() && + reused == other.reused() && + recoveredFromSource == other.recoveredFromSource && + recoveredFromSnapshot == other.recoveredFromSnapshot && + fullyRecoveredFromSnapshot == other.fullyRecoveredFromSnapshot; } return false; } @@ -695,14 +747,17 @@ public boolean equals(Object obj) { public int hashCode() { int result = name.hashCode(); result = 31 * result + Long.hashCode(length); - result = 31 * result + Long.hashCode(recovered); + result = 31 * result + Long.hashCode(recoveredFromSource); + result = 31 * result + Long.hashCode(recoveredFromSnapshot); + result = 31 * result + (fullyRecoveredFromSnapshot ? 1 : 0); result = 31 * result + (reused ? 1 : 0); return result; } @Override public String toString() { - return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], recovered [" + recovered + "])"; + return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], " + + "recovered [" + recoveredFromSource + "], recovered from snapshot [" + recoveredFromSnapshot + "]) "; } } @@ -767,10 +822,16 @@ public void addRecoveredBytesToFile(String name, long bytes) { file.addRecoveredBytes(bytes); } - public void resetRecoveredBytesOfFile(String name) { + public void addRecoveredFromSnapshotBytesToFile(String name, long bytes) { + FileDetail file = fileDetails.get(name); + assert file != null : "file [" + name + "] hasn't been reported"; + file.addRecoveredFromSnapshotBytes(bytes); + } + + public void setFullyRecoveredFromSnapshot(String name) { FileDetail file = fileDetails.get(name); assert file != null : "file [" + name + "] hasn't been reported"; - file.resetRecoveredBytes(); + file.setFullyRecoveredFromSnapshot(); } public FileDetail get(String name) { @@ -857,8 +918,12 @@ public synchronized void addRecoveredBytesToFile(String name, long bytes) { fileDetails.addRecoveredBytesToFile(name, bytes); } - public synchronized void resetRecoveredBytesOfFile(String name) { - fileDetails.resetRecoveredBytesOfFile(name); + public synchronized void addRecoveredFromSnapshotBytesToFile(String name, long bytes) { + fileDetails.addRecoveredFromSnapshotBytesToFile(name, bytes); + } + + public synchronized void setFullyRecoveredFromSnapshot(String name) { + fileDetails.setFullyRecoveredFromSnapshot(name); } public synchronized void addSourceThrottling(long timeInNanos) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 9553959d6f73d..19b4449f57e8d 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -69,6 +69,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; @@ -400,6 +401,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { RecoveryState.FileDetail fileDetails = recoveryStateIndex.getFileDetails(storeFileMetadata.name()); assertThat(fileDetails.recovered(), equalTo(storeFileMetadata.length())); + assertThat(fileDetails.recoveredFromSnapshot(), equalTo(storeFileMetadata.length())); recoveryTarget.decRef(); closeShards(shard); @@ -509,6 +511,11 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { RecoveryState.FileDetail fileDetails = recoveryStateIndex.getFileDetails(storeFileMetadata.name()); assertThat(fileDetails.recovered(), equalTo(0L)); + if (downloadFileErrorType == DownloadFileErrorType.FETCH_ERROR) { + assertThat(fileDetails.recoveredFromSnapshot(), equalTo(0L)); + } else { + assertThat(fileDetails.recoveredFromSnapshot(), greaterThan(0L)); + } // Subsequent writes on the same file can proceed without issues PlainActionFuture writeChunkFuture = PlainActionFuture.newFuture(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java index 10fcf28625b49..f79fe166907dc 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -41,6 +42,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.startsWith; @@ -259,6 +261,42 @@ Index createObj(StreamInput in) throws IOException { long recoveredBytes = 0; long sourceThrottling = Index.UNKNOWN; long targetThrottling = Index.UNKNOWN; + + List filesToRecoverFromSnapshot = randomSubsetOf(filesToRecover); + for (FileDetail fileDetail : filesToRecoverFromSnapshot) { + if (bytesToRecover <= 0) { + break; + } + + final long throttledOnTarget = rarely() ? randomIntBetween(10, 200) : 0; + if (targetThrottling == Index.UNKNOWN) { + targetThrottling = throttledOnTarget; + } else { + targetThrottling += throttledOnTarget; + } + index.addTargetThrottling(throttledOnTarget); + + if (fileDetail.length() <= bytesToRecover && randomBoolean()) { + index.addRecoveredFromSnapshotBytesToFile(fileDetail.name(), fileDetail.length()); + index.setFullyRecoveredFromSnapshot(fileDetail.name()); + fileDetail.addRecoveredFromSnapshotBytes(fileDetail.length()); + fileDetail.setFullyRecoveredFromSnapshot(); + + assertThat(fileDetail.recovered(), is(equalTo(fileDetail.length()))); + assertThat(fileDetail.recoveredFromSnapshot(), is(equalTo(fileDetail.length()))); + assertThat(fileDetail.fullyRecovered(), is(equalTo(true))); + assertThat(fileDetail.recoveredFromSnapshot(), is(equalTo(true))); + + bytesToRecover -= fileDetail.length(); + recoveredBytes += fileDetail.length(); + filesToRecover.remove(fileDetail); + } else { + long bytesRecoveredFromSnapshot = randomLongBetween(0, fileDetail.length()); + index.addRecoveredFromSnapshotBytesToFile(fileDetail.name(), bytesRecoveredFromSnapshot); + fileDetail.addRecoveredFromSnapshotBytes(bytesRecoveredFromSnapshot); + } + } + while (bytesToRecover > 0) { FileDetail file = randomFrom(filesToRecover); final long toRecover = Math.min(bytesToRecover, randomIntBetween(1, (int) (file.length() - file.recovered())));