Skip to content

Commit

Permalink
This is not the commit message you're looking for
Browse files Browse the repository at this point in the history
May the git push --force be with you if you require a more elaborative
commit message.
  • Loading branch information
jasontedor committed Dec 16, 2016
1 parent 3164819 commit 1c71393
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -184,6 +185,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}

// update the bulk item request because update request execution can mutate the bulk item request
request.items()[requestIndex] = replicaRequest;
if (operationResult == null) { // in case of noop update operation
Expand Down Expand Up @@ -346,6 +348,10 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
}
assert (replicaRequest.request() instanceof IndexRequest
&& ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
(replicaRequest.request() instanceof DeleteRequest
&& ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
// successful operation
break; // out of retry loop
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
Expand All @@ -368,10 +374,10 @@ protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, I
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
operationResult = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
operationResult = executeIndexRequestOnReplica((IndexRequest) docWriteRequest, replica);
break;
case DELETE:
operationResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
operationResult = executeDeleteRequestOnReplica((DeleteRequest) docWriteRequest, replica);
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit. These means that we there
* might be operations greater than the local checkpoint that will not be replayed. Here we force the local checkpoint to
* the maximum sequence number in the commit (at the potential expense of correctness).
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
* that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
* checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
*/
while (seqNoService.getLocalCheckpoint() < seqNoService.getMaxSeqNo()) {
final long next = seqNoService.getLocalCheckpoint() + 1;
Expand Down Expand Up @@ -795,7 +795,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
seqNo = delete.seqNo();
}

if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
if (conflict) {
// skip executing delete because of version conflict on recovery
deleteResult = new DeleteResult(expectedVersion, seqNo, true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public synchronized long getCheckpoint() {
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
/*
* The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary
* information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other
* replica shards). In these cases, the local knowledge of the global checkpoint could be higher than sync from the lagging primary.
*/
if (this.globalCheckpoint <= globalCheckpoint) {
this.globalCheckpoint = globalCheckpoint;
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ public void finalizeRecovery() {
StopWatch stopWatch = new StopWatch().start();
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(() -> {
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId());
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
});

if (request.isPrimaryRelocation()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public interface RecoveryTargetHandler {
void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException;

/**
* The finalize request clears unreferenced translog files, refreshes the engine now that new segments are available, enables garbage
* collection of tombstone files, and updates the global checkpoint.
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
* updates the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2990,6 +2990,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() {
}
}

final boolean exists = operations.get(operations.size() - 1) instanceof Engine.Index;
Randomness.shuffle(operations);

for (final Engine.Operation operation : operations) {
Expand All @@ -3001,6 +3002,8 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() {
}

assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo((long) (numberOfOperations - 1)));
final Engine.GetResult result = engine.get(new Engine.Get(true, uid));
assertThat(result.exists(), equalTo(exists));
}

private Engine.Index indexOperation(final Term uid, final ParsedDocument doc, final int seqNo, final int version) {
Expand All @@ -3010,7 +3013,7 @@ private Engine.Index indexOperation(final Term uid, final ParsedDocument doc, fi
seqNo,
1,
version,
VersionType.INTERNAL,
VersionType.EXTERNAL,
Engine.Operation.Origin.PEER_RECOVERY,
System.nanoTime(),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
Expand All @@ -3025,7 +3028,7 @@ private Engine.Delete deleteOperation(final String type, final String id, final
seqNo,
1,
version,
VersionType.INTERNAL,
VersionType.EXTERNAL,
Engine.Operation.Origin.PEER_RECOVERY,
System.nanoTime());
}
Expand Down

0 comments on commit 1c71393

Please sign in to comment.