diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index d9b77f841ed09..222e3e13d65e1 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -132,9 +132,19 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th } // Retrieve the generation and UUID from the existing data - commitData = commits.get(commits.size() - 1).getUserData(); + commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData()); String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint; + // In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit. + // We can only safely do it because we will generate a new history uuid this shard. + if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) { + globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)); + // Also advances the local checkpoint of the last commit to its max_seqno. + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint)); + } else { + globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; + } if (translogGeneration == null || translogUUID == null) { throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", translogGeneration, translogUUID); @@ -153,7 +163,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th // Write empty checkpoint and translog to empty files long gen = Long.parseLong(translogGeneration); int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); - writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen); + writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint); terminal.println("Removing existing translog files"); IOUtils.rm(translogFiles.toArray(new Path[]{})); @@ -190,9 +200,9 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th } /** Write a checkpoint file to the given location with the given generation */ - public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException { + static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException { Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, - SequenceNumbers.UNASSIGNED_SEQ_NO, translogGeneration); + globalCheckpoint, translogGeneration); Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); // fsync with metadata here to make sure. diff --git a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index d98359cdd06a0..029ed50fb2851 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -48,6 +49,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -74,6 +76,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) @@ -214,6 +217,10 @@ public void testCorruptTranslogTruncation() throws Exception { final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); + // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit. + final SeqNoStats seqNoStats = getSeqNoStats("test", 0); + assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); } public void testCorruptTranslogTruncationOfReplica() throws Exception { @@ -316,6 +323,10 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception { .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); + // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit. + final SeqNoStats seqNoStats = getSeqNoStats("test", 0); + assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); } private Set getTranslogDirs(String indexName) throws IOException { @@ -360,4 +371,10 @@ private static void disableTranslogFlush(String index) { client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); } + private SeqNoStats getSeqNoStats(String index, int shardId) { + final ShardStats[] shardStats = client().admin().indices() + .prepareStats(index).get() + .getIndices().get(index).getShards(); + return shardStats[shardId].getSeqNoStats(); + } }