Skip to content

Commit

Permalink
Propagate max_auto_id_timestamp in peer recovery (#33693)
Browse files Browse the repository at this point in the history
Today we don't store the auto-generated timestamp of append-only
operations in Lucene; and assign -1 to every index operations
constructed from LuceneChangesSnapshot. This looks innocent but it
generates duplicate documents on a replica if a retry append-only
arrives first via peer-recovery; then an original append-only arrives
via replication. Since the retry append-only (delivered via recovery)
does not have timestamp, the replica will happily optimizes the original
request while it should not.

This change transmits the max auto-generated timestamp from the primary
to replicas before translog phase in peer recovery. This timestamp will
prevent replicas from optimizing append-only requests if retry
counterparts have been processed.

Relates #33656 
Relates #33222
  • Loading branch information
dnhatn authored Sep 20, 2018
1 parent 6646bcb commit 5f7f793
Show file tree
Hide file tree
Showing 16 changed files with 237 additions and 38 deletions.
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -1761,6 +1762,21 @@ public boolean isRecovering() {
*/
public abstract void maybePruneDeletes();

/**
* Returns the maximum auto_id_timestamp of all append-only index requests have been processed by this engine
* or the auto_id_timestamp received from its primary shard via {@link #updateMaxUnsafeAutoIdTimestamp(long)}.
* Notes this method returns the auto_id_timestamp of all append-only requests, not max_unsafe_auto_id_timestamp.
*/
public long getMaxSeenAutoIdTimestamp() {
return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}

/**
* Forces this engine to advance its max_unsafe_auto_id_timestamp marker to at least the given timestamp.
* The engine will disable optimization for all append-only whose timestamp at most {@code newTimestamp}.
*/
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class InternalEngine extends Engine {
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
Expand All @@ -166,7 +167,7 @@ public InternalEngine(EngineConfig engineConfig) {
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
Expand Down Expand Up @@ -369,7 +370,7 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
assert maxUnsafeAutoIdTimestamp.get() == -1 :
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue()));
updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true);
}
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
Expand Down Expand Up @@ -1009,11 +1010,12 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
final boolean mayHaveBeenIndexBefore;
if (index.isRetry()) {
mayHaveBeenIndexBefore = true;
maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(index.getAutoGeneratedIdTimestamp(), curr));
updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true);
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
} else {
// in this case we force
mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false);
}
return mayHaveBeenIndexBefore;
}
Expand Down Expand Up @@ -2287,7 +2289,7 @@ public void onSettingsChanged() {
// this is an anti-viral settings you can only opt out for the entire index
// only if a shard starts up again due to relocation or if the index is closed
// the setting will be re-interpreted if it's set to true
this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final IndexSettings indexSettings = engineConfig.getIndexSettings();
Expand Down Expand Up @@ -2526,4 +2528,24 @@ void updateRefreshedCheckpoint(long checkpoint) {
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
}
}

@Override
public final long getMaxSeenAutoIdTimestamp() {
return maxSeenAutoIdTimestamp.get();
}

@Override
public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
updateAutoIdTimestamp(newTimestamp, true);
}

private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]";
maxSeenAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp));
if (unsafe) {
maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp));
}
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,9 @@ public void maybePruneDeletes() {
public DocsStats docStats() {
return docsStats;
}

@Override
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {

}
}
23 changes: 23 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,29 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo);
}

/**
* Returns the maximum auto_id_timestamp of all append-only requests have been processed by this shard or the auto_id_timestamp received
* from the primary via {@link #updateMaxUnsafeAutoIdTimestamp(long)} at the beginning of a peer-recovery or a primary-replica resync.
*
* @see #updateMaxUnsafeAutoIdTimestamp(long)
*/
public long getMaxSeenAutoIdTimestamp() {
return getEngine().getMaxSeenAutoIdTimestamp();
}

/**
* Since operations stored in soft-deletes do not have max_auto_id_timestamp, the primary has to propagate its max_auto_id_timestamp
* (via {@link #getMaxSeenAutoIdTimestamp()} of all processed append-only requests to replicas at the beginning of a peer-recovery
* or a primary-replica resync to force a replica to disable optimization for all append-only requests which are replicated via
* replication while its retry variants are replicated via recovery without auto_id_timestamp.
* <p>
* Without this force-update, a replica can generate duplicate documents (for the same id) if it first receives
* a retry append-only (without timestamp) via recovery, then an original append-only (with timestamp) via replication.
*/
public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimary) {
getEngine().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampFromPrimary);
}

public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryTarget recoveryTarget = recoveryRef.target();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps());
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary());
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
} catch (MapperException exception) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ public RecoveryResponse recoverToTarget() throws IOException {
}
final long targetLocalCheckpoint;
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
// We have to capture the max auto_id_timestamp after taking a snapshot of operations to guarantee
// that the auto_id_timestamp of every operation in the snapshot is at most this timestamp value.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
Expand Down Expand Up @@ -447,9 +450,11 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
* @param endingSeqNo the highest sequence number that should be sent
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @return the local checkpoint on the target
*/
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot)
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot,
final long maxSeenAutoIdTimestamp)
throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
Expand All @@ -462,7 +467,8 @@ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingS
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");

// send all the snapshot's translog operations to the target
final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
final SendSnapshotResult result = sendSnapshot(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp);

stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
Expand Down Expand Up @@ -530,10 +536,11 @@ static class SendSnapshotResult {
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
* total number of operations sent
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
final Translog.Snapshot snapshot) throws IOException {
final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
Expand All @@ -551,8 +558,8 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require
logger.trace("no translog operations to send");
}

final CancellableThreads.IOInterruptable sendBatch =
() -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps));
final CancellableThreads.IOInterruptable sendBatch = () ->
targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, maxSeenAutoIdTimestamp));

// send operations in batches
Translog.Operation operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,21 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ public interface RecoveryTargetHandler {

/**
* Index a set of translog operations on the target
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
*
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
* @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard
* @return the local checkpoint on the target shard
*/
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException;
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary) throws IOException;

/**
* Notifies the target of the files it is going to receive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -34,15 +36,18 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
private ShardId shardId;
private List<Translog.Operation> operations;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private long maxSeenAutoIdTimestampOnPrimary;

public RecoveryTranslogOperationsRequest() {
}

RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps) {
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations,
int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.operations = operations;
this.totalTranslogOps = totalTranslogOps;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
}

public long recoveryId() {
Expand All @@ -61,13 +66,22 @@ public int totalTranslogOps() {
return totalTranslogOps;
}

public long maxSeenAutoIdTimestampOnPrimary() {
return maxSeenAutoIdTimestampOnPrimary;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
operations = Translog.readOperations(in, "recovery");
totalTranslogOps = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
} else {
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
}

@Override
Expand All @@ -77,5 +91,8 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
Translog.writeOperations(out, operations);
out.writeVInt(totalTranslogOps);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) {
final RecoveryTranslogOperationsRequest translogOperationsRequest =
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps);
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary);
final TransportFuture<RecoveryTranslogOperationsResponse> future = transportService.submitRequest(
targetNode,
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3537,6 +3537,8 @@ public void run() {
}
assertEquals(0, engine.getNumVersionLookups());
assertEquals(0, engine.getNumIndexVersionsLookups());
assertThat(engine.getMaxSeenAutoIdTimestamp(),
equalTo(docs.stream().mapToLong(Engine.Index::getAutoGeneratedIdTimestamp).max().getAsLong()));
assertLuceneOperations(engine, numDocs, 0, 0);
}

Expand Down
Loading

0 comments on commit 5f7f793

Please sign in to comment.