diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
index 5de7062ab18ee..b0e2654e7f2fb 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -1512,6 +1512,11 @@ public interface Warmer {
*/
public abstract Engine recoverFromTranslog() throws IOException;
+ /**
+ * Do not replay translog operations, but make the engine be ready.
+ */
+ public abstract void skipTranslogRecovery();
+
/**
* Returns true
iff this engine is currently recovering from translog.
*/
diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 77b8275277079..1b7b891efd6ff 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -401,6 +401,15 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}
+ @Override
+ public void skipTranslogRecovery() {
+ if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
+ throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode);
+ }
+ assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
+ pendingTranslogRecovery.set(false); // we are good - now we can commit
+ }
+
private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 4c6c6a17c234d..3832cd0ae2055 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -1304,9 +1304,20 @@ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalC
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
**/
- public void openIndexAndTranslog() throws IOException {
+ public void openIndexAndRecoveryFromTranslog() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
+ getEngine().recoverFromTranslog();
+ }
+
+ /**
+ * Opens the engine on top of the existing lucene engine and translog.
+ * The translog is kept but its operations won't be replayed.
+ */
+ public void openIndexAndSkipTranslogRecovery() throws IOException {
+ assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER;
+ innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
+ getEngine().skipTranslogRecovery();
}
private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
@@ -1339,13 +1350,12 @@ private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, fi
globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
"read from translog checkpoint");
}
- Engine newEngine = createNewEngine(config);
+ createNewEngine(config);
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
- newEngine.recoverFromTranslog();
}
assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
index 6bc1ce2882c92..81ffbea642c58 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
@@ -401,7 +401,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
logger.debug("failed to list file details", e);
}
if (indexShouldExists) {
- indexShard.openIndexAndTranslog();
+ indexShard.openIndexAndRecoveryFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
} else {
indexShard.createIndexAndTranslog();
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index ba5dc5c60f29f..88b0f23d72a99 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -21,6 +21,8 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
@@ -39,6 +41,7 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -60,6 +63,7 @@
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -108,8 +112,8 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
- transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool
- .Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
+ transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
+ RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
@@ -353,7 +357,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
- final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
+ final List existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
+ final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
+ final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
@@ -387,7 +393,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
- recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
+ recoveryRef.target().prepareForTranslogOperations(request.deleteLocalTranslog(), request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java
index 61cd986a1aef4..ae8c7472f89b4 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java
@@ -28,19 +28,33 @@
import java.io.IOException;
-public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
+class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
- private long recoveryId;
- private ShardId shardId;
- private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
+ private final long recoveryId;
+ private final ShardId shardId;
+ private final int totalTranslogOps;
+ private final boolean deleteLocalTranslog;
- public RecoveryPrepareForTranslogOperationsRequest() {
- }
-
- RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
+ RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean deleteLocalTranslog) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
+ this.deleteLocalTranslog = deleteLocalTranslog;
+ }
+
+ RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
+ super.readFrom(in);
+ recoveryId = in.readLong();
+ shardId = ShardId.readShardId(in);
+ totalTranslogOps = in.readVInt();
+ if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
+ in.readLong(); // maxUnsafeAutoIdTimestamp
+ }
+ if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+ deleteLocalTranslog = in.readBoolean();
+ } else {
+ deleteLocalTranslog = true;
+ }
}
public long recoveryId() {
@@ -55,15 +69,11 @@ public int totalTranslogOps() {
return totalTranslogOps;
}
- @Override
- public void readFrom(StreamInput in) throws IOException {
- super.readFrom(in);
- recoveryId = in.readLong();
- shardId = ShardId.readShardId(in);
- totalTranslogOps = in.readVInt();
- if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
- in.readLong(); // maxUnsafeAutoIdTimestamp
- }
+ /**
+ * Whether or not the recover target should delete its local translog
+ */
+ boolean deleteLocalTranslog() {
+ return deleteLocalTranslog;
}
@Override
@@ -75,5 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
}
+ if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+ out.writeBoolean(deleteLocalTranslog);
+ }
}
}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 7afe6c977da21..3ee9b953757c3 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -150,9 +150,9 @@ public RecoveryResponse recoverToTarget() throws IOException {
final long startingSeqNo;
final long requiredSeqNoRangeStart;
- final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
+ final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
- if (isSequenceNumberBasedRecoveryPossible) {
+ if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
@@ -188,7 +188,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
try {
- prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
+ // For a sequence based recovery, the target can keep its local translog
+ prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
@@ -421,13 +422,13 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO
}
}
- void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
+ void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
- cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
+ cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(createNewTranslog, totalTranslogOps));
stopWatch.stop();
response.startTime = stopWatch.totalTime().millis() - startEngineStart;
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index d383891345818..1bbcb9efa9644 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -362,10 +362,14 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */
@Override
- public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
+ public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
- // TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
- indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
+ if (createNewTranslog) {
+ // TODO: Assigns the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
+ indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
+ } else {
+ indexShard().openIndexAndSkipTranslogRecovery();
+ }
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
index e7403986dc233..736d602044656 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
@@ -32,10 +32,10 @@ public interface RecoveryTargetHandler {
/**
* Prepares the target to receive translog operations, after all file have been copied
- *
- * @param totalTranslogOps total translog operations expected to be sent
+ * @param createNewTranslog whether or not to delete the local translog on the target
+ * @param totalTranslogOps total translog operations expected to be sent
*/
- void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
+ void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException;
/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
index 279bec186a433..4ea2be0e72659 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
@@ -76,9 +76,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}
@Override
- public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
+ public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
- new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
+ new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
index 2bf7de6b94a82..881eb16d619d0 100644
--- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
+++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
@@ -31,7 +31,9 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
@@ -226,7 +228,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
final IndexShard oldPrimary = shards.getPrimary();
final IndexShard newPrimary = shards.getReplicas().get(0);
final IndexShard replica = shards.getReplicas().get(1);
- boolean expectSeqNoRecovery = true;
if (randomBoolean()) {
// simulate docs that were inflight when primary failed, these will be rolled back
final int rollbackDocs = randomIntBetween(1, 5);
@@ -239,7 +240,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
}
if (randomBoolean()) {
oldPrimary.flush(new FlushRequest(index.getName()));
- expectSeqNoRecovery = false;
}
}
@@ -252,9 +252,30 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
equalTo(totalDocs - 1L));
// index some more
- totalDocs += shards.indexDocs(randomIntBetween(0, 5));
+ int moreDocs = shards.indexDocs(randomIntBetween(0, 5));
+ totalDocs += moreDocs;
+
+ // As a replica keeps a safe commit, the file-based recovery only happens if the required translog
+ // for the sequence based recovery are not fully retained and extra documents were added to the primary.
+ boolean expectSeqNoRecovery = (moreDocs == 0 || randomBoolean());
+ int uncommittedOpsOnPrimary = 0;
+ if (expectSeqNoRecovery == false) {
+ IndexMetaData.Builder builder = IndexMetaData.builder(newPrimary.indexSettings().getIndexMetaData());
+ builder.settings(Settings.builder().put(newPrimary.indexSettings().getSettings())
+ .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
+ .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
+ );
+ newPrimary.indexSettings().updateIndexMetaData(builder.build());
+ newPrimary.onSettingsChanged();
+ shards.syncGlobalCheckpoint();
+ newPrimary.flush(new FlushRequest());
+ uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
+ totalDocs += uncommittedOpsOnPrimary;
+ }
if (randomBoolean()) {
+ uncommittedOpsOnPrimary = 0;
+ shards.syncGlobalCheckpoint();
newPrimary.flush(new FlushRequest());
}
@@ -269,7 +290,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
} else {
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
- assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs));
+ assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
}
// roll back the extra ops in the replica
diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 48887aa4c11c7..cd75c7a08fbc3 100644
--- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -22,7 +22,6 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
@@ -2109,7 +2108,7 @@ public void testShardActiveDuringInternalRecovery() throws IOException {
shard.prepareForIndexRecovery();
// Shard is still inactive since we haven't started recovering yet
assertFalse(shard.isActive());
- shard.openIndexAndTranslog();
+ shard.openIndexAndRecoveryFromTranslog();
// Shard should now be active since we did recover:
assertTrue(shard.isActive());
closeShards(shard);
@@ -2137,8 +2136,8 @@ public void testShardActiveDuringPeerRecovery() throws IOException {
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
@Override
- public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
- super.prepareForTranslogOperations(totalTranslogOps);
+ public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
+ super.prepareForTranslogOperations(createNewTranslog, totalTranslogOps);
// Shard is still inactive since we haven't started recovering yet
assertFalse(replica.isActive());
@@ -2186,8 +2185,8 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
}) {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
- public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
- super.prepareForTranslogOperations(totalTranslogOps);
+ public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
+ super.prepareForTranslogOperations(createNewTranslog, totalTranslogOps);
assertListenerCalled.accept(replica);
}
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
index f691cfd0238d4..31521e33f21b6 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
@@ -19,103 +19,63 @@
package org.elasticsearch.indices.recovery;
-import org.elasticsearch.action.admin.indices.flush.FlushRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.mapper.SourceToParse;
-import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.translog.Translog;
-import org.elasticsearch.index.translog.TranslogConfig;
-import org.elasticsearch.index.translog.TranslogWriter;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
public void testGetStartingSeqNo() throws Exception {
- IndexShard replica = newShard(false);
- final AtomicReference translogLocation = new AtomicReference<>();
- RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null) {
- @Override
- Path translogLocation() {
- return translogLocation.get();
- }
- };
+ final IndexShard replica = newShard(false);
try {
- recoveryEmptyReplica(replica);
- int docs = randomIntBetween(1, 10);
- final String index = replica.shardId().getIndexName();
- long seqNo = 0;
- for (int i = 0; i < docs; i++) {
- replica.applyIndexOperationOnReplica(seqNo++, 1, VersionType.EXTERNAL,
- IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
- SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON),
- update -> {});
- if (rarely()) {
- // insert a gap
- seqNo++;
+ // Empty store
+ {
+ recoveryEmptyReplica(replica);
+ final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
+ recoveryTarget.decRef();
+ }
+ // Last commit is good - use it.
+ final long initDocs = scaledRandomIntBetween(1, 10);
+ {
+ for (int i = 0; i < initDocs; i++) {
+ indexDoc(replica, "doc", Integer.toString(i));
+ if (randomBoolean()) {
+ flushShard(replica);
+ }
}
+ flushShard(replica);
+ replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test");
+ replica.getTranslog().sync();
+ final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs));
+ recoveryTarget.decRef();
+ }
+ // Global checkpoint does not advance, last commit is not good - use the previous commit
+ final int moreDocs = randomIntBetween(1, 10);
+ {
+ for (int i = 0; i < moreDocs; i++) {
+ indexDoc(replica, "doc", Long.toString(i));
+ if (randomBoolean()) {
+ flushShard(replica);
+ }
+ }
+ flushShard(replica);
+ final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs));
+ recoveryTarget.decRef();
+ }
+ // Advances the global checkpoint, a safe commit also advances
+ {
+ replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test");
+ replica.getTranslog().sync();
+ final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs + moreDocs));
+ recoveryTarget.decRef();
}
-
- final long maxSeqNo = replica.seqNoStats().getMaxSeqNo();
- final long localCheckpoint = replica.getLocalCheckpoint();
-
- translogLocation.set(replica.getTranslog().location());
-
- final Translog translog = replica.getTranslog();
- final String translogUUID = translog.getTranslogUUID();
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
-
- translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo - 1));
-
- // commit is good, global checkpoint is at least max *committed* which is NO_OPS_PERFORMED
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
-
- replica.flush(new FlushRequest());
-
- translogLocation.set(replica.getTranslog().location());
-
- // commit is not good, global checkpoint is below max
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
-
- translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo));
-
- // commit is good, global checkpoint is above max
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(localCheckpoint + 1));
} finally {
closeShards(replica);
- recoveryTarget.decRef();
}
}
-
- private Path writeTranslog(
- final ShardId shardId,
- final String translogUUID,
- final long generation,
- final long globalCheckpoint
- ) throws IOException {
- final Path tempDir = createTempDir();
- final Path resolve = tempDir.resolve(Translog.getFilename(generation));
- Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME));
- try (TranslogWriter ignored = TranslogWriter.create(
- shardId,
- translogUUID,
- generation,
- resolve,
- FileChannel::open,
- TranslogConfig.DEFAULT_BUFFER_SIZE, generation, globalCheckpoint, () -> globalCheckpoint, () -> generation)) {}
- return tempDir;
- }
-
}
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
index 4963c1b74a53f..7ab6925ce57b9 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
@@ -423,7 +423,7 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO
}
@Override
- void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
+ void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException {
prepareTargetForTranslogCalled.set(true);
}
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
index 85dc3a5fc3906..2089c36d06bc0 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
@@ -41,6 +41,7 @@
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
@@ -271,4 +272,38 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint));
closeShards(primaryShard, replicaShard);
}
+
+ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception {
+ try (ReplicationGroup shards = createGroup(1)) {
+ shards.startAll();
+ final IndexShard replica = shards.getReplicas().get(0);
+ final int initDocs = scaledRandomIntBetween(0, 20);
+ int uncommittedDocs = 0;
+ for (int i = 0; i < initDocs; i++) {
+ shards.indexDocs(1);
+ uncommittedDocs++;
+ if (randomBoolean()) {
+ shards.syncGlobalCheckpoint();
+ shards.flush();
+ uncommittedDocs = 0;
+ }
+ }
+ shards.removeReplica(replica);
+ final int moreDocs = shards.indexDocs(scaledRandomIntBetween(0, 20));
+ if (randomBoolean()) {
+ shards.flush();
+ }
+ replica.close("test", randomBoolean());
+ replica.store().close();
+ final IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
+ shards.recoverReplica(newReplica);
+
+ try (Translog.Snapshot snapshot = newReplica.getTranslog().newSnapshot()) {
+ assertThat("Sequence based recovery should keep existing translog", snapshot, SnapshotMatchers.size(initDocs + moreDocs));
+ }
+ assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedDocs + moreDocs));
+ assertThat(newReplica.recoveryState().getIndex().fileDetails(), empty());
+ }
+ }
+
}