From 6181081bac5068405add82e21ccd00f676d11533 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 28 Aug 2018 12:32:09 -0400 Subject: [PATCH] Send only ops after checkpoint in file-based recovery with soft-deletes (#33190) Today a file-based recovery will replay all existing translog operations from the primary on a replica so that that replica can have a full history in translog as the primary. However, with soft-deletes enabled, we should not do it because: 1. All operations before the local checkpoint of the safe commit exist in the commit already. 2. The number of operations before the local checkpoint may be considerable and requires a significant amount of time to replay on a replica. Relates #30522 Relates #29530 --- .../recovery/RecoverySourceHandler.java | 9 +- .../gateway/RecoveryFromGatewayIT.java | 4 +- .../RecoveryDuringReplicationTests.java | 16 ++-- .../indices/recovery/RecoveryTests.java | 90 +++++++++++++++---- 4 files changed, 85 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 3ed53f6c3e118..10f796e5e1551 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -162,12 +162,13 @@ public RecoveryResponse recoverToTarget() throws IOException { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // we set this to 0 to create a translog roughly according to the retention policy - // on the target. Note that it will still filter out legacy operations with no sequence numbers - startingSeqNo = 0; //TODO: A follow-up to send only ops above the local checkpoint if soft-deletes enabled. - // but we must have everything above the local checkpoint in the commit + // We must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; + // If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have + // the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly + // according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo. + startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0; try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 861ec78d21f39..84cc1390e907f 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -481,9 +481,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused)); assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered)); assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0)); - // both cases will be zero once we start sending only ops after local checkpoint of the safe commit - int expectedTranslogOps = softDeleteEnabled ? numDocs + moreDocs : 0; - assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(expectedTranslogOps)); + assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0)); } } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 666e40978f367..6ca1b6bfb68af 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -220,8 +220,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE") public void testRecoveryAfterPrimaryPromotion() throws Exception { - Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); - try (ReplicationGroup shards = createGroup(2, settings)) { + try (ReplicationGroup shards = createGroup(2)) { shards.startAll(); int totalDocs = shards.indexDocs(randomInt(10)); int committedDocs = 0; @@ -233,7 +232,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { final IndexShard oldPrimary = shards.getPrimary(); final IndexShard newPrimary = shards.getReplicas().get(0); final IndexShard replica = shards.getReplicas().get(1); - boolean softDeleteEnabled = replica.indexSettings().isSoftDeleteEnabled(); if (randomBoolean()) { // simulate docs that were inflight when primary failed, these will be rolled back final int rollbackDocs = randomIntBetween(1, 5); @@ -281,12 +279,13 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo())); }); newPrimary.flush(new FlushRequest().force(true)); - uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); - totalDocs += uncommittedOpsOnPrimary; - // we need an extra flush or refresh to advance the min_retained_seqno on the new primary so that ops-based won't happen - if (softDeleteEnabled) { + if (replica.indexSettings().isSoftDeleteEnabled()) { + // We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen. + // The min_retained_seqno only advances when a merge asks for the retention query. newPrimary.flush(new FlushRequest().force(true)); } + uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); + totalDocs += uncommittedOpsOnPrimary; } if (randomBoolean()) { @@ -306,8 +305,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); } else { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - int expectOps = softDeleteEnabled ? totalDocs : uncommittedOpsOnPrimary; - assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectOps)); + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); } // roll back the extra ops in the replica diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 5a267e1f34895..9f7e50c03e380 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -64,13 +64,13 @@ public void testTranslogHistoryTransferred() throws Exception { int docs = shards.indexDocs(10); getTranslog(shards.getPrimary()).rollGeneration(); shards.flush(); - if (randomBoolean()) { - docs += shards.indexDocs(10); - } + int moreDocs = shards.indexDocs(randomInt(10)); shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - assertThat(getTranslog(replica).totalOperations(), equalTo(docs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs)); + shards.assertAllEqual(docs + moreDocs); } } @@ -107,7 +107,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { } } - public void testRecoveryWithOutOfOrderDelete() throws Exception { + public void testRecoveryWithOutOfOrderDeleteWithTranslog() throws Exception { /* * The flow of this test: * - delete #1 @@ -117,12 +117,9 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { * - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained) * - index #2 * - index #5 - * - If flush and the translog/lucene retention disabled, delete #1 will be removed while index #0 is still retained and replayed. + * - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed. */ - Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10) - // If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted - // index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0 - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build(); + Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build(); try (ReplicationGroup shards = createGroup(1, settings)) { shards.startAll(); // create out of order delete and index op on replica @@ -131,7 +128,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { // delete #1 orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL); - orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment + getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation // index #0 orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON)); @@ -145,23 +142,22 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON)); orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); // index #5 -> force NoOp #4. - orgReplica.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL,IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON)); final int translogOps; if (randomBoolean()) { if (randomBoolean()) { - logger.info("--> flushing shard (translog/soft-deletes will be trimmed)"); + logger.info("--> flushing shard (translog will be trimmed)"); IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData()); builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings()) .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0)); + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")); orgReplica.indexSettings().updateIndexMetaData(builder.build()); orgReplica.onSettingsChanged(); translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed). } else { - logger.info("--> flushing shard (translog/soft-deletes will be retained)"); + logger.info("--> flushing shard (translog will be retained)"); translogOps = 6; // 5 ops + seqno gaps } flushShard(orgReplica); @@ -180,6 +176,62 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { } } + public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10) + // If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted + // index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0 + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build(); + try (ReplicationGroup shards = createGroup(1, settings)) { + shards.startAll(); + // create out of order delete and index op on replica + final IndexShard orgReplica = shards.getReplicas().get(0); + final String indexName = orgReplica.shardId().getIndexName(); + + // delete #1 + orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL); + orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment + // index #0 + orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON)); + // index #3 + orgReplica.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON)); + // Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1. + orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true)); + // index #2 + orgReplica.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON)); + orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); + // index #5 -> force NoOp #4. + orgReplica.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON)); + + if (randomBoolean()) { + if (randomBoolean()) { + logger.info("--> flushing shard (translog/soft-deletes will be trimmed)"); + IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData()); + builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0)); + orgReplica.indexSettings().updateIndexMetaData(builder.build()); + orgReplica.onSettingsChanged(); + } + flushShard(orgReplica); + } + + final IndexShard orgPrimary = shards.getPrimary(); + shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed. + + IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); + shards.recoverReplica(newReplica); + shards.assertAllEqual(3); + try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", 0)) { + assertThat(snapshot, SnapshotMatchers.size(6)); + } + } + } + public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { try (ReplicationGroup shards = createGroup(1)) { shards.startAll(); @@ -228,7 +280,8 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -332,7 +385,8 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); shards.assertAllEqual(numDocs); } }