Skip to content

Commit

Permalink
Only revisit deletion policy when no pending snapshot
Browse files Browse the repository at this point in the history
A follow-up of elastic#28140

We currently revisit the index deletion policy whenever the global
checkpoint has advanced enough. However, we won't be able to clean up
the old commit points if they are being snapshotted. Here we prefer a
simple solution over an optimal solution as we should revisit if only
the last commit is being snapshotted.
  • Loading branch information
dnhatn committed Feb 12, 2018
1 parent e0bea70 commit 58f86ac
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile int pendingSnapshots;
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point

Expand All @@ -61,6 +62,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.startingCommit = startingCommit;
this.snapshottedCommits = new ObjectIntHashMap<>();
this.pendingSnapshots = 0;
}

@Override
Expand Down Expand Up @@ -163,6 +165,7 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
snapshottedCommits.addTo(snapshotting, 1); // increase refCount
pendingSnapshots++;
return new SnapshotIndexCommit(snapshotting);
}

Expand All @@ -178,6 +181,7 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) {
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
}
pendingSnapshots--;
}

/**
Expand Down Expand Up @@ -235,7 +239,7 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
*/
boolean hasUnreferencedCommits() throws IOException {
final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
if (safeCommit != lastCommit && pendingSnapshots == 0) { // Race condition can happen but harmless
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ public void testCheckUnreferencedCommits() throws Exception {
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
// Advanced enough
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
final IndexCommit snapshot = indexPolicy.acquireIndexCommit(randomBoolean());
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); // Having snapshot -> false.
indexPolicy.releaseCommit(snapshot);
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
commitList.forEach(this::resetDeletion);
indexPolicy.onCommit(commitList);
Expand Down

0 comments on commit 58f86ac

Please sign in to comment.