Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global checkpoint to translog checkpoints #21254

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
Expand Down Expand Up @@ -116,7 +117,6 @@ public class InternalEngine extends Engine {

private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
Expand Down Expand Up @@ -153,7 +153,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
try {
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer);
final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer);
if (logger.isTraceEnabled()) {
logger.trace(
"recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]",
Expand All @@ -167,7 +167,9 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
seqNoStats.getGlobalCheckpoint(),
this::onGlobalCheckpointUpdate
);
indexWriter = writer;
translog = openTranslog(engineConfig, writer);
assert translog.getGeneration() != null;
Expand Down Expand Up @@ -258,6 +260,8 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc
}

private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException {
assert openMode != null;
assert seqNoService != null;
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
Translog.TranslogGeneration generation = null;
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
Expand All @@ -266,11 +270,11 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr
if (generation == null) {
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
}
if (generation != null && generation.translogUUID == null) {
if (generation.translogUUID == null) {
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, generation);
final Translog translog = new Translog(translogConfig, generation, seqNoService::getGlobalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - shall we make this a parameter to the method, like the other things?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 6cbed98.

if (generation == null || generation.translogUUID == null) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
Expand Down Expand Up @@ -322,24 +326,52 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
return null;
}

private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
/**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog
* checkpoint (global checkpoint).
*
* @param engineConfig the engine configuration (for the open mode and the translog path)
* @param writer the index writer (for the Lucene commit point)
* @return the sequence number stats
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
*/
private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little funky as we read some stuff from given parameters and some stuff from disk but then only if we're not supposed to ignore the translog. How about never reading the global checkpoint from the translog on opening and make it part of the recovery from translog? also it means peer recovery will need to also pass the global checkpoint as part the engine opening (but we can do this a follow up - add a no commit please if so)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 19d4db0.

long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
maxSeqNo = Long.parseLong(entry.getValue());
}
}

final long globalCheckpoint;
if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

/**
* Sync the translog after the global checkpoint is updated.
*/
void onGlobalCheckpointUpdate() {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
translog.sync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this all runs on an indexing thread, i'm concerned it will make us fsync twice. How about not fsync here but rather trigger a sync from the global checkpoint sync action?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 86ec4d0.

} catch (final IOException e) {
maybeFailEngine("on global checkpoint update", e);
throw new EngineException(shardId, "failed on global checkpoint update", e);
}
}

private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
Expand Down Expand Up @@ -1312,25 +1344,21 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint());

writer.setLiveCommitData(() -> {
/**
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values.
* The maximum sequence number is different - we never want the maximum sequence number to be
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
* of re-using a sequence number for two different documents when restoring from this commit
* point and subsequently writing new documents to the index. Since we only know which Lucene
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
* segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
* of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -88,4 +89,5 @@ public String toString() {
", globalCheckpoint=" + globalCheckpoint +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,32 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {

final LocalCheckpointService localCheckpointService;
final GlobalCheckpointService globalCheckpointService;
private final Runnable onGlobalCheckpointUpdate;

/**
* Initialize the sequence number service. The {@code maxSeqNo}
* should be set to the last sequence number assigned by this
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED},
* {@code localCheckpoint} should be set to the last known local
* checkpoint for this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}, and
* {@code globalCheckpoint} should be set to the last known global
* checkpoint for this shard, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG what happened here :)

* {@link SequenceNumbersService#NO_OPS_PERFORMED}, {@code localCheckpoint} should be set to the last known local checkpoint for this
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, and {@code globalCheckpoint} should be set to the last known global
* checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard this service is providing tracking
* local checkpoints for
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this
* shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard,
* or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param globalCheckpoint the last known global checkpoint for this shard,
* or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* @param shardId the shard this service is providing tracking local checkpoints for
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* @param onGlobalCheckpointUpdate invoked when the global checkpoint is updated
*/
public SequenceNumbersService(
final ShardId shardId,
final IndexSettings indexSettings,
final long maxSeqNo,
final long localCheckpoint,
final long globalCheckpoint) {
final long globalCheckpoint,
final Runnable onGlobalCheckpointUpdate) {
super(shardId, indexSettings);
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint);
this.onGlobalCheckpointUpdate = onGlobalCheckpointUpdate;
}

/**
Expand Down Expand Up @@ -100,8 +94,7 @@ public void markSeqNoAsCompleted(long seqNo) {
* Gets sequence number related stats
*/
public SeqNoStats stats() {
return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint(),
globalCheckpointService.getCheckpoint());
return new SeqNoStats(getMaxSeqNo(), getLocalCheckpoint(), getGlobalCheckpoint());
}

/**
Expand Down Expand Up @@ -135,6 +128,7 @@ public long getGlobalCheckpoint() {
*/
public void updateGlobalCheckpointOnReplica(long checkpoint) {
globalCheckpointService.updateCheckpointOnReplica(checkpoint);
onGlobalCheckpointUpdate.run();
}

/**
Expand All @@ -155,6 +149,10 @@ public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<S
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
final boolean maybeUpdateGlobalCheckpoint = globalCheckpointService.updateCheckpointOnPrimary();
if (maybeUpdateGlobalCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is what we want? maybeUpdateGlobalCheckpoint may be true because the global checkpoint isn't updated but we miss some local checkpoint. I think it's still ok to call this runnable, but we should document it if we keep it like this. Alternatively maybe we should split this method in two - updateGlobalCheckpointOnPrimary and needsLocalCheckpointSync and call those from IndexShard's updateGlobalCheckpointOnPrimary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted the changes to this method, they are not needed after 86ec4d0.

onGlobalCheckpointUpdate.run();
}
return maybeUpdateGlobalCheckpoint;
}
}
Loading