Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replica starts peer recovery with safe commit #28181

Merged
merged 10 commits into from
Jan 13, 2018
5 changes: 5 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,11 @@ public interface Warmer {
*/
public abstract Engine recoverFromTranslog() throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,15 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

@Override
public void skipTranslogRecovery() {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode);
}
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1304,9 +1304,20 @@ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalC
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openIndexAndTranslog() throws IOException {
public void openIndexAndRecoveryFromTranslog() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
getEngine().recoverFromTranslog();
}

/**
* Opens the engine on top of the existing lucene engine and translog.
* The translog is kept but its operations won't be replayed.
*/
public void openIndexAndSkipTranslogRecovery() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
getEngine().skipTranslogRecovery();
}

private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
Expand Down Expand Up @@ -1339,13 +1350,12 @@ private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, fi
globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
"read from translog checkpoint");
}
Engine newEngine = createNewEngine(config);
createNewEngine(config);
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
newEngine.recoverFromTranslog();
}
assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
logger.debug("failed to list file details", e);
}
if (indexShouldExists) {
indexShard.openIndexAndTranslog();
indexShard.openIndexAndRecoveryFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
} else {
indexShard.createIndexAndTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -60,6 +63,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -108,8 +112,8 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool
.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
Expand Down Expand Up @@ -353,7 +357,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
Expand Down Expand Up @@ -387,7 +393,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
recoveryRef.target().prepareForTranslogOperations(request.deleteLocalTranslog(), request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,33 @@

import java.io.IOException;

public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;
private final boolean deleteLocalTranslog;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I came up with the name and I'm sorry for changing my mind. I would propose createNewTranslog. better no?


public RecoveryPrepareForTranslogOperationsRequest() {
}

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean deleteLocalTranslog) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.deleteLocalTranslog = deleteLocalTranslog;
}

RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
deleteLocalTranslog = in.readBoolean();
} else {
deleteLocalTranslog = true;
}
}

public long recoveryId() {
Expand All @@ -55,15 +69,11 @@ public int totalTranslogOps() {
return totalTranslogOps;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
/**
* Whether or not the recover target should delete its local translog
*/
boolean deleteLocalTranslog() {
return deleteLocalTranslog;
}

@Override
Expand All @@ -75,5 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(deleteLocalTranslog);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public RecoveryResponse recoverToTarget() throws IOException {

final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
if (isSequenceNumberBasedRecoveryPossible) {
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
Expand Down Expand Up @@ -188,7 +188,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));

try {
prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can roll back all these naming changes if we we keep the old message (and the boolean)

// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
Expand Down Expand Up @@ -421,13 +422,13 @@ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogO
}
}

void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(createNewTranslog, totalTranslogOps));
stopWatch.stop();

response.startTime = stopWatch.totalTime().millis() - startEngineStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,14 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */

@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the todo is still relevant no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed it back as a note as it's a valid TODO.

indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
if (createNewTranslog) {
// TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add when you expect it to be safe (what version)

indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
} else {
indexShard().openIndexAndSkipTranslogRecovery();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public interface RecoveryTargetHandler {

/**
* Prepares the target to receive translog operations, after all file have been copied
*
* @param totalTranslogOps total translog operations expected to be sent
* @param createNewTranslog whether or not to delete the local translog on the target
* @param totalTranslogOps total translog operations expected to be sent
*/
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException;

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}

@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
Expand Down Expand Up @@ -226,7 +228,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
final IndexShard oldPrimary = shards.getPrimary();
final IndexShard newPrimary = shards.getReplicas().get(0);
final IndexShard replica = shards.getReplicas().get(1);
boolean expectSeqNoRecovery = true;
if (randomBoolean()) {
// simulate docs that were inflight when primary failed, these will be rolled back
final int rollbackDocs = randomIntBetween(1, 5);
Expand All @@ -239,7 +240,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
}
if (randomBoolean()) {
oldPrimary.flush(new FlushRequest(index.getName()));
expectSeqNoRecovery = false;
}
}

Expand All @@ -252,9 +252,30 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
equalTo(totalDocs - 1L));

// index some more
totalDocs += shards.indexDocs(randomIntBetween(0, 5));
int moreDocs = shards.indexDocs(randomIntBetween(0, 5));
totalDocs += moreDocs;

// As a replica keeps a safe commit, the file-based recovery only happens if the required translog
// for the sequence based recovery are not fully retained and extra documents were added to the primary.
boolean expectSeqNoRecovery = (moreDocs == 0 || randomBoolean());
int uncommittedOpsOnPrimary = 0;
if (expectSeqNoRecovery == false) {
IndexMetaData.Builder builder = IndexMetaData.builder(newPrimary.indexSettings().getIndexMetaData());
builder.settings(Settings.builder().put(newPrimary.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
);
newPrimary.indexSettings().updateIndexMetaData(builder.build());
newPrimary.onSettingsChanged();
shards.syncGlobalCheckpoint();
newPrimary.flush(new FlushRequest());
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
totalDocs += uncommittedOpsOnPrimary;
}

if (randomBoolean()) {
uncommittedOpsOnPrimary = 0;
shards.syncGlobalCheckpoint();
newPrimary.flush(new FlushRequest());
}

Expand All @@ -269,7 +290,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
} else {
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs));
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
}

// roll back the extra ops in the replica
Expand Down
Loading