Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce sequence-number-based recovery #22484

Merged
merged 46 commits into from
Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
404b8dc
Introduce sequence number-based recovery
jasontedor Jan 3, 2017
d360a23
Simplify sequence number-based recovery
jasontedor Jan 6, 2017
6ec0ef6
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 8, 2017
b9200cf
Remove obsolete field from RecoverySourceHandler
jasontedor Jan 8, 2017
91e1ff0
Handle translog missing while preparing recovery
jasontedor Jan 8, 2017
1c14260
Skip adding operations without sequence number
jasontedor Jan 8, 2017
8b0e501
Revert adding no-ops on version confict in replica
jasontedor Jan 15, 2017
dac513a
Revert whitespace change in MultiSnapshot.java
jasontedor Jan 15, 2017
81a1e1c
Bubble up translog I/O exceptions during recovery
jasontedor Jan 15, 2017
8960522
Add assertion on number of recovered ops
jasontedor Jan 16, 2017
d71aa16
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 16, 2017
b6e6cc3
Restore logger specific to GlobalCheckpointTracker
jasontedor Jan 16, 2017
ab18bde
Use readLong/writeLong for serializing seq. no.
jasontedor Jan 16, 2017
cea70f4
Remove unneeded assertion disjunction in engine
jasontedor Jan 16, 2017
34fbb37
Explicitly prepare shard for peer recovery
jasontedor Jan 16, 2017
1929c03
Load sequence number statistics from the store
jasontedor Jan 16, 2017
c0169c2
Rename replication test case method
jasontedor Jan 16, 2017
999ca91
Iteration
jasontedor Jan 17, 2017
7281b75
Add in-flight ops recovery test
jasontedor Jan 18, 2017
320301c
Add assertion on recoveries targeting old replicas
jasontedor Jan 18, 2017
50d3191
Add defensive assertion on starting seq. no.
jasontedor Jan 18, 2017
2596dcd
Store#loadSeqNoStats iteration
jasontedor Jan 18, 2017
cc2002c
Simplify preparing start recovery request
jasontedor Jan 19, 2017
1e59f84
Only mark assigned sequence numbers as completed
jasontedor Jan 19, 2017
cc30114
Iteration
jasontedor Jan 19, 2017
781f276
Add comment regarding starting seq. no.
jasontedor Jan 19, 2017
f64b26f
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 20, 2017
3a70bd7
Add missing license header
jasontedor Jan 20, 2017
88fc801
Fix start recovery request serialization test
jasontedor Jan 20, 2017
6a0b70e
Revert start recovery request assertion
jasontedor Jan 20, 2017
4e35141
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 23, 2017
3571036
Add note about snapshot restore impact on recovery
jasontedor Jan 23, 2017
da94b28
Rename EvilRecoveryIT to EvilPeerRecoveryIT
jasontedor Jan 23, 2017
e801c5b
Use bulk thread pool size for number of docs
jasontedor Jan 23, 2017
a4cf33e
Remove extraneous blank line
jasontedor Jan 23, 2017
c16295a
Temporarily enable trace logging on test
jasontedor Jan 23, 2017
32c6702
More trace logging for test
jasontedor Jan 24, 2017
76f8807
Fix shard ID in logging statement
jasontedor Jan 24, 2017
adafa21
Fix RFGIT#testReusePeerRecovery test bug
jasontedor Jan 25, 2017
270a68a
Cleanup
jasontedor Jan 25, 2017
2e67a0b
Revert "Fix RFGIT#testReusePeerRecovery test bug"
jasontedor Jan 26, 2017
06a3785
Rewrite reuse peer recovery test
jasontedor Jan 26, 2017
137ffb4
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 26, 2017
eeaa4f9
Remove unused imports
jasontedor Jan 26, 2017
62aabb0
Cleanup test
jasontedor Jan 26, 2017
97e0b20
More cleanup
jasontedor Jan 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoverySettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]PeerRecoverySourceService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]StartRecoveryRequest.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStore.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]TransportNodesListShardStoreMetaData.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]ttl[/\\]IndicesTTLService.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -113,7 +113,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.ALLOW_UNMAPPED,
IndexSettings.INDEX_CHECK_ON_STARTUP,
IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL,
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE,
IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
IndexSettings.MAX_SLICES_PER_SCROLL,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a typo here

package org.elasticsearch.common.util.concurrent;

import org.apache.lucene.store.AlreadyClosedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public Operation.TYPE getOperationType() {

void setTranslogLocation(Translog.Location translogLocation) {
if (freeze.get() == null) {
assert failure == null : "failure has to be null to set translog location";
assert failure == null || translogLocation == null: "failure has to be null to set translog location";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering - was this required for this PR or is it preparation for the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will no longer be necessary after #22626.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 8b0e501.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I integrated master into this branch after #22626 in d71aa16.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also pushed cea70f4.

this.translogLocation = translogLocation;
} else {
throw new IllegalStateException("result is already frozen");
Expand All @@ -379,6 +379,7 @@ void setTook(long took) {
void freeze() {
freeze.set(true);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

}

public static class IndexResult extends Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -119,8 +120,6 @@ public class InternalEngine extends Engine {
private final IndexThrottle throttle;

private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -365,7 +364,7 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
final TranslogConfig translogConfig,
final IndexWriter indexWriter) throws IOException {
long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
return loadSeqNoStatsFromLucene(globalCheckpoint, indexWriter);
}

Expand All @@ -378,20 +377,7 @@ private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
* @return the sequence number stats
*/
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
maxSeqNo = Long.parseLong(entry.getValue());
}
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, indexWriter.getLiveCommitData());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this all worth it? I wonder if we should just load from store in the constructor and save on this method:

                switch (openMode) {
                    case OPEN_INDEX_AND_TRANSLOG:
                        seqNoStats = store.loadSeqNoStats(Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()));
                        writer = createWriter(false);
                        break;
                    case OPEN_INDEX_CREATE_TRANSLOG:
                        seqNoStats = store.loadSeqNoStats(SequenceNumbersService.UNASSIGNED_SEQ_NO);
                        writer = createWriter(false);
                        break;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; I pushed 1929c03 (I think that this will also fit better with future developments).

}

private SearcherManager createSearcherManager() throws EngineException {
Expand Down Expand Up @@ -684,13 +670,20 @@ private IndexResult innerIndex(Index index) throws IOException {
final IndexResult indexResult;
if (checkVersionConflictResult.isPresent()) {
indexResult = checkVersionConflictResult.get();
// norelease: this is not correct as this does not force an fsync, and we need to handle failures including replication
if (indexResult.hasFailure() || seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed another solution for this problem in another channel. The gist was to change the way we deal with version conflicts on replicas. The idea was to try do it as another first and then base this PR on it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #22626 for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 8b0e501 to revert the change in preparation for merging master in after #22626 lands there.

location = null;
} else {
final Translog.NoOp operation = new Translog.NoOp(seqNo, index.primaryTerm(), "version conflict during recovery");
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null;
}
} else {
// no version conflict
if (index.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService().generateSeqNo();
}

/**
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a Javadoc-style comment inside a method where it has no impact on javadoc. While javac will treat it as a block comment either way, my IDE formats it as a Javadoc-style comment instead of as a block comment and it annoys me.

* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
Expand All @@ -707,12 +700,11 @@ private IndexResult innerIndex(Index index) throws IOException {
update(index.uid(), index.docs(), indexWriter);
}
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Index(index, indexResult))
: null;
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
indexResult.setTranslogLocation(location);
final Translog.Index operation = new Translog.Index(index, indexResult);
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null;
}
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
Expand Down Expand Up @@ -816,21 +808,26 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
final DeleteResult deleteResult;
if (result.isPresent()) {
deleteResult = result.get();
// norelease: this is not correct as this does not force an fsync, and we need to handle failures including replication
if (deleteResult.hasFailure() || seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
location = null;
} else {
final Translog.NoOp operation = new Translog.NoOp(seqNo, delete.primaryTerm(), "version conflict during recovery");
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null;
}
} else {
if (delete.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService().generateSeqNo();
}

updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Delete(delete, deleteResult))
: null;
versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
deleteResult.setTranslogLocation(location);
final Translog.Delete operation = new Translog.Delete(delete, deleteResult);
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null;
}
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
return deleteResult;
Expand Down Expand Up @@ -1552,11 +1549,11 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;

import java.util.HashSet;
import java.util.Locale;
import java.util.Set;

import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;

/**
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which
* all lower (or equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the
* master starts them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery.
* These shards have received all old operations via the recovery mechanism and are kept up to date by the various replications actions.
* The set of shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
* equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
* them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
* have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
* shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointService extends AbstractIndexShardComponent {
public class GlobalCheckpointTracker {

/*
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
Expand All @@ -62,20 +62,22 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
*/
private long globalCheckpoint;

private final Logger logger;

/**
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint for this
* shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard this service is tracking local checkpoints for
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* @param logger a component logger
*/
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
GlobalCheckpointTracker(final IndexSettings indexSettings, final long globalCheckpoint, final Logger logger) {
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
assignedAllocationIds = new HashSet<>(1 + indexSettings.getNumberOfReplicas());
this.globalCheckpoint = globalCheckpoint;
this.logger = logger;
}

/**
Expand Down Expand Up @@ -127,8 +129,9 @@ synchronized boolean updateCheckpointOnPrimary() {
minCheckpoint = Math.min(cp.value, minCheckpoint);
}
if (minCheckpoint < globalCheckpoint) {
throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint
+ "] is lower than previous one [" + globalCheckpoint + "]");
final String message =
String.format(Locale.ROOT, "new global checkpoint [%d] is lower than previous one [%d]", minCheckpoint, globalCheckpoint);
throw new IllegalStateException(message);
}
if (globalCheckpoint != minCheckpoint) {
logger.trace("global checkpoint updated to [{}]", minCheckpoint);
Expand Down
Loading