Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop removing GpuCoalesceBatches from non-AQE queries when AQE is enabled #720

Merged
merged 4 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Name | Description | Default Value
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.multiThreadedRead.enabled"></a>spark.rapids.sql.format.parquet.multiThreadedRead.enabled|When set to true, reads multiple small files within a partition more efficiently by reading each file in a separate thread in parallel on the CPU side before sending to the GPU. Limited by spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFileProcessed|true
<a name="sql.format.parquet.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel.|2147483647
<a name="sql.format.parquet.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel.|20
<a name="sql.format.parquet.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel. This can not be changed at runtime after the executor hasstarted.|20
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indicates the branch needs to be upmerged/rebased on latest branch-0.2.

<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.hasNans"></a>spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,27 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
ShimLoader.getSparkShims.getGpuColumnarToRowTransition(plan, exportColumnRdd)
}

def optimizeAdaptiveTransitions(plan: SparkPlan): SparkPlan = plan match {
def optimizeAdaptiveTransitions(
plan: SparkPlan,
parent: Option[SparkPlan]): SparkPlan = plan match {
case HostColumnarToGpu(r2c: RowToColumnarExec, goal) =>
GpuRowToColumnarExec(optimizeAdaptiveTransitions(r2c.child), goal)
GpuRowToColumnarExec(optimizeAdaptiveTransitions(r2c.child, Some(r2c)), goal)

case GpuCoalesceBatches(e: GpuShuffleExchangeExecBase, _) =>
// we need to insert the coalesce batches step later, after the query stage has executed
optimizeAdaptiveTransitions(e)
case ColumnarToRowExec(GpuBringBackToHost(
GpuCoalesceBatches(e: GpuShuffleExchangeExecBase, _))) if parent.isEmpty =>
// If this coalesced exchange has no parent it means that this plan is being created
// as a query stage when AQE is on. Because we must return an operator that implements
// ShuffleExchangeLike we need to remove the GpuCoalesceBatches operator here and
// re-insert it around the GpuCustomShuffleReaderExec in the parent query stage when that
// is created. Another option would be to create a custom operator that combines
// GpuCoalesceBatches and GpuShuffleExchangeExec and return that here instead. See
// https://github.com/NVIDIA/spark-rapids/issues/719 for more information.
optimizeAdaptiveTransitions(e, Some(plan))

case e: GpuCustomShuffleReaderExec =>
// this is where we re-insert the GpuCoalesceBatches that we removed from the
// shuffle exchange
GpuCoalesceBatches(e.copy(child = optimizeAdaptiveTransitions(e.child)),
GpuCoalesceBatches(e.copy(child = optimizeAdaptiveTransitions(e.child, Some(e))),
TargetSize(Long.MaxValue))

// Query stages that have already executed on the GPU could be used by CPU operators
Expand All @@ -74,14 +83,14 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case HostColumnarToGpu(e: ShuffleQueryStageExec, _) => e

case ColumnarToRowExec(bb: GpuBringBackToHost) =>
optimizeAdaptiveTransitions(bb.child) match {
optimizeAdaptiveTransitions(bb.child, Some(bb)) match {
case e: GpuBroadcastExchangeExecBase => e
case e: GpuShuffleExchangeExecBase => e
case other => getColumnarToRowExec(other)
}

case p =>
p.withNewChildren(p.children.map(optimizeAdaptiveTransitions))
p.withNewChildren(p.children.map(c => optimizeAdaptiveTransitions(c, Some(p))))
}

def optimizeCoalesce(plan: SparkPlan): SparkPlan = plan match {
Expand Down Expand Up @@ -325,7 +334,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
var updatedPlan = insertHashOptimizeSorts(plan)
updatedPlan = insertCoalesce(insertColumnarFromGpu(updatedPlan))
updatedPlan = optimizeCoalesce(if (plan.conf.adaptiveExecutionEnabled) {
optimizeAdaptiveTransitions(updatedPlan)
optimizeAdaptiveTransitions(updatedPlan, None)
} else {
optimizeGpuPlanTransitions(updatedPlan)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,55 @@ class AdaptiveQueryExecSuite
}
}

test("Join partitioned tables") {
assumeSpark301orLater

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") // force shuffle exchange

withGpuSparkSession(spark => {
import spark.implicits._

val path = new File(TEST_FILES_ROOT, "test.parquet").getAbsolutePath
(0 until 100)
.map(i => (i,i*5))
.toDF("a", "b")
.write
.mode(SaveMode.Overwrite)
.parquet(path)
spark.read.parquet(path).createOrReplaceTempView("testData")

spark.sql("DROP TABLE IF EXISTS t1").collect()
spark.sql("DROP TABLE IF EXISTS t2").collect()

spark.sql("CREATE TABLE t1 (a INT, b INT) USING parquet").collect()
spark.sql("CREATE TABLE t2 (a INT, b INT) USING parquet PARTITIONED BY (a)").collect()

spark.sql("INSERT INTO TABLE t1 SELECT a, b FROM testData").collect()
spark.sql("INSERT INTO TABLE t2 SELECT a, b FROM testData").collect()

val df = spark.sql(
"SELECT t1.a, t2.b " +
"FROM t1 " +
"JOIN t2 " +
"ON t1.a = t2.a " +
"WHERE t2.a = 5" // filter on partition key to force dynamic partition pruning
)
df.collect()

// assert that DPP did cause this to run as a non-AQE plan
assert(!df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])

// assert that both inputs to the SHJ are coalesced
val shj = TestUtils.findOperator(df.queryExecution.executedPlan,
_.isInstanceOf[GpuShuffledHashJoinBase]).get
assert(shj.children.length == 2)
assert(shj.children.forall(_.isInstanceOf[GpuCoalesceBatches]))

}, conf)
}

test("Plugin should translate child plan of GPU DataWritingCommandExec to GPU") {

val conf = new SparkConf()
Expand Down Expand Up @@ -193,14 +242,7 @@ class AdaptiveQueryExecSuite
}

def skewJoinTest(fun: SparkSession => Unit) {

// this test requires Spark 3.0.1 or later
val isValidTestForSparkVersion = ShimLoader.getSparkShims.getSparkShimVersion match {
case SparkShimVersion(3, 0, 0) => false
case DatabricksShimVersion(3, 0, 0) => false
case _ => true
}
assume(isValidTestForSparkVersion)
assumeSpark301orLater

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
Expand Down Expand Up @@ -239,6 +281,17 @@ class AdaptiveQueryExecSuite
}, conf)
}

/** most of the AQE tests requires Spark 3.0.1 or later */
private def assumeSpark301orLater = {
val sparkShimVersion = ShimLoader.getSparkShims.getSparkShimVersion
val isValidTestForSparkVersion = sparkShimVersion match {
case SparkShimVersion(3, 0, 0) => false
case DatabricksShimVersion(3, 0, 0) => false
case _ => true
}
assume(isValidTestForSparkVersion)
}

def checkSkewJoin(
joins: Seq[GpuShuffledHashJoinBase],
leftSkewNum: Int,
Expand Down