Skip to content

Commit

Permalink
[SPARK-45678][CORE] Cover BufferReleasingInputStream.available/reset …
Browse files Browse the repository at this point in the history
…under tryOrFetchFailedException

### What changes were proposed in this pull request?

This patch proposes to wrap `BufferReleasingInputStream.available/reset` under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` call will be rethrown as `FetchFailedException`.

### Why are the changes needed?

We have encountered shuffle data corruption issue:

```
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:543)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450)
at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497)
at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356)
```

Spark shuffle has capacity to detect corruption for a few stream op like `read` and `skip`, such `IOException` in the stack trace will be rethrown as `FetchFailedException` that will re-try the failed shuffle task. But in the stack trace it is `available` that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed.

As the `available`/`reset` op will also involve data decompression and throw `IOException`, we should be able to check it like `read` and `skip` do.

### Does this PR introduce _any_ user-facing change?

Yes. Data corruption during `available`/`reset` op is now causing `FetchFailedException` like `read` and `skip` that can be retried instead of `IOException`.

### How was this patch tested?

Added test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43543 from viirya/add_available.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
  • Loading branch information
viirya authored and sunchao committed Oct 28, 2023
1 parent aa232f2 commit 57e73da
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,8 @@ private class BufferReleasingInputStream(
}
}

override def available(): Int = delegate.available()
override def available(): Int =
tryOrFetchFailedException(delegate.available())

override def mark(readlimit: Int): Unit = delegate.mark(readlimit)

Expand All @@ -1369,12 +1370,13 @@ private class BufferReleasingInputStream(
override def read(b: Array[Byte], off: Int, len: Int): Int =
tryOrFetchFailedException(delegate.read(b, off, len))

override def reset(): Unit = delegate.reset()
override def reset(): Unit = tryOrFetchFailedException(delegate.reset())

/**
* Execute a block of code that returns a value, close this stream quietly and re-throwing
* IOException as FetchFailedException when detectCorruption is true. This method is only
* used by the `read` and `skip` methods inside `BufferReleasingInputStream` currently.
* used by the `available`, `read` and `skip` methods inside `BufferReleasingInputStream`
* currently.
*/
private def tryOrFetchFailedException[T](block: => T): T = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
blocksByAddress: Map[BlockManagerId, Seq[(BlockId, Long, Int)]],
taskContext: Option[TaskContext] = None,
streamWrapperLimitSize: Option[Long] = None,
corruptAtAvailableReset: Boolean = false,
blockManager: Option[BlockManager] = None,
maxBytesInFlight: Long = Long.MaxValue,
maxReqsInFlight: Int = Int.MaxValue,
Expand All @@ -201,7 +202,14 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
blockManager.getOrElse(createMockBlockManager()),
mapOutputTracker,
blocksByAddress.iterator,
(_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in),
(_, in) => {
val limited = streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in)
if (corruptAtAvailableReset) {
new CorruptAvailableResetStream(limited)
} else {
limited
}
},
maxBytesInFlight,
maxReqsInFlight,
maxBlocksInFlightPerAddress,
Expand Down Expand Up @@ -712,6 +720,16 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
corruptBuffer
}

private class CorruptAvailableResetStream(in: InputStream) extends InputStream {
override def read(): Int = in.read()

override def read(dest: Array[Byte], off: Int, len: Int): Int = in.read(dest, off, len)

override def available(): Int = throw new IOException("corrupt at available")

override def reset(): Unit = throw new IOException("corrupt at reset")
}

private class CorruptStream(corruptAt: Long = 0L) extends InputStream {
var pos = 0
var closed = false
Expand Down Expand Up @@ -1879,4 +1897,48 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
blockManager = Some(blockManager), streamWrapperLimitSize = Some(100))
verifyLocalBlocksFromFallback(iterator)
}

test("SPARK-45678: retry corrupt blocks on available() and reset()") {
val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
val blocks = Map[BlockId, ManagedBuffer](
ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer()
)

// Semaphore to coordinate event sequence in two different threads.
val sem = new Semaphore(0)

answerFetchBlocks { invocation =>
val listener = invocation.getArgument[BlockFetchingListener](4)
Future {
listener.onBlockFetchSuccess(
ShuffleBlockId(0, 0, 0).toString, createMockManagedBuffer())
sem.release()
}
}

val iterator = createShuffleBlockIteratorWithDefaults(
Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)),
streamWrapperLimitSize = Some(100),
detectCorruptUseExtraMemory = false, // Don't use `ChunkedByteBufferInputStream`.
corruptAtAvailableReset = true,
checksumEnabled = false
)

sem.acquire()

val (id1, stream) = iterator.next()
assert(id1 === ShuffleBlockId(0, 0, 0))

val err1 = intercept[FetchFailedException] {
stream.available()
}

assert(err1.getMessage.contains("corrupt at available"))

val err2 = intercept[FetchFailedException] {
stream.reset()
}

assert(err2.getMessage.contains("corrupt at reset"))
}
}

0 comments on commit 57e73da

Please sign in to comment.