Skip to content

Commit

Permalink
Truncate tlog cli should assign global checkpoint (#28192)
Browse files Browse the repository at this point in the history
We are targeting to always have a safe index once the recovery is done. 
This invariant does not hold if the translog is manually truncated by 
users because the truncate translog cli resets the global checkpoint to
unassigned. This commit assigns the global checkpoint to the max_seqno
of the last commit when truncating translog. We can only safely do it
because the truncate translog command will generate a new history uuid
for that shard. With a new history UUID, sequence-based recovery between
that shard and other old shards will be disabled.

Relates #28181
  • Loading branch information
dnhatn authored Jan 13, 2018
1 parent a15ba75 commit f2db2a0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,19 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
}

// Retrieve the generation and UUID from the existing data
commitData = commits.get(commits.size() - 1).getUserData();
commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData());
String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint;
// In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit.
// We can only safely do it because we will generate a new history uuid this shard.
if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) {
globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO));
// Also advances the local checkpoint of the last commit to its max_seqno.
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint));
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (translogGeneration == null || translogUUID == null) {
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
translogGeneration, translogUUID);
Expand All @@ -153,7 +163,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
// Write empty checkpoint and translog to empty files
long gen = Long.parseLong(translogGeneration);
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);

terminal.println("Removing existing translog files");
IOUtils.rm(translogFiles.toArray(new Path[]{}));
Expand Down Expand Up @@ -190,9 +200,9 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
}

/** Write a checkpoint file to the given location with the given generation */
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
SequenceNumbers.UNASSIGNED_SEQ_NO, translogGeneration);
globalCheckpoint, translogGeneration);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
// fsync with metadata here to make sure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
Expand All @@ -48,6 +49,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -74,6 +76,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
Expand Down Expand Up @@ -214,6 +217,10 @@ public void testCorruptTranslogTruncation() throws Exception {
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
}

public void testCorruptTranslogTruncationOfReplica() throws Exception {
Expand Down Expand Up @@ -316,6 +323,10 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
// the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
}

private Set<Path> getTranslogDirs(String indexName) throws IOException {
Expand Down Expand Up @@ -360,4 +371,10 @@ private static void disableTranslogFlush(String index) {
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}

private SeqNoStats getSeqNoStats(String index, int shardId) {
final ShardStats[] shardStats = client().admin().indices()
.prepareStats(index).get()
.getIndices().get(index).getShards();
return shardStats[shardId].getSeqNoStats();
}
}

0 comments on commit f2db2a0

Please sign in to comment.