Skip to content

Commit

Permalink
Replica start peer recovery with safe commit (#28181)
Browse files Browse the repository at this point in the history
Today a replica starts a peer-recovery with the last commit. If the last
commit is not a safe commit, a replica will immediately fallback to the
file based sync which is more expensive than the sequence based
recovery. This commit modifies the peer-recovery in replica to start
with a safe commit. Moreover we can keep the existing translog on the
target if the recovery is sequence based recovery.

Relates #10708
  • Loading branch information
dnhatn authored Jan 13, 2018
1 parent f2db2a0 commit 095f31b
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,11 @@ public interface Warmer {
*/
public abstract Engine recoverFromTranslog() throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,15 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

@Override
public void skipTranslogRecovery() {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode);
}
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1304,9 +1304,20 @@ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalC
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openIndexAndTranslog() throws IOException {
public void openIndexAndRecoveryFromTranslog() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
getEngine().recoverFromTranslog();
}

/**
* Opens the engine on top of the existing lucene engine and translog.
* The translog is kept but its operations won't be replayed.
*/
public void openIndexAndSkipTranslogRecovery() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
getEngine().skipTranslogRecovery();
}

private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
Expand Down Expand Up @@ -1339,13 +1350,12 @@ private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, fi
globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
"read from translog checkpoint");
}
Engine newEngine = createNewEngine(config);
createNewEngine(config);
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
newEngine.recoverFromTranslog();
}
assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
logger.debug("failed to list file details", e);
}
if (indexShouldExists) {
indexShard.openIndexAndTranslog();
indexShard.openIndexAndRecoveryFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
} else {
indexShard.createIndexAndTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -60,6 +63,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -108,8 +112,8 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool
.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
Expand Down Expand Up @@ -353,7 +357,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
Expand Down Expand Up @@ -387,7 +393,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
recoveryRef.target().prepareForTranslogOperations(request.deleteLocalTranslog(), request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,33 @@

import java.io.IOException;

public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;
private final boolean deleteLocalTranslog;

public RecoveryPrepareForTranslogOperationsRequest() {
}

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean deleteLocalTranslog) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.deleteLocalTranslog = deleteLocalTranslog;
}

RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
deleteLocalTranslog = in.readBoolean();
} else {
deleteLocalTranslog = true;
}
}

public long recoveryId() {
Expand All @@ -55,15 +69,11 @@ public int totalTranslogOps() {
return totalTranslogOps;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
/**
* Whether or not the recover target should delete its local translog
*/
boolean deleteLocalTranslog() {
return deleteLocalTranslog;
}

@Override
Expand All @@ -75,5 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(deleteLocalTranslog);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public RecoveryResponse recoverToTarget() throws IOException {

final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
if (isSequenceNumberBasedRecoveryPossible) {
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
Expand Down Expand Up @@ -188,7 +188,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));

try {
prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
Expand Down Expand Up @@ -421,13 +422,13 @@ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogO
}
}

void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(createNewTranslog, totalTranslogOps));
stopWatch.stop();

response.startTime = stopWatch.totalTime().millis() - startEngineStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,14 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */

@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
if (createNewTranslog) {
// TODO: Assigns the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
} else {
indexShard().openIndexAndSkipTranslogRecovery();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public interface RecoveryTargetHandler {

/**
* Prepares the target to receive translog operations, after all file have been copied
*
* @param totalTranslogOps total translog operations expected to be sent
* @param createNewTranslog whether or not to delete the local translog on the target
* @param totalTranslogOps total translog operations expected to be sent
*/
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException;

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}

@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
Expand Down Expand Up @@ -226,7 +228,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
final IndexShard oldPrimary = shards.getPrimary();
final IndexShard newPrimary = shards.getReplicas().get(0);
final IndexShard replica = shards.getReplicas().get(1);
boolean expectSeqNoRecovery = true;
if (randomBoolean()) {
// simulate docs that were inflight when primary failed, these will be rolled back
final int rollbackDocs = randomIntBetween(1, 5);
Expand All @@ -239,7 +240,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
}
if (randomBoolean()) {
oldPrimary.flush(new FlushRequest(index.getName()));
expectSeqNoRecovery = false;
}
}

Expand All @@ -252,9 +252,30 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
equalTo(totalDocs - 1L));

// index some more
totalDocs += shards.indexDocs(randomIntBetween(0, 5));
int moreDocs = shards.indexDocs(randomIntBetween(0, 5));
totalDocs += moreDocs;

// As a replica keeps a safe commit, the file-based recovery only happens if the required translog
// for the sequence based recovery are not fully retained and extra documents were added to the primary.
boolean expectSeqNoRecovery = (moreDocs == 0 || randomBoolean());
int uncommittedOpsOnPrimary = 0;
if (expectSeqNoRecovery == false) {
IndexMetaData.Builder builder = IndexMetaData.builder(newPrimary.indexSettings().getIndexMetaData());
builder.settings(Settings.builder().put(newPrimary.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
);
newPrimary.indexSettings().updateIndexMetaData(builder.build());
newPrimary.onSettingsChanged();
shards.syncGlobalCheckpoint();
newPrimary.flush(new FlushRequest());
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
totalDocs += uncommittedOpsOnPrimary;
}

if (randomBoolean()) {
uncommittedOpsOnPrimary = 0;
shards.syncGlobalCheckpoint();
newPrimary.flush(new FlushRequest());
}

Expand All @@ -269,7 +290,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
} else {
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs));
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
}

// roll back the extra ops in the replica
Expand Down
Loading

0 comments on commit 095f31b

Please sign in to comment.