Skip to content

Commit

Permalink
Add unit test for IndexShard.runTranslogRecovery
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Jun 6, 2017
1 parent fab8d46 commit 658b889
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,8 @@ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.O
return translogOpToEngineOpConverter.convertToEngineOp(operation, origin);
}

private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
// package-private for testing
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
int opsRecovered = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,12 +916,16 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) {
}

public Index(String type, String id, long seqNo, byte[] source) {
this(type, id, seqNo, Versions.MATCH_ANY, VersionType.INTERNAL, source);
}

public Index(String type, String id, long seqNo, long version, VersionType versionType, byte[] source) {
this.type = type;
this.id = id;
this.source = new BytesArray(source);
this.seqNo = seqNo;
version = Versions.MATCH_ANY;
versionType = VersionType.INTERNAL;
this.version = version;
this.versionType = versionType;
routing = null;
parent = null;
autoGeneratedIdTimestamp = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1616,6 +1618,57 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
closeShards(primary, replica);
}

public void testRecoverFromTranslog() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
List<Translog.Operation> operations = new ArrayList<>();
int numTotalEntries = randomIntBetween(0, 10);
int numCorruptEntries = 0;
for (int i = 0; i < numTotalEntries; i++) {
if (randomBoolean()) {
operations.add(new Translog.Index("test", "1", 0, 1, VersionType.INTERNAL,
"{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8"))));
} else {
// corrupt entry
operations.add(new Translog.Index("test", "2", 1, 1, VersionType.INTERNAL,
"{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8"))));
numCorruptEntries++;
}
}

Iterator<Translog.Operation> iterator = operations.iterator();
Translog.Snapshot snapshot = new Translog.Snapshot() {

@Override
public int totalOperations() {
return numTotalEntries;
}

@Override
public Translog.Operation next() throws IOException {
return iterator.hasNext() ? iterator.next() : null;
}
};
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(),
getFakeDiscoNode(primary.routingEntry().currentNodeId()),
null));
primary.recoverFromStore();

primary.runTranslogRecovery(primary.getEngine(), snapshot);
assertThat(primary.recoveryState().getTranslog().totalOperationsOnStart(), equalTo(numTotalEntries));
assertThat(primary.recoveryState().getTranslog().totalOperations(), equalTo(numTotalEntries));
assertThat(primary.recoveryState().getTranslog().recoveredOperations(), equalTo(numTotalEntries - numCorruptEntries));

closeShards(primary);
}

public void testShardActiveDuringInternalRecovery() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "type", "0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ protected void recoveryEmptyReplica(IndexShard replica) throws IOException {
}
}

private DiscoveryNode getFakeDiscoNode(String id) {
protected DiscoveryNode getFakeDiscoNode(String id) {
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
Version.CURRENT);
}
Expand Down

0 comments on commit 658b889

Please sign in to comment.