Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AQE unit tests #781

Merged
merged 8 commits into from
Sep 24, 2020
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveS
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.{col, when}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.{GpuCustomShuffleReaderExec, GpuShuffledHashJoinBase}

Expand Down Expand Up @@ -135,6 +135,7 @@ class AdaptiveQueryExecSuite
val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") // force shuffle exchange
.set(SQLConf.PARQUET_COMPRESSION.key, "uncompressed")
jlowe marked this conversation as resolved.
Show resolved Hide resolved

withGpuSparkSession(spark => {
import spark.implicits._
Expand Down Expand Up @@ -183,6 +184,7 @@ class AdaptiveQueryExecSuite
val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key, "true")
.set(SQLConf.PARQUET_COMPRESSION.key, "uncompressed")

withGpuSparkSession(spark => {
import spark.implicits._
Expand Down Expand Up @@ -225,6 +227,7 @@ class AdaptiveQueryExecSuite
// the read will still happen on GPU with a CPU write
.set(RapidsConf.TEST_ALLOWED_NONGPU.key, "DataWritingCommandExec")
.set("spark.rapids.sql.exec.DataWritingCommandExec", "false")
.set(SQLConf.PARQUET_COMPRESSION.key, "uncompressed")

withGpuSparkSession(spark => {
import spark.implicits._
Expand Down Expand Up @@ -268,6 +271,7 @@ class AdaptiveQueryExecSuite
val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
.set(SQLConf.PARQUET_COMPRESSION.key, "uncompressed")

withGpuSparkSession(spark => {
setupTestData(spark)
Expand All @@ -293,6 +297,129 @@ class AdaptiveQueryExecSuite
}, conf)
}

test("Change merge join to broadcast join without local shuffle reader") {

assumeSpark301orLater

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.LOCAL_SHUFFLE_READER_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "400")
.set(RapidsConf.ENABLE_CAST_STRING_TO_INTEGER.key, "true")
.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "50")
// disable DemoteBroadcastHashJoin rule from removing BHJ due to empty partitions
.set(SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key, "0")
.set(SQLConf.PARQUET_COMPRESSION.key, "uncompressed")

withGpuSparkSession(spark => {
setupTestData(spark)
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(spark,
"""
|SELECT * FROM lowerCaseData t1 join testData2 t2
|ON t1.n = t2.a join testData3 t3 on t2.a = t3.a
|where t1.l = 1
""".stripMargin)

val smj = findTopLevelSortMergeJoin(plan)
assert(smj.size == 2)
val bhj = findTopLevelGpuBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
// There is still a SMJ, and its two shuffles can't apply local reader.
checkNumLocalShuffleReaders(adaptivePlan, 2)
}, conf)
}

test("multiple joins with aggregate") {

assumeSpark301orLater

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "500")
// disable DemoteBroadcastHashJoin rule from removing BHJ due to empty partitions
.set(SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key, "0")
.set(RapidsConf.ENABLE_CAST_STRING_TO_INTEGER.key, "true")
.set(SQLConf.PARQUET_COMPRESSION.key, "uncompressed")

withGpuSparkSession(spark => {
setupTestData(spark)
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(spark,
"""
|WITH t4 AS (
| SELECT * FROM lowercaseData t2 JOIN (
| select a, sum(b) from testData3 group by a
| ) t3 ON t2.n = t3.a where t2.n = '1'
|)
|SELECT * FROM testData
|JOIN testData2 t2 ON key = t2.a
|JOIN t4 ON t2.b = t4.a
|WHERE value = 1
""".stripMargin)

val smj = findTopLevelSortMergeJoin(plan)
assert(smj.size == 3)

val bhj = findTopLevelGpuBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 3)

// The shuffle added by Aggregate can't apply local reader.
checkNumLocalShuffleReaders(adaptivePlan, 1)

}, conf)
}

test("Verify the reader is LocalShuffleReaderExec") {

assumeSpark301orLater

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "400")
.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "50")
// disable DemoteBroadcastHashJoin rule from removing BHJ due to empty partitions
.set(SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key, "0")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
.set(RapidsConf.ENABLE_CAST_STRING_TO_INTEGER.key, "true")
.set(SQLConf.PARQUET_COMPRESSION.key, "uncompressed")

withGpuSparkSession(spark => {
setupTestData(spark)

val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(spark, "SELECT * FROM testData join " +
"testData2 ON key = a where value = '1'")

val smj = findTopLevelSortMergeJoin(plan)
assert(smj.size == 1)

val bhj = findTopLevelGpuBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
val localReaders = collect(adaptivePlan) {
case reader: GpuCustomShuffleReaderExec if reader.isLocalReader => reader
}
// Verify local readers length
assert(localReaders.length == 2)
}, conf)
}

private def checkNumLocalShuffleReaders(
plan: SparkPlan,
numShufflesWithoutLocalReader: Int = 0): Int = {
val numShuffles = collect(plan) {
case s: ShuffleQueryStageExec => s
}.length

val numLocalReaders = collect(plan) {
case reader: GpuCustomShuffleReaderExec if reader.isLocalReader => reader
}
numLocalReaders.foreach { r =>
val rdd = r.executeColumnar()
val parts = rdd.partitions
assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
}
assert(numShuffles === (numLocalReaders.length + numShufflesWithoutLocalReader))
numLocalReaders.length
}

def skewJoinTest(fun: SparkSession => Unit) {
assumeSpark301orLater

Expand Down Expand Up @@ -369,22 +496,23 @@ class AdaptiveQueryExecSuite
testData(spark)
testData2(spark)
testData3(spark)
lowerCaseData(spark)
}

/** Ported from org.apache.spark.sql.test.SQLTestData */
private def testData(spark: SparkSession) {
import spark.implicits._
val data: Seq[(Int, String)] = (1 to 100).map(i => (i, i.toString))
val df = data.toDF("key", "value")
.repartition(6)
.repartition(col("key"))
registerAsParquetTable(spark, df, "testData") }

/** Ported from org.apache.spark.sql.test.SQLTestData */
private def testData2(spark: SparkSession) {
import spark.implicits._
val df = Seq[(Int, Int)]((1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2))
.toDF("a", "b")
.repartition(2)
.repartition(col("a"))
registerAsParquetTable(spark, df, "testData2")
}

Expand All @@ -393,10 +521,24 @@ class AdaptiveQueryExecSuite
import spark.implicits._
val df = Seq[(Int, Option[Int])]((1, None), (2, Some(2)))
.toDF("a", "b")
.repartition(6)
.repartition(col("a"))
registerAsParquetTable(spark, df, "testData3")
}

/** Ported from org.apache.spark.sql.test.SQLTestData */
private def lowerCaseData(spark: SparkSession) {
import spark.implicits._
// note that this differs from the original Spark test by generating a larger data set so that
// we can trigger larger stats in the logical mode, preventing BHJ, and then our queries filter
// this down to a smaller data set so that SMJ can be replaced with BHJ at execution time when
// AQE is enabled`
val data: Seq[(Int, String)] = (0 to 10000).map(i => (i, if (i<5) i.toString else "z"))
val df = data
.toDF("n", "l")
.repartition(col("n"))
registerAsParquetTable(spark, df, "lowercaseData")
}

private def registerAsParquetTable(spark: SparkSession, df: Dataset[Row], name: String) {
val path = new File(TEST_FILES_ROOT, s"$name.parquet").getAbsolutePath
df.write
Expand Down