diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index aed68d812a6bd..333dd769eaf68 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -441,8 +441,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation if (versionValue == null) { return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else { - return op.version() > versionValue.getVersion() ? - OpVsLuceneDocStatus.OP_NEWER : OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ? + OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER; } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8840059706144..2d3ba055df4aa 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1309,8 +1309,9 @@ public void testVersioningCreateExistsException() throws IOException { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - protected List generateSingleDocHistory(boolean forReplica, boolean externalVersioning, boolean partialOldPrimary, - long primaryTerm, int minOpCount, int maxOpCount) { + protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, + boolean partialOldPrimary, long primaryTerm, + int minOpCount, int maxOpCount) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); final List ops = new ArrayList<>(); final Term id = newUid(Uid.createUid("test", "1")); @@ -1322,14 +1323,30 @@ protected List generateSingleDocHistory(boolean forReplica, bo } final String valuePrefix = forReplica ? "r_" : "p_"; final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); - final VersionType versionType = externalVersioning ? VersionType.EXTERNAL : VersionType.INTERNAL; for (int i = 0; i < numOfOps; i++) { final Engine.Operation op; + final long version; + switch (versionType) { + case INTERNAL: + version = forReplica ? i : Versions.MATCH_ANY; + break; + case EXTERNAL: + version = i; + break; + case EXTERNAL_GTE: + version = randomBoolean() ? Math.max(i - 1, 0) : i; + break; + case FORCE: + version = randomNonNegativeLong(); + break; + default: + throw new UnsupportedOperationException("unknown version type: " + versionType); + } if (randomBoolean()) { op = new Engine.Index(id, testParsedDocument("1", "test", null, testDocumentWithTextField(valuePrefix + i), B_1, null), forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, - forReplica || externalVersioning ? i : Versions.MATCH_ANY, + version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis(), -1, false @@ -1338,7 +1355,7 @@ protected List generateSingleDocHistory(boolean forReplica, bo op = new Engine.Delete("test", "1", id, forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, - forReplica || externalVersioning ? i : Versions.MATCH_ANY, + version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis()); @@ -1349,10 +1366,20 @@ protected List generateSingleDocHistory(boolean forReplica, bo } public void testOutOfOrderDocsOnReplica() throws IOException { - final List ops = generateSingleDocHistory(true, true, false, 2, 2, 20); - assertOpsOnReplica(ops, replicaEngine); + final List ops = generateSingleDocHistory(true, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20); + assertOpsOnReplica(ops, replicaEngine, true); } + public void testNonStandardVersioningOnReplica() throws IOException { + // TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order + // is detected using seq# + final List ops = generateSingleDocHistory(true, + randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); + assertOpsOnReplica(ops, replicaEngine, false); + } + + public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException { IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder() .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us @@ -1365,12 +1392,12 @@ public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException { try (Store oldReplicaStore = createStore(); InternalEngine replicaEngine = createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) { - final List ops = generateSingleDocHistory(true, true, true, 2, 2, 20); - assertOpsOnReplica(ops, replicaEngine); + final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20); + assertOpsOnReplica(ops, replicaEngine, true); } } - private void assertOpsOnReplica(List ops, InternalEngine replicaEngine) throws IOException { + private void assertOpsOnReplica(List ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { @@ -1380,13 +1407,15 @@ private void assertOpsOnReplica(List ops, InternalEngine repli // delete lastFieldValue = null; } - int firstOpWithSeqNo = 0; - while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { - firstOpWithSeqNo++; + if (shuffleOps) { + int firstOpWithSeqNo = 0; + while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { + firstOpWithSeqNo++; + } + // shuffle ops but make sure legacy ops are first + shuffle(ops.subList(0, firstOpWithSeqNo), random()); + shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); } - // shuffle ops but make sure legacy ops are first - shuffle(ops.subList(0, firstOpWithSeqNo), random()); - shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); boolean firstOp = true; for (Engine.Operation op : ops) { logger.info("performing [{}], v [{}], seq# [{}], term [{}]", @@ -1432,7 +1461,7 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(true, true, false, 2, 100, 300); + final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { @@ -1492,7 +1521,7 @@ private void concurrentlyApplyOps(List ops, InternalEngine eng } public void testInternalVersioningOnPrimary() throws IOException { - final List ops = generateSingleDocHistory(false, false, false, 2, 2, 20); + final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); } @@ -1595,8 +1624,11 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion return opsPerformed; } - public void testExternalVersioningOnPrimary() throws IOException { - final List ops = generateSingleDocHistory(false, true, false, 2, 2, 20); + public void testNonInternalVersioningOnPrimary() throws IOException { + final Set nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values())); + nonInternalVersioning.remove(VersionType.INTERNAL); + final VersionType versionType = randomFrom(nonInternalVersioning); + final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { @@ -1606,7 +1638,10 @@ public void testExternalVersioningOnPrimary() throws IOException { // delete lastFieldValue = null; } - shuffle(ops, random()); + // other version types don't support out of order processing. + if (versionType == VersionType.EXTERNAL) { + shuffle(ops, random()); + } long highestOpVersion = Versions.NOT_FOUND; long seqNo = -1; boolean docDeleted = true; @@ -1616,7 +1651,7 @@ public void testExternalVersioningOnPrimary() throws IOException { if (op instanceof Engine.Index) { final Engine.Index index = (Engine.Index) op; Engine.IndexResult result = engine.index(index); - if (op.version() > highestOpVersion) { + if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) { seqNo++; assertThat(result.getSeqNo(), equalTo(seqNo)); assertThat(result.isCreated(), equalTo(docDeleted)); @@ -1634,7 +1669,7 @@ public void testExternalVersioningOnPrimary() throws IOException { } else { final Engine.Delete delete = (Engine.Delete) op; Engine.DeleteResult result = engine.delete(delete); - if (op.version() > highestOpVersion) { + if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) { seqNo++; assertThat(result.getSeqNo(), equalTo(seqNo)); assertThat(result.isFound(), equalTo(docDeleted == false)); @@ -1660,6 +1695,7 @@ public void testExternalVersioningOnPrimary() throws IOException { assertVisibleCount(engine, docDeleted ? 0 : 1); if (docDeleted == false) { + logger.info("searching for [{}]", lastFieldValue); try (Searcher searcher = engine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); @@ -1669,13 +1705,13 @@ public void testExternalVersioningOnPrimary() throws IOException { } public void testVersioningPromotedReplica() throws IOException { - final List replicaOps = generateSingleDocHistory(true, true, false, 1, 2, 20); - List primaryOps = generateSingleDocHistory(false, false, false, 2, 2, 20); + final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); + List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; final long finalReplicaVersion = lastReplicaOp.version(); final long finalReplicaSeqNo = lastReplicaOp.seqNo(); - assertOpsOnReplica(replicaOps, replicaEngine); + assertOpsOnReplica(replicaOps, replicaEngine, true); final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, lastReplicaOp.uid())).v1(); try (Searcher searcher = engine.acquireSearcher("test")) { @@ -1689,7 +1725,7 @@ public void testVersioningPromotedReplica() throws IOException { } public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(false, true, false, 2, 100, 300); + final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) {