Skip to content

Commit

Permalink
[SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This pr adds a check: we only mark the cached partition is materialized if the task is not failed and not interrupted. And adds a new method `isFailed` in `TaskContext`.

### Why are the changes needed?

Before this pr, when do cache, task failure can cause NPE in other tasks

```
java.lang.NullPointerException
	at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44445 from ulysses-you/fix-cache.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
ulysses-you authored and yaooqinn committed Dec 22, 2023
1 parent fc7d7bc commit 43f7932
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 4 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ class BarrierTaskContext private[spark] (

override def isCompleted(): Boolean = taskContext.isCompleted()

override def isFailed(): Boolean = taskContext.isFailed()

override def isInterrupted(): Boolean = taskContext.isInterrupted()

override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ abstract class TaskContext extends Serializable {
*/
def isCompleted(): Boolean

/**
* Returns true if the task has failed.
*/
def isFailed(): Boolean

/**
* Returns true if the task has been killed.
*/
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ private[spark] class TaskContextImpl(
@GuardedBy("this")
override def isCompleted(): Boolean = synchronized(completed)

override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined)

override def isInterrupted(): Boolean = reasonIfKilled.isDefined

override def getLocalProperty(key: String): String = localProperties.getProperty(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,16 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
assert(invocationOrder === Seq("C", "B", "A", "D"))
}

test("SPARK-46480: Add isFailed in TaskContext") {
val context = TaskContext.empty()
var isFailed = false
context.addTaskCompletionListener[Unit] { context =>
isFailed = context.isFailed()
}
context.markTaskFailed(new RuntimeException())
context.markTaskCompleted(None)
assert(isFailed)
}
}

private object TaskContextSuite {
Expand Down
4 changes: 3 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI$default$3"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI"),
// [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this"),
// [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.isFailed")
)

// Default exclude rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,11 @@ case class CachedRDDBuilder(
cachedPlan.conf)
}
val cached = cb.mapPartitionsInternal { it =>
TaskContext.get().addTaskCompletionListener[Unit](_ => {
materializedPartitions.add(1L)
})
TaskContext.get().addTaskCompletionListener[Unit] { context =>
if (!context.isFailed() && !context.isInterrupted()) {
materializedPartitions.add(1L)
}
}
new Iterator[CachedBatch] {
override def hasNext: Boolean = it.hasNext
override def next(): CachedBatch = {
Expand Down

0 comments on commit 43f7932

Please sign in to comment.