Skip to content

Commit

Permalink
Engine: version logic on replicas should not be hard coded (#23998)
Browse files Browse the repository at this point in the history
The refactoring in #23711 hardcoded version logic for replica to assume monotonic versions. Sadly that's wrong for `FORCE` and `VERSION_GTE`. Instead we should use the methods in VersionType to detect conflicts.

Note - once replicas use sequence numbers for out of order delivery, this logic goes away.
  • Loading branch information
bleskes authored Apr 9, 2017
1 parent f0df5e6 commit b636ca7
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation
if (versionValue == null) {
return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else {
return op.version() > versionValue.getVersion() ?
OpVsLuceneDocStatus.OP_NEWER : OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ?
OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,8 +1309,9 @@ public void testVersioningCreateExistsException() throws IOException {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}

protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, boolean externalVersioning, boolean partialOldPrimary,
long primaryTerm, int minOpCount, int maxOpCount) {
protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
boolean partialOldPrimary, long primaryTerm,
int minOpCount, int maxOpCount) {
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
final List<Engine.Operation> ops = new ArrayList<>();
final Term id = newUid(Uid.createUid("test", "1"));
Expand All @@ -1322,14 +1323,30 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
}
final String valuePrefix = forReplica ? "r_" : "p_";
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
final VersionType versionType = externalVersioning ? VersionType.EXTERNAL : VersionType.INTERNAL;
for (int i = 0; i < numOfOps; i++) {
final Engine.Operation op;
final long version;
switch (versionType) {
case INTERNAL:
version = forReplica ? i : Versions.MATCH_ANY;
break;
case EXTERNAL:
version = i;
break;
case EXTERNAL_GTE:
version = randomBoolean() ? Math.max(i - 1, 0) : i;
break;
case FORCE:
version = randomNonNegativeLong();
break;
default:
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument("1", "test", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
forReplica || externalVersioning ? i : Versions.MATCH_ANY,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis(), -1, false
Expand All @@ -1338,7 +1355,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
op = new Engine.Delete("test", "1", id,
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
forReplica || externalVersioning ? i : Versions.MATCH_ANY,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis());
Expand All @@ -1349,10 +1366,20 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
}

public void testOutOfOrderDocsOnReplica() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine);
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, true);
}

public void testNonStandardVersioningOnReplica() throws IOException {
// TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order
// is detected using seq#
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, false);
}


public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder()
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
Expand All @@ -1365,12 +1392,12 @@ public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
try (Store oldReplicaStore = createStore();
InternalEngine replicaEngine =
createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) {
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, true, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine);
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, true);
}
}

private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine) throws IOException {
private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand All @@ -1380,13 +1407,15 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
// delete
lastFieldValue = null;
}
int firstOpWithSeqNo = 0;
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
firstOpWithSeqNo++;
if (shuffleOps) {
int firstOpWithSeqNo = 0;
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
firstOpWithSeqNo++;
}
// shuffle ops but make sure legacy ops are first
shuffle(ops.subList(0, firstOpWithSeqNo), random());
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
}
// shuffle ops but make sure legacy ops are first
shuffle(ops.subList(0, firstOpWithSeqNo), random());
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
boolean firstOp = true;
for (Engine.Operation op : ops) {
logger.info("performing [{}], v [{}], seq# [{}], term [{}]",
Expand Down Expand Up @@ -1432,7 +1461,7 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
}

public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException {
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 100, 300);
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand Down Expand Up @@ -1492,7 +1521,7 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
}

public void testInternalVersioningOnPrimary() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, false, false, 2, 2, 20);
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
}

Expand Down Expand Up @@ -1595,8 +1624,11 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
return opsPerformed;
}

public void testExternalVersioningOnPrimary() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 2, 20);
public void testNonInternalVersioningOnPrimary() throws IOException {
final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values()));
nonInternalVersioning.remove(VersionType.INTERNAL);
final VersionType versionType = randomFrom(nonInternalVersioning);
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand All @@ -1606,7 +1638,10 @@ public void testExternalVersioningOnPrimary() throws IOException {
// delete
lastFieldValue = null;
}
shuffle(ops, random());
// other version types don't support out of order processing.
if (versionType == VersionType.EXTERNAL) {
shuffle(ops, random());
}
long highestOpVersion = Versions.NOT_FOUND;
long seqNo = -1;
boolean docDeleted = true;
Expand All @@ -1616,7 +1651,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
if (op instanceof Engine.Index) {
final Engine.Index index = (Engine.Index) op;
Engine.IndexResult result = engine.index(index);
if (op.version() > highestOpVersion) {
if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
seqNo++;
assertThat(result.getSeqNo(), equalTo(seqNo));
assertThat(result.isCreated(), equalTo(docDeleted));
Expand All @@ -1634,7 +1669,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
} else {
final Engine.Delete delete = (Engine.Delete) op;
Engine.DeleteResult result = engine.delete(delete);
if (op.version() > highestOpVersion) {
if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
seqNo++;
assertThat(result.getSeqNo(), equalTo(seqNo));
assertThat(result.isFound(), equalTo(docDeleted == false));
Expand All @@ -1660,6 +1695,7 @@ public void testExternalVersioningOnPrimary() throws IOException {

assertVisibleCount(engine, docDeleted ? 0 : 1);
if (docDeleted == false) {
logger.info("searching for [{}]", lastFieldValue);
try (Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector);
Expand All @@ -1669,13 +1705,13 @@ public void testExternalVersioningOnPrimary() throws IOException {
}

public void testVersioningPromotedReplica() throws IOException {
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, true, false, 1, 2, 20);
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, false, false, 2, 2, 20);
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20);
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1);
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete;
final long finalReplicaVersion = lastReplicaOp.version();
final long finalReplicaSeqNo = lastReplicaOp.seqNo();
assertOpsOnReplica(replicaOps, replicaEngine);
assertOpsOnReplica(replicaOps, replicaEngine, true);
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, lastReplicaOp.uid())).v1();
try (Searcher searcher = engine.acquireSearcher("test")) {
Expand All @@ -1689,7 +1725,7 @@ public void testVersioningPromotedReplica() throws IOException {
}

public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 100, 300);
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand Down

0 comments on commit b636ca7

Please sign in to comment.