Skip to content

Commit

Permalink
Bubble up translog I/O exceptions during recovery
Browse files Browse the repository at this point in the history
When reading the translog on the source during peer recovery, if an I/O
exception occurs it is wrapped in an unchecked exception. This is
unnecessary as we can just let the I/O exception bubble all the way
up. This commit does that.
  • Loading branch information
jasontedor committed Jan 15, 2017
1 parent dac513a commit 81a1e1c
Showing 1 changed file with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,15 @@ public RecoveryResponse recoverToTarget() throws IOException {
return response;
}

boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) {
/**
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
* translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
*
* @param translogView a view of the translog on the source
* @return {@code true} if the source is ready for a sequence-number-based recovery
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) throws IOException {
final long startingSeqNo = request.startingSeqNo();
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
if (startingSeqNo <= endingSeqNo) {
Expand All @@ -205,7 +213,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
final Translog.Snapshot snapshot = translogView.snapshot();
Translog.Operation operation;
while ((operation = getNextOperationFromSnapshot(snapshot)) != null) {
while ((operation = snapshot.next()) != null) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
return tracker.getCheckpoint() >= endingSeqNo;
Expand Down Expand Up @@ -398,7 +406,7 @@ void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAu
*
* @param snapshot a snapshot of the translog
*/
void phase2(final Translog.Snapshot snapshot) {
void phase2(final Translog.Snapshot snapshot) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
Expand Down Expand Up @@ -463,8 +471,9 @@ public void finalizeRecovery() {
*
* @param snapshot the translog snapshot to replay operations from
* @return the total number of translog operations that were sent
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected int sendSnapshot(final Translog.Snapshot snapshot) {
protected int sendSnapshot(final Translog.Snapshot snapshot) throws IOException {
int ops = 0;
long size = 0;
int totalOperations = 0;
Expand All @@ -476,7 +485,7 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) {

// send operations in batches
Translog.Operation operation;
while ((operation = getNextOperationFromSnapshot(snapshot)) != null) {
while ((operation = snapshot.next()) != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
Expand Down Expand Up @@ -522,14 +531,6 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) {
return totalOperations;
}

private Translog.Operation getNextOperationFromSnapshot(final Translog.Snapshot snapshot) {
try {
return snapshot.next();
} catch (final IOException ex) {
throw new ElasticsearchException("failed to get next operation from translog", ex);
}
}

/**
* Cancels the recovery and interrupts all eligible threads.
*/
Expand Down

0 comments on commit 81a1e1c

Please sign in to comment.