Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global checkpoint to translog checkpoints #21254

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
Expand Down Expand Up @@ -83,6 +84,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongSupplier;

public class InternalEngine extends Engine {

Expand Down Expand Up @@ -116,7 +118,6 @@ public class InternalEngine extends Engine {

private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
Expand Down Expand Up @@ -153,7 +154,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
try {
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer);
final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer);
if (logger.isTraceEnabled()) {
logger.trace(
"recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]",
Expand All @@ -169,7 +170,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
indexWriter = writer;
translog = openTranslog(engineConfig, writer);
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -257,7 +258,8 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc
}
}

private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException {
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, LongSupplier globalCheckpointSupplier) throws IOException {
assert openMode != null;
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
Translog.TranslogGeneration generation = null;
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
Expand All @@ -266,11 +268,11 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr
if (generation == null) {
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
}
if (generation != null && generation.translogUUID == null) {
if (generation.translogUUID == null) {
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, generation);
final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
if (generation == null || generation.translogUUID == null) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
Expand Down Expand Up @@ -322,21 +324,37 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
return null;
}

private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
/**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog
* checkpoint (global checkpoint).
*
* @param engineConfig the engine configuration (for the open mode and the translog path)
* @param writer the index writer (for the Lucene commit point)
* @return the sequence number stats
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
*/
private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little funky as we read some stuff from given parameters and some stuff from disk but then only if we're not supposed to ignore the translog. How about never reading the global checkpoint from the translog on opening and make it part of the recovery from translog? also it means peer recovery will need to also pass the global checkpoint as part the engine opening (but we can do this a follow up - add a no commit please if so)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 19d4db0.

long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
maxSeqNo = Long.parseLong(entry.getValue());
}
}

// nocommit: reading this should be part of recovery from the translog
final long globalCheckpoint;
if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

Expand Down Expand Up @@ -1312,25 +1330,21 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint());

writer.setLiveCommitData(() -> {
/**
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values.
* The maximum sequence number is different - we never want the maximum sequence number to be
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
* of re-using a sequence number for two different documents when restoring from this commit
* point and subsequently writing new documents to the index. Since we only know which Lucene
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
* segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
* of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;

public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest,
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
Expand All @@ -64,10 +66,11 @@ protected ReplicationResponse newResponseInstance() {
}

@Override
protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) {
protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) throws Exception {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
long checkpoint = indexShard.getGlobalCheckpoint();
syncTranslog(indexShard);
return new PrimaryResult(new ReplicaRequest(request, checkpoint), new ReplicationResponse());
}

Expand All @@ -76,9 +79,19 @@ protected ReplicaResult shardOperationOnReplica(ReplicaRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
indexShard.updateGlobalCheckpointOnReplica(request.checkpoint);
syncTranslog(indexShard);
return new ReplicaResult();
}

private void syncTranslog(final IndexShard indexShard) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the conversion to an unchecked exceptions? shardOperationOnX Allows throwing them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes This is correct for shardOperationOnPrimary (declares checked Exception) but not shardOperationOnReplica (does not declare any checked exceptions). I did push d742a68 since this was missing from the signature for GlobalCheckpointSyncAction#shardOperationOnPrimary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In master both shardOperationOnPrimary and shardOperationOnReplica declare the top-level checked exception. We can pick this up and remove the wrapping after integrating master into feature/seq_no. I pushed 6c1338c.

try {
indexShard.getTranslog().sync();
} catch (final IOException e) {
// nocommit: no need to wrap this exception after integrating master into feature/seq_no
throw new UncheckedIOException("failed to sync translog after updating global checkpoint for shard " + indexShard.shardId(), e);
}
}

public void updateCheckpointForShard(ShardId shardId) {
execute(new PrimaryRequest(shardId), new ActionListener<ReplicationResponse>() {
@Override
Expand Down Expand Up @@ -135,4 +148,5 @@ public long getCheckpoint() {
return checkpoint;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -88,4 +89,5 @@ public String toString() {
", globalCheckpoint=" + globalCheckpoint +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
final GlobalCheckpointService globalCheckpointService;

/**
* Initialize the sequence number service. The {@code maxSeqNo}
* should be set to the last sequence number assigned by this
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED},
* {@code localCheckpoint} should be set to the last known local
* checkpoint for this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}, and
* {@code globalCheckpoint} should be set to the last known global
* checkpoint for this shard, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG what happened here :)

* {@link SequenceNumbersService#NO_OPS_PERFORMED}, {@code localCheckpoint} should be set to the last known local checkpoint for this
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, and {@code globalCheckpoint} should be set to the last known global
* checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard this service is providing tracking
* local checkpoints for
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this
* shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard,
* or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param globalCheckpoint the last known global checkpoint for this shard,
* or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* @param shardId the shard this service is providing tracking local checkpoints for
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/
public SequenceNumbersService(
final ShardId shardId,
Expand Down Expand Up @@ -100,8 +90,7 @@ public void markSeqNoAsCompleted(long seqNo) {
* Gets sequence number related stats
*/
public SeqNoStats stats() {
return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint(),
globalCheckpointService.getCheckpoint());
return new SeqNoStats(getMaxSeqNo(), getLocalCheckpoint(), getGlobalCheckpoint());
}

/**
Expand Down Expand Up @@ -130,6 +119,16 @@ public long getGlobalCheckpoint() {
return globalCheckpointService.getCheckpoint();
}

/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
}

/**
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
Expand All @@ -148,13 +147,4 @@ public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<S
globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
}

/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
}
}
Loading