Skip to content

Commit

Permalink
Fix merge conflict with branch-21.10 (#3725)
Browse files Browse the repository at this point in the history
* Fix issues with AQE and DPP enabled on Spark 3.2 [databricks] (#3691)

* Fix issues with AQE and DPP enabled on Spark 3.2

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Add canonicalized parameter for 301db shim

* Fix double-close when batch contains multiple columns

* Fix HostColumnVector deserialization

* CDH build stopped working due to missing jars in maven repo (#3722)

fixes #3718

Evidently some jars were removed from the cdh maven repo that were pulled in through spark-hive -> spark-core -> curator-recipes.  We don't use that version as its explicitly called out in the cdh profiles.  Just exclude spark-core when pulling in spark-hive dep.  Built and unit tests pass.

I did see a couple of other dependency warnings but then didn't see them again. I'll rerun with clean m2 but that shouldn't block this to fix the build.

For reference the error was:
`Could not resolve dependencies for project com.nvidia:rapids-4-spark-sql_2.12:jar:21.10.0-SNAPSHOT: Failed to collect dependencies at org.apache.spark:spark-hive_2.12:jar:3.1.1.3.1.7270.0-253 -> org.apache.spark:spark-core_2.12:jar:3.1.1.3.1.7270.0-253 -> org.apache.curator:curator-recipes:jar:4.3.0.7.2.7.0-SNAPSHOT: Failed to read artifact descriptor for org.apache.curator:curator-recipes:jar:4.3.0.7.2.7.0-SNAPSHOT: Could not transfer artifact org.apache.curator:curator-recipes:pom:4.3.0.7.2.7.0-SNAPSHOT from/to cloudera (https://repo.hortonworks.com/nexus/content/groups/public): PKIX path building failed:`

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

Co-authored-by: Thomas Graves <tgraves@nvidia.com>
  • Loading branch information
jlowe and tgravescs authored Oct 1, 2021
1 parent 13e2099 commit 8f051e1
Show file tree
Hide file tree
Showing 28 changed files with 332 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,6 @@ class Spark320Shims extends Spark32XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}

override def getGpuBroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan): GpuBroadcastExchangeExecBase = {
GpuBroadcastExchangeExec(mode, child)
}

override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = {
plan match {
case _: GpuBroadcastHashJoinExec => true
Expand Down
7 changes: 7 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark311cdh.version}</version>
<exclusions>
<!-- spark-core tries to pull a curator-recipes version we don't want -->
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import java.util.UUID
import scala.concurrent.Promise

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase

case class GpuBroadcastExchangeExec(
override val mode: BroadcastMode,
child: SparkPlan) extends GpuBroadcastExchangeExecBase(mode, child) with BroadcastExchangeLike {

override def runId: UUID = _runId
/**
* This shim handles the completion future differences between
* Apache Spark and Databricks.
*/
trait ShimBroadcastExchangeLike extends BroadcastExchangeLike {
@transient
protected lazy val promise = Promise[Broadcast[Any]]()

/**
* For registering callbacks on `relationFuture`.
* Note that calling this field will not start the execution of broadcast job.
*/
override def doCompletionFuture: concurrent.Future[Broadcast[Any]] = promise.future

override def runtimeStatistics: Statistics = {
val dataSize = metrics("dataSize").value
Statistics(dataSize)
}

override def doCanonicalize(): SparkPlan = {
GpuBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
}
@transient
lazy val completionFuture: concurrent.Future[Broadcast[Any]] = promise.future
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastHashJoinMeta(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import scala.concurrent.Promise

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike

/**
* This shim handles the completion future differences between
* Apache Spark and Databricks.
*/
trait ShimBroadcastExchangeLike extends BroadcastExchangeLike {
@transient
protected lazy val promise = Promise[Broadcast[Any]]()

/**
* For registering callbacks on `relationFuture`.
* Note that calling this field will not start the execution of broadcast job.
*/
@transient
lazy val completionFuture: concurrent.Future[Broadcast[Any]] = promise.future
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ trait Spark30XShims extends SparkShims {
plan.sqlContext.sparkSession
}

override def newBroadcastQueryStageExec(
old: BroadcastQueryStageExec,
newPlan: SparkPlan): BroadcastQueryStageExec =
BroadcastQueryStageExec(old.id, newPlan, old._canonicalized)

override def getDateFormatter(): DateFormatter = {
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}

override def getGpuBroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan): GpuBroadcastExchangeExecBase = {
GpuBroadcastExchangeExec(mode, child)
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
child: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastHashJoinMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}

override def getGpuBroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan): GpuBroadcastExchangeExecBase = {
GpuBroadcastExchangeExec(mode, child)
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
child: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ trait Spark30XShims extends SparkShims {
def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any =
mode.transform(rows)

override def newBroadcastQueryStageExec(
old: BroadcastQueryStageExec,
newPlan: SparkPlan): BroadcastQueryStageExec = BroadcastQueryStageExec(old.id, newPlan)

override def getDateFormatter(): DateFormatter = {
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastHashJoinMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastHashJoinMeta(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import scala.concurrent.Promise

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike

/**
* This shim handles the completion future differences between
* Apache Spark and Databricks.
*/
trait ShimBroadcastExchangeLike extends BroadcastExchangeLike {
@transient
protected lazy val promise = Promise[Broadcast[Any]]()

override def doCompletionFuture: concurrent.Future[Broadcast[Any]] = promise.future
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ trait Spark30XShims extends SparkShims {
plan.sqlContext.sparkSession
}

override def newBroadcastQueryStageExec(
old: BroadcastQueryStageExec,
newPlan: SparkPlan): BroadcastQueryStageExec =
BroadcastQueryStageExec(old.id, newPlan, old._canonicalized)

override def getDateFormatter(): DateFormatter = {
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}

override def getGpuBroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan): GpuBroadcastExchangeExecBase = {
GpuBroadcastExchangeExec(mode, child)
}

override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = {
plan match {
case _: GpuBroadcastHashJoinExec => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ abstract class SparkBaseShims extends Spark31XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}

override def getGpuBroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan): GpuBroadcastExchangeExecBase = {
GpuBroadcastExchangeExec(mode, child)
}

override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = {
plan match {
case _: GpuBroadcastHashJoinExec => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, BroadcastQueryStageExec, QueryStageExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
Expand Down Expand Up @@ -97,6 +97,11 @@ trait Spark32XShims extends SparkShims {
override final def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any =
mode.transform(rows)

override final def newBroadcastQueryStageExec(
old: BroadcastQueryStageExec,
newPlan: SparkPlan): BroadcastQueryStageExec =
BroadcastQueryStageExec(old.id, newPlan, old._canonicalized)

override final def isExchangeOp(plan: SparkPlanMeta[_]): Boolean = {
// if the child query stage already executed on GPU then we need to keep the
// next operator on GPU in these cases
Expand Down
Loading

0 comments on commit 8f051e1

Please sign in to comment.