Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit deletion policy after release the last snapshot #28627

Merged
merged 11 commits into from
Feb 19, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {

/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*
* @return true if the snapshotting commit can be clean up.
*/
synchronized void releaseCommit(final IndexCommit snapshotCommit) {
synchronized boolean releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
Expand All @@ -178,6 +180,8 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) {
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
}
// The commit can be clean up only if no pending snapshot and it is neither the safe commit nor last commit.
return refCount == 0 && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1712,13 +1712,21 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
logger.trace("finish flush for snapshot");
}
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit));
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit));
return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit));
}

private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
ensureOpen();
indexWriter.deleteUnusedFiles();
}
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,15 @@ public void testAcquireIndexCommit() throws Exception {
assertThat(snapshot.getUserData(), equalTo(commitList.get(commitList.size() - 1).getUserData()));
}
}
randomSubsetOf(snapshottingCommits).forEach(snapshot -> {
final List<IndexCommit> releasingSnapshots = randomSubsetOf(snapshottingCommits);
for (IndexCommit snapshot : releasingSnapshots) {
snapshottingCommits.remove(snapshot);
indexPolicy.releaseCommit(snapshot);
});
final long pendingSnapshots = snapshottingCommits.stream().filter(snapshot::equals).count();
final IndexCommit lastCommit = commitList.get(commitList.size() - 1);
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(indexPolicy.releaseCommit(snapshot),
equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false));
}
// Snapshotting commits must not be deleted.
snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false)));
// We don't need to retain translog for snapshotting commits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2079,9 +2079,9 @@ public void testSeqNoAndCheckpoints() throws IOException {
// this test writes documents to the engine while concurrently flushing/commit
// and ensuring that the commit points contain the correct sequence number data
public void testConcurrentWritesAndCommits() throws Exception {
List<Engine.IndexCommitRef> commits = new ArrayList<>();
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
final List<Engine.IndexCommitRef> commits = new ArrayList<>();

final int numIndexingThreads = scaledRandomIntBetween(2, 4);
final int numDocsPerThread = randomIntBetween(500, 1000);
Expand Down Expand Up @@ -2166,8 +2166,6 @@ public void testConcurrentWritesAndCommits() throws Exception {
prevLocalCheckpoint = localCheckpoint;
prevMaxSeqNo = maxSeqNo;
}
} finally {
IOUtils.close(commits);
}
}

Expand Down Expand Up @@ -4456,6 +4454,37 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
}
}

public void testCleanupCommitsWhenReleaseSnapshot() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
engine.flush(false, randomBoolean());
int numSnapshots = between(1, 10);
final List<Engine.IndexCommitRef> snapshots = new ArrayList<>();
for (int i = 0; i < numSnapshots; i++) {
snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit.
}
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.syncTranslog();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
for (int i = 0; i < numSnapshots - 1; i++) {
snapshots.get(i).close();
// pending snapshots - should not release any commit.
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
}
snapshots.get(numSnapshots - 1).close(); // release the last snapshot - delete all except the last commit
assertThat(DirectoryReader.listCommits(store.directory()), hasSize(1));
}
}

public void testShouldPeriodicallyFlush() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
int numDocs = between(10, 100);
Expand Down