Skip to content

Commit

Permalink
[SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Save the RDD created by doExecute, instead of creating a new one in execute each time.

Currently, many types of SparkPlans already save the RDD they create. For example, shuffle just save `lazy val inputRDD: RDD[InternalRow] = child.execute()`. It creates inconsistencies when an action (e.g. repeated `df.collect()`) is executed on Dataframe twice:
* The SparkPlan will be reused, since the same `df.queryExecution.executedPlan` will be used.
* Any not-result stage will be reused, as the shuffle operators will just have their `inputRDD` reused.
* However, for result stage, `execute()` will call `doExecute()` again, and the logic of generating the actual execution RDD will be reexecuted for the result stage.

This means that for example for the result stage, WSCG code gen will generate and compile new code, create a new RDD out of it. Generation of execution RDDs is also often influenced by config: for example, staying with WSCG, various configs like `spark.sql.codegen.hugeMethodLimit` or `spark.sql.codegen.methodSplitThreshold`. The fact that upon re-execution this will be evaluated anew for the result stage, but not for earlier stages creates inconsistencies in what config changes are visible.

By saving the result of `doExecute` and reusing the RDD in `execute` we make sure that work in creating that RDD is not duplicated, and it is more consistent that all RDDs of the plan are reused, same as with the `executedPlan`.

Note, that while the results of earlier shuffle stages are also reused, the result stage still does get executed again, as the result of it are not saved and available for Reuse in BlockManager.

We also add a `Lazy` utility instead of using `lazy val` to deal with shortcomings of scala lazy val.

### Why are the changes needed?

Resolved subtle inconsistencies coming from object reuse vs. recreating objects from scratch.

### Does this PR introduce _any_ user-facing change?

Subtle changes caused by the RDD being reused, e.g. when a config change might be picked up. However, it makes things more consistent. Spark 4.0.0 might be a good candidate for making such a change.

### How was this patch tested?

Existing SQL execution tests validate that the change in SparkPlan works.
Tests were added for the new Lazy utility.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Github Copilot
(trivial code completion suggestions)

Closes #48037 from juliuszsompolski/SPARK-48195-rdd.

Lead-authored-by: Julek Sompolski <Juliusz Sompolski>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
3 people committed Sep 23, 2024
1 parent 3c81f07 commit d2e8c1c
Show file tree
Hide file tree
Showing 8 changed files with 475 additions and 59 deletions.
70 changes: 70 additions & 0 deletions core/src/main/scala/org/apache/spark/util/LazyTry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import scala.util.Try

/**
* Wrapper utility for a lazy val, with two differences compared to scala behavior:
*
* 1. Non-retrying in case of failure. This wrapper stores the exception in a Try, and will re-throw
* it on the access to `get`.
* In scala, when a `lazy val` field initialization throws an exception, the field remains
* uninitialized, and initialization will be re-attempted on the next access. This also can lead
* to performance issues, needlessly computing something towards a failure, and also can lead to
* duplicated side effects.
*
* 2. Resolving locking issues.
* In scala, when a `lazy val` field is initialized, it grabs the synchronized lock on the
* enclosing object instance. This can lead both to performance issues, and deadlocks.
* For example:
* a) Thread 1 entered a synchronized method, grabbing a coarse lock on the parent object.
* b) Thread 2 get spawned off, and tries to initialize a lazy value on the same parent object
* This causes scala to also try to grab a lock on the parent object.
* c) If thread 1 waits for thread 2 to join, a deadlock occurs.
* This wrapper will only grab a lock on the wrapper itself, and not the parent object.
*
* @param initialize The block of code to initialize the lazy value.
* @tparam T type of the lazy value.
*/
private[spark] class LazyTry[T](initialize: => T) extends Serializable {
private lazy val tryT: Try[T] = Utils.doTryWithCallerStacktrace { initialize }

/**
* Get the lazy value. If the initialization block threw an exception, it will be re-thrown here.
* The exception will be re-thrown with the current caller's stacktrace.
* An exception with stack trace from when the exception was first thrown can be accessed with
* ```
* ex.getSuppressed.find { e =>
* e.getMessage == org.apache.spark.util.Utils.TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE
* }
* ```
*/
def get: T = Utils.getTryWithCallerStacktrace(tryT)
}

private[spark] object LazyTry {
/**
* Create a new LazyTry instance.
*
* @param initialize The block of code to initialize the lazy value.
* @tparam T type of the lazy value.
* @return a new LazyTry instance.
*/
def apply[T](initialize: => T): LazyTry[T] = new LazyTry(initialize)
}
80 changes: 80 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,86 @@ private[spark] object Utils
}
}

val TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE =
"Full stacktrace of original doTryWithCallerStacktrace caller"

val TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE =
"Stacktrace under doTryWithCallerStacktrace"

/**
* Use Try with stacktrace substitution for the caller retrieving the error.
*
* Normally in case of failure, the exception would have the stacktrace of the caller that
* originally called doTryWithCallerStacktrace. However, we want to replace the part above
* this function with the stacktrace of the caller who calls getTryWithCallerStacktrace.
* So here we save the part of the stacktrace below doTryWithCallerStacktrace, and
* getTryWithCallerStacktrace will stitch it with the new stack trace of the caller.
* The full original stack trace is kept in ex.getSuppressed.
*
* @param f Code block to be wrapped in Try
* @return Try with Success or Failure of the code block. Use with getTryWithCallerStacktrace.
*/
def doTryWithCallerStacktrace[T](f: => T): Try[T] = {
val t = Try {
f
}
t match {
case Failure(ex) =>
// Note: we remove the common suffix instead of e.g. finding the call to this function, to
// account for recursive calls with multiple doTryWithCallerStacktrace on the stack trace.
val origStackTrace = ex.getStackTrace
val currentStackTrace = Thread.currentThread().getStackTrace
val commonSuffixLen = origStackTrace.reverse.zip(currentStackTrace.reverse).takeWhile {
case (exElem, currentElem) => exElem == currentElem
}.length
val belowEx = new Exception(TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE)
belowEx.setStackTrace(origStackTrace.dropRight(commonSuffixLen))
ex.addSuppressed(belowEx)

// keep the full original stack trace in a suppressed exception.
val fullEx = new Exception(TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE)
fullEx.setStackTrace(origStackTrace)
ex.addSuppressed(fullEx)
case Success(_) => // nothing
}
t
}

/**
* Retrieve the result of Try that was created by doTryWithCallerStacktrace.
*
* In case of failure, the resulting exception has a stack trace that combines the stack trace
* below the original doTryWithCallerStacktrace which triggered it, with the caller stack trace
* of the current caller of getTryWithCallerStacktrace.
*
* Full stack trace of the original doTryWithCallerStacktrace caller can be retrieved with
* ```
* ex.getSuppressed.find { e =>
* e.getMessage == Utils.TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE
* }
* ```
*
*
* @param t Try from doTryWithCallerStacktrace
* @return Result of the Try or rethrows the failure exception with modified stacktrace.
*/
def getTryWithCallerStacktrace[T](t: Try[T]): T = t match {
case Failure(ex) =>
val belowStacktrace = ex.getSuppressed.find { e =>
// added in doTryWithCallerStacktrace
e.getMessage == TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE
}.getOrElse {
// If we don't have the expected stacktrace information, just rethrow
throw ex
}.getStackTrace
// We are modifying and throwing the original exception. It would be better if we could
// return a copy, but we can't easily clone it and preserve. If this is accessed from
// multiple threads that then look at the stack trace, this could break.
ex.setStackTrace(belowStacktrace ++ Thread.currentThread().getStackTrace.drop(1))
throw ex
case Success(s) => s
}

// A regular expression to match classes of the internal Spark API's
// that we want to skip when finding the call site of a method.
private val SPARK_CORE_CLASS_REGEX =
Expand Down
151 changes: 151 additions & 0 deletions core/src/test/scala/org/apache/spark/util/LazyTrySuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util

import org.apache.spark.SparkFunSuite

class LazyTrySuite extends SparkFunSuite{
test("LazyTry should initialize only once") {
var count = 0
val lazyVal = LazyTry {
count += 1
count
}
assert(count == 0)
assert(lazyVal.get == 1)
assert(count == 1)
assert(lazyVal.get == 1)
assert(count == 1)
}

test("LazyTry should re-throw exceptions") {
val lazyVal = LazyTry {
throw new RuntimeException("test")
}
intercept[RuntimeException] {
lazyVal.get
}
intercept[RuntimeException] {
lazyVal.get
}
}

test("LazyTry should re-throw exceptions with current caller stack-trace") {
val fileName = Thread.currentThread().getStackTrace()(1).getFileName
val lineNo = Thread.currentThread().getStackTrace()(1).getLineNumber
val lazyVal = LazyTry {
throw new RuntimeException("test")
}

val e1 = intercept[RuntimeException] {
lazyVal.get // lineNo + 6
}
assert(e1.getStackTrace
.exists(elem => elem.getFileName == fileName && elem.getLineNumber == lineNo + 6))

val e2 = intercept[RuntimeException] {
lazyVal.get // lineNo + 12
}
assert(e2.getStackTrace
.exists(elem => elem.getFileName == fileName && elem.getLineNumber == lineNo + 12))
}

test("LazyTry does not lock containing object") {
class LazyContainer() {
@volatile var aSet = 0

val a: LazyTry[Int] = LazyTry {
aSet = 1
aSet
}

val b: LazyTry[Int] = LazyTry {
val t = new Thread(new Runnable {
override def run(): Unit = {
assert(a.get == 1)
}
})
t.start()
t.join()
aSet
}
}
val container = new LazyContainer()
// Nothing is lazy initialized yet
assert(container.aSet == 0)
// This will not deadlock, thread t will initialize a, and update aSet
assert(container.b.get == 1)
assert(container.aSet == 1)
}

// Scala lazy val tests are added to test for potential changes in the semantics of scala lazy val

test("Scala lazy val initializing multiple times on error") {
class LazyValError() {
var counter = 0
lazy val a = {
counter += 1
throw new RuntimeException("test")
}
}
val lazyValError = new LazyValError()
intercept[RuntimeException] {
lazyValError.a
}
assert(lazyValError.counter == 1)
intercept[RuntimeException] {
lazyValError.a
}
assert(lazyValError.counter == 2)
}

test("Scala lazy val locking containing object and deadlocking") {
// Note: this will change in scala 3, with different lazy vals not deadlocking with each other.
// https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html
class LazyValContainer() {
@volatile var aSet = 0
@volatile var t: Thread = _

lazy val a = {
aSet = 1
aSet
}

lazy val b = {
t = new Thread(new Runnable {
override def run(): Unit = {
assert(a == 1)
}
})
t.start()
t.join(1000)
aSet
}
}
val container = new LazyValContainer()
// Nothing is lazy initialized yet
assert(container.aSet == 0)
// This will deadlock, because b will take monitor on LazyValContainer, and then thread t
// will wait on that monitor, not able to initialize a.
// b will therefore see aSet == 0.
assert(container.b == 0)
// However, after b finishes initializing, the monitor will be released, and then thread t
// will finish initializing a, and set aSet to 1.
container.t.join()
assert(container.aSet == 1)
}
}
Loading

0 comments on commit d2e8c1c

Please sign in to comment.