From 9281caa53c6bd404cde286a46af87a4e31592bb2 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 4 Jan 2022 17:50:49 +0800 Subject: [PATCH] Fix 330 build error and add 322 shims layer [databricks] (#4447) * Add Shims 322 Signed-off-by: Chong Gao * Fix the 330 build errors Signed-off-by: Firestarman * Add Shims 322 Signed-off-by: Chong Gao * Update doc Signed-off-by: Chong Gao * Update comments Signed-off-by: Chong Gao * Fix the build errors related to partitioning Signed-off-by: Firestarman * Correct the improt order Signed-off-by: Firestarman * add 322 version in the jenkens version def file Signed-off-by: Chong Gao * Refactor Signed-off-by: Chong Gao Co-authored-by: Firestarman --- build/buildall | 1 + dist/pom.xml | 1 + .../rapids-shuffle.md | 1 + jenkins/spark-premerge-build.sh | 1 + jenkins/version-def.sh | 2 +- pom.xml | 56 + shims/pom.xml | 4 + .../rapids/shims/spark301/Spark301Shims.scala | 6 +- .../rapids/shims/spark302/Spark302Shims.scala | 6 +- .../rapids/shims/spark303/Spark303Shims.scala | 6 +- .../rapids/shims/spark304/Spark304Shims.scala | 10 +- .../rapids/shims/spark311/Spark311Shims.scala | 6 +- .../shims/spark311cdh/Spark311CDHShims.scala | 6 +- .../rapids/shims/spark312/Spark312Shims.scala | 6 +- .../shims/spark312db/Spark312dbShims.scala | 10 +- .../rapids/shims/spark313/Spark313Shims.scala | 10 +- .../rapids/shims/spark320/Spark320Shims.scala | 2 +- .../rapids/shims/spark321/Spark321Shims.scala | 2 +- shims/spark322/pom.xml | 96 ++ .../rapids/shims/spark322/Spark322Shims.scala | 40 + .../spark322/SparkShimServiceProvider.scala | 35 + .../spark322/RapidsShuffleManager.scala | 26 + .../RapidsShuffleInternalManager.scala | 60 + .../shims/spark322/Spark322ShimsSuite.scala | 41 + .../shims/v2/GpuShuffledHashJoinExec.scala | 15 +- .../rapids/shims/v2/Spark30XdbShims.scala | 10 +- .../shims/v2/GpuShuffledHashJoinExec.scala | 12 +- .../rapids/shims/v2/GpuHashPartitioning.scala | 41 + .../parquet/rapids/ParquetMaterializer.scala | 2 +- .../shims/v2/GpuShuffledHashJoinExec.scala | 100 ++ .../spark/rapids/shims/v2/RebaseShims.scala | 43 + .../shims/v2/Spark320until322Shims.scala} | 32 +- .../shims/v2/ShimVectorizedColumnReader.scala | 0 .../rapids/shims/v2/Spark322PlusShims.scala | 1103 +++++++++++++++++ .../shims/v2/ShimVectorizedColumnReader.scala | 67 + .../rapids/shims/v2/GpuHashPartitioning.scala | 36 + .../shims/v2/GpuShuffledHashJoinExec.scala | 14 +- .../spark/rapids/shims/v2/Spark33XShims.scala | 24 +- ...ng.scala => GpuHashPartitioningBase.scala} | 19 +- .../nvidia/spark/rapids/GpuOverrides.scala | 2 +- .../spark/rapids/GpuParquetScanBase.scala | 4 +- .../rapids/GpuShuffledHashJoinBase.scala | 8 +- .../com/nvidia/spark/rapids/SparkShims.scala | 6 +- .../GpuShuffleExchangeExecBase.scala | 2 +- 44 files changed, 1863 insertions(+), 111 deletions(-) create mode 100644 shims/spark322/pom.xml create mode 100644 shims/spark322/src/main/scala/com/nvidia/spark/rapids/shims/spark322/Spark322Shims.scala create mode 100644 shims/spark322/src/main/scala/com/nvidia/spark/rapids/shims/spark322/SparkShimServiceProvider.scala create mode 100644 shims/spark322/src/main/scala/com/nvidia/spark/rapids/spark322/RapidsShuffleManager.scala create mode 100644 shims/spark322/src/main/scala/org/apache/spark/sql/rapids/shims/spark322/RapidsShuffleInternalManager.scala create mode 100644 shims/spark322/src/test/scala/com/nvidia/spark/rapids/shims/spark322/Spark322ShimsSuite.scala create mode 100644 sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuHashPartitioning.scala create mode 100644 sql-plugin/src/main/311until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala create mode 100644 sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/RebaseShims.scala rename sql-plugin/src/main/{320+/scala/com/nvidia/spark/rapids/shims/v2/Spark32XShims.scala => 320until322/scala/com/nvidia/spark/rapids/shims/v2/Spark320until322Shims.scala} (97%) rename sql-plugin/src/main/{320+ => 320until322}/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala (100%) create mode 100644 sql-plugin/src/main/322+/scala/com/nvidia/spark/rapids/shims/v2/Spark322PlusShims.scala create mode 100644 sql-plugin/src/main/322+/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala create mode 100644 sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuHashPartitioning.scala rename sql-plugin/src/main/{311+-all => 330+}/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala (86%) rename sql-plugin/src/main/scala/com/nvidia/spark/rapids/{GpuHashPartitioning.scala => GpuHashPartitioningBase.scala} (80%) diff --git a/build/buildall b/build/buildall index fbe08d9f3f6..2ab7ab30527 100755 --- a/build/buildall +++ b/build/buildall @@ -132,6 +132,7 @@ case $DIST_PROFILE in 313 320 321 + 322 330 ) ;; diff --git a/dist/pom.xml b/dist/pom.xml index 6fb8670a86f..e83dbe9b82b 100644 --- a/dist/pom.xml +++ b/dist/pom.xml @@ -64,6 +64,7 @@ 304, 313, 321, + 322, 330 diff --git a/docs/additional-functionality/rapids-shuffle.md b/docs/additional-functionality/rapids-shuffle.md index 03f08a09ff7..f2cd4715c64 100644 --- a/docs/additional-functionality/rapids-shuffle.md +++ b/docs/additional-functionality/rapids-shuffle.md @@ -297,6 +297,7 @@ In this section, we are using a docker container built using the sample dockerfi | 3.1.3 | com.nvidia.spark.rapids.spark313.RapidsShuffleManager | | 3.2.0 | com.nvidia.spark.rapids.spark320.RapidsShuffleManager | | 3.2.1 | com.nvidia.spark.rapids.spark321.RapidsShuffleManager | + | 3.2.2 | com.nvidia.spark.rapids.spark322.RapidsShuffleManager | | 3.3.0 | com.nvidia.spark.rapids.spark330.RapidsShuffleManager | | Databricks 7.3| com.nvidia.spark.rapids.spark301db.RapidsShuffleManager | | Databricks 9.1| com.nvidia.spark.rapids.spark312db.RapidsShuffleManager | diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index a82254ce240..767f7747eee 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -48,6 +48,7 @@ mvn_verify() { # don't skip tests env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=320 clean install -Drat.skip=true -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -Dpytest.TEST_TAGS='' -pl '!tools' env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=321 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am + env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=322 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=330 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am # Here run Python integration tests tagged with 'premerge_ci_1' only, that would help balance test duration and memory diff --git a/jenkins/version-def.sh b/jenkins/version-def.sh index bf9461f2e66..6edb82b05ba 100755 --- a/jenkins/version-def.sh +++ b/jenkins/version-def.sh @@ -49,7 +49,7 @@ echo "CUDF_VER: $CUDF_VER, CUDA_CLASSIFIER: $CUDA_CLASSIFIER, PROJECT_VER: $PROJ SPARK_VER: $SPARK_VER, SCALA_BINARY_VER: $SCALA_BINARY_VER" -SPARK_SHIM_VERSIONS_STR=${SPARK_SHIM_VERSIONS_STR:-"301 302 303 304 311 311cdh 312 313 320 321 330"} +SPARK_SHIM_VERSIONS_STR=${SPARK_SHIM_VERSIONS_STR:-"301 302 303 304 311 311cdh 312 313 320 321 322 330"} IFS=" " <<< $SPARK_SHIM_VERSIONS_STR read -r -a SPARK_SHIM_VERSIONS diff --git a/pom.xml b/pom.xml index d613adae3af..365b0096a77 100644 --- a/pom.xml +++ b/pom.xml @@ -292,6 +292,7 @@ ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-noncdh/scala ${project.basedir}/src/main/311until320-nondb/scala + ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -405,6 +406,7 @@ ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311until320-noncdh/scala ${project.basedir}/src/main/31xdb/scala + ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/post320-treenode/scala @@ -450,6 +452,7 @@ ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-noncdh/scala ${project.basedir}/src/main/311until320-nondb/scala + ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -498,6 +501,7 @@ ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-noncdh/scala ${project.basedir}/src/main/311until320-nondb/scala + ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -541,7 +545,9 @@ ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala + ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/320until322/scala ${project.basedir}/src/main/post320-treenode/scala @@ -584,7 +590,54 @@ ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala + ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/320until322/scala + ${project.basedir}/src/main/post320-treenode/scala + + + + + + + + + tools + aggregator + tests-spark310+ + + + + release322 + + + buildver + 322 + + + + ${spark322.version} + ${spark322.version} + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-profile-src-31+ + add-source + generate-sources + + + ${project.basedir}/src/main/301+-nondb/scala + ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/311+-all/scala + ${project.basedir}/src/main/311+-nondb/scala + ${project.basedir}/src/main/311until330-all/scala + ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/322+/scala ${project.basedir}/src/main/post320-treenode/scala @@ -627,6 +680,7 @@ ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/322+/scala ${project.basedir}/src/main/330+/scala ${project.basedir}/src/main/post320-treenode/scala @@ -675,6 +729,7 @@ ${project.basedir}/src/main/311cdh/scala ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-nondb/scala + ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -783,6 +838,7 @@ 3.1.3-SNAPSHOT 3.2.0 3.2.1-SNAPSHOT + 3.2.2-SNAPSHOT 3.3.0-SNAPSHOT 3.6.0 4.3.0 diff --git a/shims/pom.xml b/shims/pom.xml index 59fd079c448..a9452e71398 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -85,6 +85,10 @@ release321 spark321 + + release322 + spark322 + diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala index 31b390c7742..1e71d426f7c 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark301Shims extends Spark30XShims with Spark30Xuntil33XShims { @@ -35,7 +34,8 @@ class Spark301Shims extends Spark30XShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = { + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive) } diff --git a/shims/spark302/src/main/scala/com/nvidia/spark/rapids/shims/spark302/Spark302Shims.scala b/shims/spark302/src/main/scala/com/nvidia/spark/rapids/shims/spark302/Spark302Shims.scala index 8399c2a99f0..dccd459ae4c 100644 --- a/shims/spark302/src/main/scala/com/nvidia/spark/rapids/shims/spark302/Spark302Shims.scala +++ b/shims/spark302/src/main/scala/com/nvidia/spark/rapids/shims/spark302/Spark302Shims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark302Shims extends Spark30XShims with Spark30Xuntil33XShims { @@ -35,7 +34,8 @@ class Spark302Shims extends Spark30XShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = { + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive) } diff --git a/shims/spark303/src/main/scala/com/nvidia/spark/rapids/shims/spark303/Spark303Shims.scala b/shims/spark303/src/main/scala/com/nvidia/spark/rapids/shims/spark303/Spark303Shims.scala index 41116562b8e..9656e655f74 100644 --- a/shims/spark303/src/main/scala/com/nvidia/spark/rapids/shims/spark303/Spark303Shims.scala +++ b/shims/spark303/src/main/scala/com/nvidia/spark/rapids/shims/spark303/Spark303Shims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark303Shims extends Spark30XShims with Spark30Xuntil33XShims { @@ -35,7 +34,8 @@ class Spark303Shims extends Spark30XShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = { + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive) } diff --git a/shims/spark304/src/main/scala/com/nvidia/spark/rapids/shims/spark304/Spark304Shims.scala b/shims/spark304/src/main/scala/com/nvidia/spark/rapids/shims/spark304/Spark304Shims.scala index 346d9c2951c..ac3e33cddb3 100644 --- a/shims/spark304/src/main/scala/com/nvidia/spark/rapids/shims/spark304/Spark304Shims.scala +++ b/shims/spark304/src/main/scala/com/nvidia/spark/rapids/shims/spark304/Spark304Shims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,8 @@ import com.nvidia.spark.rapids.ShimVersion import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark304Shims extends Spark30XShims with Spark30Xuntil33XShims { @@ -35,7 +35,11 @@ class Spark304Shims extends Spark30XShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { + val datetimeRebaseMode = DataSourceUtils + .datetimeRebaseMode(lookupFileMeta, dateTimeRebaseModeFromConf) new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) + } } diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala index 4f7d30a7681..343270588c2 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark311Shims extends Spark31XShims with Spark30Xuntil33XShims { @@ -37,7 +36,8 @@ class Spark311Shims extends Spark31XShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = { + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive) } diff --git a/shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala b/shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala index c687755e4a8..ba527ecad93 100644 --- a/shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala +++ b/shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark311CDHShims extends Spark31XShims with Spark30Xuntil33XShims { @@ -37,7 +36,8 @@ class Spark311CDHShims extends Spark31XShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = { + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive) } diff --git a/shims/spark312/src/main/scala/com/nvidia/spark/rapids/shims/spark312/Spark312Shims.scala b/shims/spark312/src/main/scala/com/nvidia/spark/rapids/shims/spark312/Spark312Shims.scala index 5ada5b10c0a..41f6bd44ca5 100644 --- a/shims/spark312/src/main/scala/com/nvidia/spark/rapids/shims/spark312/Spark312Shims.scala +++ b/shims/spark312/src/main/scala/com/nvidia/spark/rapids/shims/spark312/Spark312Shims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark312Shims extends Spark31XShims with Spark30Xuntil33XShims { @@ -37,7 +36,8 @@ class Spark312Shims extends Spark31XShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = { + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive) } diff --git a/shims/spark312db/src/main/scala/com/nvidia/spark/rapids/shims/spark312db/Spark312dbShims.scala b/shims/spark312db/src/main/scala/com/nvidia/spark/rapids/shims/spark312db/Spark312dbShims.scala index 933994f2256..2a8b2830e1d 100644 --- a/shims/spark312db/src/main/scala/com/nvidia/spark/rapids/shims/spark312db/Spark312dbShims.scala +++ b/shims/spark312db/src/main/scala/com/nvidia/spark/rapids/shims/spark312db/Spark312dbShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,8 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark312dbShims extends Spark31XdbShims with Spark30Xuntil33XShims { @@ -35,7 +35,11 @@ class Spark312dbShims extends Spark31XdbShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { + val datetimeRebaseMode = DataSourceUtils + .datetimeRebaseMode(lookupFileMeta, dateTimeRebaseModeFromConf) new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) + } } diff --git a/shims/spark313/src/main/scala/com/nvidia/spark/rapids/shims/spark313/Spark313Shims.scala b/shims/spark313/src/main/scala/com/nvidia/spark/rapids/shims/spark313/Spark313Shims.scala index 020a3b74027..f9ae5c67869 100644 --- a/shims/spark313/src/main/scala/com/nvidia/spark/rapids/shims/spark313/Spark313Shims.scala +++ b/shims/spark313/src/main/scala/com/nvidia/spark/rapids/shims/spark313/Spark313Shims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,8 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.v2._ import org.apache.parquet.schema.MessageType +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf class Spark313Shims extends Spark31XShims with Spark30Xuntil33XShims { @@ -35,9 +35,13 @@ class Spark313Shims extends Spark31XShims with Spark30Xuntil33XShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { + val datetimeRebaseMode = DataSourceUtils + .datetimeRebaseMode(lookupFileMeta, dateTimeRebaseModeFromConf) new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) + } override def hasCastFloatTimestampUpcast: Boolean = true diff --git a/shims/spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/Spark320Shims.scala b/shims/spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/Spark320Shims.scala index 0d4f9118f2d..2b492a15588 100644 --- a/shims/spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/Spark320Shims.scala +++ b/shims/spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/Spark320Shims.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.types.StructType -class Spark320Shims extends Spark32XShims with Spark30Xuntil33XShims { +class Spark320Shims extends Spark320until322Shims with Spark30Xuntil33XShims { override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION override def getFileScanRDD( diff --git a/shims/spark321/src/main/scala/com/nvidia/spark/rapids/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/nvidia/spark/rapids/shims/spark321/Spark321Shims.scala index dde084e5bca..ffd469afe83 100644 --- a/shims/spark321/src/main/scala/com/nvidia/spark/rapids/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/nvidia/spark/rapids/shims/spark321/Spark321Shims.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.types.StructType -class Spark321Shims extends Spark32XShims with Spark30Xuntil33XShims { +class Spark321Shims extends Spark320until322Shims with Spark30Xuntil33XShims { override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION override def getFileScanRDD( diff --git a/shims/spark322/pom.xml b/shims/spark322/pom.xml new file mode 100644 index 00000000000..8aa67880097 --- /dev/null +++ b/shims/spark322/pom.xml @@ -0,0 +1,96 @@ + + + + 4.0.0 + + + com.nvidia + rapids-4-spark-shims_2.12 + 22.02.0-SNAPSHOT + ../pom.xml + + rapids-4-spark-shims-spark322_2.12 + RAPIDS Accelerator for Apache Spark SQL Plugin Spark 3.2.2 Shim + The RAPIDS SQL plugin for Apache Spark 3.2.2 Shim + 22.02.0-SNAPSHOT + + + + + + + + + + + maven-antrun-plugin + + + dependency + generate-resources + + + + + + + + + + + + + + + + + + + + run + + + + + + + + + + ${project.build.directory}/extra-resources + + + src/main/resources + + + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark322.version} + provided + + + diff --git a/shims/spark322/src/main/scala/com/nvidia/spark/rapids/shims/spark322/Spark322Shims.scala b/shims/spark322/src/main/scala/com/nvidia/spark/rapids/shims/spark322/Spark322Shims.scala new file mode 100644 index 00000000000..b3ba4baaa41 --- /dev/null +++ b/shims/spark322/src/main/scala/com/nvidia/spark/rapids/shims/spark322/Spark322Shims.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark322 + +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.shims.v2._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.types.StructType + +class Spark322Shims extends Spark322PlusShims with Spark30Xuntil33XShims { + override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION + + override def getFileScanRDD( + sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + filePartitions: Seq[FilePartition], + readDataSchema: StructType, + metadataColumns: Seq[AttributeReference]): RDD[InternalRow] = { + new FileScanRDD(sparkSession, readFunction, filePartitions) + } +} diff --git a/shims/spark322/src/main/scala/com/nvidia/spark/rapids/shims/spark322/SparkShimServiceProvider.scala b/shims/spark322/src/main/scala/com/nvidia/spark/rapids/shims/spark322/SparkShimServiceProvider.scala new file mode 100644 index 00000000000..3b979042d59 --- /dev/null +++ b/shims/spark322/src/main/scala/com/nvidia/spark/rapids/shims/spark322/SparkShimServiceProvider.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark322 + +import com.nvidia.spark.rapids.{SparkShims, SparkShimVersion} + +object SparkShimServiceProvider { + val VERSION = SparkShimVersion(3, 2, 2) + val VERSIONNAMES = Seq(s"$VERSION", s"$VERSION-SNAPSHOT") +} + +class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { + + def matchesVersion(version: String): Boolean = { + SparkShimServiceProvider.VERSIONNAMES.contains(version) + } + + def buildShim: SparkShims = { + new Spark322Shims() + } +} diff --git a/shims/spark322/src/main/scala/com/nvidia/spark/rapids/spark322/RapidsShuffleManager.scala b/shims/spark322/src/main/scala/com/nvidia/spark/rapids/spark322/RapidsShuffleManager.scala new file mode 100644 index 00000000000..0b2ea1ef2c2 --- /dev/null +++ b/shims/spark322/src/main/scala/com/nvidia/spark/rapids/spark322/RapidsShuffleManager.scala @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.spark322 + +import org.apache.spark.SparkConf +import org.apache.spark.sql.rapids.shims.spark322.ProxyRapidsShuffleInternalManager + +/** A shuffle manager optimized for the RAPIDS Plugin for Apache Spark. */ +sealed class RapidsShuffleManager( + conf: SparkConf, + isDriver: Boolean) extends ProxyRapidsShuffleInternalManager(conf, isDriver) { +} diff --git a/shims/spark322/src/main/scala/org/apache/spark/sql/rapids/shims/spark322/RapidsShuffleInternalManager.scala b/shims/spark322/src/main/scala/org/apache/spark/sql/rapids/shims/spark322/RapidsShuffleInternalManager.scala new file mode 100644 index 00000000000..bb14004326a --- /dev/null +++ b/shims/spark322/src/main/scala/org/apache/spark/sql/rapids/shims/spark322/RapidsShuffleInternalManager.scala @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.shims.spark322 + +import org.apache.spark.{SparkConf, TaskContext} +import org.apache.spark.shuffle._ +import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase} + +/** + * A shuffle manager optimized for the RAPIDS Plugin For Apache Spark. + * @note This is an internal class to obtain access to the private + * `ShuffleManager` and `SortShuffleManager` classes. + */ +class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean) + extends RapidsShuffleInternalManagerBase(conf, isDriver) { + + def getReader[K, C]( + handle: ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { + getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, + metrics) + } +} + + +class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean) + extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager { + + def getReader[K, C]( + handle: ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter + ): ShuffleReader[K, C] = { + self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, + metrics) + } +} \ No newline at end of file diff --git a/shims/spark322/src/test/scala/com/nvidia/spark/rapids/shims/spark322/Spark322ShimsSuite.scala b/shims/spark322/src/test/scala/com/nvidia/spark/rapids/shims/spark322/Spark322ShimsSuite.scala new file mode 100644 index 00000000000..95295634f4f --- /dev/null +++ b/shims/spark322/src/test/scala/com/nvidia/spark/rapids/shims/spark322/Spark322ShimsSuite.scala @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark322; + +import com.nvidia.spark.rapids.{ShimLoader, SparkShims, SparkShimVersion, TypeSig} +import org.scalatest.FunSuite + +import org.apache.spark.sql.types.{DayTimeIntervalType, YearMonthIntervalType} + +class Spark322ShimsSuite extends FunSuite { + val sparkShims: SparkShims = new SparkShimServiceProvider().buildShim + test("spark shims version") { + assert(sparkShims.getSparkShimVersion === SparkShimVersion(3, 2, 2)) + } + + test("shuffle manager class") { + assert(ShimLoader.getRapidsShuffleManagerClass === + classOf[com.nvidia.spark.rapids.spark322.RapidsShuffleManager].getCanonicalName) + } + + test("TypeSig322") { + val check = TypeSig.DAYTIME + TypeSig.YEARMONTH + assert(check.isSupportedByPlugin(DayTimeIntervalType()) == true) + assert(check.isSupportedByPlugin(YearMonthIntervalType()) == true) + } + +} diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala index 0438d956c0c..c2977bc2df1 100644 --- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks} @@ -30,7 +31,8 @@ object GpuJoinUtils { buildSide match { case BuildRight => GpuBuildRight case BuildLeft => GpuBuildLeft - case _ => throw new Exception("unknown buildSide Type") + case unknownBuildSide => + throw new IllegalArgumentException(s"unknown buildSide Type: $unknownBuildSide") } } } @@ -90,9 +92,10 @@ case class GpuShuffledHashJoinExec( extends GpuShuffledHashJoinBase( buildSide, condition, - isSkewJoin = isSkewJoin, - cpuLeftKeys, - cpuRightKeys) { + isSkewJoin = isSkewJoin) { - override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil + override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys) + + override def requiredChildDistribution: Seq[Distribution] = + Seq(HashClusteredDistribution(cpuLeftKeys), HashClusteredDistribution(cpuRightKeys)) } diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XdbShims.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XdbShims.scala index c9b1211ee22..1d3276849f6 100644 --- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XdbShims.scala +++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XdbShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} -import org.apache.spark.sql.execution.datasources.{FileIndex, FilePartition, HadoopFsRelation, InMemoryFileIndex, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileIndex, FilePartition, HadoopFsRelation, InMemoryFileIndex, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils @@ -77,9 +77,13 @@ abstract class Spark30XdbShims extends Spark30XdbShimsBase with Logging { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { + val datetimeRebaseMode = DataSourceUtils + .datetimeRebaseMode(lookupFileMeta, dateTimeRebaseModeFromConf) new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) + } override def v1RepairTableCommand(tableName: TableIdentifier): RunnableCommand = AlterTableRecoverPartitionsCommand(tableName) diff --git a/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala index 8801e7bc998..7ada30d77e6 100644 --- a/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, ShuffledHashJoinExec} import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks} @@ -29,7 +30,7 @@ object GpuJoinUtils { buildSide match { case BuildRight => GpuBuildRight case BuildLeft => GpuBuildLeft - case _ => throw new Exception("unknown buildSide Type") + case unknownBuildSide => throw new Exception(s"unknown buildSide Type: $unknownBuildSide") } } } @@ -89,9 +90,10 @@ case class GpuShuffledHashJoinExec( extends GpuShuffledHashJoinBase( buildSide, condition, - isSkewJoin = isSkewJoin, - cpuLeftKeys, - cpuRightKeys) { + isSkewJoin = isSkewJoin) { override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil + + override def requiredChildDistribution: Seq[Distribution] = + HashClusteredDistribution(cpuLeftKeys) :: HashClusteredDistribution(cpuRightKeys) :: Nil } diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuHashPartitioning.scala b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuHashPartitioning.scala new file mode 100644 index 00000000000..cd058e6e47a --- /dev/null +++ b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuHashPartitioning.scala @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.v2 + +import com.nvidia.spark.rapids.GpuHashPartitioningBase + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution} + +case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int) + extends GpuHashPartitioningBase(expressions, numPartitions) { + + override def satisfies0(required: Distribution): Boolean = { + super.satisfies0(required) || { + required match { + case h: HashClusteredDistribution => + expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { + case (l, r) => l.semanticEquals(r) + } + case ClusteredDistribution(requiredClustering, _) => + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + case _ => false + } + } + } + +} diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala b/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala index 4684742fc41..8154ded831d 100644 --- a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala +++ b/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala @@ -41,7 +41,7 @@ class ParquetRecordMaterializer( parquetSchema, catalystSchema, convertTz, - datetimeRebaseMode, + datetimeRebaseMode, // always LegacyBehaviorPolicy.CORRECTED LegacyBehaviorPolicy.EXCEPTION, false, NoopUpdater) diff --git a/sql-plugin/src/main/311until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/311until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala new file mode 100644 index 00000000000..6d3850d3189 --- /dev/null +++ b/sql-plugin/src/main/311until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.v2 + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks} + +object GpuJoinUtils { + def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = { + buildSide match { + case BuildRight => GpuBuildRight + case BuildLeft => GpuBuildLeft + case unknownBuildSide => throw new Exception(s"unknown buildSide Type: $unknownBuildSide") + } + } +} + +class GpuShuffledHashJoinMeta( + join: ShuffledHashJoinExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends SparkPlanMeta[ShuffledHashJoinExec](join, conf, parent, rule) { + val leftKeys: Seq[BaseExprMeta[_]] = + join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val rightKeys: Seq[BaseExprMeta[_]] = + join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val condition: Option[BaseExprMeta[_]] = + join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + + override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] = + JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition) + + override def tagPlanForGpu(): Unit = { + GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) + } + + override def convertToGpu(): GpuExec = { + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) + val joinExec = GpuShuffledHashJoinExec( + leftKeys.map(_.convertToGpu()), + rightKeys.map(_.convertToGpu()), + join.joinType, + GpuJoinUtils.getGpuBuildSide(join.buildSide), + None, + left, + right, + isSkewJoin = false)( + join.leftKeys, + join.rightKeys) + // The GPU does not yet support conditional joins, so conditions are implemented + // as a filter after the join when possible. + condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec) + } +} + +case class GpuShuffledHashJoinExec( + override val leftKeys: Seq[Expression], + override val rightKeys: Seq[Expression], + joinType: JoinType, + buildSide: GpuBuildSide, + override val condition: Option[Expression], + left: SparkPlan, + right: SparkPlan, + override val isSkewJoin: Boolean)( + cpuLeftKeys: Seq[Expression], + cpuRightKeys: Seq[Expression]) + extends GpuShuffledHashJoinBase( + buildSide, + condition, + isSkewJoin = isSkewJoin) { + + override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys) + + override def requiredChildDistribution: Seq[Distribution] = + Seq(HashClusteredDistribution(cpuLeftKeys), HashClusteredDistribution(cpuRightKeys)) +} diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/RebaseShims.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/RebaseShims.scala new file mode 100644 index 00000000000..867cc996281 --- /dev/null +++ b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/RebaseShims.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.v2 + +import org.apache.spark.sql.internal.SQLConf + +// 320+ rebase related shims +trait RebaseShims { + final def parquetRebaseReadKey: String = + SQLConf.PARQUET_REBASE_MODE_IN_READ.key + final def parquetRebaseWriteKey: String = + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key + final def avroRebaseReadKey: String = + SQLConf.AVRO_REBASE_MODE_IN_READ.key + final def avroRebaseWriteKey: String = + SQLConf.AVRO_REBASE_MODE_IN_WRITE.key + final def parquetRebaseRead(conf: SQLConf): String = + conf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ) + final def parquetRebaseWrite(conf: SQLConf): String = + conf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE) + def int96ParquetRebaseRead(conf: SQLConf): String = + conf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ) + def int96ParquetRebaseWrite(conf: SQLConf): String = + conf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE) + def int96ParquetRebaseReadKey: String = + SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key + def int96ParquetRebaseWriteKey: String = + SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key + def hasSeparateINT96RebaseConf: Boolean = true +} diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/Spark32XShims.scala b/sql-plugin/src/main/320until322/scala/com/nvidia/spark/rapids/shims/v2/Spark320until322Shims.scala similarity index 97% rename from sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/Spark32XShims.scala rename to sql-plugin/src/main/320until322/scala/com/nvidia/spark/rapids/shims/v2/Spark320until322Shims.scala index 707aa8bb7e5..2bed17d1ce1 100644 --- a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/Spark32XShims.scala +++ b/sql-plugin/src/main/320until322/scala/com/nvidia/spark/rapids/shims/v2/Spark320until322Shims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,28 +75,7 @@ import org.apache.spark.unsafe.types.CalendarInterval /** * Shim base class that can be compiled with every supported 3.2.x */ -trait Spark32XShims extends SparkShims with Logging { - override final def parquetRebaseReadKey: String = - SQLConf.PARQUET_REBASE_MODE_IN_READ.key - override final def parquetRebaseWriteKey: String = - SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key - override final def avroRebaseReadKey: String = - SQLConf.AVRO_REBASE_MODE_IN_READ.key - override final def avroRebaseWriteKey: String = - SQLConf.AVRO_REBASE_MODE_IN_WRITE.key - override final def parquetRebaseRead(conf: SQLConf): String = - conf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ) - override final def parquetRebaseWrite(conf: SQLConf): String = - conf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE) - override def int96ParquetRebaseRead(conf: SQLConf): String = - conf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ) - override def int96ParquetRebaseWrite(conf: SQLConf): String = - conf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE) - override def int96ParquetRebaseReadKey: String = - SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key - override def int96ParquetRebaseWriteKey: String = - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key - override def hasSeparateINT96RebaseConf: Boolean = true +trait Spark320until322Shims extends SparkShims with RebaseShims with Logging { override final def aqeShuffleReaderExec: ExecRule[_ <: SparkPlan] = exec[AQEShuffleReadExec]( "A wrapper of shuffle query stage", @@ -108,7 +87,7 @@ trait Spark32XShims extends SparkShims with Logging { plan.session } - override final def getParquetFilters( + override def getParquetFilters( schema: MessageType, pushDownDate: Boolean, pushDownTimestamp: Boolean, @@ -116,7 +95,10 @@ trait Spark32XShims extends SparkShims with Logging { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = { + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { + val datetimeRebaseMode = DataSourceUtils + .datetimeRebaseMode(lookupFileMeta, dateTimeRebaseModeFromConf) new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) } diff --git a/sql-plugin/src/main/320+/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala b/sql-plugin/src/main/320until322/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala similarity index 100% rename from sql-plugin/src/main/320+/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala rename to sql-plugin/src/main/320until322/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala diff --git a/sql-plugin/src/main/322+/scala/com/nvidia/spark/rapids/shims/v2/Spark322PlusShims.scala b/sql-plugin/src/main/322+/scala/com/nvidia/spark/rapids/shims/v2/Spark322PlusShims.scala new file mode 100644 index 00000000000..ebbb18bad36 --- /dev/null +++ b/sql-plugin/src/main/322+/scala/com/nvidia/spark/rapids/shims/v2/Spark322PlusShims.scala @@ -0,0 +1,1103 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.v2 + +import java.net.URI +import java.nio.ByteBuffer + +import scala.collection.mutable.ListBuffer + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} +import com.nvidia.spark.InMemoryTableScanMeta +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.GpuOverrides.exec +import org.apache.arrow.memory.ReferenceManager +import org.apache.arrow.vector.ValueVector +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.parquet.schema.MessageType + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rapids.shims.v2.GpuShuffleExchangeExec +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{Abs, Alias, AnsiCast, Attribute, Cast, DynamicPruningExpression, ElementAt, Expression, ExprId, GetArrayItem, GetMapValue, Lag, Lead, Literal, NamedExpression, NullOrdering, PlanExpression, PythonUDF, RegExpReplace, ScalaUDF, SortDirection, SortOrder, SpecifiedWindowFrame, TimeAdd, WindowExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate.Average +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} +import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.util.DateFormatter +import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering} +import org.apache.spark.sql.execution.{BaseSubqueryExec, CommandResultExec, FileSourceScanExec, InSubqueryExec, PartitionedFileUtil, SparkPlan, SubqueryBroadcastExec} +import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters +import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_REQUIREMENTS, ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.execution.python._ +import org.apache.spark.sql.execution.window.WindowExecBase +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.rapids.{GpuAbs, GpuAnsi, GpuAverage, GpuElementAt, GpuFileSourceScanExec, GpuGetArrayItem, GpuGetArrayItemMeta, GpuGetMapValue, GpuGetMapValueMeta} +import org.apache.spark.sql.rapids.execution._ +import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.sql.rapids.execution.python.shims.v2.GpuFlatMapGroupsInPandasExecMeta +import org.apache.spark.sql.rapids.shims.v2._ +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types._ +import org.apache.spark.storage.{BlockId, BlockManagerId} +import org.apache.spark.unsafe.types.CalendarInterval + +/** +* Shim base class that can be compiled with every supported 3.2.x +*/ +trait Spark322PlusShims extends SparkShims with RebaseShims with Logging { + + override final def aqeShuffleReaderExec: ExecRule[_ <: SparkPlan] = exec[AQEShuffleReadExec]( + "A wrapper of shuffle query stage", + ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.ARRAY + + TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all), + (exec, conf, p, r) => new GpuCustomShuffleReaderMeta(exec, conf, p, r)) + + override final def sessionFromPlan(plan: SparkPlan): SparkSession = { + plan.session + } + + override def getParquetFilters( + schema: MessageType, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownStartWith: Boolean, + pushDownInFilterThreshold: Int, + caseSensitive: Boolean, + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { + val datetimeRebaseMode = DataSourceUtils + .datetimeRebaseSpec(lookupFileMeta, dateTimeRebaseModeFromConf) + new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, + pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) + } + + override final def filesFromFileIndex( + fileIndex: PartitioningAwareFileIndex + ): Seq[FileStatus] = { + fileIndex.allFiles() + } + + override def isEmptyRelation(relation: Any): Boolean = relation match { + case EmptyHashedRelation => true + case arr: Array[InternalRow] if arr.isEmpty => true + case _ => false + } + + override def tryTransformIfEmptyRelation(mode: BroadcastMode): Option[Any] = { + Some(broadcastModeTransform(mode, Array.empty)).filter(isEmptyRelation) + } + + override final def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any = + mode.transform(rows) + + override final def newBroadcastQueryStageExec( + old: BroadcastQueryStageExec, + newPlan: SparkPlan): BroadcastQueryStageExec = + BroadcastQueryStageExec(old.id, newPlan, old._canonicalized) + + override final def isExchangeOp(plan: SparkPlanMeta[_]): Boolean = { + // if the child query stage already executed on GPU then we need to keep the + // next operator on GPU in these cases + SQLConf.get.adaptiveExecutionEnabled && (plan.wrapped match { + case _: AQEShuffleReadExec + | _: ShuffledHashJoinExec + | _: BroadcastHashJoinExec + | _: BroadcastExchangeExec + | _: BroadcastNestedLoopJoinExec => true + case _ => false + }) + } + + override final def isAqePlan(p: SparkPlan): Boolean = p match { + case _: AdaptiveSparkPlanExec | + _: QueryStageExec | + _: AQEShuffleReadExec => true + case _ => false + } + + override def getBuildSide(join: HashJoin): GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } + + override def getBuildSide(join: BroadcastNestedLoopJoinExec): GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } + + override def getDateFormatter(): DateFormatter = { + // TODO verify + DateFormatter() + } + + override def isCustomReaderExec(x: SparkPlan): Boolean = x match { + case _: GpuCustomShuffleReaderExec | _: AQEShuffleReadExec => true + case _ => false + } + + override def v1RepairTableCommand(tableName: TableIdentifier): RunnableCommand = + RepairTableCommand(tableName, + // These match the one place that this is called, if we start to call this in more places + // we will need to change the API to pass these values in. + enableAddPartitions = true, + enableDropPartitions = false) + + override def shouldFailDivOverflow(): Boolean = SQLConf.get.ansiEnabled + + override def leafNodeDefaultParallelism(ss: SparkSession): Int = { + Spark32XShimsUtils.leafNodeDefaultParallelism(ss) + } + + override def shouldFallbackOnAnsiTimestamp(): Boolean = SQLConf.get.ansiEnabled + + override def getLegacyStatisticalAggregate(): Boolean = + SQLConf.get.legacyStatisticalAggregate + + + override def getScalaUDFAsExpression( + function: AnyRef, + dataType: DataType, + children: Seq[Expression], + inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Nil, + outputEncoder: Option[ExpressionEncoder[_]] = None, + udfName: Option[String] = None, + nullable: Boolean = true, + udfDeterministic: Boolean = true): Expression = { + ScalaUDF(function, dataType, children, inputEncoders, outputEncoder, udfName, nullable, + udfDeterministic) + } + + override def getMapSizesByExecutorId( + shuffleId: Int, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, + startMapIndex, endMapIndex, startPartition, endPartition) + } + + override def getGpuBroadcastNestedLoopJoinShim( + left: SparkPlan, + right: SparkPlan, + join: BroadcastNestedLoopJoinExec, + joinType: JoinType, + condition: Option[Expression], + targetSizeBytes: Long): GpuBroadcastNestedLoopJoinExecBase = { + GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) + } + + override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuBroadcastHashJoinExec => true + case _ => false + } + } + + override def isWindowFunctionExec(plan: SparkPlan): Boolean = plan.isInstanceOf[WindowExecBase] + + override def isGpuShuffledHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuShuffledHashJoinExec => true + case _ => false + } + } + + override def getFileSourceMaxMetadataValueLength(sqlConf: SQLConf): Int = + sqlConf.maxMetadataStringLength + + override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( + GpuOverrides.expr[Cast]( + "Convert a column of one type of data into another type", + new CastChecks(), + (cast, conf, p, r) => new CastExprMeta[Cast](cast, + SparkSession.active.sessionState.conf.ansiEnabled, conf, p, r, + doFloatToIntCheck = true, stringToAnsiDate = true)), + GpuOverrides.expr[AnsiCast]( + "Convert a column of one type of data into another type", + new CastChecks { + import TypeSig._ + // nullChecks are the same + + override val booleanChecks: TypeSig = integral + fp + BOOLEAN + STRING + override val sparkBooleanSig: TypeSig = numeric + BOOLEAN + STRING + + override val integralChecks: TypeSig = gpuNumeric + BOOLEAN + STRING + override val sparkIntegralSig: TypeSig = numeric + BOOLEAN + STRING + + override val fpChecks: TypeSig = (gpuNumeric + BOOLEAN + STRING) + .withPsNote(TypeEnum.STRING, fpToStringPsNote) + override val sparkFpSig: TypeSig = numeric + BOOLEAN + STRING + + override val dateChecks: TypeSig = TIMESTAMP + DATE + STRING + override val sparkDateSig: TypeSig = TIMESTAMP + DATE + STRING + + override val timestampChecks: TypeSig = TIMESTAMP + DATE + STRING + override val sparkTimestampSig: TypeSig = TIMESTAMP + DATE + STRING + + // stringChecks are the same, but adding in PS note + private val fourDigitYearMsg: String = "Only 4 digit year parsing is available. To " + + s"enable parsing anyways set ${RapidsConf.HAS_EXTENDED_YEAR_VALUES} to false." + override val stringChecks: TypeSig = gpuNumeric + BOOLEAN + STRING + BINARY + + TypeSig.psNote(TypeEnum.DATE, fourDigitYearMsg) + + TypeSig.psNote(TypeEnum.TIMESTAMP, fourDigitYearMsg) + + // binaryChecks are the same + override val decimalChecks: TypeSig = gpuNumeric + STRING + override val sparkDecimalSig: TypeSig = numeric + BOOLEAN + STRING + + // calendarChecks are the same + + override val arrayChecks: TypeSig = + ARRAY.nested(commonCudfTypes + DECIMAL_128_FULL + NULL + ARRAY + BINARY + STRUCT) + + psNote(TypeEnum.ARRAY, "The array's child type must also support being cast to " + + "the desired child type") + override val sparkArraySig: TypeSig = ARRAY.nested(all) + + override val mapChecks: TypeSig = + MAP.nested(commonCudfTypes + DECIMAL_128_FULL + NULL + ARRAY + BINARY + STRUCT + MAP) + + psNote(TypeEnum.MAP, "the map's key and value must also support being cast to the " + + "desired child types") + override val sparkMapSig: TypeSig = MAP.nested(all) + + override val structChecks: TypeSig = + STRUCT.nested(commonCudfTypes + DECIMAL_128_FULL + NULL + ARRAY + BINARY + STRUCT) + + psNote(TypeEnum.STRUCT, "the struct's children must also support being cast to the " + + "desired child type(s)") + override val sparkStructSig: TypeSig = STRUCT.nested(all) + + override val udtChecks: TypeSig = none + override val sparkUdtSig: TypeSig = UDT + }, + (cast, conf, p, r) => new CastExprMeta[AnsiCast](cast, ansiEnabled = true, conf = conf, + parent = p, rule = r, doFloatToIntCheck = true, stringToAnsiDate = true)), + GpuOverrides.expr[Average]( + "Average aggregate operator", + ExprChecks.fullAgg( + TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL, + TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL, + // NullType is not technically allowed by Spark, but in practice in 3.2.0 + // it can show up + Seq(ParamCheck("input", + TypeSig.integral + TypeSig.fp + TypeSig.DECIMAL_128_FULL + TypeSig.NULL, + TypeSig.numericAndInterval + TypeSig.NULL))), + (a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) { + override def tagAggForGpu(): Unit = { + // For Decimal Average the SUM adds a precision of 10 to avoid overflowing + // then it divides by the count with an output scale that is 4 more than the input + // scale. With how our divide works to match Spark, this means that we will need a + // precision of 5 more. So 38 - 10 - 5 = 23 + val dataType = a.child.dataType + dataType match { + case dt: DecimalType => + if (dt.precision > 23) { + if (conf.needDecimalGuarantees) { + willNotWorkOnGpu("GpuAverage cannot guarantee proper overflow checks for " + + s"a precision large than 23. The current precision is ${dt.precision}") + } else { + logWarning("Decimal overflow guarantees disabled for " + + s"Average(${a.child.dataType}) produces $dt with an " + + s"intermediate precision of ${dt.precision + 15}") + } + } + case _ => // NOOP + } + GpuOverrides.checkAndTagFloatAgg(dataType, conf, this) + } + + override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = + GpuAverage(childExprs.head) + + // Average is not supported in ANSI mode right now, no matter the type + override val ansiTypeToCheck: Option[DataType] = None + }), + GpuOverrides.expr[Abs]( + "Absolute value", + ExprChecks.unaryProjectAndAstInputMatchesOutput( + TypeSig.implicitCastsAstTypes, TypeSig.gpuNumeric + TypeSig.DECIMAL_128_FULL, + TypeSig.numeric), + (a, conf, p, r) => new UnaryAstExprMeta[Abs](a, conf, p, r) { + val ansiEnabled = SQLConf.get.ansiEnabled + + override def tagSelfForAst(): Unit = { + if (ansiEnabled && GpuAnsi.needBasicOpOverflowCheck(a.dataType)) { + willNotWorkInAst("AST unary minus does not support ANSI mode.") + } + } + + // ANSI support for ABS was added in 3.2.0 SPARK-33275 + override def convertToGpu(child: Expression): GpuExpression = GpuAbs(child, ansiEnabled) + }), + GpuOverrides.expr[RegExpReplace]( + "RegExpReplace support for string literal input patterns", + ExprChecks.projectOnly(TypeSig.STRING, TypeSig.STRING, + Seq(ParamCheck("str", TypeSig.STRING, TypeSig.STRING), + ParamCheck("regex", TypeSig.lit(TypeEnum.STRING), TypeSig.STRING), + ParamCheck("rep", TypeSig.lit(TypeEnum.STRING), TypeSig.STRING), + ParamCheck("pos", TypeSig.lit(TypeEnum.INT) + .withPsNote(TypeEnum.INT, "only a value of 1 is supported"), + TypeSig.lit(TypeEnum.INT)))), + (a, conf, p, r) => new GpuRegExpReplaceMeta(a, conf, p, r)), + // Spark 3.2.0-specific LEAD expression, using custom OffsetWindowFunctionMeta. + GpuOverrides.expr[Lead]( + "Window function that returns N entries ahead of this one", + ExprChecks.windowOnly( + (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.NULL + + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all, + Seq( + ParamCheck("input", + (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + + TypeSig.NULL + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all), + ParamCheck("offset", TypeSig.INT, TypeSig.INT), + ParamCheck("default", + (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.NULL + + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all) + ) + ), + (lead, conf, p, r) => new OffsetWindowFunctionMeta[Lead](lead, conf, p, r) { + override def convertToGpu(): GpuExpression = + GpuLead(input.convertToGpu(), offset.convertToGpu(), default.convertToGpu()) + }), + // Spark 3.2.0-specific LAG expression, using custom OffsetWindowFunctionMeta. + GpuOverrides.expr[Lag]( + "Window function that returns N entries behind this one", + ExprChecks.windowOnly( + (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.NULL + + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all, + Seq( + ParamCheck("input", + (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + + TypeSig.NULL + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all), + ParamCheck("offset", TypeSig.INT, TypeSig.INT), + ParamCheck("default", + (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.NULL + + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all) + ) + ), + (lag, conf, p, r) => new OffsetWindowFunctionMeta[Lag](lag, conf, p, r) { + override def convertToGpu(): GpuExpression = { + GpuLag(input.convertToGpu(), offset.convertToGpu(), default.convertToGpu()) + } + }), + GpuOverrides.expr[GetArrayItem]( + "Gets the field at `ordinal` in the Array", + ExprChecks.binaryProject( + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL + + TypeSig.DECIMAL_128_FULL + TypeSig.MAP).nested(), + TypeSig.all, + ("array", TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY + + TypeSig.STRUCT + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.MAP), + TypeSig.ARRAY.nested(TypeSig.all)), + ("ordinal", TypeSig.lit(TypeEnum.INT), TypeSig.INT)), + (in, conf, p, r) => new GpuGetArrayItemMeta(in, conf, p, r){ + override def convertToGpu(arr: Expression, ordinal: Expression): GpuExpression = + GpuGetArrayItem(arr, ordinal, SQLConf.get.ansiEnabled) + }), + GpuOverrides.expr[GetMapValue]( + "Gets Value from a Map based on a key", + ExprChecks.binaryProject(TypeSig.STRING, TypeSig.all, + ("map", TypeSig.MAP.nested(TypeSig.STRING), TypeSig.MAP.nested(TypeSig.all)), + ("key", TypeSig.lit(TypeEnum.STRING), TypeSig.all)), + (in, conf, p, r) => new GpuGetMapValueMeta(in, conf, p, r){ + override def convertToGpu(map: Expression, key: Expression): GpuExpression = + GpuGetMapValue(map, key, SQLConf.get.ansiEnabled) + }), + GpuOverrides.expr[ElementAt]( + "Returns element of array at given(1-based) index in value if column is array. " + + "Returns value for the given key in value if column is map.", + ExprChecks.binaryProject( + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL + + TypeSig.DECIMAL_128_FULL + TypeSig.MAP).nested(), TypeSig.all, + ("array/map", TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY + + TypeSig.STRUCT + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.MAP) + + TypeSig.MAP.nested(TypeSig.STRING) + .withPsNote(TypeEnum.MAP ,"If it's map, only string is supported."), + TypeSig.ARRAY.nested(TypeSig.all) + TypeSig.MAP.nested(TypeSig.all)), + ("index/key", (TypeSig.lit(TypeEnum.INT) + TypeSig.lit(TypeEnum.STRING)) + .withPsNote(TypeEnum.INT, "ints are only supported as array indexes, " + + "not as maps keys") + .withPsNote(TypeEnum.STRING, "strings are only supported as map keys, " + + "not array indexes"), + TypeSig.all)), + (in, conf, p, r) => new BinaryExprMeta[ElementAt](in, conf, p, r) { + override def tagExprForGpu(): Unit = { + // To distinguish the supported nested type between Array and Map + val checks = in.left.dataType match { + case _: MapType => + // Match exactly with the checks for GetMapValue + ExprChecks.binaryProject(TypeSig.STRING, TypeSig.all, + ("map", TypeSig.MAP.nested(TypeSig.STRING), TypeSig.MAP.nested(TypeSig.all)), + ("key", TypeSig.lit(TypeEnum.STRING), TypeSig.all)) + case _: ArrayType => + // Match exactly with the checks for GetArrayItem + ExprChecks.binaryProject( + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL + + TypeSig.DECIMAL_128_FULL + TypeSig.MAP).nested(), + TypeSig.all, + ("array", TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY + + TypeSig.STRUCT + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.MAP), + TypeSig.ARRAY.nested(TypeSig.all)), + ("ordinal", TypeSig.lit(TypeEnum.INT), TypeSig.INT)) + case _ => throw new IllegalStateException("Only Array or Map is supported as input.") + } + checks.tag(this) + } + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = { + GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled) + } + }), + GpuOverrides.expr[Literal]( + "Holds a static value from the query", + ExprChecks.projectAndAst( + TypeSig.astTypes, + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.CALENDAR + + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT + TypeSig.DAYTIME) + .nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT), + TypeSig.all), + (lit, conf, p, r) => new LiteralExprMeta(lit, conf, p, r)), + GpuOverrides.expr[TimeAdd]( + "Adds interval to timestamp", + ExprChecks.binaryProject(TypeSig.TIMESTAMP, TypeSig.TIMESTAMP, + ("start", TypeSig.TIMESTAMP, TypeSig.TIMESTAMP), + ("interval", TypeSig.lit(TypeEnum.DAYTIME) + TypeSig.lit(TypeEnum.CALENDAR), + TypeSig.DAYTIME + TypeSig.CALENDAR)), + (timeAdd, conf, p, r) => new BinaryExprMeta[TimeAdd](timeAdd, conf, p, r) { + override def tagExprForGpu(): Unit = { + GpuOverrides.extractLit(timeAdd.interval).foreach { lit => + lit.dataType match { + case CalendarIntervalType => + val intvl = lit.value.asInstanceOf[CalendarInterval] + if (intvl.months != 0) { + willNotWorkOnGpu("interval months isn't supported") + } + case _: DayTimeIntervalType => // Supported + } + } + checkTimeZoneId(timeAdd.timeZoneId) + } + + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuTimeAdd(lhs, rhs) + }), + GpuOverrides.expr[SpecifiedWindowFrame]( + "Specification of the width of the group (or \"frame\") of input rows " + + "around which a window function is evaluated", + ExprChecks.projectOnly( + TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DAYTIME, + TypeSig.numericAndInterval, + Seq( + ParamCheck("lower", + TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DAYTIME, + TypeSig.numericAndInterval), + ParamCheck("upper", + TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DAYTIME, + TypeSig.numericAndInterval))), + (windowFrame, conf, p, r) => new GpuSpecifiedWindowFrameMeta(windowFrame, conf, p, r)), + GpuOverrides.expr[WindowExpression]( + "Calculates a return value for every input row of a table based on a group (or " + + "\"window\") of rows", + ExprChecks.windowOnly( + (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.NULL + + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), + TypeSig.all, + Seq(ParamCheck("windowFunction", + (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.NULL + + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), + TypeSig.all), + ParamCheck("windowSpec", + TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL_64 + + TypeSig.DAYTIME, TypeSig.numericAndInterval))), + (windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r)), + GpuScalaUDFMeta.exprMeta + ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap + + override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { + Seq( + GpuOverrides.exec[WindowInPandasExec]( + "The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between" + + " the Java process and the Python process. It also supports scheduling GPU resources" + + " for the Python process when enabled. For now it only supports row based window frame.", + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.ARRAY).nested(TypeSig.commonCudfTypes), + TypeSig.all), + (winPy, conf, p, r) => new GpuWindowInPandasExecMetaBase(winPy, conf, p, r) { + override val windowExpressions: Seq[BaseExprMeta[NamedExpression]] = + winPy.windowExpression.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override def convertToGpu(): GpuExec = { + GpuWindowInPandasExec( + windowExpressions.map(_.convertToGpu()), + partitionSpec.map(_.convertToGpu()), + // leave ordering expression on the CPU, it's not used for GPU computation + winPy.orderSpec, + childPlans.head.convertIfNeeded() + )(winPy.partitionSpec) + } + }).disabledByDefault("it only supports row based frame for now"), + GpuOverrides.exec[FileSourceScanExec]( + "Reading data from files, often from Hive tables", + ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP + + TypeSig.ARRAY + TypeSig.DECIMAL_128_FULL).nested(), TypeSig.all), + (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { + + // Replaces SubqueryBroadcastExec inside dynamic pruning filters with GPU counterpart + // if possible. Instead regarding filters as childExprs of current Meta, we create + // a new meta for SubqueryBroadcastExec. The reason is that the GPU replacement of + // FileSourceScan is independent from the replacement of the partitionFilters. It is + // possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters + // are on the GPU. And vice versa. + private lazy val partitionFilters = wrapped.partitionFilters.map { filter => + filter.transformDown { + case dpe @ DynamicPruningExpression(inSub: InSubqueryExec) + if inSub.plan.isInstanceOf[SubqueryBroadcastExec] => + + val subBcMeta = GpuOverrides.wrapAndTagPlan(inSub.plan, conf) + subBcMeta.tagForExplain() + val gpuSubBroadcast = subBcMeta.convertIfNeeded().asInstanceOf[BaseSubqueryExec] + dpe.copy(inSub.copy(plan = gpuSubBroadcast)) + } + } + + // partition filters and data filters are not run on the GPU + override val childExprs: Seq[ExprMeta[_]] = Seq.empty + + override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) + + override def convertToCpu(): SparkPlan = { + wrapped.copy(partitionFilters = partitionFilters) + } + + override def convertToGpu(): GpuExec = { + val sparkSession = wrapped.relation.sparkSession + val options = wrapped.relation.options + + val location = replaceWithAlluxioPathIfNeeded( + conf, + wrapped.relation, + partitionFilters, + wrapped.dataFilters) + + val newRelation = HadoopFsRelation( + location, + wrapped.relation.partitionSchema, + wrapped.relation.dataSchema, + wrapped.relation.bucketSpec, + GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), + options)(sparkSession) + + GpuFileSourceScanExec( + newRelation, + wrapped.output, + wrapped.requiredSchema, + partitionFilters, + wrapped.optionalBucketSet, + wrapped.optionalNumCoalescedBuckets, + wrapped.dataFilters, + wrapped.tableIdentifier)(conf) + } + }), + GpuOverrides.exec[InMemoryTableScanExec]( + "Implementation of InMemoryTableScanExec to use GPU accelerated Caching", + ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT + + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all), + (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)), + GpuOverrides.exec[SortMergeJoinExec]( + "Sort merge join, replacing with shuffled hash join", + JoinTypeChecks.equiJoinExecChecks, + (join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)), + GpuOverrides.exec[BroadcastHashJoinExec]( + "Implementation of join using broadcast data", + JoinTypeChecks.equiJoinExecChecks, + (join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)), + GpuOverrides.exec[ShuffledHashJoinExec]( + "Implementation of join using hashed shuffled data", + JoinTypeChecks.equiJoinExecChecks, + (join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)), + GpuOverrides.exec[ArrowEvalPythonExec]( + "The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" + + " Java process and the Python process. It also supports scheduling GPU resources" + + " for the Python process when enabled", + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all), + (e, conf, p, r) => + new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) { + val udfs: Seq[BaseExprMeta[PythonUDF]] = + e.udfs.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val resultAttrs: Seq[BaseExprMeta[Attribute]] = + e.resultAttrs.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + override val childExprs: Seq[BaseExprMeta[_]] = udfs ++ resultAttrs + + override def replaceMessage: String = "partially run on GPU" + override def noReplacementPossibleMessage(reasons: String): String = + s"cannot run even partially on the GPU because $reasons" + + override def convertToGpu(): GpuExec = + GpuArrowEvalPythonExec(udfs.map(_.convertToGpu()).asInstanceOf[Seq[GpuPythonUDF]], + resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], + childPlans.head.convertIfNeeded(), + e.evalType) + }), + GpuOverrides.exec[MapInPandasExec]( + "The backend for Map Pandas Iterator UDF. Accelerates the data transfer between the" + + " Java process and the Python process. It also supports scheduling GPU resources" + + " for the Python process when enabled.", + ExecChecks((TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all), + (mapPy, conf, p, r) => new GpuMapInPandasExecMeta(mapPy, conf, p, r)), + GpuOverrides.exec[FlatMapGroupsInPandasExec]( + "The backend for Flat Map Groups Pandas UDF, Accelerates the data transfer between the" + + " Java process and the Python process. It also supports scheduling GPU resources" + + " for the Python process when enabled.", + ExecChecks(TypeSig.commonCudfTypes, TypeSig.all), + (flatPy, conf, p, r) => new GpuFlatMapGroupsInPandasExecMeta(flatPy, conf, p, r)), + GpuOverrides.exec[AggregateInPandasExec]( + "The backend for an Aggregation Pandas UDF, this accelerates the data transfer between" + + " the Java process and the Python process. It also supports scheduling GPU resources" + + " for the Python process when enabled.", + ExecChecks(TypeSig.commonCudfTypes, TypeSig.all), + (aggPy, conf, p, r) => new GpuAggregateInPandasExecMeta(aggPy, conf, p, r)), + GpuOverrides.exec[BatchScanExec]( + "The backend for most file input", + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY + + TypeSig.DECIMAL_128_FULL).nested(), + TypeSig.all), + (p, conf, parent, r) => new SparkPlanMeta[BatchScanExec](p, conf, parent, r) { + override val childScans: scala.Seq[ScanMeta[_]] = + Seq(GpuOverrides.wrapScan(p.scan, conf, Some(this))) + + override def tagPlanForGpu(): Unit = { + if (!p.runtimeFilters.isEmpty) { + willNotWorkOnGpu("Runtime filtering (DPP) on datasource V2 is not supported") + } + } + + override def convertToGpu(): GpuExec = + GpuBatchScanExec(p.output, childScans.head.convertToGpu()) + }) + ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap + } + + override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq( + GpuOverrides.scan[ParquetScan]( + "Parquet parsing", + (a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) { + override def tagSelfForGpu(): Unit = { + GpuParquetScanBase.tagSupport(this) + // we are being overly cautious and that Parquet does not support this yet + if (a.isInstanceOf[SupportsRuntimeFiltering]) { + willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" + + " on datasource V2 yet.") + } + } + + override def convertToGpu(): Scan = { + GpuParquetScan(a.sparkSession, + a.hadoopConf, + a.fileIndex, + a.dataSchema, + a.readDataSchema, + a.readPartitionSchema, + a.pushedFilters, + a.options, + a.partitionFilters, + a.dataFilters, + conf) + } + }), + GpuOverrides.scan[OrcScan]( + "ORC parsing", + (a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) { + override def tagSelfForGpu(): Unit = { + GpuOrcScanBase.tagSupport(this) + // we are being overly cautious and that Orc does not support this yet + if (a.isInstanceOf[SupportsRuntimeFiltering]) { + willNotWorkOnGpu("Orc does not support Runtime filtering (DPP)" + + " on datasource V2 yet.") + } + } + + override def convertToGpu(): Scan = + GpuOrcScan(a.sparkSession, + a.hadoopConf, + a.fileIndex, + a.dataSchema, + a.readDataSchema, + a.readPartitionSchema, + a.options, + a.pushedFilters, + a.partitionFilters, + a.dataFilters, + conf) + }), + GpuOverrides.scan[CSVScan]( + "CSV parsing", + (a, conf, p, r) => new ScanMeta[CSVScan](a, conf, p, r) { + override def tagSelfForGpu(): Unit = { + GpuCSVScan.tagSupport(this) + // we are being overly cautious and that Csv does not support this yet + if (a.isInstanceOf[SupportsRuntimeFiltering]) { + willNotWorkOnGpu("Csv does not support Runtime filtering (DPP)" + + " on datasource V2 yet.") + } + } + + override def convertToGpu(): Scan = + GpuCSVScan(a.sparkSession, + a.fileIndex, + a.dataSchema, + a.readDataSchema, + a.readPartitionSchema, + a.options, + a.partitionFilters, + a.dataFilters, + conf.maxReadBatchSizeRows, + conf.maxReadBatchSizeBytes) + }) + ).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap + + override def getPartitionFileNames( + partitions: Seq[PartitionDirectory]): Seq[String] = { + val files = partitions.flatMap(partition => partition.files) + files.map(_.getPath.getName) + } + + override def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { + partitions.map(_.files.map(_.getLen).sum).sum + } + + override def getPartitionedFiles( + partitions: Array[PartitionDirectory]): Array[PartitionedFile] = { + partitions.flatMap { p => + p.files.map { f => + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + } + } + } + + override def getPartitionSplitFiles( + partitions: Array[PartitionDirectory], + maxSplitBytes: Long, + relation: HadoopFsRelation): Array[PartitionedFile] = { + partitions.flatMap { partition => + partition.files.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + } + } + } + + override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = { + FilePartition(index, files) + } + + override def copyBatchScanExec( + batchScanExec: GpuBatchScanExec, + queryUsesInputFile: Boolean): GpuBatchScanExec = { + val scanCopy = batchScanExec.scan match { + case parquetScan: GpuParquetScan => + parquetScan.copy(queryUsesInputFile=queryUsesInputFile) + case orcScan: GpuOrcScan => + orcScan.copy(queryUsesInputFile=queryUsesInputFile) + case _ => throw new RuntimeException("Wrong format") // never reach here + } + batchScanExec.copy(scan=scanCopy) + } + + override def copyFileSourceScanExec( + scanExec: GpuFileSourceScanExec, + queryUsesInputFile: Boolean): GpuFileSourceScanExec = { + scanExec.copy(queryUsesInputFile=queryUsesInputFile)(scanExec.rapidsConf) + } + + override def getGpuColumnarToRowTransition(plan: SparkPlan, + exportColumnRdd: Boolean): GpuColumnarToRowExecParent = { + val serName = plan.conf.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER) + val serClass = ShimLoader.loadClass(serName) + if (serClass == classOf[com.nvidia.spark.ParquetCachedBatchSerializer]) { + GpuColumnarToRowTransitionExec(plan, exportColumnRdd) + } else { + GpuColumnarToRowExec(plan, exportColumnRdd) + } + } + + override def checkColumnNameDuplication( + schema: StructType, + colType: String, + resolver: Resolver): Unit = { + GpuSchemaUtils.checkColumnNameDuplication(schema, colType, resolver) + } + + override def getGpuShuffleExchangeExec( + gpuOutputPartitioning: GpuPartitioning, + child: SparkPlan, + cpuOutputPartitioning: Partitioning, + cpuShuffle: Option[ShuffleExchangeExec]): GpuShuffleExchangeExecBase = { + val shuffleOrigin = cpuShuffle.map(_.shuffleOrigin).getOrElse(ENSURE_REQUIREMENTS) + GpuShuffleExchangeExec(gpuOutputPartitioning, child, shuffleOrigin)(cpuOutputPartitioning) + } + + override def getGpuShuffleExchangeExec( + queryStage: ShuffleQueryStageExec): GpuShuffleExchangeExecBase = { + queryStage.shuffle.asInstanceOf[GpuShuffleExchangeExecBase] + } + + override def sortOrder( + child: Expression, + direction: SortDirection, + nullOrdering: NullOrdering): SortOrder = SortOrder(child, direction, nullOrdering, Seq.empty) + + override def copySortOrderWithNewChild(s: SortOrder, child: Expression) = { + s.copy(child = child) + } + + override def alias(child: Expression, name: String)( + exprId: ExprId, + qualifier: Seq[String], + explicitMetadata: Option[Metadata]): Alias = { + Alias(child, name)(exprId, qualifier, explicitMetadata) + } + + override def shouldIgnorePath(path: String): Boolean = { + HadoopFSUtilsShim.shouldIgnorePath(path) + } + + override def getLegacyComplexTypeToString(): Boolean = { + SQLConf.get.getConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING) + } + + // Arrow version changed between Spark versions + override def getArrowDataBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { + val arrowBuf = vec.getDataBuffer() + (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) + } + + override def getArrowValidityBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { + val arrowBuf = vec.getValidityBuffer + (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) + } + + override def createTable(table: CatalogTable, + sessionCatalog: SessionCatalog, + tableLocation: Option[URI], + result: BaseRelation) = { + val newTable = table.copy( + storage = table.storage.copy(locationUri = tableLocation), + // We will use the schema of resolved.relation as the schema of the table (instead of + // the schema of df). It is important since the nullability may be changed by the relation + // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). + schema = result.schema) + // Table location is already validated. No need to check it again during table creation. + sessionCatalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) + } + + override def getArrowOffsetsBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { + val arrowBuf = vec.getOffsetBuffer + (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) + } + + /** matches SPARK-33008 fix in 3.1.1 */ + override def shouldFailDivByZero(): Boolean = SQLConf.get.ansiEnabled + + override def replaceWithAlluxioPathIfNeeded( + conf: RapidsConf, + relation: HadoopFsRelation, + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): FileIndex = { + + val alluxioPathsReplace: Option[Seq[String]] = conf.getAlluxioPathsToReplace + + if (alluxioPathsReplace.isDefined) { + // alluxioPathsReplace: Seq("key->value", "key1->value1") + // turn the rules to the Map with eg + // { s3:/foo -> alluxio://0.1.2.3:19998/foo, + // gs:/bar -> alluxio://0.1.2.3:19998/bar, + // /baz -> alluxio://0.1.2.3:19998/baz } + val replaceMapOption = alluxioPathsReplace.map(rules => { + rules.map(rule => { + val split = rule.split("->") + if (split.size == 2) { + split(0).trim -> split(1).trim + } else { + throw new IllegalArgumentException(s"Invalid setting for " + + s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}") + } + }).toMap + }) + + replaceMapOption.map(replaceMap => { + + def isDynamicPruningFilter(e: Expression): Boolean = + e.find(_.isInstanceOf[PlanExpression[_]]).isDefined + + val partitionDirs = relation.location.listFiles( + partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) + + // replacement func to check if the file path is prefixed with the string user configured + // if yes, replace it + val replaceFunc = (f: Path) => { + val pathStr = f.toString + val matchedSet = replaceMap.keySet.filter(reg => pathStr.startsWith(reg)) + if (matchedSet.size > 1) { + // never reach here since replaceMap is a Map + throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " + + s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule " + + s"for each file path") + } else if (matchedSet.size == 1) { + new Path(pathStr.replaceFirst(matchedSet.head, replaceMap(matchedSet.head))) + } else { + f + } + } + + // replace all of input files + val inputFiles: Seq[Path] = partitionDirs.flatMap(partitionDir => { + replacePartitionDirectoryFiles(partitionDir, replaceFunc) + }) + + // replace all of rootPaths which are already unique + val rootPaths = relation.location.rootPaths.map(replaceFunc) + + val parameters: Map[String, String] = relation.options + + // infer PartitionSpec + val partitionSpec = GpuPartitioningUtils.inferPartitioning( + relation.sparkSession, + rootPaths, + inputFiles, + parameters, + Option(relation.dataSchema), + replaceFunc) + + // generate a new InMemoryFileIndex holding paths with alluxio schema + new InMemoryFileIndex( + relation.sparkSession, + inputFiles, + parameters, + Option(relation.dataSchema), + userSpecifiedPartitionSpec = Some(partitionSpec)) + }).getOrElse(relation.location) + + } else { + relation.location + } + } + + override def replacePartitionDirectoryFiles(partitionDir: PartitionDirectory, + replaceFunc: Path => Path): Seq[Path] = { + partitionDir.files.map(f => replaceFunc(f.getPath)) + } + + /** + * Case class ShuffleQueryStageExec holds an additional field shuffleOrigin + * affecting the unapply method signature + */ + override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = { + case ShuffleQueryStageExec(_, e: ReusedExchangeExec, _) => e + case BroadcastQueryStageExec(_, e: ReusedExchangeExec, _) => e + } + + /** dropped by SPARK-34234 */ + override def attachTreeIfSupported[TreeType <: TreeNode[_], A]( + tree: TreeType, + msg: String)( + f: => A + ): A = { + identity(f) + } + + override def hasAliasQuoteFix: Boolean = true + + override def hasCastFloatTimestampUpcast: Boolean = true + + override def findOperators(plan: SparkPlan, predicate: SparkPlan => Boolean): Seq[SparkPlan] = { + def recurse( + plan: SparkPlan, + predicate: SparkPlan => Boolean, + accum: ListBuffer[SparkPlan]): Seq[SparkPlan] = { + if (predicate(plan)) { + accum += plan + } + plan match { + case a: AdaptiveSparkPlanExec => recurse(a.executedPlan, predicate, accum) + case qs: BroadcastQueryStageExec => recurse(qs.broadcast, predicate, accum) + case qs: ShuffleQueryStageExec => recurse(qs.shuffle, predicate, accum) + case c: CommandResultExec => recurse(c.commandPhysicalPlan, predicate, accum) + case other => other.children.flatMap(p => recurse(p, predicate, accum)).headOption + } + accum + } + recurse(plan, predicate, new ListBuffer[SparkPlan]()) + } + + override def skipAssertIsOnTheGpu(plan: SparkPlan): Boolean = plan match { + case _: CommandResultExec => true + case _ => false + } + + override def registerKryoClasses(kryo: Kryo): Unit = { + kryo.register(classOf[SerializeConcatHostBuffersDeserializeBatch], + new KryoJavaSerializer()) + kryo.register(classOf[SerializeBatchDeserializeHostBuffer], + new KryoJavaSerializer()) + } + + override def getAdaptiveInputPlan(adaptivePlan: AdaptiveSparkPlanExec): SparkPlan = { + adaptivePlan.initialPlan + } + + override def columnarAdaptivePlan(a: AdaptiveSparkPlanExec, + goal: CoalesceSizeGoal): SparkPlan = { + a.copy(supportsColumnar = true) + } + + override def supportsColumnarAdaptivePlans: Boolean = true +} diff --git a/sql-plugin/src/main/322+/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala b/sql-plugin/src/main/322+/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala new file mode 100644 index 00000000000..a0667518b48 --- /dev/null +++ b/sql-plugin/src/main/322+/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/v2/ShimVectorizedColumnReader.scala @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet.rapids.shims.v2 + +import java.time.ZoneId +import java.util.TimeZone + +import org.apache.parquet.column.ColumnDescriptor +import org.apache.parquet.column.page.PageReadStore +import org.apache.parquet.schema.{GroupType, Type} + +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.execution.datasources.parquet.{ParentContainerUpdater, ParquetRowConverter, ParquetToSparkSchemaConverter, VectorizedColumnReader} +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.types.StructType + +class ShimParquetRowConverter( + schemaConverter: ParquetToSparkSchemaConverter, + parquetType: GroupType, + catalystType: StructType, + convertTz: Option[ZoneId], + datetimeRebaseMode: LegacyBehaviorPolicy.Value, // always LegacyBehaviorPolicy.CORRECTED + int96RebaseMode: LegacyBehaviorPolicy.Value, // always LegacyBehaviorPolicy.EXCEPTION + int96CDPHive3Compatibility: Boolean, + updater: ParentContainerUpdater +) extends ParquetRowConverter( + schemaConverter, + parquetType, + catalystType, + convertTz, + RebaseSpec(datetimeRebaseMode), // no need to rebase, so set originTimeZone as default + RebaseSpec(int96RebaseMode), // no need to rebase, so set originTimeZone as default + updater) + +class ShimVectorizedColumnReader( + index: Int, + columns: java.util.List[ColumnDescriptor], + types: java.util.List[Type], + pageReadStore: PageReadStore, + convertTz: ZoneId, + datetimeRebaseMode: String, // always LegacyBehaviorPolicy.CORRECTED + int96RebaseMode: String, // always LegacyBehaviorPolicy.EXCEPTION + int96CDPHive3Compatibility: Boolean +) extends VectorizedColumnReader( + columns.get(index), + types.get(index).getLogicalTypeAnnotation, + pageReadStore.getPageReader(columns.get(index)), + pageReadStore.getRowIndexes().orElse(null), + convertTz, + datetimeRebaseMode, + TimeZone.getDefault.getID, // use default zone because of no rebase + int96RebaseMode, + TimeZone.getDefault.getID) // use default zone because of will throw exception if rebase \ No newline at end of file diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuHashPartitioning.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuHashPartitioning.scala new file mode 100644 index 00000000000..bff9643f44d --- /dev/null +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuHashPartitioning.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.v2 + +import com.nvidia.spark.rapids.GpuHashPartitioningBase + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} + +case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int) + extends GpuHashPartitioningBase(expressions, numPartitions) { + + override def satisfies0(required: Distribution): Boolean = { + super.satisfies0(required) || { + required match { + case ClusteredDistribution(requiredClustering, _) => + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + case _ => false + } + } + } +} diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala similarity index 86% rename from sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala rename to sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala index 0438d956c0c..176609fedca 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks} @@ -30,7 +31,7 @@ object GpuJoinUtils { buildSide match { case BuildRight => GpuBuildRight case BuildLeft => GpuBuildLeft - case _ => throw new Exception("unknown buildSide Type") + case unknownBuildSide => throw new Exception(s"unknown buildSide Type: $unknownBuildSide") } } } @@ -90,9 +91,10 @@ case class GpuShuffledHashJoinExec( extends GpuShuffledHashJoinBase( buildSide, condition, - isSkewJoin = isSkewJoin, - cpuLeftKeys, - cpuRightKeys) { + isSkewJoin = isSkewJoin) { - override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil + override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys) + + override def requiredChildDistribution: Seq[Distribution] = + Seq(ClusteredDistribution(cpuLeftKeys), ClusteredDistribution(cpuRightKeys)) } diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala index e83f418701c..9d670155aed 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids.shims.v2 import com.nvidia.spark.rapids._ +import org.apache.parquet.schema.MessageType import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession @@ -24,10 +25,11 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters import org.apache.spark.sql.types.StructType -trait Spark33XShims extends Spark32XShims { +trait Spark33XShims extends Spark322PlusShims { override def neverReplaceShowCurrentNamespaceCommand: ExecRule[_ <: SparkPlan] = null override def dateFormatInRead(csvOpts: CSVOptions): Option[String] = { @@ -46,4 +48,20 @@ trait Spark33XShims extends Spark32XShims { metadataColumns: Seq[AttributeReference]): RDD[InternalRow] = { new FileScanRDD(sparkSession, readFunction, filePartitions, readDataSchema, metadataColumns) } + + override def getParquetFilters( + schema: MessageType, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownStartWith: Boolean, + pushDownInFilterThreshold: Int, + caseSensitive: Boolean, + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters = { + val datetimeRebaseMode = DataSourceUtils + .datetimeRebaseSpec(lookupFileMeta, dateTimeRebaseModeFromConf) + new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, + pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala similarity index 80% rename from sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala index 689895c6e6d..9ede14a8c35 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala @@ -20,32 +20,17 @@ import ai.rapids.cudf.{DType, NvtxColor, NvtxRange} import com.nvidia.spark.rapids.shims.v2.ShimExpression import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution} import org.apache.spark.sql.rapids.GpuMurmur3Hash import org.apache.spark.sql.types.{DataType, IntegerType} import org.apache.spark.sql.vectorized.ColumnarBatch -case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int) - extends GpuExpression with ShimExpression with GpuPartitioning { +abstract class GpuHashPartitioningBase(expressions: Seq[Expression], numPartitions: Int) + extends GpuExpression with ShimExpression with GpuPartitioning with Serializable { override def children: Seq[Expression] = expressions override def nullable: Boolean = false override def dataType: DataType = IntegerType - override def satisfies0(required: Distribution): Boolean = { - super.satisfies0(required) || { - required match { - case h: HashClusteredDistribution => - expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { - case (l, r) => l.semanticEquals(r) - } - case ClusteredDistribution(requiredClustering, _) => - expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) - case _ => false - } - } - } - def partitionInternalAndClose(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = { val types = GpuColumnVector.extractTypes(batch) val partedTable = withResource(batch) { batch => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 5ba4c9b5b7f..7723c369fd4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -24,7 +24,7 @@ import scala.util.control.NonFatal import ai.rapids.cudf.DType import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF} -import com.nvidia.spark.rapids.shims.v2.{AQEUtils, GpuSpecifiedWindowFrameMeta, GpuWindowExpressionMeta, OffsetWindowFunctionMeta} +import com.nvidia.spark.rapids.shims.v2.{AQEUtils, GpuHashPartitioning, GpuSpecifiedWindowFrameMeta, GpuWindowExpressionMeta, OffsetWindowFunctionMeta} import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index e94ea0e77ca..8500efa6c01 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -341,11 +341,9 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte ParquetMetadataConverter.range(file.start, file.start + file.length)) val fileSchema = footer.getFileMetaData.getSchema val pushedFilters = if (enableParquetFilterPushDown) { - val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( - footer.getFileMetaData.getKeyValueMetaData.get, rebaseMode) val parquetFilters = ShimLoader.getSparkShims.getParquetFilters(fileSchema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, - isCaseSensitive, datetimeRebaseMode) + isCaseSensitive, footer.getFileMetaData.getKeyValueMetaData.get, rebaseMode) filters.flatMap(parquetFilters.createFilter).reduceOption(FilterApi.and) } else { None diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala index 6dbef428ee1..e81317b501e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala @@ -22,16 +22,13 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.FullOuter -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} import org.apache.spark.sql.rapids.execution.GpuHashJoin import org.apache.spark.sql.vectorized.ColumnarBatch abstract class GpuShuffledHashJoinBase( buildSide: GpuBuildSide, override val condition: Option[Expression], - val isSkewJoin: Boolean, - cpuLeftKeys: Seq[Expression], - cpuRightKeys: Seq[Expression]) extends ShimBinaryExecNode with GpuHashJoin { + val isSkewJoin: Boolean) extends ShimBinaryExecNode with GpuHashJoin { import GpuMetric._ override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL @@ -44,9 +41,6 @@ abstract class GpuShuffledHashJoinBase( JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME), JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS)) ++ spillMetrics - override def requiredChildDistribution: Seq[Distribution] = - HashClusteredDistribution(cpuLeftKeys) :: HashClusteredDistribution(cpuRightKeys) :: Nil - override protected def doExecute(): RDD[InternalRow] = { throw new UnsupportedOperationException( "GpuShuffledHashJoin does not support the execute() code path.") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 28849df9cb0..1e6bd047b03 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.rapids.GpuFileSourceScanExec import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase} import org.apache.spark.sql.sources.BaseRelation @@ -109,7 +108,8 @@ trait SparkShims { pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, - datetimeRebaseMode: LegacyBehaviorPolicy.Value): ParquetFilters + lookupFileMeta: String => String, + dateTimeRebaseModeFromConf: String): ParquetFilters def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean def isGpuShuffledHashJoin(plan: SparkPlan): Boolean diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 7a31265f643..b0e88e25136 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -21,7 +21,7 @@ import scala.concurrent.Future import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode +import com.nvidia.spark.rapids.shims.v2.{GpuHashPartitioning, ShimUnaryExecNode} import org.apache.spark.{MapOutputStatistics, ShuffleDependency} import org.apache.spark.rdd.RDD