-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add global checkpoint to translog checkpoints #21254
Changes from 1 commit
8bea450
37a50bb
19d4db0
86ec4d0
6cbed98
d742a68
6c1338c
37af724
90f7b60
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ | |
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.common.logging.ESLoggerFactory; | ||
import org.elasticsearch.common.lucene.LoggerInfoStream; | ||
import org.elasticsearch.common.lucene.Lucene; | ||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; | ||
|
@@ -116,7 +117,6 @@ public class InternalEngine extends Engine { | |
|
||
private final SequenceNumbersService seqNoService; | ||
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; | ||
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; | ||
static final String MAX_SEQ_NO = "max_seq_no"; | ||
|
||
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges | ||
|
@@ -153,7 +153,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { | |
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); | ||
try { | ||
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); | ||
final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer); | ||
final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace( | ||
"recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", | ||
|
@@ -167,7 +167,9 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { | |
engineConfig.getIndexSettings(), | ||
seqNoStats.getMaxSeqNo(), | ||
seqNoStats.getLocalCheckpoint(), | ||
seqNoStats.getGlobalCheckpoint()); | ||
seqNoStats.getGlobalCheckpoint(), | ||
this::onGlobalCheckpointUpdate | ||
); | ||
indexWriter = writer; | ||
translog = openTranslog(engineConfig, writer); | ||
assert translog.getGeneration() != null; | ||
|
@@ -258,6 +260,8 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc | |
} | ||
|
||
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException { | ||
assert openMode != null; | ||
assert seqNoService != null; | ||
final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); | ||
Translog.TranslogGeneration generation = null; | ||
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { | ||
|
@@ -266,11 +270,11 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr | |
if (generation == null) { | ||
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); | ||
} | ||
if (generation != null && generation.translogUUID == null) { | ||
if (generation.translogUUID == null) { | ||
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); | ||
} | ||
} | ||
final Translog translog = new Translog(translogConfig, generation); | ||
final Translog translog = new Translog(translogConfig, generation, seqNoService::getGlobalCheckpoint); | ||
if (generation == null || generation.translogUUID == null) { | ||
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " | ||
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; | ||
|
@@ -322,24 +326,52 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) | |
return null; | ||
} | ||
|
||
private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException { | ||
/** | ||
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog | ||
* checkpoint (global checkpoint). | ||
* | ||
* @param engineConfig the engine configuration (for the open mode and the translog path) | ||
* @param writer the index writer (for the Lucene commit point) | ||
* @return the sequence number stats | ||
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint | ||
*/ | ||
private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a little funky as we read some stuff from given parameters and some stuff from disk but then only if we're not supposed to ignore the translog. How about never reading the global checkpoint from the translog on opening and make it part of the recovery from translog? also it means peer recovery will need to also pass the global checkpoint as part the engine opening (but we can do this a follow up - add a no commit please if so) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 19d4db0. |
||
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; | ||
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; | ||
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; | ||
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) { | ||
final String key = entry.getKey(); | ||
if (key.equals(LOCAL_CHECKPOINT_KEY)) { | ||
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED; | ||
localCheckpoint = Long.parseLong(entry.getValue()); | ||
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) { | ||
globalCheckpoint = Long.parseLong(entry.getValue()); | ||
} else if (key.equals(MAX_SEQ_NO)) { | ||
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint; | ||
maxSeqNo = Long.parseLong(entry.getValue()); | ||
} | ||
} | ||
|
||
final long globalCheckpoint; | ||
if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { | ||
globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); | ||
} else { | ||
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; | ||
} | ||
|
||
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); | ||
} | ||
|
||
/** | ||
* Sync the translog after the global checkpoint is updated. | ||
*/ | ||
void onGlobalCheckpointUpdate() { | ||
try (ReleasableLock ignored = readLock.acquire()) { | ||
ensureOpen(); | ||
translog.sync(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this all runs on an indexing thread, i'm concerned it will make us fsync twice. How about not fsync here but rather trigger a sync from the global checkpoint sync action? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 86ec4d0. |
||
} catch (final IOException e) { | ||
maybeFailEngine("on global checkpoint update", e); | ||
throw new EngineException(shardId, "failed on global checkpoint update", e); | ||
} | ||
} | ||
|
||
private SearcherManager createSearcherManager() throws EngineException { | ||
boolean success = false; | ||
SearcherManager searcherManager = null; | ||
|
@@ -1312,25 +1344,21 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn | |
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration); | ||
final String translogUUID = translogGeneration.translogUUID; | ||
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint()); | ||
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint()); | ||
|
||
writer.setLiveCommitData(() -> { | ||
/** | ||
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated | ||
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values. | ||
* The maximum sequence number is different - we never want the maximum sequence number to be | ||
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk | ||
* of re-using a sequence number for two different documents when restoring from this commit | ||
* point and subsequently writing new documents to the index. Since we only know which Lucene | ||
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes | ||
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit | ||
* data iterator (which occurs after all documents have been flushed to Lucene). | ||
/* | ||
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes | ||
* segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want | ||
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the | ||
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently | ||
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the | ||
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation | ||
* of the commit data iterator (which occurs after all documents have been flushed to Lucene). | ||
*/ | ||
final Map<String, String> commitData = new HashMap<>(6); | ||
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen); | ||
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); | ||
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint); | ||
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint); | ||
if (syncId != null) { | ||
commitData.put(Engine.SYNC_COMMIT_ID, syncId); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,38 +38,32 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { | |
|
||
final LocalCheckpointService localCheckpointService; | ||
final GlobalCheckpointService globalCheckpointService; | ||
private final Runnable onGlobalCheckpointUpdate; | ||
|
||
/** | ||
* Initialize the sequence number service. The {@code maxSeqNo} | ||
* should be set to the last sequence number assigned by this | ||
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, | ||
* {@code localCheckpoint} should be set to the last known local | ||
* checkpoint for this shard, or | ||
* {@link SequenceNumbersService#NO_OPS_PERFORMED}, and | ||
* {@code globalCheckpoint} should be set to the last known global | ||
* checkpoint for this shard, or | ||
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. | ||
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OMG what happened here :) |
||
* {@link SequenceNumbersService#NO_OPS_PERFORMED}, {@code localCheckpoint} should be set to the last known local checkpoint for this | ||
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, and {@code globalCheckpoint} should be set to the last known global | ||
* checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. | ||
* | ||
* @param shardId the shard this service is providing tracking | ||
* local checkpoints for | ||
* @param indexSettings the index settings | ||
* @param maxSeqNo the last sequence number assigned by this | ||
* shard, or | ||
* {@link SequenceNumbersService#NO_OPS_PERFORMED} | ||
* @param localCheckpoint the last known local checkpoint for this shard, | ||
* or {@link SequenceNumbersService#NO_OPS_PERFORMED} | ||
* @param globalCheckpoint the last known global checkpoint for this shard, | ||
* or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} | ||
* @param shardId the shard this service is providing tracking local checkpoints for | ||
* @param indexSettings the index settings | ||
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED} | ||
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED} | ||
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} | ||
* @param onGlobalCheckpointUpdate invoked when the global checkpoint is updated | ||
*/ | ||
public SequenceNumbersService( | ||
final ShardId shardId, | ||
final IndexSettings indexSettings, | ||
final long maxSeqNo, | ||
final long localCheckpoint, | ||
final long globalCheckpoint) { | ||
final long globalCheckpoint, | ||
final Runnable onGlobalCheckpointUpdate) { | ||
super(shardId, indexSettings); | ||
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint); | ||
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint); | ||
this.onGlobalCheckpointUpdate = onGlobalCheckpointUpdate; | ||
} | ||
|
||
/** | ||
|
@@ -100,8 +94,7 @@ public void markSeqNoAsCompleted(long seqNo) { | |
* Gets sequence number related stats | ||
*/ | ||
public SeqNoStats stats() { | ||
return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint(), | ||
globalCheckpointService.getCheckpoint()); | ||
return new SeqNoStats(getMaxSeqNo(), getLocalCheckpoint(), getGlobalCheckpoint()); | ||
} | ||
|
||
/** | ||
|
@@ -135,6 +128,7 @@ public long getGlobalCheckpoint() { | |
*/ | ||
public void updateGlobalCheckpointOnReplica(long checkpoint) { | ||
globalCheckpointService.updateCheckpointOnReplica(checkpoint); | ||
onGlobalCheckpointUpdate.run(); | ||
} | ||
|
||
/** | ||
|
@@ -155,6 +149,10 @@ public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<S | |
* of one of the active allocations is not known. | ||
*/ | ||
public boolean updateGlobalCheckpointOnPrimary() { | ||
return globalCheckpointService.updateCheckpointOnPrimary(); | ||
final boolean maybeUpdateGlobalCheckpoint = globalCheckpointService.updateCheckpointOnPrimary(); | ||
if (maybeUpdateGlobalCheckpoint) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure this is what we want? maybeUpdateGlobalCheckpoint may be true because the global checkpoint isn't updated but we miss some local checkpoint. I think it's still ok to call this runnable, but we should document it if we keep it like this. Alternatively maybe we should split this method in two - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've reverted the changes to this method, they are not needed after 86ec4d0. |
||
onGlobalCheckpointUpdate.run(); | ||
} | ||
return maybeUpdateGlobalCheckpoint; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - shall we make this a parameter to the method, like the other things?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 6cbed98.