Skip to content

Commit

Permalink
InternalEngineTests.testConcurrentOutOfOrderDocsOnReplica should use …
Browse files Browse the repository at this point in the history
…two documents (#30121)

We were recently looking at bugs that can only occur if two different documents were indexed concurrently. For example, what happens if the local checkpoint advances above the sequence number of  a document that's being indexed. That can only happen if another concurrent operation caused the checkpoint to advance. It has to be another document to allow concurrency as we acquire a per uid lock.While our investigation proved that the suspected bug doesn't exists, we still discovered our unit testing coverage is not good enough to cover this case. 

This PR extend the test concurrent out of order replica processing to use two documents in its history.
  • Loading branch information
bleskes authored May 3, 2018
1 parent bdd43fa commit ccd791b
Showing 1 changed file with 83 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1385,18 +1386,13 @@ public void testVersioningCreateExistsException() throws IOException {
}

protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
boolean partialOldPrimary, long primaryTerm,
int minOpCount, int maxOpCount) {
long primaryTerm,
int minOpCount, int maxOpCount, String docId) {
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
final List<Engine.Operation> ops = new ArrayList<>();
final Term id = newUid("1");
final int startWithSeqNo;
if (partialOldPrimary) {
startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1);
} else {
startWithSeqNo = 0;
}
final String valuePrefix = forReplica ? "r_" : "p_";
final Term id = newUid(docId);
final int startWithSeqNo = 0;
final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_";
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
for (int i = 0; i < numOfOps; i++) {
final Engine.Operation op;
Expand All @@ -1418,7 +1414,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
Expand All @@ -1427,7 +1423,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
System.currentTimeMillis(), -1, false
);
} else {
op = new Engine.Delete("test", "1", id,
op = new Engine.Delete("test", docId, id,
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
Expand All @@ -1442,7 +1438,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve

public void testOutOfOrderDocsOnReplica() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), 2, 2, 20, "1");
assertOpsOnReplica(ops, replicaEngine, true);
}

Expand Down Expand Up @@ -1511,28 +1507,83 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
}
}

public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException {
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOp;
lastFieldValue = index.docs().get(0).get("value");
public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, InterruptedException {
final List<Engine.Operation> opsDoc1 =
generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "1");
final Engine.Operation lastOpDoc1 = opsDoc1.get(opsDoc1.size() - 1);
final String lastFieldValueDoc1;
if (lastOpDoc1 instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOpDoc1;
lastFieldValueDoc1 = index.docs().get(0).get("value");
} else {
// delete
lastFieldValue = null;
lastFieldValueDoc1 = null;
}
final List<Engine.Operation> opsDoc2 =
generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "2");
final Engine.Operation lastOpDoc2 = opsDoc2.get(opsDoc2.size() - 1);
final String lastFieldValueDoc2;
if (lastOpDoc2 instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOpDoc2;
lastFieldValueDoc2 = index.docs().get(0).get("value");
} else {
// delete
lastFieldValueDoc2 = null;
}
shuffle(ops, random());
concurrentlyApplyOps(ops, engine);
// randomly interleave
final AtomicLong seqNoGenerator = new AtomicLong();
Function<Engine.Operation, Engine.Operation> seqNoUpdater = operation -> {
final long newSeqNo = seqNoGenerator.getAndIncrement();
if (operation instanceof Engine.Index) {
Engine.Index index = (Engine.Index) operation;
return new Engine.Index(index.uid(), index.parsedDoc(), newSeqNo, index.primaryTerm(), index.version(),
index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry());
} else {
Engine.Delete delete = (Engine.Delete) operation;
return new Engine.Delete(delete.type(), delete.id(), delete.uid(), newSeqNo, delete.primaryTerm(),
delete.version(), delete.versionType(), delete.origin(), delete.startTime());
}
};
final List<Engine.Operation> allOps = new ArrayList<>();
Iterator<Engine.Operation> iter1 = opsDoc1.iterator();
Iterator<Engine.Operation> iter2 = opsDoc2.iterator();
while (iter1.hasNext() && iter2.hasNext()) {
final Engine.Operation next = randomBoolean() ? iter1.next() : iter2.next();
allOps.add(seqNoUpdater.apply(next));
}
iter1.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o)));
iter2.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o)));
// insert some duplicates
allOps.addAll(randomSubsetOf(allOps));

assertVisibleCount(engine, lastFieldValue == null ? 0 : 1);
if (lastFieldValue != null) {
shuffle(allOps, random());
concurrentlyApplyOps(allOps, engine);

engine.refresh("test");

if (lastFieldValueDoc1 != null) {
try (Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector);
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValueDoc1)), collector);
assertThat(collector.getTotalHits(), equalTo(1));
}
}
if (lastFieldValueDoc2 != null) {
try (Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValueDoc2)), collector);
assertThat(collector.getTotalHits(), equalTo(1));
}
}

int totalExpectedOps = 0;
if (lastFieldValueDoc1 != null) {
totalExpectedOps++;
}
if (lastFieldValueDoc2 != null) {
totalExpectedOps++;
}
assertVisibleCount(engine, totalExpectedOps);
}

private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
Expand Down Expand Up @@ -1572,12 +1623,12 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
}

public void testInternalVersioningOnPrimary() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1");
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
}

public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception {
List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 10, 100);
List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 10, 100, "1");
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean running = new AtomicBoolean(true);
Thread refreshThread = new Thread(() -> {
Expand Down Expand Up @@ -1697,7 +1748,7 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values()));
nonInternalVersioning.remove(VersionType.INTERNAL);
final VersionType versionType = randomFrom(nonInternalVersioning);
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20);
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, 2, 2, 20, "1");
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand Down Expand Up @@ -1775,8 +1826,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
}

public void testVersioningPromotedReplica() throws IOException {
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20);
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, 1, 2, 20, "1");
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1");
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1);
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete;
final long finalReplicaVersion = lastReplicaOp.version();
Expand All @@ -1796,7 +1847,7 @@ public void testVersioningPromotedReplica() throws IOException {
}

public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300);
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, 2, 100, 300, "1");
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand Down

0 comments on commit ccd791b

Please sign in to comment.