Skip to content

Commit

Permalink
Fix race between replica reset and primary promotion (#32442)
Browse files Browse the repository at this point in the history
We've recently seen a number of test failures that tripped an assertion in IndexShard (see issues
linked below), leading to the discovery of a race between resetting a replica when it learns about a
higher term and when the same replica is promoted to primary. This commit fixes the race by
distinguishing between a cluster state primary term (called pendingPrimaryTerm) and a shard-level
operation term. The former is set during the cluster state update or when a replica learns about a
new primary. The latter is only incremented under the operation block, which can happen in a
delayed fashion. It also solves the issue where a replica that's still adjusting to the new term
receives a cluster state update that promotes it to primary, which can happen in the situation of
multiple nodes being shut down in short succession. In that case, the cluster state update thread
would call `asyncBlockOperations` in `updateShardState`, which in turn would throw an exception
as blocking permits is not allowed while an ongoing block is in place, subsequently failing the shard.
This commit therefore extends the IndexShardOperationPermits to allow it to queue multiple blocks
(which will all take precedence over operations acquiring permits). Finally, it also moves the primary
activation of the replication tracker under the operation block, so that the actual transition to
primary only happens under the operation block.

Relates to #32431, #32304 and #32118
  • Loading branch information
ywelsch committed Aug 3, 2018
1 parent e0320fe commit 94b30b2
Show file tree
Hide file tree
Showing 28 changed files with 470 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
switch (indexResult.getResultType()) {
case SUCCESS:
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
indexResult.getSeqNo(), indexResult.getTerm(), indexResult.getVersion(), indexResult.isCreated());
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
case FAILURE:
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
Expand All @@ -159,7 +159,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del
switch (deleteResult.getResultType()) {
case SUCCESS:
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
deleteResult.getSeqNo(), deleteResult.getTerm(), deleteResult.getVersion(), deleteResult.isFound());
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
case FAILURE:
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
Expand Down Expand Up @@ -298,7 +298,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
assert result instanceof Engine.IndexResult : result.getClass();
final IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(),
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(),
indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(),
indexResponse.getResult());
Expand All @@ -319,7 +319,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
final DeleteRequest updateDeleteRequest = translate.action();

final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(),
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());

updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
Expand Down Expand Up @@ -355,7 +355,7 @@ static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO);
final Engine.Result result = primary.getFailedIndexResult(failure, updateRequest.version());
return new BulkItemResultHolder(null, result, primaryItemRequest);
}

Expand Down Expand Up @@ -559,15 +559,15 @@ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, Ind
() ->
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
request.getAutoGeneratedTimestamp(), request.isRetry()),
e -> new Engine.IndexResult(e, request.version()),
e -> primary.getFailedIndexResult(e, request.version()),
mappingUpdater);
}

private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(),
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()),
e -> new Engine.DeleteResult(e, request.version()),
e -> primary.getFailedDeleteResult(e, request.version()),
mappingUpdater);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ private void acquirePrimaryShardReference(ShardId shardId, String allocationId,
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}
final long actualTerm = indexShard.getPrimaryTerm();
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
primaryTerm, actualTerm);
Expand Down Expand Up @@ -1000,7 +1000,7 @@ class PrimaryShardReference extends ShardReference
}

public boolean isRelocated() {
return indexShard.isPrimaryMode() == false;
return indexShard.isRelocatedPrimary();
}

@Override
Expand Down
44 changes: 26 additions & 18 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,26 +304,29 @@ public abstract static class Result {
private final Operation.TYPE operationType;
private final Result.Type resultType;
private final long version;
private final long term;
private final long seqNo;
private final Exception failure;
private final SetOnce<Boolean> freeze = new SetOnce<>();
private final Mapping requiredMappingUpdate;
private Translog.Location translogLocation;
private long took;

protected Result(Operation.TYPE operationType, Exception failure, long version, long seqNo) {
protected Result(Operation.TYPE operationType, Exception failure, long version, long term, long seqNo) {
this.operationType = operationType;
this.failure = Objects.requireNonNull(failure);
this.version = version;
this.term = term;
this.seqNo = seqNo;
this.requiredMappingUpdate = null;
this.resultType = Type.FAILURE;
}

protected Result(Operation.TYPE operationType, long version, long seqNo) {
protected Result(Operation.TYPE operationType, long version, long term, long seqNo) {
this.operationType = operationType;
this.version = version;
this.seqNo = seqNo;
this.term = term;
this.failure = null;
this.requiredMappingUpdate = null;
this.resultType = Type.SUCCESS;
Expand All @@ -333,6 +336,7 @@ protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate) {
this.operationType = operationType;
this.version = Versions.NOT_FOUND;
this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
this.term = 0L;
this.failure = null;
this.requiredMappingUpdate = requiredMappingUpdate;
this.resultType = Type.MAPPING_UPDATE_REQUIRED;
Expand All @@ -357,6 +361,10 @@ public long getSeqNo() {
return seqNo;
}

public long getTerm() {
return term;
}

/**
* If the operation was aborted due to missing mappings, this method will return the mappings
* that are required to complete the operation.
Expand Down Expand Up @@ -415,20 +423,20 @@ public static class IndexResult extends Result {

private final boolean created;

public IndexResult(long version, long seqNo, boolean created) {
super(Operation.TYPE.INDEX, version, seqNo);
public IndexResult(long version, long term, long seqNo, boolean created) {
super(Operation.TYPE.INDEX, version, term, seqNo);
this.created = created;
}

/**
* use in case of the index operation failed before getting to internal engine
**/
public IndexResult(Exception failure, long version) {
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO);
public IndexResult(Exception failure, long version, long term) {
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO);
}

public IndexResult(Exception failure, long version, long seqNo) {
super(Operation.TYPE.INDEX, failure, version, seqNo);
public IndexResult(Exception failure, long version, long term, long seqNo) {
super(Operation.TYPE.INDEX, failure, version, term, seqNo);
this.created = false;
}

Expand All @@ -447,20 +455,20 @@ public static class DeleteResult extends Result {

private final boolean found;

public DeleteResult(long version, long seqNo, boolean found) {
super(Operation.TYPE.DELETE, version, seqNo);
public DeleteResult(long version, long term, long seqNo, boolean found) {
super(Operation.TYPE.DELETE, version, term, seqNo);
this.found = found;
}

/**
* use in case of the delete operation failed before getting to internal engine
**/
public DeleteResult(Exception failure, long version) {
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
public DeleteResult(Exception failure, long version, long term) {
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
}

public DeleteResult(Exception failure, long version, long seqNo, boolean found) {
super(Operation.TYPE.DELETE, failure, version, seqNo);
public DeleteResult(Exception failure, long version, long term, long seqNo, boolean found) {
super(Operation.TYPE.DELETE, failure, version, term, seqNo);
this.found = found;
}

Expand All @@ -477,12 +485,12 @@ public boolean isFound() {

public static class NoOpResult extends Result {

NoOpResult(long seqNo) {
super(Operation.TYPE.NO_OP, 0, seqNo);
NoOpResult(long term, long seqNo) {
super(Operation.TYPE.NO_OP, term, 0, seqNo);
}

NoOpResult(long seqNo, Exception failure) {
super(Operation.TYPE.NO_OP, failure, 0, seqNo);
NoOpResult(long term, long seqNo, Exception failure) {
super(Operation.TYPE.NO_OP, failure, term, 0, seqNo);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,10 @@ protected long doGenerateSeqNoForOperation(final Operation operation) {
return localCheckpointTracker.generateSeqNo();
}

private long getPrimaryTerm() {
return engineConfig.getPrimaryTermSupplier().getAsLong();
}

@Override
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), uidField) : index.uid().field();
Expand Down Expand Up @@ -842,7 +846,7 @@ public IndexResult index(Index index) throws IOException {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
Expand Down Expand Up @@ -963,7 +967,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e =
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
generateSeqNoForOperation(index),
Expand Down Expand Up @@ -993,7 +997,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
addDocs(index.docs(), indexWriter);
}
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
return new IndexResult(plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure.
Expand All @@ -1009,7 +1013,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
* we return a `MATCH_ANY` version to indicate no document was index. The value is
* not used anyway
*/
return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing);
return new IndexResult(ex, Versions.MATCH_ANY, getPrimaryTerm(), plan.seqNoForIndexing);
} else {
throw ex;
}
Expand Down Expand Up @@ -1082,8 +1086,8 @@ static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
}

static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
final IndexResult result = new IndexResult(e, currentVersion);
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) {
final IndexResult result = new IndexResult(e, currentVersion, term);
return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
}
Expand Down Expand Up @@ -1161,7 +1165,7 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult = deleteInLucene(delete, plan);
} else {
deleteResult = new DeleteResult(
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
}
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
Expand Down Expand Up @@ -1250,7 +1254,7 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException
final DeletionStrategy plan;
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
} else {
plan = DeletionStrategy.processNormally(
currentlyDeleted,
Expand All @@ -1273,12 +1277,12 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()));
return new DeleteResult(
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
// there is no tragic event and such it must be a document level failure
return new DeleteResult(
ex, plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
ex, plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
} else {
throw ex;
}
Expand Down Expand Up @@ -1309,9 +1313,9 @@ private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted,
}

static DeletionStrategy skipDueToVersionConflict(
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false);
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);
return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
}

Expand Down Expand Up @@ -1340,7 +1344,7 @@ public NoOpResult noOp(final NoOp noOp) {
try (ReleasableLock ignored = readLock.acquire()) {
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
noOpResult = new NoOpResult(noOp.seqNo(), e);
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
}
return noOpResult;
}
Expand All @@ -1350,7 +1354,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
final NoOpResult noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
Expand Down
Loading

0 comments on commit 94b30b2

Please sign in to comment.