Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tighten sequence numbers recovery #22212

Merged
merged 7 commits into from
Dec 17, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(indexResult.getSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe assert at the end of this method that the seqNo is set on the replica request?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
Expand All @@ -173,6 +174,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(deleteResult.getVersion());
deleteRequest.seqNo(deleteResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
deleteResult.getVersion(), deleteResult.isFound());
Expand Down Expand Up @@ -282,6 +284,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
final long version = updateOperationResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(updateOperationResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
}
break;
Expand All @@ -292,6 +295,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(updateOperationResult.getVersion());
deleteRequest.seqNo(updateOperationResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
request.seqNo(result.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
response = new DeleteResponse(
primary.shardId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
indexWriter = writer;
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit. These means that we there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "are in the lucene commit" -> "are in the lucene commit or the translog generation associated with it"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

* might be operations greater than the local checkpoint that will not be replayed. Here we force the local checkpoint to
* the maximum sequence number in the commit (at the potential expense of correctness).
*/
while (seqNoService.getLocalCheckpoint() < seqNoService.getMaxSeqNo()) {
final long next = seqNoService.getLocalCheckpoint() + 1;
seqNoService.markSeqNoAsCompleted(next);
}
indexWriter = writer;
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) {
Expand Down Expand Up @@ -638,16 +648,23 @@ private IndexResult innerIndex(Index index) throws IOException {
}
}
final long expectedVersion = index.version();
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
// skip index operation because of version conflict on recovery
indexResult = new IndexResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
} else {
final long seqNo;
if (index.origin() == Operation.Origin.PRIMARY) {
final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is a replica and we have a version conflict and throws an exception? I think we still end up not marking the seq no as completed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like this?

diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index e75dc47..d16c11e 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -86,6 +86,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 public class InternalEngine extends Engine {
 
@@ -444,11 +445,18 @@ public class InternalEngine extends Engine {
         }
     }
 
-    private boolean checkVersionConflict(
+    /**
+     * checks for version conflicts and returns the right result object if conflict was detected. returns `null`
+     * if no conflicts was found and indexing should proceed as normal
+     */
+    private <T extends Result> T checkVersionConflict(
             final Operation op,
             final long currentVersion,
             final long expectedVersion,
-            final boolean deleted) {
+            final boolean deleted,
+            final Supplier<T> resultOnSuccess,
+            final Function<Exception, T> resultOnFailure) {
+        final T result;
         if (op.versionType() == VersionType.FORCE) {
             if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
                 // If index was created in 5.0 or later, 'force' is not allowed at all
@@ -461,15 +469,17 @@ public class InternalEngine extends Engine {
 
         if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
             if (op.origin().isRecovery()) {
-                // version conflict, but okay
-                return true;
+                // version conflict, but okay, mark as success
+                result = resultOnSuccess.get();
             } else {
                 // fatal version conflict
-                throw new VersionConflictEngineException(shardId, op.type(), op.id(),
-                        op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
+                result = resultOnFailure.apply(new VersionConflictEngineException(shardId, op.type(), op.id(),
+                    op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
             }
+        } else {
+            result = null;
         }
-        return false;
+        return result;
     }
 
     private long checkDeletedAndGCed(VersionValue versionValue) {
@@ -584,6 +594,7 @@ public class InternalEngine extends Engine {
         final Translog.Location location;
         final long updatedVersion;
         IndexResult indexResult = null;
+        long seqNo = index.seqNo();
         try (Releasable ignored = acquireLock(index.uid())) {
             lastWriteNanos = index.startTime();
             /* if we have an autoGeneratedID that comes into the engine we can potentially optimize
@@ -648,23 +659,18 @@ public class InternalEngine extends Engine {
                 }
             }
             final long expectedVersion = index.version();
-            final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
+            IndexResult result = checkVersionConflict(index, currentVersion, expectedVersion, deleted,
+                () -> new IndexResult(currentVersion, index.seqNo(), false),
+                exception -> new IndexResult(exception, currentVersion, index.seqNo()));
 
-            final long seqNo;
-            if (index.origin() == Operation.Origin.PRIMARY) {
-                if (!conflict) {
+            if (result == null) {
+
+                if (index.origin() == Operation.Origin.PRIMARY) {
                     seqNo = seqNoService.generateSeqNo();
-                } else {
-                    seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
                 }
-            } else {
-                seqNo = index.seqNo();
-            }
 
-            if (conflict) {
-                // skip index operation because of version conflict on recovery
-                indexResult = new IndexResult(expectedVersion, seqNo, false);
-            } else {
+                index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
+
                 updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
                 index.parsedDoc().version().setLongValue(updatedVersion);
 
@@ -686,8 +692,8 @@ public class InternalEngine extends Engine {
                 }
                 indexResult = new IndexResult(updatedVersion, seqNo, deleted);
                 location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
-                        ? translog.add(new Translog.Index(index, indexResult))
-                        : null;
+                    ? translog.add(new Translog.Index(index, indexResult))
+                    : null;
                 versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
                 indexResult.setTranslogLocation(location);
             }
@@ -695,7 +701,7 @@ public class InternalEngine extends Engine {
             indexResult.freeze();
             return indexResult;
         } finally {
-            if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+            if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
                 seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
             }
         }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strengthened the test to include the replica case (it caught the issue), and incorporated your suggestion.


final long seqNo;
if (index.origin() == Operation.Origin.PRIMARY) {
if (!conflict) {
seqNo = seqNoService.generateSeqNo();
} else {
seqNo = index.seqNo();
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
} else {
seqNo = index.seqNo();
}

if (conflict) {
// skip index operation because of version conflict on recovery
indexResult = new IndexResult(expectedVersion, seqNo, false);
} else {
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

Expand Down Expand Up @@ -764,16 +781,24 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
}

final long expectedVersion = delete.version();
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
// skip executing delete because of version conflict on recovery
deleteResult = new DeleteResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, true);
} else {
final long seqNo;
if (delete.origin() == Operation.Origin.PRIMARY) {

final boolean conflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted);

final long seqNo;
if (delete.origin() == Operation.Origin.PRIMARY) {
if (!conflict) {
seqNo = seqNoService.generateSeqNo();
} else {
seqNo = delete.seqNo();
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
} else {
seqNo = delete.seqNo();
}

if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this twice now? I'm not sure this what you intended?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - it feels like this can simplified based on the origin. I do wonder if the code reuse in checkVersionConflict is worth the extra complexity it brings to reading the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I pushed 1c71393.

// skip executing delete because of version conflict on recovery
deleteResult = new DeleteResult(expectedVersion, seqNo, true);
} else {
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
if (this.globalCheckpoint <= globalCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment about when the current global checkpoint can be higher? here is what I wrote in #10708

Note that the global checkpoint is a local knowledge of that is update under the mandate of the primary. It may be that the primary information is lagging compared to a replica. This can happen when a replica is promoted to a primary (but still has stale info).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

this.globalCheckpoint = globalCheckpoint;
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
} else {
throw new IllegalArgumentException("global checkpoint from primary should never decrease. current [" +
this.globalCheckpoint + "], got [" + globalCheckpoint + "]");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery

@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()))
{
recoveryRef.status().finalizeRecovery();
try (RecoveriesCollection.RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.status().finalizeRecovery(request.globalCheckpoint());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;

Expand All @@ -29,15 +31,16 @@
public class RecoveryFinalizeRecoveryRequest extends TransportRequest {

private long recoveryId;

private ShardId shardId;
private long globalCheckpoint;

public RecoveryFinalizeRecoveryRequest() {
}

RecoveryFinalizeRecoveryRequest(long recoveryId, ShardId shardId) {
RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.globalCheckpoint = globalCheckpoint;
}

public long recoveryId() {
Expand All @@ -48,17 +51,30 @@ public ShardId shardId() {
return shardId;
}

public long globalCheckpoint() {
return globalCheckpoint;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(globalCheckpoint);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public void finalizeRecovery() {
StopWatch stopWatch = new StopWatch().start();
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(() -> {
recoveryTarget.finalizeRecovery();
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paranoia - can we flip this around and mark the target allocation as "in sync" before we give it the global checkpoint? it at least reads better as "we know you are in sync and therefore every global checkpoint advances will take you into account"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAut
}

@Override
public void finalizeRecovery() {
public void finalizeRecovery(final long globalCheckpoint) {
indexShard().updateGlobalCheckpointOnReplica(globalCheckpoint);
final IndexShard indexShard = indexShard();
indexShard.finalizeRecovery();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ public interface RecoveryTargetHandler {
void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException;

/**
* The finalize request clears unreferenced translog files, refreshes the engine now that
* new segments are available, and enables garbage collection of
* tombstone files.
**/
void finalizeRecovery();
* The finalize request clears unreferenced translog files, refreshes the engine now that new segments are available, enables garbage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct anymore - clean the translog files? (I know it was there before, it just caught my eye)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct anymore - clean the translog files? (I know it was there before, it just caught my eye)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it was already not correct. I pushed 1c71393.

* collection of tombstone files, and updates the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
*/
void finalizeRecovery(long globalCheckpoint);

/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
Expand Down Expand Up @@ -82,4 +83,5 @@ void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReferenc
* @return the allocation id of the target shard.
*/
String getTargetAllocationId();

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAut
}

@Override
public void finalizeRecovery() {
public void finalizeRecovery(final long globalCheckpoint) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId),
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Loading