Skip to content

Commit

Permalink
Truncate tlog cli should assign global checkpoint
Browse files Browse the repository at this point in the history
We are targeting to always have a safe index once the recovery is done.
This invariant does not hold if the translog is manually truncated by
users because the truncate translog cli resets the global checkpoint to
unassigned. This commit assigns the max_seqno of the last commit to the
global checkpoint when truncating translog.

Relates elastic#28181
  • Loading branch information
dnhatn committed Jan 12, 2018
1 parent 99f88f1 commit 3c99ab3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
commitData = commits.get(commits.size() - 1).getUserData();
String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint;
// In order to have a safe commit invariant, we have to assign max_seqno of the last commit to the global checkpoint.
if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) {
globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO));
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (translogGeneration == null || translogUUID == null) {
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
translogGeneration, translogUUID);
Expand All @@ -153,7 +160,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
// Write empty checkpoint and translog to empty files
long gen = Long.parseLong(translogGeneration);
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);

terminal.println("Removing existing translog files");
IOUtils.rm(translogFiles.toArray(new Path[]{}));
Expand Down Expand Up @@ -190,9 +197,9 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
}

/** Write a checkpoint file to the given location with the given generation */
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
SequenceNumbers.UNASSIGNED_SEQ_NO, translogGeneration);
globalCheckpoint, translogGeneration);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
// fsync with metadata here to make sure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
Expand All @@ -48,6 +49,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -73,7 +75,9 @@
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
Expand Down Expand Up @@ -146,6 +150,7 @@ public void testCorruptTranslogTruncation() throws Exception {
replica.flush(new FlushRequest());
logger.info("--> performed extra flushing on replica");
}
final SeqNoStats oldSeqNoStats = getSeqNoStats("test", 0);

// shut down the replica node to be tested later
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
Expand Down Expand Up @@ -214,6 +219,9 @@ public void testCorruptTranslogTruncation() throws Exception {
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
// Ensure that the global checkpoint is restored from the max seqno of the last commit.
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
assertThat(seqNoStats.getGlobalCheckpoint(), allOf(equalTo(seqNoStats.getMaxSeqNo()), equalTo(oldSeqNoStats.getMaxSeqNo())));
}

public void testCorruptTranslogTruncationOfReplica() throws Exception {
Expand Down Expand Up @@ -261,6 +269,7 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
final ShardId shardId = new ShardId(resolveIndex("test"), 0);
Set<Path> translogDirs = getTranslogDirs(replicaNode, shardId);

final SeqNoStats oldSeqNoStats = getSeqNoStats("test", 0);
// stop the cluster nodes. we don't use full restart so the node start up order will be the same
// and shard roles will be maintained
internalCluster().stopRandomDataNode();
Expand Down Expand Up @@ -316,6 +325,9 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
// the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
// Ensure that the global checkpoint is restored from the max seqno of the last commit.
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
assertThat(seqNoStats.getGlobalCheckpoint(), allOf(equalTo(seqNoStats.getMaxSeqNo()), equalTo(oldSeqNoStats.getMaxSeqNo())));
}

private Set<Path> getTranslogDirs(String indexName) throws IOException {
Expand Down Expand Up @@ -360,4 +372,10 @@ private static void disableTranslogFlush(String index) {
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}

private SeqNoStats getSeqNoStats(String index, int shardId) {
final ShardStats[] shardStats = client().admin().indices()
.prepareStats(index).get()
.getIndices().get(index).getShards();
return shardStats[shardId].getSeqNoStats();
}
}

0 comments on commit 3c99ab3

Please sign in to comment.