diff --git a/jenkins/databricks/build.sh b/jenkins/databricks/build.sh index 4704aff2e9d..261dd6bec0a 100755 --- a/jenkins/databricks/build.sh +++ b/jenkins/databricks/build.sh @@ -26,7 +26,6 @@ BUILD_PROFILES=${BUILD_PROFILES:-'databricks312,!snapshot-shims'} BASE_SPARK_VERSION=${BASE_SPARK_VERSION:-'3.1.2'} BUILDVER=$(echo ${BASE_SPARK_VERSION} | sed 's/\.//g')db # the version of Spark used when we install the Databricks jars in .m2 -# 3.1.0-databricks is add because its actually based on Spark 3.1.1 BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS=${BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS:-$BASE_SPARK_VERSION} SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS=$BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS-databricks # something like spark_3_1 or spark_3_0 diff --git a/jenkins/databricks/params.py b/jenkins/databricks/params.py index 5093c61cdf5..11bcc67d86c 100644 --- a/jenkins/databricks/params.py +++ b/jenkins/databricks/params.py @@ -23,10 +23,6 @@ source_tgz = 'spark-rapids-ci.tgz' tgz_dest = '/home/ubuntu/spark-rapids-ci.tgz' base_spark_pom_version = '3.0.1' -# this is odd but with Databricks 8.2 it reports Spark version 3.1.0 -# but its really 3.1.1, so base Spark pom will be 3.1.1 but we want to -# install them as 3.1.0-databricks. Other Databricks versions -# this is the same as base_spark_pom_version. base_spark_version_to_install_databricks_jars = base_spark_pom_version clusterid = '' build_profiles = 'databricks,!snapshot-shims' diff --git a/pom.xml b/pom.xml index fdcdfae9c3f..d4d6e07055e 100644 --- a/pom.xml +++ b/pom.xml @@ -357,63 +357,6 @@ aggregator - - - release311db - - - buildver - 311db - - - - - 3.4.4 - spark311db - spark311db - - ${spark311db.version} - ${spark311db.version} - true - ${spark311db.version} - ${spark311db.version} - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-profile-src-31+ - add-source - generate-sources - - - ${project.basedir}/src/main/301until320-all/scala - ${project.basedir}/src/main/311+-all/scala - ${project.basedir}/src/main/31xdb/scala - ${project.basedir}/src/main/311db/scala - ${project.basedir}/src/main/311until320-all/scala - ${project.basedir}/src/main/311until320-apache/scala - ${project.basedir}/src/main/301until330-all/scala - ${project.basedir}/src/main/pre320-treenode/scala - - - - - - - - - aggregator - - release312db @@ -823,7 +766,6 @@ 3.0.2 3.0.3 3.0.4-SNAPSHOT - 3.1.1-databricks 3.1.1 3.1.1.3.1.7270.0-253 3.1.2 diff --git a/shims/spark311db/pom.xml b/shims/spark311db/pom.xml deleted file mode 100644 index afc0e73708b..00000000000 --- a/shims/spark311db/pom.xml +++ /dev/null @@ -1,188 +0,0 @@ - - - - 4.0.0 - - - com.nvidia - rapids-4-spark-shims_2.12 - 21.12.0-SNAPSHOT - ../pom.xml - - rapids-4-spark-shims-spark311db_2.12 - RAPIDS Accelerator for Apache Spark SQL Plugin Spark 3.1.1 Databricks Shim - The RAPIDS SQL plugin for Apache Spark 3.1.1 Databricks Shim - 21.12.0-SNAPSHOT - - - - - - - - - - - maven-antrun-plugin - - - dependency - generate-resources - - - - - - - - - - - - - run - - - - - - - - - - ${project.build.directory}/extra-resources - - - src/main/resources - - - - - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark311db.version} - provided - - - org.apache.spark - spark-catalyst_${scala.binary.version} - ${spark311db.version} - provided - - - org.apache.spark - spark-core_${scala.binary.version} - ${spark311db.version} - provided - - - org.apache.spark - spark-annotation_${scala.binary.version} - ${spark311db.version} - provided - - - - - org.apache.spark - spark-unsafe_${scala.binary.version} - ${spark311db.version} - provided - - - org.apache.hadoop - hadoop-common - ${spark311db.version} - provided - - - org.apache.hadoop - hadoop-mapreduce-client - ${spark311db.version} - provided - - - org.apache.parquet - parquet-hadoop - ${spark311db.version} - provided - - - org.apache.parquet - parquet-common - ${spark311db.version} - provided - - - org.apache.parquet - parquet-column - ${spark311db.version} - provided - - - org.apache.parquet - parquet-format - ${spark311db.version} - provided - - - org.apache.commons - commons-io - ${spark311db.version} - provided - - - com.esotericsoftware.kryo - kryo-shaded-db - ${spark311db.version} - provided - - - org.apache.arrow - arrow-format - ${spark311db.version} - provided - - - org.apache.arrow - arrow-memory - ${spark311db.version} - provided - - - org.apache.arrow - arrow-vector - ${spark311db.version} - provided - - - org.json4s - JsonAST - ${spark.version} - provided - - - diff --git a/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/shims/spark311db/Spark311dbShims.scala b/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/shims/spark311db/Spark311dbShims.scala deleted file mode 100644 index 00c0f6e7217..00000000000 --- a/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/shims/spark311db/Spark311dbShims.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark311db - -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.shims.v2._ -import org.apache.parquet.schema.MessageType - -import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -import org.apache.spark.sql.internal.SQLConf - -class Spark311dbShims extends SparkBaseShims with Spark30Xuntil33XShims { - - override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION - - override def getParquetFilters( - schema: MessageType, - pushDownDate: Boolean, - pushDownTimestamp: Boolean, - pushDownDecimal: Boolean, - pushDownStartWith: Boolean, - pushDownInFilterThreshold: Int, - caseSensitive: Boolean, - datetimeRebaseMode: SQLConf.LegacyBehaviorPolicy.Value): ParquetFilters = - new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, - pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) - -} diff --git a/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/shims/spark311db/SparkShimServiceProvider.scala b/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/shims/spark311db/SparkShimServiceProvider.scala deleted file mode 100644 index 42bd881b95a..00000000000 --- a/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/shims/spark311db/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark311db - -import com.nvidia.spark.rapids.{DatabricksShimVersion, SparkShims} - -import org.apache.spark.SparkEnv - -object SparkShimServiceProvider { - val VERSION = DatabricksShimVersion(3, 1, 1) -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - def matchesVersion(version: String): Boolean = { - SparkEnv.get.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "").startsWith("8.2.") - } - - def buildShim: SparkShims = { - new Spark311dbShims() - } -} diff --git a/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/spark311db/RapidsShuffleManager.scala b/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/spark311db/RapidsShuffleManager.scala deleted file mode 100644 index 8e8f5238daa..00000000000 --- a/shims/spark311db/src/main/scala/com/nvidia/spark/rapids/spark311db/RapidsShuffleManager.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.spark311db - -import org.apache.spark.SparkConf -import org.apache.spark.sql.rapids.shims.spark311db.ProxyRapidsShuffleInternalManager - -/** A shuffle manager optimized for the RAPIDS Plugin for Apache Spark. */ -sealed class RapidsShuffleManager( - conf: SparkConf, - isDriver: Boolean) extends ProxyRapidsShuffleInternalManager(conf, isDriver) diff --git a/shims/spark311db/src/main/scala/org/apache/spark/sql/rapids/shims/spark311db/RapidsShuffleInternalManager.scala b/shims/spark311db/src/main/scala/org/apache/spark/sql/rapids/shims/spark311db/RapidsShuffleInternalManager.scala deleted file mode 100644 index cba1a083d19..00000000000 --- a/shims/spark311db/src/main/scala/org/apache/spark/sql/rapids/shims/spark311db/RapidsShuffleInternalManager.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids.shims.spark311db - -import org.apache.spark.{SparkConf, TaskContext} -import org.apache.spark.shuffle._ -import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase} - -/** - * A shuffle manager optimized for the RAPIDS Plugin For Apache Spark. - * @note This is an internal class to obtain access to the private - * `ShuffleManager` and `SortShuffleManager` classes. - */ -class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean) - extends RapidsShuffleInternalManagerBase(conf, isDriver) { - - def getReader[K, C]( - handle: ShuffleHandle, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, - metrics) - } -} - -class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean) - extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager { - - def getReader[K, C]( - handle: ShuffleHandle, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter - ): ShuffleReader[K, C] = { - self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, - metrics) - } -} diff --git a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala deleted file mode 100644 index 835fc03d16f..00000000000 --- a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.v2 - -import scala.collection.mutable.ListBuffer - -import com.nvidia.spark.rapids.{ExecChecks, ExecRule, GpuExec, SparkPlanMeta, SparkShims, TypeSig} -import com.nvidia.spark.rapids.GpuOverrides.exec -import org.apache.hadoop.fs.FileStatus - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode -import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils} -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex -import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.execution.GpuCustomShuffleReaderExec - -/** -* Shim base class that can be compiled with every supported 3.0.x -*/ -trait Spark30XShims extends Spark30Xuntil33XShims { - override def parquetRebaseReadKey: String = - SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key - override def parquetRebaseWriteKey: String = - SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key - override def avroRebaseReadKey: String = - SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key - override def avroRebaseWriteKey: String = - SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key - override def parquetRebaseRead(conf: SQLConf): String = - conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ) - override def parquetRebaseWrite(conf: SQLConf): String = - conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE) - override def int96ParquetRebaseRead(conf: SQLConf): String = - conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ) - override def int96ParquetRebaseWrite(conf: SQLConf): String = - conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE) - override def int96ParquetRebaseReadKey: String = - SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key - override def int96ParquetRebaseWriteKey: String = - SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key - override def hasSeparateINT96RebaseConf: Boolean = true - - override def sessionFromPlan(plan: SparkPlan): SparkSession = { - plan.sqlContext.sparkSession - } - - override def newBroadcastQueryStageExec( - old: BroadcastQueryStageExec, - newPlan: SparkPlan): BroadcastQueryStageExec = - BroadcastQueryStageExec(old.id, newPlan, old._canonicalized) - - override def getDateFormatter(): DateFormatter = { - DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) - } - - override def isExchangeOp(plan: SparkPlanMeta[_]): Boolean = { - // if the child query stage already executed on GPU then we need to keep the - // next operator on GPU in these cases - SQLConf.get.adaptiveExecutionEnabled && (plan.wrapped match { - case _: CustomShuffleReaderExec - | _: ShuffledHashJoinExec - | _: BroadcastHashJoinExec - | _: BroadcastExchangeExec - | _: BroadcastNestedLoopJoinExec => true - case _ => false - }) - } - - override def isAqePlan(p: SparkPlan): Boolean = p match { - case _: AdaptiveSparkPlanExec | - _: QueryStageExec | - _: CustomShuffleReaderExec => true - case _ => false - } - - override def isCustomReaderExec(x: SparkPlan): Boolean = x match { - case _: GpuCustomShuffleReaderExec | _: CustomShuffleReaderExec => true - case _ => false - } - - override def aqeShuffleReaderExec: ExecRule[_ <: SparkPlan] = exec[CustomShuffleReaderExec]( - "A wrapper of shuffle query stage", - ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 + TypeSig.ARRAY + - TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all), - (exec, conf, p, r) => new GpuCustomShuffleReaderMeta(exec, conf, p, r)) - - override def findOperators(plan: SparkPlan, predicate: SparkPlan => Boolean): Seq[SparkPlan] = { - def recurse( - plan: SparkPlan, - predicate: SparkPlan => Boolean, - accum: ListBuffer[SparkPlan]): Seq[SparkPlan] = { - if (predicate(plan)) { - accum += plan - } - plan match { - case a: AdaptiveSparkPlanExec => recurse(a.executedPlan, predicate, accum) - case qs: BroadcastQueryStageExec => recurse(qs.broadcast, predicate, accum) - case qs: ShuffleQueryStageExec => recurse(qs.shuffle, predicate, accum) - case other => other.children.flatMap(p => recurse(p, predicate, accum)).headOption - } - accum - } - recurse(plan, predicate, new ListBuffer[SparkPlan]()) - } - - override def skipAssertIsOnTheGpu(plan: SparkPlan): Boolean = false - - override def shouldFailDivOverflow(): Boolean = false - - override def leafNodeDefaultParallelism(ss: SparkSession): Int = { - ss.sparkContext.defaultParallelism - } - - override def shouldFallbackOnAnsiTimestamp(): Boolean = false -} diff --git a/sql-plugin/src/main/311db/scala/org/apache/spark/rapids/shims/v2/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/311db/scala/org/apache/spark/rapids/shims/v2/GpuShuffleExchangeExec.scala deleted file mode 100644 index 68da0979bef..00000000000 --- a/sql-plugin/src/main/311db/scala/org/apache/spark/rapids/shims/v2/GpuShuffleExchangeExec.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.rapids.shims.v2 - -import scala.concurrent.Future - -import com.nvidia.spark.rapids.GpuPartitioning - -import org.apache.spark.MapOutputStatistics -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.plans.logical.Statistics -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} -import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrigin} -import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase - -case class GpuShuffleExchangeExec( - gpuOutputPartitioning: GpuPartitioning, - child: SparkPlan, - shuffleOrigin: ShuffleOrigin)( - cpuOutputPartitioning: Partitioning) - extends GpuShuffleExchangeExecBase(gpuOutputPartitioning, child) with ShuffleExchangeLike { - - override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil - - override val outputPartitioning: Partitioning = cpuOutputPartitioning - - // 'mapOutputStatisticsFuture' is only needed when enable AQE. - override def doMapOutputStatisticsFuture: Future[MapOutputStatistics] = { - if (inputBatchRDD.getNumPartitions == 0) { - Future.successful(null) - } else { - sparkContext.submitMapStage(shuffleDependencyColumnar) - } - } - - override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions - - override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions - - override def getShuffleRDD( - partitionSpecs: Array[ShufflePartitionSpec], - partitionSizes: Option[Array[Long]]): RDD[_] = { - throw new UnsupportedOperationException - } - - // DB SPECIFIC - throw if called since we don't know how its used - override def withNewOutputPartitioning(outputPartitioning: Partitioning) = { - throw new UnsupportedOperationException - } - - override def runtimeStatistics: Statistics = { - // note that Spark will only use the sizeInBytes statistic but making the rowCount - // available here means that we can more easily reference it in GpuOverrides when - // planning future query stages when AQE is on - Statistics( - sizeInBytes = metrics("dataSize").value, - rowCount = Some(metrics("numOutputRows").value) - ) - } -}