-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce sequence-number-based recovery #22484
Changes from 6 commits
404b8dc
d360a23
6ec0ef6
b9200cf
91e1ff0
1c14260
8b0e501
dac513a
81a1e1c
8960522
d71aa16
b6e6cc3
ab18bde
cea70f4
34fbb37
1929c03
c0169c2
999ca91
7281b75
320301c
50d3191
2596dcd
cc2002c
1e59f84
cc30114
781f276
f64b26f
3a70bd7
88fc801
6a0b70e
4e35141
3571036
da94b28
e801c5b
a4cf33e
c16295a
32c6702
76f8807
adafa21
270a68a
2e67a0b
06a3785
137ffb4
eeaa4f9
62aabb0
97e0b20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -361,7 +361,7 @@ public Operation.TYPE getOperationType() { | |
|
||
void setTranslogLocation(Translog.Location translogLocation) { | ||
if (freeze.get() == null) { | ||
assert failure == null : "failure has to be null to set translog location"; | ||
assert failure == null || translogLocation == null: "failure has to be null to set translog location"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering - was this required for this PR or is it preparation for the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change will no longer be necessary after #22626. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 8b0e501. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also pushed cea70f4. |
||
this.translogLocation = translogLocation; | ||
} else { | ||
throw new IllegalStateException("result is already frozen"); | ||
|
@@ -379,6 +379,7 @@ void setTook(long took) { | |
void freeze() { | ||
freeze.set(true); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here |
||
} | ||
|
||
public static class IndexResult extends Result { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,7 @@ | |
import org.elasticsearch.index.merge.MergeStats; | ||
import org.elasticsearch.index.merge.OnGoingMerge; | ||
import org.elasticsearch.index.seqno.SeqNoStats; | ||
import org.elasticsearch.index.seqno.SequenceNumbers; | ||
import org.elasticsearch.index.seqno.SequenceNumbersService; | ||
import org.elasticsearch.index.shard.ElasticsearchMergePolicy; | ||
import org.elasticsearch.index.shard.ShardId; | ||
|
@@ -119,8 +120,6 @@ public class InternalEngine extends Engine { | |
private final IndexThrottle throttle; | ||
|
||
private final SequenceNumbersService seqNoService; | ||
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; | ||
static final String MAX_SEQ_NO = "max_seq_no"; | ||
|
||
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges | ||
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling | ||
|
@@ -365,7 +364,7 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) | |
private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog( | ||
final TranslogConfig translogConfig, | ||
final IndexWriter indexWriter) throws IOException { | ||
long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()); | ||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()); | ||
return loadSeqNoStatsFromLucene(globalCheckpoint, indexWriter); | ||
} | ||
|
||
|
@@ -378,20 +377,7 @@ private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog( | |
* @return the sequence number stats | ||
*/ | ||
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) { | ||
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; | ||
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; | ||
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) { | ||
final String key = entry.getKey(); | ||
if (key.equals(LOCAL_CHECKPOINT_KEY)) { | ||
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED; | ||
localCheckpoint = Long.parseLong(entry.getValue()); | ||
} else if (key.equals(MAX_SEQ_NO)) { | ||
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint; | ||
maxSeqNo = Long.parseLong(entry.getValue()); | ||
} | ||
} | ||
|
||
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); | ||
return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, indexWriter.getLiveCommitData()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this all worth it? I wonder if we should just load from store in the constructor and save on this method:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree; I pushed 1929c03 (I think that this will also fit better with future developments). |
||
} | ||
|
||
private SearcherManager createSearcherManager() throws EngineException { | ||
|
@@ -684,13 +670,20 @@ private IndexResult innerIndex(Index index) throws IOException { | |
final IndexResult indexResult; | ||
if (checkVersionConflictResult.isPresent()) { | ||
indexResult = checkVersionConflictResult.get(); | ||
// norelease: this is not correct as this does not force an fsync, and we need to handle failures including replication | ||
if (indexResult.hasFailure() || seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We discussed another solution for this problem in another channel. The gist was to change the way we deal with version conflicts on replicas. The idea was to try do it as another first and then base this PR on it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opened #22626 for this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
location = null; | ||
} else { | ||
final Translog.NoOp operation = new Translog.NoOp(seqNo, index.primaryTerm(), "version conflict during recovery"); | ||
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null; | ||
} | ||
} else { | ||
// no version conflict | ||
if (index.origin() == Operation.Origin.PRIMARY) { | ||
seqNo = seqNoService().generateSeqNo(); | ||
} | ||
|
||
/** | ||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a Javadoc-style comment inside a method where it has no impact on |
||
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence | ||
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The | ||
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. | ||
|
@@ -707,12 +700,11 @@ private IndexResult innerIndex(Index index) throws IOException { | |
update(index.uid(), index.docs(), indexWriter); | ||
} | ||
indexResult = new IndexResult(updatedVersion, seqNo, deleted); | ||
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY | ||
? translog.add(new Translog.Index(index, indexResult)) | ||
: null; | ||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); | ||
indexResult.setTranslogLocation(location); | ||
final Translog.Index operation = new Translog.Index(index, indexResult); | ||
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null; | ||
} | ||
indexResult.setTranslogLocation(location); | ||
indexResult.setTook(System.nanoTime() - index.startTime()); | ||
indexResult.freeze(); | ||
return indexResult; | ||
|
@@ -816,21 +808,26 @@ private DeleteResult innerDelete(Delete delete) throws IOException { | |
final DeleteResult deleteResult; | ||
if (result.isPresent()) { | ||
deleteResult = result.get(); | ||
// norelease: this is not correct as this does not force an fsync, and we need to handle failures including replication | ||
if (deleteResult.hasFailure() || seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
location = null; | ||
} else { | ||
final Translog.NoOp operation = new Translog.NoOp(seqNo, delete.primaryTerm(), "version conflict during recovery"); | ||
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null; | ||
} | ||
} else { | ||
if (delete.origin() == Operation.Origin.PRIMARY) { | ||
seqNo = seqNoService().generateSeqNo(); | ||
} | ||
|
||
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); | ||
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); | ||
deleteResult = new DeleteResult(updatedVersion, seqNo, found); | ||
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY | ||
? translog.add(new Translog.Delete(delete, deleteResult)) | ||
: null; | ||
versionMap.putUnderLock(delete.uid().bytes(), | ||
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); | ||
deleteResult.setTranslogLocation(location); | ||
final Translog.Delete operation = new Translog.Delete(delete, deleteResult); | ||
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null; | ||
} | ||
deleteResult.setTranslogLocation(location); | ||
deleteResult.setTook(System.nanoTime() - delete.startTime()); | ||
deleteResult.freeze(); | ||
return deleteResult; | ||
|
@@ -1552,11 +1549,11 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn | |
final Map<String, String> commitData = new HashMap<>(6); | ||
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen); | ||
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); | ||
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint); | ||
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint); | ||
if (syncId != null) { | ||
commitData.put(Engine.SYNC_COMMIT_ID, syncId); | ||
} | ||
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); | ||
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("committing writer with commit data [{}]", commitData); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a typo here