Skip to content

Commit

Permalink
Add support for peer recoveries using snapshots after primary failove…
Browse files Browse the repository at this point in the history
…rs (#77420)

This commit adds support for peer recoveries using snapshots after
a primary failover if the snapshot shares the same logical contents
but the physical files are different. It uses the seq no information
stored in the snapshot to compare against the current shard source
node seq nos and decide whether or not it can use the snapshot to
recover the shard. Since the underlying index files are different
to the source index files, error handling is different than when
the files are shared. In this case, if there's an error while
snapshots files are recovered, we have to cancel the on-going
downloads, wait until all in-flight operations complete, remove
the recovered files and start from scratch using a fallback
recovery plan that uses the files from the source node.

Relates #73496
  • Loading branch information
fcofdez authored Oct 14, 2021
1 parent f81fa5e commit b50b81e
Show file tree
Hide file tree
Showing 28 changed files with 1,103 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.indices.recovery;

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
Expand All @@ -19,21 +20,27 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
Expand Down Expand Up @@ -409,7 +416,7 @@ public void testPeerRecoveryTriesToUseMostOfTheDataFromAnAvailableSnapshot() thr
assertDocumentsAreEqual(indexName, numDocs + docsIndexedAfterSnapshot);
}

public void testPeerRecoveryDoNotUseSnapshotsWhenSegmentsAreNotShared() throws Exception {
public void testPeerRecoveryDoNotUseSnapshotsWhenSegmentsAreNotSharedAndSeqNosAreDifferent() throws Exception {
String sourceNode = internalCluster().startDataOnlyNode();
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName,
Expand Down Expand Up @@ -455,26 +462,54 @@ public void testRecoveryIsCancelledAfterDeletingTheIndex() throws Exception {
updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), "1");

try {
String sourceNode = internalCluster().startDataOnlyNode();
boolean seqNoRecovery = randomBoolean();
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put("index.routing.allocation.require._name", sourceNode)
.build()
);
final Settings.Builder indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s");

final List<String> dataNodes;
if (seqNoRecovery) {
dataNodes = internalCluster().startDataOnlyNodes(3);
indexSettings.put("index.routing.allocation.include._name", String.join(",", dataNodes))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1);
} else {
dataNodes = internalCluster().startDataOnlyNodes(1);
indexSettings.put("index.routing.allocation.require._name", dataNodes.get(0))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);
}
createIndex(indexName, indexSettings.build());

int numDocs = randomIntBetween(300, 1000);
indexDocs(indexName, numDocs, numDocs);
if (seqNoRecovery) {
// Flush to ensure that index_commit_seq_nos(replica) == index_commit_seq_nos(primary),
// since the primary flushes the index before taking the snapshot.
flush(indexName);
}

String repoName = "repo";
createRepo(repoName, "fs");
createSnapshot(repoName, "snap", Collections.singletonList(indexName));

String targetNode = internalCluster().startDataOnlyNode();
final String targetNode;
if (seqNoRecovery) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(indexName).shard(0);
String primaryNodeName = clusterState.nodes().resolveNode(shardRoutingTable.primaryShard().currentNodeId()).getName();
String replicaNodeName =
clusterState.nodes().resolveNode(shardRoutingTable.replicaShards().get(0).currentNodeId()).getName();

targetNode = dataNodes.stream()
.filter(nodeName -> nodeName.equals(primaryNodeName) == false && nodeName.equals(replicaNodeName) == false)
.findFirst()
.get();

} else {
targetNode = internalCluster().startDataOnlyNode();
}

MockTransportService targetMockTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, targetNode);

Expand All @@ -490,11 +525,19 @@ public void testRecoveryIsCancelledAfterDeletingTheIndex() throws Exception {
}
);

assertAcked(
client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", targetNode)).get()
);
if (seqNoRecovery) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(indexName).shard(0);
String primaryNodeName = clusterState.nodes().resolveNode(shardRoutingTable.primaryShard().currentNodeId()).getName();

assertThat(internalCluster().stopNode(primaryNodeName), is(equalTo(true)));
} else {
assertAcked(
client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", targetNode)).get()
);
}

recoverSnapshotFileRequestReceived.await();

Expand Down Expand Up @@ -722,6 +765,78 @@ public void testRecoveryConcurrentlyWithIndexing() throws Exception {
assertDocumentsAreEqual(indexName, numDocs.get());
}

public void testSeqNoBasedRecoveryIsUsedAfterPrimaryFailOver() throws Exception {
List<String> dataNodes = internalCluster().startDataOnlyNodes(3);
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
.build()
);

int numDocs = randomIntBetween(300, 1000);
indexDocs(indexName, 0, numDocs);
// Flush to ensure that index_commit_seq_nos(replica) == index_commit_seq_nos(primary),
// since the primary flushes the index before taking the snapshot.
flush(indexName);

String repoType = randomFrom(TestRepositoryPlugin.FAULTY_TYPE, TestRepositoryPlugin.INSTRUMENTED_TYPE, "fs");
String repoName = "repo";
createRepo(repoName, repoType);
createSnapshot(repoName, "snap", Collections.singletonList(indexName));

ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String primaryNodeId = clusterState.routingTable().index(indexName).shard(0).primaryShard().currentNodeId();
String primaryNodeName = clusterState.nodes().resolveNode(primaryNodeId).getName();

Store.MetadataSnapshot primaryMetadataSnapshot = getMetadataSnapshot(primaryNodeName, indexName);

assertThat(internalCluster().stopNode(primaryNodeName), is(equalTo(true)));

ensureGreen(indexName);

ClusterState clusterStateAfterPrimaryFailOver = client().admin().cluster().prepareState().get().getState();
IndexShardRoutingTable shardRoutingTableAfterFailOver =
clusterStateAfterPrimaryFailOver.routingTable().index(indexName).shard(0);

String primaryNodeIdAfterFailOver = shardRoutingTableAfterFailOver.primaryShard().currentNodeId();
String primaryNodeNameAfterFailOver = clusterStateAfterPrimaryFailOver.nodes().resolveNode(primaryNodeIdAfterFailOver).getName();

String replicaNodeIdAfterFailOver = shardRoutingTableAfterFailOver.replicaShards().get(0).currentNodeId();
String replicaNodeNameAfterFailOver = clusterStateAfterPrimaryFailOver.nodes().resolveNode(replicaNodeIdAfterFailOver).getName();

RecoveryState recoveryState = getLatestPeerRecoveryStateForShard(indexName, 0);
assertPeerRecoveryWasSuccessful(recoveryState, primaryNodeNameAfterFailOver, replicaNodeNameAfterFailOver);
assertDocumentsAreEqual(indexName, numDocs);

if (repoType.equals(TestRepositoryPlugin.FAULTY_TYPE) == false) {
for (RecoveryState.FileDetail fileDetail : recoveryState.getIndex().fileDetails()) {
assertThat(fileDetail.recoveredFromSnapshot(), is(equalTo(fileDetail.length())));
}

Store.MetadataSnapshot replicaAfterFailoverMetadataSnapshot =
getMetadataSnapshot(replicaNodeNameAfterFailOver, indexName);
Store.RecoveryDiff recoveryDiff = primaryMetadataSnapshot.recoveryDiff(replicaAfterFailoverMetadataSnapshot);
assertThat(recoveryDiff.identical, is(not(empty())));
}
}

private Store.MetadataSnapshot getMetadataSnapshot(String nodeName, String indexName) throws IOException {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
IndexService indexService = indicesService.indexService(clusterState.metadata().index(indexName).getIndex());
IndexShard shard = indexService.getShard(0);
try(Engine.IndexCommitRef indexCommitRef = shard.acquireSafeIndexCommit()) {
IndexCommit safeCommit = indexCommitRef.getIndexCommit();
assertThat(safeCommit, is(notNullValue()));
return shard.store().getMetadata(safeCommit);
}
}

private long getSnapshotSizeForIndex(String repository, String snapshot, String index) {
GetSnapshotsResponse getSnapshotsResponse =
client().admin().cluster().prepareGetSnapshots(repository).addSnapshots(snapshot).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.indices.recovery.plan;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -53,6 +54,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class ShardSnapshotsServiceIT extends ESIntegTestCase {
@Override
Expand Down Expand Up @@ -184,6 +186,12 @@ public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));

assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));
Version commitVersion = shardSnapshotData.getCommitVersion();
assertThat(commitVersion, is(notNullValue()));
assertThat(commitVersion, is(equalTo(Version.CURRENT)));
final org.apache.lucene.util.Version commitLuceneVersion = shardSnapshotData.getCommitLuceneVersion();
assertThat(commitLuceneVersion, is(notNullValue()));
assertThat(commitLuceneVersion, is(equalTo(Version.CURRENT.luceneVersion)));

assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public abstract class Engine implements Closeable {
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
// Field name that stores the Elasticsearch version in Lucene commit user data, representing
// the version that was used to write the commit (and thus a max version for the underlying segments).
public static final String ES_VERSION = "es_version";
public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum?
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match";
protected static final String DOC_STATS_SOURCE = "doc_stats";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -2363,7 +2364,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(7);
final Map<String, String> commitData = new HashMap<>(8);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
Expand All @@ -2374,6 +2375,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
}
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
commitData.put(ES_VERSION, Version.CURRENT.toString());
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -1718,7 +1719,8 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
final SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(store.directory());
final Map<String, String> userData = segmentCommitInfos.getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
Expand All @@ -1727,6 +1729,12 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
assert userData.containsKey(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
"opening index which was created post 5.5.0 but " + Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID
+ " is not found in commit";
final org.apache.lucene.util.Version commitLuceneVersion = segmentCommitInfos.getCommitLuceneVersion();
// This relies in the previous minor having another lucene version
assert commitLuceneVersion.onOrAfter(RecoverySettings.SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION.luceneVersion) == false ||
userData.containsKey(Engine.ES_VERSION) && Version.fromString(userData.get(Engine.ES_VERSION)).onOrBefore(Version.CURRENT) :
"commit point has an invalid ES_VERSION value. commit point lucene version [" + commitLuceneVersion + "]," +
" ES_VERSION [" + userData.get(Engine.ES_VERSION) + "]";
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand;
Expand Down Expand Up @@ -391,6 +392,10 @@ protected void addNewHistoryCommit(Directory indexDirectory, Terminal terminal,

// commit the new history id
userData.put(Engine.HISTORY_UUID_KEY, historyUUID);
final String commitESVersion = userData.get(Engine.ES_VERSION);
if (commitESVersion == null || Version.fromString(commitESVersion).onOrBefore(Version.CURRENT)) {
userData.put(Engine.ES_VERSION, Version.CURRENT.toString());
}

indexWriter.setLiveCommitData(userData.entrySet());
indexWriter.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -170,6 +171,7 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
liveCommitData.put(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
liveCommitData.put(Engine.ES_VERSION, Version.CURRENT.toString());
return liveCommitData.entrySet().iterator();
});
writer.commit();
Expand Down
Loading

0 comments on commit b50b81e

Please sign in to comment.