diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index aa2a4c0fa9700..c5bf0d1b1b43e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1054,7 +1054,8 @@ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.O return translogOpToEngineOpConverter.convertToEngineOp(operation, origin); } - private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { + // package-private for testing + int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); int opsRecovered = 0; diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 1314504e397ec..0a2f7526c1340 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -916,12 +916,16 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) { } public Index(String type, String id, long seqNo, byte[] source) { + this(type, id, seqNo, Versions.MATCH_ANY, VersionType.INTERNAL, source); + } + + public Index(String type, String id, long seqNo, long version, VersionType versionType, byte[] source) { this.type = type; this.id = id; this.source = new BytesArray(source); this.seqNo = seqNo; - version = Versions.MATCH_ANY; - versionType = VersionType.INTERNAL; + this.version = version; + this.versionType = versionType; routing = null; parent = null; autoGeneratedIdTimestamp = -1; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 2f3f8f7ad8c3c..c25cbc5ce99c0 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -102,12 +102,14 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.nio.charset.Charset; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -1616,6 +1618,57 @@ public long indexTranslogOperations(List operations, int tot closeShards(primary, replica); } + public void testRecoverFromTranslog() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + List operations = new ArrayList<>(); + int numTotalEntries = randomIntBetween(0, 10); + int numCorruptEntries = 0; + for (int i = 0; i < numTotalEntries; i++) { + if (randomBoolean()) { + operations.add(new Translog.Index("test", "1", 0, 1, VersionType.INTERNAL, + "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")))); + } else { + // corrupt entry + operations.add(new Translog.Index("test", "2", 1, 1, VersionType.INTERNAL, + "{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")))); + numCorruptEntries++; + } + } + + Iterator iterator = operations.iterator(); + Translog.Snapshot snapshot = new Translog.Snapshot() { + + @Override + public int totalOperations() { + return numTotalEntries; + } + + @Override + public Translog.Operation next() throws IOException { + return iterator.hasNext() ? iterator.next() : null; + } + }; + primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), + getFakeDiscoNode(primary.routingEntry().currentNodeId()), + null)); + primary.recoverFromStore(); + + primary.runTranslogRecovery(primary.getEngine(), snapshot); + assertThat(primary.recoveryState().getTranslog().totalOperationsOnStart(), equalTo(numTotalEntries)); + assertThat(primary.recoveryState().getTranslog().totalOperations(), equalTo(numTotalEntries)); + assertThat(primary.recoveryState().getTranslog().recoveredOperations(), equalTo(numTotalEntries - numCorruptEntries)); + + closeShards(primary); + } + public void testShardActiveDuringInternalRecovery() throws IOException { IndexShard shard = newStartedShard(true); indexDoc(shard, "type", "0"); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 515e01c0409e3..a4d587b4835d7 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -365,7 +365,7 @@ protected void recoveryEmptyReplica(IndexShard replica) throws IOException { } } - private DiscoveryNode getFakeDiscoNode(String id) { + protected DiscoveryNode getFakeDiscoNode(String id) { return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); }