Skip to content

Commit

Permalink
Add assertion on number of recovered ops
Browse files Browse the repository at this point in the history
This commit adds an assertion to the number of ops recovered from the
translog in the recovery of disconnected replica test.
  • Loading branch information
jasontedor committed Jan 16, 2017
1 parent 81a1e1c commit 8960522
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl
}
return tracker.getCheckpoint() >= endingSeqNo;
}
return false;
return true;

This comment has been minimized.

Copy link
@bleskes

bleskes Jan 16, 2017

Contributor

why is this changed?

This comment has been minimized.

Copy link
@jasontedor

jasontedor Jan 17, 2017

Author Member

That was inadvertent. I'm glad that you noticed.

}

/**
Expand Down Expand Up @@ -417,7 +417,7 @@ void phase2(final Translog.Snapshot snapshot) throws IOException {
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());

// send all the snapshot's translog operations to the target
final int totalOperations = sendSnapshot(snapshot);
final int totalOperations = sendSnapshot(request.startingSeqNo(), snapshot);

stopWatch.stop();
logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime());
Expand Down Expand Up @@ -465,15 +465,17 @@ public void finalizeRecovery() {
}

/**
* Send the given snapshot's operations to this handler's target node.
* Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's
* target node.
* <p>
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
* @param snapshot the translog snapshot to replay operations from
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param snapshot the translog snapshot to replay operations from
* @return the total number of translog operations that were sent
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected int sendSnapshot(final Translog.Snapshot snapshot) throws IOException {
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
int ops = 0;
long size = 0;
int totalOperations = 0;
Expand All @@ -490,6 +492,7 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) throws IOException
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
if (operation.seqNo() < startingSeqNo) continue;

This comment has been minimized.

Copy link
@bleskes

bleskes Jan 16, 2017

Contributor

how does that mix with the BWC aspect? what happens if we have no seq no? (not saying it goes wrong, just double checking)

This comment has been minimized.

Copy link
@jasontedor

jasontedor Jan 17, 2017

Author Member

You're right, I pushed a change in 999ca91.

operations.add(operation);
ops++;
size += operation.estimateSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
}

@Override
protected int sendSnapshot(final Translog.Snapshot snapshot) {
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) {
logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}", shard.shardId(), request.targetNode());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.Future;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestCase {
Expand Down Expand Up @@ -68,6 +69,7 @@ public void testRecoveryOfDisconnectedReplica() throws Exception {
shards.flush();
shards.getPrimary().updateGlobalCheckpointOnPrimary();
final IndexShard originalReplica = shards.getReplicas().get(0);
long replicaCommittedLocalCheckpoint = docs - 1;
boolean replicaHasDocsSinceLastFlushedCheckpoint = false;
for (int i = 0; i < randomInt(2); i++) {
final int indexedDocs = shards.indexDocs(randomInt(5));
Expand All @@ -79,14 +81,13 @@ public void testRecoveryOfDisconnectedReplica() throws Exception {
final boolean flush = randomBoolean();
if (flush) {
originalReplica.flush(new FlushRequest());
replicaHasDocsSinceLastFlushedCheckpoint = false;
replicaCommittedLocalCheckpoint = docs - 1;
}

final boolean sync = randomBoolean();
if (sync) {
shards.getPrimary().updateGlobalCheckpointOnPrimary();
if (flush) {
replicaHasDocsSinceLastFlushedCheckpoint = false;
}
}
}

Expand All @@ -112,6 +113,9 @@ public void testRecoveryOfDisconnectedReplica() throws Exception {
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty()));
} else {
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty());
assertThat(
recoveredReplica.recoveryState().getTranslog().recoveredOperations(),
equalTo(Math.toIntExact(docs - (replicaCommittedLocalCheckpoint + 1))));
}

docs += shards.indexDocs(randomInt(5));
Expand Down

0 comments on commit 8960522

Please sign in to comment.