-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tighten sequence numbers recovery #22212
Changes from 1 commit
3164819
1c71393
3c37f4b
f57eb99
2a8d069
b9f68d4
3935af2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -175,8 +175,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { | |
throw new IllegalArgumentException(openMode.toString()); | ||
} | ||
logger.trace("recovered [{}]", seqNoStats); | ||
indexWriter = writer; | ||
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); | ||
// norelease | ||
/* | ||
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit. These means that we there | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: "are in the lucene commit" -> "are in the lucene commit or the translog generation associated with it" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 1c71393. |
||
* might be operations greater than the local checkpoint that will not be replayed. Here we force the local checkpoint to | ||
* the maximum sequence number in the commit (at the potential expense of correctness). | ||
*/ | ||
while (seqNoService.getLocalCheckpoint() < seqNoService.getMaxSeqNo()) { | ||
final long next = seqNoService.getLocalCheckpoint() + 1; | ||
seqNoService.markSeqNoAsCompleted(next); | ||
} | ||
indexWriter = writer; | ||
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint); | ||
assert translog.getGeneration() != null; | ||
} catch (IOException | TranslogCorruptedException e) { | ||
|
@@ -638,16 +648,23 @@ private IndexResult innerIndex(Index index) throws IOException { | |
} | ||
} | ||
final long expectedVersion = index.version(); | ||
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) { | ||
// skip index operation because of version conflict on recovery | ||
indexResult = new IndexResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, false); | ||
} else { | ||
final long seqNo; | ||
if (index.origin() == Operation.Origin.PRIMARY) { | ||
final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if this is a replica and we have a version conflict and throws an exception? I think we still end up not marking the seq no as completed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about something like this?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I strengthened the test to include the replica case (it caught the issue), and incorporated your suggestion. |
||
|
||
final long seqNo; | ||
if (index.origin() == Operation.Origin.PRIMARY) { | ||
if (!conflict) { | ||
seqNo = seqNoService.generateSeqNo(); | ||
} else { | ||
seqNo = index.seqNo(); | ||
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; | ||
} | ||
} else { | ||
seqNo = index.seqNo(); | ||
} | ||
|
||
if (conflict) { | ||
// skip index operation because of version conflict on recovery | ||
indexResult = new IndexResult(expectedVersion, seqNo, false); | ||
} else { | ||
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); | ||
index.parsedDoc().version().setLongValue(updatedVersion); | ||
|
||
|
@@ -764,16 +781,24 @@ private DeleteResult innerDelete(Delete delete) throws IOException { | |
} | ||
|
||
final long expectedVersion = delete.version(); | ||
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { | ||
// skip executing delete because of version conflict on recovery | ||
deleteResult = new DeleteResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, true); | ||
} else { | ||
final long seqNo; | ||
if (delete.origin() == Operation.Origin.PRIMARY) { | ||
|
||
final boolean conflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted); | ||
|
||
final long seqNo; | ||
if (delete.origin() == Operation.Origin.PRIMARY) { | ||
if (!conflict) { | ||
seqNo = seqNoService.generateSeqNo(); | ||
} else { | ||
seqNo = delete.seqNo(); | ||
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; | ||
} | ||
} else { | ||
seqNo = delete.seqNo(); | ||
} | ||
|
||
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we do this twice now? I'm not sure this what you intended? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also - it feels like this can simplified based on the origin. I do wonder if the code reuse in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops. I pushed 1c71393. |
||
// skip executing delete because of version conflict on recovery | ||
deleteResult = new DeleteResult(expectedVersion, seqNo, true); | ||
} else { | ||
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); | ||
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); | ||
deleteResult = new DeleteResult(updatedVersion, seqNo, found); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -152,9 +152,6 @@ synchronized void updateCheckpointOnReplica(long globalCheckpoint) { | |
if (this.globalCheckpoint <= globalCheckpoint) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a comment about when the current global checkpoint can be higher? here is what I wrote in #10708
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 1c71393. |
||
this.globalCheckpoint = globalCheckpoint; | ||
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint); | ||
} else { | ||
throw new IllegalArgumentException("global checkpoint from primary should never decrease. current [" + | ||
this.globalCheckpoint + "], got [" + globalCheckpoint + "]"); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -391,7 +391,7 @@ public void finalizeRecovery() { | |
StopWatch stopWatch = new StopWatch().start(); | ||
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode()); | ||
cancellableThreads.execute(() -> { | ||
recoveryTarget.finalizeRecovery(); | ||
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()); | ||
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. paranoia - can we flip this around and mark the target allocation as "in sync" before we give it the global checkpoint? it at least reads better as "we know you are in sync and therefore every global checkpoint advances will take you into account" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 1c71393. |
||
}); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,11 +39,12 @@ public interface RecoveryTargetHandler { | |
void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException; | ||
|
||
/** | ||
* The finalize request clears unreferenced translog files, refreshes the engine now that | ||
* new segments are available, and enables garbage collection of | ||
* tombstone files. | ||
**/ | ||
void finalizeRecovery(); | ||
* The finalize request clears unreferenced translog files, refreshes the engine now that new segments are available, enables garbage | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is correct anymore - clean the translog files? (I know it was there before, it just caught my eye) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is correct anymore - clean the translog files? (I know it was there before, it just caught my eye) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, it was already not correct. I pushed 1c71393. |
||
* collection of tombstone files, and updates the global checkpoint. | ||
* | ||
* @param globalCheckpoint the global checkpoint on the recovery source | ||
*/ | ||
void finalizeRecovery(long globalCheckpoint); | ||
|
||
/** | ||
* Blockingly waits for cluster state with at least clusterStateVersion to be available | ||
|
@@ -82,4 +83,5 @@ void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReferenc | |
* @return the allocation id of the target shard. | ||
*/ | ||
String getTargetAllocationId(); | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe assert at the end of this method that the seqNo is set on the replica request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 1c71393.