Skip to content

Commit

Permalink
Avoid comparing window range canonicalized plans on Spark 3.0.x (#2984)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Jul 22, 2021
1 parent 8cdf2d9 commit ca51ed2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,13 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
df: SparkSession => DataFrame,
execsAllowedNonGpu: Seq[String],
conf: SparkConf = new SparkConf(),
sortBeforeRepart: Boolean = false)(fun: DataFrame => DataFrame): Unit = {
sortBeforeRepart: Boolean = false,
skipCanonicalizationCheck: Boolean = false)(fun: DataFrame => DataFrame): Unit = {
testSparkResultsAreEqual(testName, df,
conf=conf,
execsAllowedNonGpu=execsAllowedNonGpu,
sortBeforeRepart = sortBeforeRepart)(fun)
sortBeforeRepart = sortBeforeRepart,
skipCanonicalizationCheck = skipCanonicalizationCheck)(fun)
}

def ALLOW_NON_GPU_testSparkResultsAreEqualWithCapture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType

class WindowFunctionSuite extends SparkQueryCompareTestSuite {
// The logical plan optimizer in Spark 3.0.x is non-deterministic when planning windows
// over the same range, so avoid trying to compare canonicalized plans on Spark 3.0.x
private val skipRangeCanon = cmpSparkVersion(3, 1, 1) < 0

def windowAggregationTester(windowSpec: WindowSpec): DataFrame => DataFrame =
(df : DataFrame) => df.select(
Expand Down Expand Up @@ -88,7 +91,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
https://github.com/NVIDIA/spark-rapids/issues/1039 is fixed then we can enable these tests again
testSparkResultsAreEqual("[Window] [ROWS/RANGE] [default] ", windowTestDfOrcNonNullable,
execsAllowedNonGpu=Seq("DeserializeToObjectExec", "CreateExternalRow",
"Invoke", "StaticInvoke")) {
"Invoke", "StaticInvoke"), skipCanonicalizationCheck = skipRangeCanon) {
val rowsWindow = Window.partitionBy("uid")
.orderBy("dateLong")
windowAggregationTester(rowsWindow)
Expand Down Expand Up @@ -222,7 +225,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
|""".stripMargin)
}

testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [-2 DAYS, 3 DAYS] ", windowTestDfOrc) {
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [-2 DAYS, 3 DAYS] ", windowTestDfOrc,
skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -234,7 +238,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
testAllWindowAggregations(windowClause)
}

testSparkResultsAreEqual("[Window] [RANGE] [DESC] [-2 DAYS, 3 DAYS] ", windowTestDfOrc) {
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [-2 DAYS, 3 DAYS] ", windowTestDfOrc,
skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -246,7 +251,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
testAllWindowAggregations(windowClause)
}

testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [-2 DAYS, CURRENT ROW] ", windowTestDfOrc) {
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [-2 DAYS, CURRENT ROW] ", windowTestDfOrc,
skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -258,7 +264,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
testAllWindowAggregations(windowClause)
}

testSparkResultsAreEqual("[Window] [RANGE] [DESC] [-2 DAYS, CURRENT ROW] ", windowTestDfOrc) {
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [-2 DAYS, CURRENT ROW] ", windowTestDfOrc,
skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -274,7 +281,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
Very similar functionality is covered by the python integration tests. When
https://github.com/NVIDIA/spark-rapids/issues/1039 is fixed then we can enable these tests again
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [-2 DAYS, UNBOUNDED FOLLOWING] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -287,7 +294,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [-2 DAYS, UNBOUNDED FOLLOWING] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -300,7 +307,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
*/

testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [CURRENT ROW, 3 DAYS] ", windowTestDfOrc) {
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [CURRENT ROW, 3 DAYS] ", windowTestDfOrc,
skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -312,7 +320,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
testAllWindowAggregations(windowClause)
}

testSparkResultsAreEqual("[Window] [RANGE] [DESC] [CURRENT ROW, 3 DAYS] ", windowTestDfOrc) {
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [CURRENT ROW, 3 DAYS] ", windowTestDfOrc,
skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -325,7 +334,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [CURRENT ROW, CURRENT ROW] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -338,7 +347,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("[Window] [RANGE] [DESC] [CURRENT ROW, CURRENT ROW] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -351,7 +360,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("[Window] [RANGE] [ASC] [Integral Type]",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -364,7 +373,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("[Window] [RANGE] [ASC] [Short Type]", windowTestDfOrc,
new SparkConf().set("spark.rapids.sql.window.range.short.enabled", "true")) {
new SparkConf().set("spark.rapids.sql.window.range.short.enabled", "true"),
skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -377,7 +387,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("[Window] [RANGE] [ASC] [Long Type]",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -390,7 +400,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("[Window] [RANGE] [ASC] [Byte Type]", windowTestDfOrc,
new SparkConf().set("spark.rapids.sql.window.range.byte.enabled", "true")) {
new SparkConf().set("spark.rapids.sql.window.range.byte.enabled", "true"),
skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -403,7 +414,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("[Window] [RANGE] [ASC] [Date Type]",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {

val windowClause =
"""
Expand All @@ -420,7 +431,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
Very similar functionality is covered by the python integration tests. When
https://github.com/NVIDIA/spark-rapids/issues/1039 is fixed then we can enable these tests again
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [CURRENT ROW, UNBOUNDED FOLLOWING] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -433,7 +444,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [CURRENT ROW, UNBOUNDED FOLLOWING] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -446,7 +457,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [UNBOUNDED PRECEDING, 3 DAYS] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -459,7 +470,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [UNBOUNDED PRECEDING, 3 DAYS] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -472,7 +483,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [UNBOUNDED PRECEDING, CURRENT ROW] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -485,7 +496,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [UNBOUNDED PRECEDING, CURRENT ROW] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -498,7 +509,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -511,7 +522,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
}
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -523,7 +534,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
testAllWindowAggregations(windowClause)
}
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [Unspecified bounds] ", windowTestDfOrc) {
testSparkResultsAreEqual("[Window] [RANGE] [ ASC] [Unspecified bounds] ", windowTestDfOrc,
skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -534,7 +546,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
testAllWindowAggregations(windowClause)
}
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [Unspecified bounds] ", windowTestDfOrc) {
testSparkResultsAreEqual("[Window] [RANGE] [DESC] [Unspecified bounds] ", windowTestDfOrc,
skipCanonicalizationCheck = skipRangeCanon) {
val windowClause =
"""
Expand All @@ -547,7 +560,7 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
*/

IGNORE_ORDER_testSparkResultsAreEqual("[Window] [MIXED WINDOW SPECS] ",
windowTestDfOrc) {
windowTestDfOrc, skipCanonicalizationCheck = skipRangeCanon) {
(df : DataFrame) => {
df.createOrReplaceTempView("mytable")
// scalastyle:off line.size.limit
Expand Down Expand Up @@ -577,7 +590,8 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite {
"SpecifiedWindowFrame",
"WindowExec",
"WindowExpression",
"WindowSpecDefinition")) {
"WindowSpecDefinition"),
skipCanonicalizationCheck = skipRangeCanon) {
(df : DataFrame) => {
df.createOrReplaceTempView("mytable")
// scalastyle:off line.size.limit
Expand Down

0 comments on commit ca51ed2

Please sign in to comment.