diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index dde55698fa7d5..9ba424ec6b131 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -218,7 +218,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl } return tracker.getCheckpoint() >= endingSeqNo; } - return false; + return true; } /** @@ -417,7 +417,7 @@ void phase2(final Translog.Snapshot snapshot) throws IOException { logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode()); // send all the snapshot's translog operations to the target - final int totalOperations = sendSnapshot(snapshot); + final int totalOperations = sendSnapshot(request.startingSeqNo(), snapshot); stopWatch.stop(); logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime()); @@ -465,15 +465,17 @@ public void finalizeRecovery() { } /** - * Send the given snapshot's operations to this handler's target node. + * Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's + * target node. *

* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * - * @param snapshot the translog snapshot to replay operations from + * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent + * @param snapshot the translog snapshot to replay operations from * @return the total number of translog operations that were sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ - protected int sendSnapshot(final Translog.Snapshot snapshot) throws IOException { + protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { int ops = 0; long size = 0; int totalOperations = 0; @@ -490,6 +492,7 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) throws IOException throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); + if (operation.seqNo() < startingSeqNo) continue; operations.add(operation); ops++; size += operation.estimateSize(); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index e8437b614e0b6..802576922f90e 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -83,7 +83,7 @@ public RecoveryResponse recoverToTarget() throws IOException { } @Override - protected int sendSnapshot(final Translog.Snapshot snapshot) { + protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) { logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}", shard.shardId(), request.targetNode()); return 0; } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index ee01aee64bd4a..2f565dbf56a9f 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -36,6 +36,7 @@ import java.util.concurrent.Future; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestCase { @@ -68,6 +69,7 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { shards.flush(); shards.getPrimary().updateGlobalCheckpointOnPrimary(); final IndexShard originalReplica = shards.getReplicas().get(0); + long replicaCommittedLocalCheckpoint = docs - 1; boolean replicaHasDocsSinceLastFlushedCheckpoint = false; for (int i = 0; i < randomInt(2); i++) { final int indexedDocs = shards.indexDocs(randomInt(5)); @@ -79,14 +81,13 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { final boolean flush = randomBoolean(); if (flush) { originalReplica.flush(new FlushRequest()); + replicaHasDocsSinceLastFlushedCheckpoint = false; + replicaCommittedLocalCheckpoint = docs - 1; } final boolean sync = randomBoolean(); if (sync) { shards.getPrimary().updateGlobalCheckpointOnPrimary(); - if (flush) { - replicaHasDocsSinceLastFlushedCheckpoint = false; - } } } @@ -112,6 +113,9 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty())); } else { assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty()); + assertThat( + recoveredReplica.recoveryState().getTranslog().recoveredOperations(), + equalTo(Math.toIntExact(docs - (replicaCommittedLocalCheckpoint + 1)))); } docs += shards.indexDocs(randomInt(5));