From cccc9c6de680b0cac78ba5b38d33a814f83fbe87 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 11 Jun 2018 09:44:50 -0600 Subject: [PATCH] Encapsulate Translog in Engine (#31220) This removes the abstract `getTranslog` method in `Engine`, instead leaving it to the abstract implementations of the other methods that use the translog. This allows future Engines not to have a Translog, as instead they must implement the methods that use the translog pieces to return necessary values. --- .../elasticsearch/index/engine/Engine.java | 38 ++++------------ .../index/engine/InternalEngine.java | 44 ++++++++++++++++++- .../index/engine/InternalEngineTests.java | 16 +++---- .../index/engine/EngineTestCase.java | 4 +- 4 files changed, 62 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 2dc39f093a5ab..c8a80665b4df6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -565,18 +565,10 @@ public enum SearcherScope { EXTERNAL, INTERNAL } - /** - * Returns the translog associated with this engine. - * Prefer to keep the translog package-private, so that an engine can control all accesses to the translog. - */ - abstract Translog getTranslog(); - /** * Checks if the underlying storage sync is required. */ - public boolean isTranslogSyncNeeded() { - return getTranslog().syncNeeded(); - } + public abstract boolean isTranslogSyncNeeded(); /** * Ensures that all locations in the given stream have been written to the underlying storage. @@ -585,35 +577,25 @@ public boolean isTranslogSyncNeeded() { public abstract void syncTranslog() throws IOException; - public Closeable acquireTranslogRetentionLock() { - return getTranslog().acquireRetentionLock(); - } + public abstract Closeable acquireTranslogRetentionLock(); /** * Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#. * The caller has to close the returned snapshot after finishing the reading. */ - public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { - return getTranslog().newSnapshotFromMinSeqNo(minSeqNo); - } + public abstract Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException; /** * Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#. */ - public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { - return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo); - } + public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo); - public TranslogStats getTranslogStats() { - return getTranslog().stats(); - } + public abstract TranslogStats getTranslogStats(); /** * Returns the last location that the translog of this engine has written into. */ - public Translog.Location getTranslogLastWriteLocation() { - return getTranslog().getLastWriteLocation(); - } + public abstract Translog.Location getTranslogLastWriteLocation(); protected final void ensureOpen(Exception suppressed) { if (isClosed.get()) { @@ -661,9 +643,7 @@ public CommitStats commitStats() { /** * Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint) */ - public long getLastSyncedGlobalCheckpoint() { - return getTranslog().getLastSyncedGlobalCheckpoint(); - } + public abstract long getLastSyncedGlobalCheckpoint(); /** * Global stats on segments. @@ -935,9 +915,7 @@ public final boolean refreshNeeded() { * * @return {@code true} if the current generation should be rolled to a new generation */ - public boolean shouldRollTranslogGeneration() { - return getTranslog().shouldRollGeneration(); - } + public abstract boolean shouldRollTranslogGeneration(); /** * Rolls the translog generation and cleans unneeded. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 9b2a109a0512b..bc2393415e644 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -75,8 +75,10 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; import org.elasticsearch.index.translog.TranslogDeletionPolicy; +import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -435,12 +437,17 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier()); } - @Override + // Package private for testing purposes only Translog getTranslog() { ensureOpen(); return translog; } + @Override + public boolean isTranslogSyncNeeded() { + return getTranslog().syncNeeded(); + } + @Override public boolean ensureTranslogSynced(Stream locations) throws IOException { final boolean synced = translog.ensureSynced(locations); @@ -456,6 +463,31 @@ public void syncTranslog() throws IOException { revisitIndexDeletionPolicyOnTranslogSynced(); } + @Override + public Closeable acquireTranslogRetentionLock() { + return getTranslog().acquireRetentionLock(); + } + + @Override + public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { + return getTranslog().newSnapshotFromMinSeqNo(minSeqNo); + } + + @Override + public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { + return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo); + } + + @Override + public TranslogStats getTranslogStats() { + return getTranslog().stats(); + } + + @Override + public Translog.Location getTranslogLastWriteLocation() { + return getTranslog().getLastWriteLocation(); + } + private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); @@ -1619,6 +1651,11 @@ public void trimUnreferencedTranslogFiles() throws EngineException { } } + @Override + public boolean shouldRollTranslogGeneration() { + return getTranslog().shouldRollGeneration(); + } + @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { try (ReleasableLock lock = readLock.acquire()) { @@ -2240,6 +2277,11 @@ LocalCheckpointTracker getLocalCheckpointTracker() { return localCheckpointTracker; } + @Override + public long getLastSyncedGlobalCheckpoint() { + return getTranslog().getLastSyncedGlobalCheckpoint(); + } + @Override public long getLocalCheckpoint() { return localCheckpointTracker.getCheckpoint(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8f1d80d22dde1..b5f0b5de60c7c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -731,7 +731,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s super.commitIndexWriter(writer, translog, syncId); } }; - assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs)); + assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); recoveringEngine.recoverFromTranslog(); assertTrue(committed.get()); } finally { @@ -758,7 +758,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); initialEngine.index(indexForDoc(doc)); if (rarely()) { - initialEngine.getTranslog().rollGeneration(); + getTranslog(initialEngine).rollGeneration(); } else if (rarely()) { initialEngine.flush(); } @@ -4007,14 +4007,14 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint()); trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); - assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations()); + assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2)); // now snapshot the tlog and ensure the primary term is updated - try (Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(recoveringEngine).newSnapshot()) { assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations()); Translog.Operation operation; while ((operation = snapshot.next()) != null) { @@ -4029,7 +4029,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); if ((flushed = randomBoolean())) { globalCheckpoint.set(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); - recoveringEngine.getTranslog().sync(); + getTranslog(recoveringEngine).sync(); recoveringEngine.flush(true, true); } } @@ -4042,7 +4042,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { trimUnsafeCommits(replicaEngine.config()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { - assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); @@ -4245,7 +4245,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null))); if (frequently()) { globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); - engine.getTranslog().sync(); + engine.syncTranslog(); } if (frequently()) { final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); @@ -4267,7 +4267,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } // Make sure we keep all translog operations after the local checkpoint of the safe commit. long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(localCheckpointFromSafeCommit + 1, docId)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 0d5e693d62da6..a23e29b0bcd6b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -507,6 +507,8 @@ protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, * Exposes a translog associated with the given engine for testing purpose. */ public static Translog getTranslog(Engine engine) { - return engine.getTranslog(); + assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass(); + InternalEngine internalEngine = (InternalEngine) engine; + return internalEngine.getTranslog(); } }