diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e399c02d6cc84..979c44dd5fc8d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -133,6 +133,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -1385,18 +1386,13 @@ public void testVersioningCreateExistsException() throws IOException { } protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, - boolean partialOldPrimary, long primaryTerm, - int minOpCount, int maxOpCount) { + long primaryTerm, + int minOpCount, int maxOpCount, String docId) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); final List ops = new ArrayList<>(); - final Term id = newUid("1"); - final int startWithSeqNo; - if (partialOldPrimary) { - startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); - } else { - startWithSeqNo = 0; - } - final String valuePrefix = forReplica ? "r_" : "p_"; + final Term id = newUid(docId); + final int startWithSeqNo = 0; + final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_"; final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); for (int i = 0; i < numOfOps; i++) { final Engine.Operation op; @@ -1418,7 +1414,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve throw new UnsupportedOperationException("unknown version type: " + versionType); } if (randomBoolean()) { - op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null), + op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null), forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, @@ -1427,7 +1423,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve System.currentTimeMillis(), -1, false ); } else { - op = new Engine.Delete("test", "1", id, + op = new Engine.Delete("test", docId, id, forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, @@ -1442,7 +1438,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve public void testOutOfOrderDocsOnReplica() throws IOException { final List ops = generateSingleDocHistory(true, - randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), 2, 2, 20, "1"); assertOpsOnReplica(ops, replicaEngine, true); } @@ -1511,28 +1507,83 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } } - public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); - final Engine.Operation lastOp = ops.get(ops.size() - 1); - final String lastFieldValue; - if (lastOp instanceof Engine.Index) { - Engine.Index index = (Engine.Index) lastOp; - lastFieldValue = index.docs().get(0).get("value"); + public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, InterruptedException { + final List opsDoc1 = + generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "1"); + final Engine.Operation lastOpDoc1 = opsDoc1.get(opsDoc1.size() - 1); + final String lastFieldValueDoc1; + if (lastOpDoc1 instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOpDoc1; + lastFieldValueDoc1 = index.docs().get(0).get("value"); } else { // delete - lastFieldValue = null; + lastFieldValueDoc1 = null; + } + final List opsDoc2 = + generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "2"); + final Engine.Operation lastOpDoc2 = opsDoc2.get(opsDoc2.size() - 1); + final String lastFieldValueDoc2; + if (lastOpDoc2 instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOpDoc2; + lastFieldValueDoc2 = index.docs().get(0).get("value"); + } else { + // delete + lastFieldValueDoc2 = null; } - shuffle(ops, random()); - concurrentlyApplyOps(ops, engine); + // randomly interleave + final AtomicLong seqNoGenerator = new AtomicLong(); + Function seqNoUpdater = operation -> { + final long newSeqNo = seqNoGenerator.getAndIncrement(); + if (operation instanceof Engine.Index) { + Engine.Index index = (Engine.Index) operation; + return new Engine.Index(index.uid(), index.parsedDoc(), newSeqNo, index.primaryTerm(), index.version(), + index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry()); + } else { + Engine.Delete delete = (Engine.Delete) operation; + return new Engine.Delete(delete.type(), delete.id(), delete.uid(), newSeqNo, delete.primaryTerm(), + delete.version(), delete.versionType(), delete.origin(), delete.startTime()); + } + }; + final List allOps = new ArrayList<>(); + Iterator iter1 = opsDoc1.iterator(); + Iterator iter2 = opsDoc2.iterator(); + while (iter1.hasNext() && iter2.hasNext()) { + final Engine.Operation next = randomBoolean() ? iter1.next() : iter2.next(); + allOps.add(seqNoUpdater.apply(next)); + } + iter1.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); + iter2.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); + // insert some duplicates + allOps.addAll(randomSubsetOf(allOps)); - assertVisibleCount(engine, lastFieldValue == null ? 0 : 1); - if (lastFieldValue != null) { + shuffle(allOps, random()); + concurrentlyApplyOps(allOps, engine); + + engine.refresh("test"); + + if (lastFieldValueDoc1 != null) { try (Searcher searcher = engine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); - searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValueDoc1)), collector); assertThat(collector.getTotalHits(), equalTo(1)); } } + if (lastFieldValueDoc2 != null) { + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValueDoc2)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + + int totalExpectedOps = 0; + if (lastFieldValueDoc1 != null) { + totalExpectedOps++; + } + if (lastFieldValueDoc2 != null) { + totalExpectedOps++; + } + assertVisibleCount(engine, totalExpectedOps); } private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { @@ -1572,12 +1623,12 @@ private void concurrentlyApplyOps(List ops, InternalEngine eng } public void testInternalVersioningOnPrimary() throws IOException { - final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1"); assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); } public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception { - List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 10, 100); + List ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 10, 100, "1"); CountDownLatch latch = new CountDownLatch(1); AtomicBoolean running = new AtomicBoolean(true); Thread refreshThread = new Thread(() -> { @@ -1697,7 +1748,7 @@ public void testNonInternalVersioningOnPrimary() throws IOException { final Set nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values())); nonInternalVersioning.remove(VersionType.INTERNAL); final VersionType versionType = randomFrom(nonInternalVersioning); - final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); + final List ops = generateSingleDocHistory(false, versionType, 2, 2, 20, "1"); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { @@ -1775,8 +1826,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException { } public void testVersioningPromotedReplica() throws IOException { - final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); - List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, 1, 2, 20, "1"); + List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1"); Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; final long finalReplicaVersion = lastReplicaOp.version(); @@ -1796,7 +1847,7 @@ public void testVersioningPromotedReplica() throws IOException { } public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); + final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, 2, 100, 300, "1"); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) {