Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Lucene soft-deletes in peer recovery #30522

Merged
merged 46 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b18eb06
Introduce soft-deletes retention policy for peer recovery
dnhatn May 8, 2018
4b2e385
Don’t check local_checkpoint in commit
dnhatn May 12, 2018
176b497
Prefer synchronized methods
dnhatn May 14, 2018
9ae627c
Remove force-merge
dnhatn May 14, 2018
ff2215c
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 14, 2018
188138d
Add +1 in 1 place
dnhatn May 15, 2018
6d901bf
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 15, 2018
8a78f65
acquireTranslogRetentionLock -> acquireRetentionLockForPeerRecovery
dnhatn May 15, 2018
6fe8847
Cut over the translog snapshot
dnhatn May 15, 2018
cc2b3f0
testRecoveryWithOutOfOrderDelete with Lucene history
dnhatn May 15, 2018
0612a05
Adapt PrimaryReplicaSyncerTests test
dnhatn May 15, 2018
a61d00b
Mute two more tests
dnhatn May 16, 2018
f3f1fa2
Add discuss
dnhatn May 16, 2018
65b8458
comment
dnhatn May 16, 2018
fc3d7d1
Update testRecoveryWithOutOfOrderDelete
dnhatn May 16, 2018
bd1b8ac
Minor feedback
dnhatn May 16, 2018
b1e73aa
Fix recovery tests
dnhatn May 16, 2018
f7ea71c
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 17, 2018
04112c6
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 17, 2018
e34154a
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 23, 2018
1531024
use retention policy to decide seq# or file-based
dnhatn May 23, 2018
86c3eba
Fix an engine test
dnhatn May 24, 2018
b3d0d5f
fix test - wait for flush
dnhatn May 24, 2018
8320647
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 24, 2018
6b95e21
bootstrap from max_seqno from an empty commit
dnhatn May 24, 2018
3be0e30
harden bootstrap
dnhatn May 24, 2018
ca3f781
naming
dnhatn May 24, 2018
65ede0b
getMaxExposedSeqNoToMergePolicy -> getMaxExposedSeqNo
dnhatn May 24, 2018
a0e58b9
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 25, 2018
c1e03d1
decouple min_retained seqno from merge policy
dnhatn May 28, 2018
c5ba76f
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 28, 2018
33be718
simplify tests
dnhatn May 28, 2018
c717bf7
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 31, 2018
df132e1
minSeqNo -> startingSeqNo
dnhatn May 31, 2018
1bcd443
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 31, 2018
f232c8a
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 31, 2018
591d521
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 6, 2018
76a035f
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 7, 2018
78c0d92
Adjust testForceMergeWithSoftDeletesRetentionAndRecoverySource
dnhatn Jun 7, 2018
c30de4a
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 11, 2018
5ff18f9
overriddenOperations -> skippedOperations
dnhatn Jun 11, 2018
8a37126
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 15, 2018
88950b3
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 15, 2018
dbe0472
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 19, 2018
f9eeb90
Naming
dnhatn Jun 19, 2018
ccb6e80
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final SoftDeletesPolicy softDeletesPolicy;
private final LongSupplier globalCheckpointSupplier;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) {
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) {
this.logger = logger;
this.translogDeletionPolicy = translogDeletionPolicy;
this.softDeletesPolicy = softDeletesPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.snapshottedCommits = new ObjectIntHashMap<>();
}
Expand All @@ -80,7 +83,7 @@ public synchronized void onCommit(List<? extends IndexCommit> commits) throws IO
deleteCommit(commits.get(i));
}
}
updateTranslogDeletionPolicy();
updateRetentionPolicy();
}

private void deleteCommit(IndexCommit commit) throws IOException {
Expand All @@ -90,7 +93,7 @@ private void deleteCommit(IndexCommit commit) throws IOException {
assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed";
}

private void updateTranslogDeletionPolicy() throws IOException {
private void updateRetentionPolicy() throws IOException {
assert Thread.holdsLock(this);
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
Expand All @@ -101,6 +104,8 @@ private void updateTranslogDeletionPolicy() throws IOException {
assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);

softDeletesPolicy.setCheckpointOfSafeCommit(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public class InternalEngine extends Engine {
private final CounterMetric numDocUpdates = new CounterMetric();
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField();
private final boolean softDeleteEnabled;
private final SoftDeletesPolicy softDeletesPolicy;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;

/**
Expand All @@ -173,7 +174,6 @@ public InternalEngine(EngineConfig engineConfig) {
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
Expand All @@ -195,8 +195,11 @@ public InternalEngine(EngineConfig engineConfig) {
assert translog.getGeneration() != null;
this.translog = translog;
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
this.softDeletesPolicy = new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations());
this.combinedDeletionPolicy =
new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint);
new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadHistoryUUID(writer);
Expand Down Expand Up @@ -2013,7 +2016,7 @@ private IndexWriterConfig getIndexWriterConfig() {
MergePolicy mergePolicy = config().getMergePolicy();
if (softDeleteEnabled) {
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy);
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, softDeletesPolicy::retentionQuery, mergePolicy);
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
Expand All @@ -2026,20 +2029,6 @@ private IndexWriterConfig getIndexWriterConfig() {
return iwc;
}

/**
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
*/
private Query softDeletesRetentionQuery() {
ensureOpen();
// TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations();
// Prefer using the global checkpoint which is persisted on disk than an in-memory value.
// If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
// then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE);
}

/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;
Expand Down Expand Up @@ -2281,6 +2270,8 @@ public void onSettingsChanged() {
final IndexSettings indexSettings = engineConfig.getIndexSettings();
translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());

softDeletesPolicy.setRetentionOperations(indexSettings.getSoftDeleteRetentionOperations());
}

public MergeStats getMergeStats() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;

/**
* A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying changes purpose.
*/
final class SoftDeletesPolicy {
private final LongSupplier globalCheckpointSupplier;
private int retentionLockCount;
private long checkpointOfSafeCommit;
private long minRequiredSeqNoForRecovery;
private long retentionOperations;

SoftDeletesPolicy(LongSupplier globalCheckpointSupplier, long retentionOperations) {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionOperations = retentionOperations;
this.checkpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call this localCheckpointOfSafeCommit? better be clear and not confuse potentially with the global checkpoint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.minRequiredSeqNoForRecovery = checkpointOfSafeCommit;
this.retentionLockCount = 0;
}

/**
* Updates the number of soft-deleted prior to the global checkpoint to be retained
* See {@link org.elasticsearch.index.IndexSettings#INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING}
*/
synchronized void setRetentionOperations(long retentionOperations) {
this.retentionOperations = retentionOperations;
}

/**
* Sets the local checkpoint of the current safe commit.
* All operations whose seqno are greater than this checkpoint will be retained until the new checkpoint is advanced.
*/
synchronized void setCheckpointOfSafeCommit(long newCheckpoint) {
if (newCheckpoint < this.checkpointOfSafeCommit) {
throw new IllegalArgumentException("Local checkpoint can't go backwards; " +
"new checkpoint [" + newCheckpoint + "]," + "current checkpoint [" + checkpointOfSafeCommit + "]");
}
this.checkpointOfSafeCommit = newCheckpoint;
updateMinRequiredSeqNoForRecovery();
}

private void updateMinRequiredSeqNoForRecovery() {
assert Thread.holdsLock(this) : Thread.currentThread().getName();
if (retentionLockCount == 0) {
this.minRequiredSeqNoForRecovery = checkpointOfSafeCommit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name this storeRecovery? I think it will help clarify that this is only used for store recovery.

}
}

/**
* Acquires a lock on soft-deleted documents to prevent them from cleaning up in merge processes. This is necessary to
* make sure that all operations after the local checkpoint of the safe commit are retained until the lock is released.
* This is a analogy to the translog's retention lock; see {@link Translog#acquireRetentionLock()}
*/
synchronized Releasable acquireRetentionLock() {

This comment was marked as resolved.

This comment was marked as resolved.

assert retentionLockCount >= 0 : "Invalid number of retention locks [" + retentionLockCount + "]";
retentionLockCount++;
final AtomicBoolean released = new AtomicBoolean();
return () -> {
if (released.compareAndSet(false, true)) {
releaseRetentionLock();
}
};
}

private synchronized void releaseRetentionLock() {
assert retentionLockCount > 0 : "Invalid number of retention locks [" + retentionLockCount + "]";
retentionLockCount--;
updateMinRequiredSeqNoForRecovery();
}

/**
* Returns a soft-deletes retention query that will be used in {@link org.apache.lucene.index.SoftDeletesRetentionMergePolicy}
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
*/
Query retentionQuery() {
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinSeqNoToRetain(), Long.MAX_VALUE);
}

// Package-level for testing
synchronized long getMinSeqNoToRetain() {
final long minSeqNoForQueryingChanges = globalCheckpointSupplier.getAsLong() - retentionOperations;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was doubting a bit if we should use the maxSeqNo for this as I thought it might be more intuitive. I got to the conclusion that I prefer the global checkpoint as the global checkpoint is the upper bound for CCR and the future changes API so people can set the retentionOperations and also expect to always get that much operations from the API. If you agree, do you mind adding a comment here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return Math.min(minRequiredSeqNoForRecovery, minSeqNoForQueryingChanges) + 1;
}
}
Loading