Skip to content

Commit

Permalink
Trim translog when safe commit advanced (#32967)
Browse files Browse the repository at this point in the history
Since #28140 when the global checkpoint is advanced, we try to move the
safe commit forward, and clean up old index commits if possible. However,
we forget to trim unreferenced translog.

This change makes sure that we prune both old translog and index commits
when the safe commit advanced.

Relates #28140
Closes #32089
  • Loading branch information
dnhatn committed Aug 20, 2018
1 parent 46a8405 commit bbdcd5a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public Translog.Location getTranslogLastWriteLocation() {
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
translog.trimUnreferencedReaders();
}
}

Expand Down Expand Up @@ -1808,6 +1809,8 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
ensureOpen();
// Here we don't have to trim translog because snapshotting an index commit
// does not lock translog or prevents unreferenced files from trimming.
indexWriter.deleteUnusedFiles();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4365,13 +4365,18 @@ public void testAcquireIndexCommit() throws Exception {

public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test",
Settings.builder().put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1)
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1).build());
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
try (Store store = createStore();
InternalEngine engine =
createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
if (rarely()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
Expand All @@ -4385,6 +4390,7 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpoint(), Long.MAX_VALUE));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
assertThat(engine.estimateTranslogOperationsFromMinSeq(0L), equalTo(0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -74,7 +73,6 @@ public void testTranslogHistoryTransferred() throws Exception {
}
}

@TestLogging("_root:TRACE")
public void testRetentionPolicyChangeDuringRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
shards.startPrimary();
Expand Down

0 comments on commit bbdcd5a

Please sign in to comment.