Skip to content

Commit

Permalink
Use Lucene soft-deletes in peer recovery (#30522)
Browse files Browse the repository at this point in the history
This commit adds Lucene soft-deletes as another source for peer-recovery
besides translog.

Relates #29530
  • Loading branch information
dnhatn committed Jun 24, 2018
1 parent 07ecc03 commit 8de4659
Show file tree
Hide file tree
Showing 25 changed files with 652 additions and 189 deletions.
3 changes: 2 additions & 1 deletion docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ which returns something similar to:
"translog_generation" : "2",
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1"
"max_unsafe_auto_id_timestamp" : "-1",
"min_retained_seq_no": "0"
},
"num_docs" : 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final SoftDeletesPolicy softDeletesPolicy;
private final LongSupplier globalCheckpointSupplier;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) {
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) {
this.logger = logger;
this.translogDeletionPolicy = translogDeletionPolicy;
this.softDeletesPolicy = softDeletesPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.snapshottedCommits = new ObjectIntHashMap<>();
}
Expand All @@ -80,7 +83,7 @@ public synchronized void onCommit(List<? extends IndexCommit> commits) throws IO
deleteCommit(commits.get(i));
}
}
updateTranslogDeletionPolicy();
updateRetentionPolicy();
}

private void deleteCommit(IndexCommit commit) throws IOException {
Expand All @@ -90,7 +93,7 @@ private void deleteCommit(IndexCommit commit) throws IOException {
assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed";
}

private void updateTranslogDeletionPolicy() throws IOException {
private void updateRetentionPolicy() throws IOException {
assert Thread.holdsLock(this);
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
Expand All @@ -101,6 +104,9 @@ private void updateTranslogDeletionPolicy() throws IOException {
assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);

softDeletesPolicy.setLocalCheckpointOfSafeCommit(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
}

/**
Expand Down
24 changes: 18 additions & 6 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";

protected final ShardId shardId;
protected final String allocationId;
Expand Down Expand Up @@ -578,19 +579,17 @@ public enum SearcherScope {

public abstract void syncTranslog() throws IOException;

public abstract Closeable acquireTranslogRetentionLock();
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public abstract Closeable acquireRetentionLockForPeerRecovery();

/**
* Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range.
* The caller has to close the returned snapshot after finishing the reading.
*/
public abstract Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException;

/**
* Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#.
*/
public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo);

public abstract TranslogStats getTranslogStats();

/**
Expand All @@ -604,6 +603,19 @@ public enum SearcherScope {
public abstract Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException;

/**
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public abstract Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
public abstract int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException;

public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand All @@ -38,7 +37,6 @@
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
Expand Down Expand Up @@ -157,6 +155,7 @@ public class InternalEngine extends Engine {
private final CounterMetric numDocUpdates = new CounterMetric();
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField();
private final boolean softDeleteEnabled;
private final SoftDeletesPolicy softDeletesPolicy;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;

/**
Expand All @@ -182,7 +181,6 @@ public InternalEngine(EngineConfig engineConfig) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
Expand All @@ -204,8 +202,10 @@ public InternalEngine(EngineConfig engineConfig) {
assert translog.getGeneration() != null;
this.translog = translog;
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy =
new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint);
new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadHistoryUUID(writer);
Expand Down Expand Up @@ -262,6 +262,18 @@ private LocalCheckpointTracker createLocalCheckpointTracker(
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}

private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().userData;
final long lastMinRetainedSeqNo;
if (commitUserData.containsKey(Engine.MIN_RETAINED_SEQNO)) {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(Engine.MIN_RETAINED_SEQNO));
} else {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
}
return new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations());
}

/**
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
Expand Down Expand Up @@ -481,18 +493,39 @@ public void syncTranslog() throws IOException {
}

@Override
public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo);
}

/**
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
@Override
public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo);
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE);
}
}

/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Translog.Snapshot snapshot =
newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
} catch (IOException ex) {
maybeFailEngine(source, ex);
throw ex;
}
} else {
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}
}

@Override
Expand Down Expand Up @@ -2127,8 +2160,8 @@ private IndexWriterConfig getIndexWriterConfig() {
MergePolicy mergePolicy = config().getMergePolicy();
if (softDeleteEnabled) {
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, this::softDeletesRetentionQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy));
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy));
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
Expand All @@ -2141,20 +2174,6 @@ private IndexWriterConfig getIndexWriterConfig() {
return iwc;
}

/**
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
*/
private Query softDeletesRetentionQuery() {
ensureOpen();
// TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations();
// Prefer using the global checkpoint which is persisted on disk than an in-memory value.
// If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
// then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE);
}

/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;
Expand Down Expand Up @@ -2341,6 +2360,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
if (softDeleteEnabled) {
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
Expand Down Expand Up @@ -2396,6 +2418,8 @@ public void onSettingsChanged() {
final IndexSettings indexSettings = engineConfig.getIndexSettings();
translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());

softDeletesPolicy.setRetentionOperations(indexSettings.getSoftDeleteRetentionOperations());
}

public MergeStats getMergeStats() {
Expand Down Expand Up @@ -2509,6 +2533,41 @@ public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService m
}
}

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return getMinRetainedSeqNo() <= startingSeqNo;
} else {
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
}
}
return tracker.getCheckpoint() >= currentLocalCheckpoint;
}
}

/**
* Returns the minimum seqno that is retained in the Lucene index.
* Operations whose seq# are at least this value should exist in the Lucene index.
*/
final long getMinRetainedSeqNo() {
assert softDeleteEnabled : Thread.currentThread().getName();
return softDeletesPolicy.getMinRetainedSeqNo();
}

@Override
public Closeable acquireRetentionLockForPeerRecovery() {
final Closeable translogLock = translog.acquireRetentionLock();
final Releasable softDeletesLock = softDeletesPolicy.acquireRetentionLock();
return () -> IOUtils.close(translogLock, softDeletesLock);
}

@Override
public boolean isRecovering() {
return pendingTranslogRecovery.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public int totalOperations() {
}

@Override
public int overriddenOperations() {
public int skippedOperations() {
return skippedOperations;
}

Expand Down
Loading

0 comments on commit 8de4659

Please sign in to comment.