Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tighten sequence numbers recovery #22212

Merged
merged 7 commits into from
Dec 17, 2016

Conversation

jasontedor
Copy link
Member

@jasontedor jasontedor commented Dec 16, 2016

This commit touches addresses issues related to recovery and sequence numbers:

  • A sequence number can be assigned and a Lucene commit created with a
    maximum sequence number at least as large as that sequence number,
    yet the operation corresponding to that sequence number can be
    missing from both the Lucene commit and the translog. This means that
    upon recovery the local checkpoint will be stuck at or below this
    missing sequence number. To address this, we force the local
    checkpoint to the maximum sequence number in the Lucene commit when
    opening the engine. Note that there can still be gaps in the history
    in the translog but we do not address those here.
  • The global checkpoint is transferred to the target shard at the end
    of peer recovery.
  • Additionally, we reenable the relocation integration tests.

Lastly, this work uncovered some bugs in the assignment of sequence
numbers on replica operations:

  • setting the sequence number on replica write requests was missing,
    very likely introduced as a result of resolving merge conflicts
  • handling operations that arrive out of order on a replica and have a
    version conflict with a previous operation were never marked as
    processed

Relates #10708

@jasontedor
Copy link
Member Author

retest this please

1 similar comment
@jasontedor
Copy link
Member Author

retest this please

This commit touches addresses issues related to recovery and sequence numbers:
 - A sequence number can be assigned and a Lucene commit created with a
   maximum sequence number at least as large as that sequence number,
   yet the operation corresponding to that sequence number can be
   missing from both the Lucene commit and the translog. This means that
   upon recovery the local checkpoint will be stuck at or below this
   missing sequence number. To address this, we force the local
   checkpoint to the maximum sequence number in the Lucene commit when
   opening the engine. Note that there can still be gaps in the history
   in the translog but we do not address those here.
 - The global checkpoint is transferred to the target shard at the end
   of peer recovery.
 - Additionally, we reenable the relocation integration tests.

Lastly, this work uncovered some bugs in the assignment of sequence
numbers on replica operations:
 - setting the sequence number on replica write requests was missing,
   very likely introduced as a result of resolving merge conflicts
 - handling operations that arrive out of order on a replica and have a
   version conflict with a previous operation were never marked as
   processed
@jasontedor
Copy link
Member Author

retest this please

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. I left some suggestion. I do think there is something wrong with the inner delete method on the engine

@@ -150,6 +150,7 @@ protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, I
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(indexResult.getSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe assert at the end of this method that the seqNo is set on the replica request?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit. These means that we there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "are in the lucene commit" -> "are in the lucene commit or the translog generation associated with it"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

seqNo = delete.seqNo();
}

if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this twice now? I'm not sure this what you intended?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - it feels like this can simplified based on the origin. I do wonder if the code reuse in checkVersionConflict is worth the extra complexity it brings to reading the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I pushed 1c71393.

@@ -152,9 +152,6 @@ synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
if (this.globalCheckpoint <= globalCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment about when the current global checkpoint can be higher? here is what I wrote in #10708

Note that the global checkpoint is a local knowledge of that is update under the mandate of the primary. It may be that the primary information is lagging compared to a replica. This can happen when a replica is promoted to a primary (but still has stale info).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

@@ -391,7 +391,7 @@ public void finalizeRecovery() {
StopWatch stopWatch = new StopWatch().start();
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(() -> {
recoveryTarget.finalizeRecovery();
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paranoia - can we flip this around and mark the target allocation as "in sync" before we give it the global checkpoint? it at least reads better as "we know you are in sync and therefore every global checkpoint advances will take you into account"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

* tombstone files.
**/
void finalizeRecovery();
* The finalize request clears unreferenced translog files, refreshes the engine now that new segments are available, enables garbage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct anymore - clean the translog files? (I know it was there before, it just caught my eye)

* tombstone files.
**/
void finalizeRecovery();
* The finalize request clears unreferenced translog files, refreshes the engine now that new segments are available, enables garbage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct anymore - clean the translog files? (I know it was there before, it just caught my eye)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it was already not correct. I pushed 1c71393.

@Override
public long generateSeqNo() {
if (rarely()) {
// force skipping a sequence number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you evil person :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should do the test based on real world scenarios we know can happen. I think it will make things clearer?

For example, using a replica engine :

  1. Index a doc, with seq#1
  2. Delete the same doc with seq#3
  3. Flush
  4. index doc with seq#2

To do so on a primary takes something more evil - instead of skiping translog ops, make sure this come "out of order". So make the seq number service put a seq# aside but then serve it on a follow up request.

I think this also serve as documentation of when this can happen.

Copy link
Member Author

@jasontedor jasontedor Dec 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a plan here, but it's a work-in-progress.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this is still in progress? (which is fine)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed f57eb99.

}
}

public void testOutOfOrderSequenceNumbersWithVersionConflict() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++. Maybe also add a sanity check that a get on the doc at the end gives us what we expect? (deleted or found)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

May the git push --force be with you if you require a more elaborative
commit message.
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments

}
}

final boolean exists = operations.get(operations.size() - 1) instanceof Engine.Index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... what happens if the last op is an index, but the previous op was a delete with a higher version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This represents whether or not the last operation before shuffling, and hence the operation with the highest version, is an index or delete operation. I think that this is correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. it's just before the shuffle. got confused. all good.

} else {
final long seqNo;
if (index.origin() == Operation.Origin.PRIMARY) {
final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is a replica and we have a version conflict and throws an exception? I think we still end up not marking the seq no as completed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like this?

diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index e75dc47..d16c11e 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -86,6 +86,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 public class InternalEngine extends Engine {
 
@@ -444,11 +445,18 @@ public class InternalEngine extends Engine {
         }
     }
 
-    private boolean checkVersionConflict(
+    /**
+     * checks for version conflicts and returns the right result object if conflict was detected. returns `null`
+     * if no conflicts was found and indexing should proceed as normal
+     */
+    private <T extends Result> T checkVersionConflict(
             final Operation op,
             final long currentVersion,
             final long expectedVersion,
-            final boolean deleted) {
+            final boolean deleted,
+            final Supplier<T> resultOnSuccess,
+            final Function<Exception, T> resultOnFailure) {
+        final T result;
         if (op.versionType() == VersionType.FORCE) {
             if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
                 // If index was created in 5.0 or later, 'force' is not allowed at all
@@ -461,15 +469,17 @@ public class InternalEngine extends Engine {
 
         if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
             if (op.origin().isRecovery()) {
-                // version conflict, but okay
-                return true;
+                // version conflict, but okay, mark as success
+                result = resultOnSuccess.get();
             } else {
                 // fatal version conflict
-                throw new VersionConflictEngineException(shardId, op.type(), op.id(),
-                        op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
+                result = resultOnFailure.apply(new VersionConflictEngineException(shardId, op.type(), op.id(),
+                    op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
             }
+        } else {
+            result = null;
         }
-        return false;
+        return result;
     }
 
     private long checkDeletedAndGCed(VersionValue versionValue) {
@@ -584,6 +594,7 @@ public class InternalEngine extends Engine {
         final Translog.Location location;
         final long updatedVersion;
         IndexResult indexResult = null;
+        long seqNo = index.seqNo();
         try (Releasable ignored = acquireLock(index.uid())) {
             lastWriteNanos = index.startTime();
             /* if we have an autoGeneratedID that comes into the engine we can potentially optimize
@@ -648,23 +659,18 @@ public class InternalEngine extends Engine {
                 }
             }
             final long expectedVersion = index.version();
-            final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
+            IndexResult result = checkVersionConflict(index, currentVersion, expectedVersion, deleted,
+                () -> new IndexResult(currentVersion, index.seqNo(), false),
+                exception -> new IndexResult(exception, currentVersion, index.seqNo()));
 
-            final long seqNo;
-            if (index.origin() == Operation.Origin.PRIMARY) {
-                if (!conflict) {
+            if (result == null) {
+
+                if (index.origin() == Operation.Origin.PRIMARY) {
                     seqNo = seqNoService.generateSeqNo();
-                } else {
-                    seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
                 }
-            } else {
-                seqNo = index.seqNo();
-            }
 
-            if (conflict) {
-                // skip index operation because of version conflict on recovery
-                indexResult = new IndexResult(expectedVersion, seqNo, false);
-            } else {
+                index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
+
                 updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
                 index.parsedDoc().version().setLongValue(updatedVersion);
 
@@ -686,8 +692,8 @@ public class InternalEngine extends Engine {
                 }
                 indexResult = new IndexResult(updatedVersion, seqNo, deleted);
                 location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
-                        ? translog.add(new Translog.Index(index, indexResult))
-                        : null;
+                    ? translog.add(new Translog.Index(index, indexResult))
+                    : null;
                 versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
                 indexResult.setTranslogLocation(location);
             }
@@ -695,7 +701,7 @@ public class InternalEngine extends Engine {
             indexResult.freeze();
             return indexResult;
         } finally {
-            if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+            if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
                 seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
             }
         }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strengthened the test to include the replica case (it caught the issue), and incorporated your suggestion.

@jasontedor
Copy link
Member Author

Thanks @bleskes, I've responded to your feedback.

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! that test qualifies for the brain olympics :) left some minor comments that don't require another review

@@ -574,6 +611,7 @@ private IndexResult innerIndex(Index index) throws IOException {
final Translog.Location location;
final long updatedVersion;
IndexResult indexResult = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - this can be put in scope and next to where it's created and probably made final

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b9f68d4.

deleteResult.setTranslogLocation(location);
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
return deleteResult;
} finally {
if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should use the seqNo variable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b9f68d4.

@Override
public long generateSeqNo() {
if (rarely()) {
// force skipping a sequence number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this is still in progress? (which is fine)

final Term uid = newUid("1");
final Document document = testDocumentWithTextField();
final AtomicLong sequenceNumber = new AtomicLong();
final Engine.Operation.Origin origin = randomFrom(PEER_RECOVERY, PRIMARY, PEER_RECOVERY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant one of this to be translog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b9f68d4.

if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should use the seqNo variable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b9f68d4. Annoying I caught that in the innerIndex but not here 😞.

@jasontedor
Copy link
Member Author

Thanks @bleskes. I'll let CI have a go at this before merging.

@jasontedor
Copy link
Member Author

My CI passes, but the PR CI build here is struggling with the checkout problem that we've been seeing. I'm going to merge and will watch our regular CI.

@jasontedor jasontedor merged commit 58d73ba into elastic:master Dec 17, 2016
@jasontedor jasontedor deleted the enable-relocation-it branch December 17, 2016 14:20
@clintongormley clintongormley added the :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. label Feb 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement v6.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants