From 5bec52268bc609178538ec4e02c785db35fa0ced Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 27 Aug 2018 11:39:56 -0400 Subject: [PATCH] File-based with soft-deletes should send ops after checkpoint Today a file-based recovery will replay all existing translog operations from the primary on a replica so that that replica can have a full history in translog as the primary. However, with soft-deletes enabled, we should not do it because: 1. All operations before the local checkpoint of the safe commit exist in the commit already. 2. The number of operations before the local checkpoint may be considerable and requires a significant amount of time to replay on a replica. Relates #30522 Relates #29530 --- .../indices/recovery/RecoverySourceHandler.java | 9 +++++---- .../RecoveryDuringReplicationTests.java | 16 +++++++--------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 3ed53f6c3e118..10f796e5e1551 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -162,12 +162,13 @@ public RecoveryResponse recoverToTarget() throws IOException { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // we set this to 0 to create a translog roughly according to the retention policy - // on the target. Note that it will still filter out legacy operations with no sequence numbers - startingSeqNo = 0; //TODO: A follow-up to send only ops above the local checkpoint if soft-deletes enabled. - // but we must have everything above the local checkpoint in the commit + // We must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; + // If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have + // the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly + // according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo. + startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0; try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index ba2345af4f70b..28122665e9bb6 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -219,8 +219,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE") public void testRecoveryAfterPrimaryPromotion() throws Exception { - Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); - try (ReplicationGroup shards = createGroup(2, settings)) { + try (ReplicationGroup shards = createGroup(2)) { shards.startAll(); int totalDocs = shards.indexDocs(randomInt(10)); int committedDocs = 0; @@ -232,7 +231,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { final IndexShard oldPrimary = shards.getPrimary(); final IndexShard newPrimary = shards.getReplicas().get(0); final IndexShard replica = shards.getReplicas().get(1); - boolean softDeleteEnabled = replica.indexSettings().isSoftDeleteEnabled(); if (randomBoolean()) { // simulate docs that were inflight when primary failed, these will be rolled back final int rollbackDocs = randomIntBetween(1, 5); @@ -280,12 +278,13 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo())); }); newPrimary.flush(new FlushRequest().force(true)); - uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); - totalDocs += uncommittedOpsOnPrimary; - // we need an extra flush or refresh to advance the min_retained_seqno on the new primary so that ops-based won't happen - if (softDeleteEnabled) { + if (replica.indexSettings().isSoftDeleteEnabled()) { + // We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen. + // The min_retained_seqno only advances when a merge asks for the retention query. newPrimary.flush(new FlushRequest().force(true)); } + uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); + totalDocs += uncommittedOpsOnPrimary; } if (randomBoolean()) { @@ -305,8 +304,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); } else { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - int expectOps = softDeleteEnabled ? totalDocs : uncommittedOpsOnPrimary; - assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectOps)); + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); } // roll back the extra ops in the replica