From 81a1e1c79bdda961e66cadb7298d1d8e908581bb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 15 Jan 2017 15:45:05 -0500 Subject: [PATCH] Bubble up translog I/O exceptions during recovery When reading the translog on the source during peer recovery, if an I/O exception occurs it is wrapped in an unchecked exception. This is unnecessary as we can just let the I/O exception bubble all the way up. This commit does that. --- .../recovery/RecoverySourceHandler.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 67925f0e12916..dde55698fa7d5 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -189,7 +189,15 @@ public RecoveryResponse recoverToTarget() throws IOException { return response; } - boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) { + /** + * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source + * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source. + * + * @param translogView a view of the translog on the source + * @return {@code true} if the source is ready for a sequence-number-based recovery + * @throws IOException if an I/O exception occurred reading the translog snapshot + */ + boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) throws IOException { final long startingSeqNo = request.startingSeqNo(); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); if (startingSeqNo <= endingSeqNo) { @@ -205,7 +213,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); final Translog.Snapshot snapshot = translogView.snapshot(); Translog.Operation operation; - while ((operation = getNextOperationFromSnapshot(snapshot)) != null) { + while ((operation = snapshot.next()) != null) { tracker.markSeqNoAsCompleted(operation.seqNo()); } return tracker.getCheckpoint() >= endingSeqNo; @@ -398,7 +406,7 @@ void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAu * * @param snapshot a snapshot of the translog */ - void phase2(final Translog.Snapshot snapshot) { + void phase2(final Translog.Snapshot snapshot) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -463,8 +471,9 @@ public void finalizeRecovery() { * * @param snapshot the translog snapshot to replay operations from * @return the total number of translog operations that were sent + * @throws IOException if an I/O exception occurred reading the translog snapshot */ - protected int sendSnapshot(final Translog.Snapshot snapshot) { + protected int sendSnapshot(final Translog.Snapshot snapshot) throws IOException { int ops = 0; long size = 0; int totalOperations = 0; @@ -476,7 +485,7 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) { // send operations in batches Translog.Operation operation; - while ((operation = getNextOperationFromSnapshot(snapshot)) != null) { + while ((operation = snapshot.next()) != null) { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -522,14 +531,6 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) { return totalOperations; } - private Translog.Operation getNextOperationFromSnapshot(final Translog.Snapshot snapshot) { - try { - return snapshot.next(); - } catch (final IOException ex) { - throw new ElasticsearchException("failed to get next operation from translog", ex); - } - } - /** * Cancels the recovery and interrupts all eligible threads. */