Skip to content

Commit

Permalink
skippedOperations -> overriddenOperations
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Nov 8, 2017
1 parent def1d39 commit 5cc33bb
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ final class MultiSnapshot implements Translog.Snapshot {

private final TranslogSnapshot[] translogs;
private final int totalOperations;
private int skippedOperations;
private int overriddenOperations;
private final Closeable onClose;
private int index;
private final SeqNumSet seenSeqNo;
Expand All @@ -46,7 +46,7 @@ final class MultiSnapshot implements Translog.Snapshot {
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
this.translogs = translogs;
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
this.skippedOperations = 0;
this.overriddenOperations = 0;
this.onClose = onClose;
this.seenSeqNo = new SeqNumSet();
this.index = translogs.length - 1;
Expand All @@ -58,8 +58,8 @@ public int totalOperations() {
}

@Override
public int skippedOperations() {
return skippedOperations;
public int overriddenOperations() {
return overriddenOperations;
}

@Override
Expand All @@ -71,7 +71,7 @@ public Translog.Operation next() throws IOException {
if (op.seqNo() < 0 || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
skippedOperations++;
overriddenOperations++;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,10 +837,11 @@ public interface Snapshot extends Closeable {
int totalOperations();

/**
* The number of operations has been skipped in the snapshot so far.
* Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called.
* The number of operations has been overridden (eg. superseded) in the snapshot so far.
* If two operations have the same sequence number, the operation with a lower term will be overridden by the operation
* with a higher term. Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called.
*/
default int skippedOperations() {
default int overriddenOperations() {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -562,16 +563,14 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
}
}

logger.trace("Translog skipped [{}] operations", snapshot.skippedOperations());
skippedOps += snapshot.skippedOperations();

if (!operations.isEmpty() || totalSentOps == 0) {
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
cancellableThreads.executeIO(sendBatch);
}

assert expectedTotalOps == skippedOps + totalSentOps
: "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]";
assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);

logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,10 @@ public void testTranslogDedupOperations() throws Exception {
op1 = snapshot.next();
assertThat(op1, notNullValue());
assertThat(snapshot.next(), nullValue());
assertThat(snapshot.skippedOperations(), equalTo(0));
assertThat(snapshot.overriddenOperations(), equalTo(0));
}

// Make sure that replica2 receives translog from replica1 and overwrites its stale operation (op1).
// Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1).
logger.info("--> Promote replica1 as the primary");
shards.promoteReplicaToPrimary(replica1);
shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON));
Expand All @@ -347,10 +347,10 @@ public void testTranslogDedupOperations() throws Exception {
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
assertThat(snapshot.skippedOperations(), greaterThanOrEqualTo(1));
assertThat(snapshot.overriddenOperations(), greaterThanOrEqualTo(1));
}

// Make sure that peer-recovery transfers all but non-duplicated operations.
// Make sure that peer-recovery transfers all but non-overridden operations.
IndexShard replica3 = shards.addReplica();
logger.info("--> Promote replica2 as the primary");
shards.promoteReplicaToPrimary(replica2);
Expand All @@ -360,7 +360,7 @@ public void testTranslogDedupOperations() throws Exception {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
assertThat(snapshot.next(), equalTo(op2));
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
assertThat(snapshot.skippedOperations(), equalTo(0));
assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2497,7 +2497,7 @@ public void testMinSeqNoBasedAPI() throws IOException {
assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos));
readFromSnapshot++;
}
readFromSnapshot += snapshot.skippedOperations();
readFromSnapshot += snapshot.overriddenOperations();
}
assertThat(readFromSnapshot, equalTo(expectedSnapshotOps));
final long seqNoLowerBound = seqNo;
Expand Down

0 comments on commit 5cc33bb

Please sign in to comment.