Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move GlobalCheckpointTracker and remove SequenceNumbersService #27837

Merged
merged 14 commits into from
Dec 18, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -400,14 +399,14 @@ void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable
public interface ReplicaResponse {

/**
* The local checkpoint for the shard. See {@link SequenceNumbersService#getLocalCheckpoint()}.
* The local checkpoint for the shard.
*
* @return the local checkpoint
**/
long localCheckpoint();

/**
* The global checkpoint for the shard. See {@link SequenceNumbersService#getGlobalCheckpoint()}.
* The global checkpoint for the shard.
*
* @return the global checkpoint
**/
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -567,7 +567,7 @@ public CommitStats commitStats() {
*
* @return the sequence number service
*/
public abstract SequenceNumbersService seqNoService();
public abstract LocalCheckpointTracker getLocalCheckpointTracker();

/**
* Read the last segments info from the commit pointed to by the searcher manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.IOException;
import java.util.List;
import java.util.function.LongSupplier;

/*
* Holds all the configuration that is used to create an {@link Engine}.
Expand Down Expand Up @@ -78,6 +79,7 @@ public final class EngineConfig {
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -124,7 +126,8 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService) {
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand Down Expand Up @@ -155,6 +158,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
}

/**
Expand Down Expand Up @@ -227,6 +231,13 @@ public Store getStore() {
return store;
}

/**
* Returns the global checkpoint tracker
*/
public LongSupplier getGlobalCheckpointSupplier() {
return globalCheckpointSupplier;
}

/**
* Returns the {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -67,9 +68,8 @@
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -123,7 +123,7 @@ public class InternalEngine extends Engine {

private final IndexThrottle throttle;

private final SequenceNumbersService seqNoService;
private final LocalCheckpointTracker localCheckpointTracker;

private final String uidField;

Expand All @@ -150,12 +150,12 @@ public class InternalEngine extends Engine {
private final String historyUUID;

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, InternalEngine::sequenceNumberService);
this(engineConfig, LocalCheckpointTracker::new);
}

InternalEngine(
final EngineConfig engineConfig,
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoServiceSupplier) {
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
Expand All @@ -179,11 +179,9 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
logger.trace("recovered [{}]", seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint)
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
Expand All @@ -195,7 +193,7 @@ public InternalEngine(EngineConfig engineConfig) {
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService.getGlobalCheckpoint());
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
updateWriterOnOpen();
Expand Down Expand Up @@ -237,6 +235,22 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
if (openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paranoia - can you use a switch statement with a fallback logic and an exception on default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in ea23a0d

maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
} else {
final Tuple<Long, Long> seqNoStats = store.loadSeqNoStats();
maxSeqNo = seqNoStats.v1();
localCheckpoint = seqNoStats.v2();
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
}
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}

/**
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
Expand Down Expand Up @@ -310,12 +324,12 @@ protected int getRefCount(IndexSearcher reference) {
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
seqNoService.markSeqNoAsCompleted(operation.seqNo());
localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo());
}
}
}
Expand All @@ -326,17 +340,17 @@ public void restoreLocalCheckpointFromTranslog() throws IOException {
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long maxSeqNo = seqNoService.getMaxSeqNo();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();
int numNoOpsAdded = 0;
for (
long seqNo = localCheckpoint + 1;
seqNo <= maxSeqNo;
seqNo = seqNoService.getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
seqNo = localCheckpointTracker.getCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
numNoOpsAdded++;
assert seqNo <= seqNoService.getLocalCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService.getLocalCheckpoint() + "]";
assert seqNo <= localCheckpointTracker.getCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + localCheckpointTracker.getCheckpoint() + "]";

}
return numNoOpsAdded;
Expand All @@ -354,35 +368,6 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}

static SequenceNumbersService sequenceNumberService(
final EngineConfig engineConfig,
final SeqNoStats seqNoStats) {
return new SequenceNumbersService(
engineConfig.getShardId(),
engineConfig.getAllocationId(),
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
}

private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
return store.loadSeqNoStats(globalCheckpoint);
case OPEN_INDEX_CREATE_TRANSLOG:
return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
case CREATE_INDEX_AND_TRANSLOG:
return new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
default:
throw new IllegalArgumentException(openMode.toString());
}
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
flushLock.lock();
Expand Down Expand Up @@ -733,7 +718,7 @@ private long generateSeqNoForOperation(final Operation operation) {
* @return the sequence number
*/
protected long doGenerateSeqNoForOperation(final Operation operation) {
return seqNoService.generateSeqNo();
return localCheckpointTracker.generateSeqNo();
}

@Override
Expand Down Expand Up @@ -804,7 +789,7 @@ public IndexResult index(Index index) throws IOException {
indexResult.setTranslogLocation(location);
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
Expand Down Expand Up @@ -835,7 +820,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (index.seqNo() <= seqNoService.getLocalCheckpoint()){
if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
Expand Down Expand Up @@ -1101,7 +1086,7 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult.setTranslogLocation(location);
}
if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo());
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
Expand All @@ -1127,7 +1112,7 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) {
if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) {
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
Expand Down Expand Up @@ -1274,7 +1259,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
return noOpResult;
} finally {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(seqNo);
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}
}
}
Expand Down Expand Up @@ -2018,7 +2003,7 @@ private void maybeDie(final String maybeMessage, final Throwable maybeFatal) {
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
Expand All @@ -2041,7 +2026,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService.getMaxSeqNo()));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
logger.trace("committing writer with commit data [{}]", commitData);
Expand Down Expand Up @@ -2105,8 +2090,8 @@ public MergeStats getMergeStats() {
return mergeScheduler.stats();
}

public final SequenceNumbersService seqNoService() {
return seqNoService;
public final LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public int hashCode() {
*
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
public synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
assert primaryMode;
assert handoffInProgress == false;
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size
Expand Down Expand Up @@ -329,7 +329,7 @@ private static long inSyncCheckpointStates(
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointTracker(
public GlobalCheckpointTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
Expand Down
Loading