diff --git a/shims/spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/Spark320Shims.scala b/shims/spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/Spark320Shims.scala
index d03d8db9938..cda1f7b5d30 100644
--- a/shims/spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/Spark320Shims.scala
+++ b/shims/spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/Spark320Shims.scala
@@ -109,12 +109,6 @@ class Spark320Shims extends Spark32XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}
- override def getGpuBroadcastExchangeExec(
- mode: BroadcastMode,
- child: SparkPlan): GpuBroadcastExchangeExecBase = {
- GpuBroadcastExchangeExec(mode, child)
- }
-
override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = {
plan match {
case _: GpuBroadcastHashJoinExec => true
diff --git a/sql-plugin/pom.xml b/sql-plugin/pom.xml
index cffd609dbda..b670d1a0dc6 100644
--- a/sql-plugin/pom.xml
+++ b/sql-plugin/pom.xml
@@ -108,6 +108,13 @@
org.apache.spark
spark-hive_${scala.binary.version}
${spark311cdh.version}
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+
+
provided
diff --git a/sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala
deleted file mode 100644
index 1ca92a8cd7c..00000000000
--- a/sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2020-2021, NVIDIA CORPORATION.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.nvidia.spark.rapids.shims.v2
-
-import java.util.UUID
-
-import com.nvidia.spark.rapids.GpuMetric
-
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
-import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
-import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBaseWithFuture
-
-case class GpuBroadcastExchangeExec(
- override val mode: BroadcastMode,
- child: SparkPlan) extends GpuBroadcastExchangeExecBaseWithFuture(mode, child)
- with BroadcastExchangeLike {
-
- override def runId: UUID = _runId
-
- override def runtimeStatistics: Statistics = {
- Statistics(
- sizeInBytes = metrics("dataSize").value,
- rowCount = Some(metrics(GpuMetric.NUM_OUTPUT_ROWS).value))
- }
-
- override def doCanonicalize(): SparkPlan = {
- GpuBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
- }
-}
diff --git a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala
similarity index 52%
rename from sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala
rename to sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala
index 900632b6425..85eb1bd1221 100644
--- a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala
+++ b/sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala
@@ -13,35 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.nvidia.spark.rapids.shims.v2
-import java.util.UUID
+import scala.concurrent.Promise
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
-import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
-import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
-import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase
-
-case class GpuBroadcastExchangeExec(
- override val mode: BroadcastMode,
- child: SparkPlan) extends GpuBroadcastExchangeExecBase(mode, child) with BroadcastExchangeLike {
- override def runId: UUID = _runId
+/**
+ * This shim handles the completion future differences between
+ * Apache Spark and Databricks.
+ */
+trait ShimBroadcastExchangeLike extends BroadcastExchangeLike {
+ @transient
+ protected lazy val promise = Promise[Broadcast[Any]]()
/**
* For registering callbacks on `relationFuture`.
* Note that calling this field will not start the execution of broadcast job.
*/
- override def doCompletionFuture: concurrent.Future[Broadcast[Any]] = promise.future
-
- override def runtimeStatistics: Statistics = {
- val dataSize = metrics("dataSize").value
- Statistics(dataSize)
- }
-
- override def doCanonicalize(): SparkPlan = {
- GpuBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
- }
+ @transient
+ lazy val completionFuture: concurrent.Future[Broadcast[Any]] = promise.future
}
diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala
deleted file mode 100644
index 52e6f0efeef..00000000000
--- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastExchangeExec.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2020-2021, NVIDIA CORPORATION.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.nvidia.spark.rapids.shims.v2
-
-import java.util.UUID
-
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
-import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
-import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBaseWithFuture
-
-case class GpuBroadcastExchangeExec(
- override val mode: BroadcastMode,
- child: SparkPlan)
- extends GpuBroadcastExchangeExecBaseWithFuture(mode, child)
- with BroadcastExchangeLike {
-
- override def runId: UUID = _runId
-
- override def runtimeStatistics: Statistics = {
- val dataSize = metrics("dataSize").value
- Statistics(dataSize)
- }
-
- override def doCanonicalize(): SparkPlan = {
- GpuBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
- }
-}
diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
index 6e8ffd15394..433344189b3 100644
--- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
+++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
+import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.vectorized.ColumnarBatch
class GpuBroadcastHashJoinMeta(
diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala
new file mode 100644
index 00000000000..85eb1bd1221
--- /dev/null
+++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.shims.v2
+
+import scala.concurrent.Promise
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+
+/**
+ * This shim handles the completion future differences between
+ * Apache Spark and Databricks.
+ */
+trait ShimBroadcastExchangeLike extends BroadcastExchangeLike {
+ @transient
+ protected lazy val promise = Promise[Broadcast[Any]]()
+
+ /**
+ * For registering callbacks on `relationFuture`.
+ * Note that calling this field will not start the execution of broadcast job.
+ */
+ @transient
+ lazy val completionFuture: concurrent.Future[Broadcast[Any]] = promise.future
+}
diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
index 668448bdd63..a464a35df0b 100644
--- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
+++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
@@ -64,6 +64,11 @@ trait Spark30XShims extends SparkShims {
plan.sqlContext.sparkSession
}
+ override def newBroadcastQueryStageExec(
+ old: BroadcastQueryStageExec,
+ newPlan: SparkPlan): BroadcastQueryStageExec =
+ BroadcastQueryStageExec(old.id, newPlan, old._canonicalized)
+
override def getDateFormatter(): DateFormatter = {
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
}
diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
index 5ea7a459fda..e59ff6be512 100644
--- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
+++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
@@ -115,12 +115,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}
- override def getGpuBroadcastExchangeExec(
- mode: BroadcastMode,
- child: SparkPlan): GpuBroadcastExchangeExecBase = {
- GpuBroadcastExchangeExec(mode, child)
- }
-
override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
child: SparkPlan,
diff --git a/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
index 43c4f57ae91..be6c29043bd 100644
--- a/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
+++ b/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
+import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.vectorized.ColumnarBatch
class GpuBroadcastHashJoinMeta(
diff --git a/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala b/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
index 47f251172f4..4fca3eed77d 100644
--- a/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
+++ b/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
@@ -100,12 +100,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}
- override def getGpuBroadcastExchangeExec(
- mode: BroadcastMode,
- child: SparkPlan): GpuBroadcastExchangeExecBase = {
- GpuBroadcastExchangeExec(mode, child)
- }
-
override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
child: SparkPlan,
diff --git a/sql-plugin/src/main/301until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala b/sql-plugin/src/main/301until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
index 6a743a294e9..7e3d35678d8 100644
--- a/sql-plugin/src/main/301until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
+++ b/sql-plugin/src/main/301until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
@@ -74,6 +74,10 @@ trait Spark30XShims extends SparkShims {
def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any =
mode.transform(rows)
+ override def newBroadcastQueryStageExec(
+ old: BroadcastQueryStageExec,
+ newPlan: SparkPlan): BroadcastQueryStageExec = BroadcastQueryStageExec(old.id, newPlan)
+
override def getDateFormatter(): DateFormatter = {
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
}
diff --git a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
index 8e78f5a0bd7..e8a5bd3eacd 100644
--- a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
+++ b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
+import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.vectorized.ColumnarBatch
class GpuBroadcastHashJoinMeta(
diff --git a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
index 45877781079..3e704e8df63 100644
--- a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
+++ b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
+import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuHashJoin, JoinTypeChecks, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.vectorized.ColumnarBatch
class GpuBroadcastHashJoinMeta(
diff --git a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala
new file mode 100644
index 00000000000..8ee4ac10084
--- /dev/null
+++ b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/ShimBroadcastExchangeLike.scala
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2021, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.shims.v2
+
+import scala.concurrent.Promise
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+
+/**
+ * This shim handles the completion future differences between
+ * Apache Spark and Databricks.
+ */
+trait ShimBroadcastExchangeLike extends BroadcastExchangeLike {
+ @transient
+ protected lazy val promise = Promise[Broadcast[Any]]()
+
+ override def doCompletionFuture: concurrent.Future[Broadcast[Any]] = promise.future
+}
diff --git a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
index d4e0930c77f..bf163d417c9 100644
--- a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
+++ b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala
@@ -64,6 +64,11 @@ trait Spark30XShims extends SparkShims {
plan.sqlContext.sparkSession
}
+ override def newBroadcastQueryStageExec(
+ old: BroadcastQueryStageExec,
+ newPlan: SparkPlan): BroadcastQueryStageExec =
+ BroadcastQueryStageExec(old.id, newPlan, old._canonicalized)
+
override def getDateFormatter(): DateFormatter = {
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
}
diff --git a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
index baf611f8213..a1e42a2be31 100644
--- a/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
+++ b/sql-plugin/src/main/311db/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
@@ -101,12 +101,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}
- override def getGpuBroadcastExchangeExec(
- mode: BroadcastMode,
- child: SparkPlan): GpuBroadcastExchangeExecBase = {
- GpuBroadcastExchangeExec(mode, child)
- }
-
override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = {
plan match {
case _: GpuBroadcastHashJoinExec => true
diff --git a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
index f5126c702b5..9916a189a53 100644
--- a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
+++ b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/SparkBaseShims.scala
@@ -100,12 +100,6 @@ abstract class SparkBaseShims extends Spark31XShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}
- override def getGpuBroadcastExchangeExec(
- mode: BroadcastMode,
- child: SparkPlan): GpuBroadcastExchangeExecBase = {
- GpuBroadcastExchangeExec(mode, child)
- }
-
override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = {
plan match {
case _: GpuBroadcastHashJoinExec => true
diff --git a/sql-plugin/src/main/320/scala/com/nvidia/spark/rapids/shims/v2/Spark32XShims.scala b/sql-plugin/src/main/320/scala/com/nvidia/spark/rapids/shims/v2/Spark32XShims.scala
index 04201a929ff..b3a382536a7 100644
--- a/sql-plugin/src/main/320/scala/com/nvidia/spark/rapids/shims/v2/Spark32XShims.scala
+++ b/sql-plugin/src/main/320/scala/com/nvidia/spark/rapids/shims/v2/Spark32XShims.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, BroadcastQueryStageExec, QueryStageExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -97,6 +97,11 @@ trait Spark32XShims extends SparkShims {
override final def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any =
mode.transform(rows)
+ override final def newBroadcastQueryStageExec(
+ old: BroadcastQueryStageExec,
+ newPlan: SparkPlan): BroadcastQueryStageExec =
+ BroadcastQueryStageExec(old.id, newPlan, old._canonicalized)
+
override final def isExchangeOp(plan: SparkPlanMeta[_]): Boolean = {
// if the child query stage already executed on GPU then we need to keep the
// next operator on GPU in these cases
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastJoinMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastJoinMeta.scala
index c1f9ecc18a1..f6fcada886f 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastJoinMeta.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastJoinMeta.scala
@@ -18,7 +18,7 @@ package com.nvidia.spark.rapids
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
-import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase
+import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExec
abstract class GpuBroadcastJoinMeta[INPUT <: SparkPlan](plan: INPUT,
conf: RapidsConf,
@@ -28,24 +28,24 @@ abstract class GpuBroadcastJoinMeta[INPUT <: SparkPlan](plan: INPUT,
def canBuildSideBeReplaced(buildSide: SparkPlanMeta[_]): Boolean = {
buildSide.wrapped match {
- case bqse: BroadcastQueryStageExec => bqse.plan.isInstanceOf[GpuBroadcastExchangeExecBase] ||
+ case bqse: BroadcastQueryStageExec => bqse.plan.isInstanceOf[GpuBroadcastExchangeExec] ||
bqse.plan.isInstanceOf[ReusedExchangeExec] &&
bqse.plan.asInstanceOf[ReusedExchangeExec]
- .child.isInstanceOf[GpuBroadcastExchangeExecBase]
- case reused: ReusedExchangeExec => reused.child.isInstanceOf[GpuBroadcastExchangeExecBase]
- case _: GpuBroadcastExchangeExecBase => true
+ .child.isInstanceOf[GpuBroadcastExchangeExec]
+ case reused: ReusedExchangeExec => reused.child.isInstanceOf[GpuBroadcastExchangeExec]
+ case _: GpuBroadcastExchangeExec => true
case _ => buildSide.canThisBeReplaced
}
}
def verifyBuildSideWasReplaced(buildSide: SparkPlan): Unit = {
val buildSideOnGpu = buildSide match {
- case bqse: BroadcastQueryStageExec => bqse.plan.isInstanceOf[GpuBroadcastExchangeExecBase] ||
+ case bqse: BroadcastQueryStageExec => bqse.plan.isInstanceOf[GpuBroadcastExchangeExec] ||
bqse.plan.isInstanceOf[ReusedExchangeExec] &&
bqse.plan.asInstanceOf[ReusedExchangeExec]
- .child.isInstanceOf[GpuBroadcastExchangeExecBase]
- case reused: ReusedExchangeExec => reused.child.isInstanceOf[GpuBroadcastExchangeExecBase]
- case _: GpuBroadcastExchangeExecBase => true
+ .child.isInstanceOf[GpuBroadcastExchangeExec]
+ case reused: ReusedExchangeExec => reused.child.isInstanceOf[GpuBroadcastExchangeExec]
+ case _: GpuBroadcastExchangeExec => true
case _ => false
}
if (!buildSideOnGpu) {
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala
index ea62ab4ab52..c35b4d32138 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName, GpuShuffleEnv}
-import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastToCpuExec, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase}
+import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuBroadcastToCpuExec, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.vectorized.ColumnarBatch
/**
@@ -157,13 +157,14 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
// don't need to recurse down and optimize them again
case ColumnarToRowExec(e: BroadcastQueryStageExec) =>
e.plan match {
- case ReusedExchangeExec(_, b: GpuBroadcastExchangeExecBase) =>
+ case ReusedExchangeExec(output, b: GpuBroadcastExchangeExec) =>
// we can't directly re-use a GPU broadcast exchange to feed a CPU broadcast
// hash join but Spark will sometimes try and do this (see
// https://issues.apache.org/jira/browse/SPARK-35093 for more information) so we
// need to convert the output to rows in the driver before broadcasting the data
// to the executors
- GpuBroadcastToCpuExec(b.mode, b.child)
+ val newChild = ReusedExchangeExec(output, GpuBroadcastToCpuExec(b.mode, b.child))
+ ShimLoader.getSparkShims.newBroadcastQueryStageExec(e, newChild)
case _ => getColumnarToRowExec(e)
}
case ColumnarToRowExec(e: ShuffleQueryStageExec) =>
@@ -171,7 +172,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case ColumnarToRowExec(bb: GpuBringBackToHost) =>
optimizeAdaptiveTransitions(bb.child, Some(bb)) match {
- case e: GpuBroadcastExchangeExecBase => e
+ case e: GpuBroadcastExchangeExec => e
case e: GpuShuffleExchangeExecBase => e
case other => getColumnarToRowExec(other)
}
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala
index 232fb645bc1..b369761988e 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
+import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{FileIndex, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -136,11 +136,6 @@ trait SparkShims {
condition: Option[Expression],
targetSizeBytes: Long): GpuBroadcastNestedLoopJoinExecBase
-
- def getGpuBroadcastExchangeExec(
- mode: BroadcastMode,
- child: SparkPlan): GpuBroadcastExchangeExecBase
-
def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
child: SparkPlan,
@@ -149,6 +144,10 @@ trait SparkShims {
def getGpuShuffleExchangeExec(
queryStage: ShuffleQueryStageExec): GpuShuffleExchangeExecBase
+ def newBroadcastQueryStageExec(
+ old: BroadcastQueryStageExec,
+ newPlan: SparkPlan): BroadcastQueryStageExec
+
def getMapSizesByExecutorId(
shuffleId: Int,
startMapIndex: Int,
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala
index 68d66d0dd86..48e3524de30 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala
@@ -20,7 +20,7 @@ import java.io._
import java.util.UUID
import java.util.concurrent._
-import scala.concurrent.{ExecutionContext, Promise}
+import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange}
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
-import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode
+import com.nvidia.spark.rapids.shims.v2.{ShimBroadcastExchangeLike, ShimUnaryExecNode}
import org.apache.spark.SparkException
import org.apache.spark.broadcast.Broadcast
@@ -36,9 +36,10 @@ import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange}
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
@@ -256,30 +257,13 @@ class GpuBroadcastMeta(
}
override def convertToGpu(): GpuExec = {
- ShimLoader.getSparkShims.getGpuBroadcastExchangeExec(
- exchange.mode, childPlans.head.convertIfNeeded())
+ GpuBroadcastExchangeExec(exchange.mode, childPlans.head.convertIfNeeded())
}
-
-}
-
-abstract class GpuBroadcastExchangeExecBaseWithFuture(
- mode: BroadcastMode,
- child: SparkPlan) extends GpuBroadcastExchangeExecBase(mode, child) {
-
- /**
- * For registering callbacks on `relationFuture`.
- * Note that calling this field will not start the execution of broadcast job.
- */
- @transient
- lazy val completionFuture: concurrent.Future[Broadcast[Any]] = promise.future
}
-/**
- * In some versions of databricks we need to return the completionFuture in a different way.
- */
abstract class GpuBroadcastExchangeExecBase(
- val mode: BroadcastMode,
- child: SparkPlan) extends Exchange with ShimUnaryExecNode with GpuExec {
+ mode: BroadcastMode,
+ child: SparkPlan) extends ShimBroadcastExchangeLike with ShimUnaryExecNode with GpuExec {
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
@@ -294,14 +278,13 @@ abstract class GpuBroadcastExchangeExecBase(
// For now all broadcasts produce a single batch. We might need to change that at some point
override def outputBatching: CoalesceGoal = RequireSingleBatch
- @transient
- protected lazy val promise = Promise[Broadcast[Any]]()
-
@transient
protected val timeout: Long = SQLConf.get.broadcastTimeout
val _runId: UUID = UUID.randomUUID()
+ override def runId: UUID = _runId
+
@transient
lazy val relationFuture: Future[Broadcast[Any]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
@@ -448,8 +431,13 @@ abstract class GpuBroadcastExchangeExecBase(
ex)
}
}
-}
+ override def runtimeStatistics: Statistics = {
+ Statistics(
+ sizeInBytes = metrics("dataSize").value,
+ rowCount = Some(metrics(GpuMetric.NUM_OUTPUT_ROWS).value))
+ }
+}
object GpuBroadcastExchangeExecBase {
/**
@@ -481,3 +469,10 @@ object GpuBroadcastExchangeExecBase {
newDaemonCachedThreadPool("gpu-broadcast-exchange",
SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)))
}
+
+case class GpuBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
+ extends GpuBroadcastExchangeExecBase(mode, child) {
+ override def doCanonicalize(): SparkPlan = {
+ GpuBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
+ }
+}
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastToCpuExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastToCpuExec.scala
index 139f8902adf..1e6818b0c4a 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastToCpuExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastToCpuExec.scala
@@ -15,15 +15,17 @@
*/
package org.apache.spark.sql.rapids.execution
+import java.util
import java.util.Optional
import java.util.concurrent.{Callable, Future}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
-import ai.rapids.cudf.{HostColumnVector, NvtxColor}
-import com.nvidia.spark.rapids.{GpuMetric, MetricRange, NvtxWithMetrics, RapidsHostColumnVector, ShimLoader}
+import ai.rapids.cudf.{DType, HostColumnVector, HostColumnVectorCore, HostMemoryBuffer, NvtxColor}
+import ai.rapids.cudf.JCudfSerialization.{SerializedColumnHeader, SerializedTableHeader}
+import com.nvidia.spark.rapids.{Arm, GpuMetric, MetricRange, NvtxWithMetrics, RapidsHostColumnVector, ShimLoader}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import org.apache.spark.SparkException
@@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES
import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.KnownSizeEstimation
@@ -45,8 +48,8 @@ import org.apache.spark.util.KnownSizeEstimation
* @param mode Broadcast mode
* @param child Input to broadcast
*/
-case class GpuBroadcastToCpuExec(override val mode: BroadcastMode, child: SparkPlan)
- extends GpuBroadcastExchangeExecBaseWithFuture(mode, child) {
+case class GpuBroadcastToCpuExec(mode: BroadcastMode, child: SparkPlan)
+ extends GpuBroadcastExchangeExecBase(mode, child) {
import GpuMetric._
@@ -58,6 +61,7 @@ case class GpuBroadcastToCpuExec(override val mode: BroadcastMode, child: SparkP
val collectTime = gpuLongMetric(COLLECT_TIME)
val buildTime = gpuLongMetric(BUILD_TIME)
val broadcastTime = gpuLongMetric("broadcastTime")
+ val dataTypes = child.output.map(_.dataType).toArray
val task = new Callable[Broadcast[Any]]() {
override def call(): Broadcast[Any] = {
@@ -81,36 +85,27 @@ case class GpuBroadcastToCpuExec(override val mode: BroadcastMode, child: SparkP
}
// deserialize to host buffers in the driver and then convert to rows
- val dataTypes = child.output.map(_.dataType)
-
- val gpuBatches = serializedBatches.safeMap { cb =>
- val hostColumns = (0 until cb.header.getNumColumns).map { i =>
- val columnHeader = cb.header.getColumnHeader(i)
- val hcv = new HostColumnVector(
- columnHeader.getType,
- columnHeader.rowCount,
- Optional.of(columnHeader.nullCount),
- cb.buffer,
- null, null, List.empty.asJava)
- new RapidsHostColumnVector(dataTypes(i), hcv)
+ val gpuBatches = withResource(serializedBatches) { _ =>
+ serializedBatches.safeMap { cb =>
+ val hostColumns = GpuBroadcastToCpuExec.buildHostColumns(cb.header, cb.buffer,
+ dataTypes)
+ val rowCount = cb.header.getNumRows
+ new ColumnarBatch(hostColumns.toArray, rowCount)
}
- val rowCount = hostColumns.headOption.map(_.getRowCount.toInt).getOrElse(0)
- new ColumnarBatch(hostColumns.toArray, rowCount)
}
val broadcasted = try {
- val rows = new ListBuffer[InternalRow]()
- gpuBatches.foreach(cb => rows.appendAll(cb.rowIterator().asScala))
-
- val numRows = rows.length
+ val numRows = gpuBatches.map(_.numRows).sum
checkRowLimit(numRows)
numOutputRows += numRows
val relation = withResource(new NvtxWithMetrics(
"broadcast build", NvtxColor.DARK_GREEN, buildTime)) { _ =>
val toUnsafe = UnsafeProjection.create(output, output)
- val unsafeRows = rows.iterator.map(toUnsafe)
- val relation = ShimLoader.getSparkShims
+ val unsafeRows = gpuBatches.flatMap {
+ _.rowIterator().asScala.map(r => toUnsafe(r).copy())
+ }
+ val relation = ShimLoader.getSparkShims
.broadcastModeTransform(mode, unsafeRows.toArray)
val dataSize = relation match {
@@ -167,4 +162,142 @@ case class GpuBroadcastToCpuExec(override val mode: BroadcastMode, child: SparkP
GpuBroadcastExchangeExecBase.executionContext.submit[Broadcast[Any]](task)
}
+ override def doCanonicalize(): SparkPlan = {
+ GpuBroadcastToCpuExec(mode.canonicalized, child.canonicalized)
+ }
+}
+
+object GpuBroadcastToCpuExec extends Arm {
+ case class ColumnOffsets(validity: Long, offsets: Long, data: Long, dataLen: Long)
+
+ private def buildHostColumns(
+ header: SerializedTableHeader,
+ buffer: HostMemoryBuffer,
+ dataTypes: Array[DataType]): Array[RapidsHostColumnVector] = {
+ assert(dataTypes.length == header.getNumColumns)
+ val columnOffsets = buildColumnOffsets(header, buffer)
+ closeOnExcept(new ArrayBuffer[RapidsHostColumnVector](header.getNumColumns)) { hostColumns =>
+ (0 until header.getNumColumns).foreach { i =>
+ val columnHeader = header.getColumnHeader(i)
+ val hcv = buildHostColumn(columnHeader, columnOffsets, buffer)
+ hostColumns += new RapidsHostColumnVector(dataTypes(i), hcv)
+ }
+ assert(columnOffsets.isEmpty)
+ hostColumns.toArray
+ }
+ }
+
+ // TODO: The methods below either replicate private functionality in cudf
+ // or should be moved to cudf.
+
+ private def buildHostColumn(
+ columnHeader: SerializedColumnHeader,
+ columnOffsets: util.ArrayDeque[ColumnOffsets],
+ buffer: HostMemoryBuffer): HostColumnVector = {
+ val offsetsInfo = columnOffsets.remove()
+ closeOnExcept(new ArrayBuffer[HostColumnVectorCore](columnHeader.getNumChildren)) { children =>
+ val childHeaders = columnHeader.getChildren
+ if (childHeaders != null) {
+ childHeaders.foreach { childHeader =>
+ children += buildHostColumn(childHeader, columnOffsets, buffer)
+ }
+ }
+ val dtype = columnHeader.getType
+ val numRows = columnHeader.getRowCount
+ val nullCount = columnHeader.getNullCount
+
+ // Slice up the host buffer for this column vector's buffers.
+ val dataBuffer = if (dtype.isNestedType) {
+ null
+ } else {
+ buffer.slice(offsetsInfo.data, offsetsInfo.dataLen)
+ }
+ val validityBuffer = if (nullCount > 0) {
+ // one bit per row
+ val validitySize = (numRows + 7) / 8
+ buffer.slice(offsetsInfo.validity, validitySize)
+ } else {
+ null
+ }
+ val offsetsBuffer = if (dtype.hasOffsets) {
+ // one 32-bit integer offset per row plus one additional offset at the end
+ val offsetsSize = if (numRows > 0) (numRows + 1) * Integer.BYTES else 0
+ buffer.slice(offsetsInfo.offsets, offsetsSize)
+ } else {
+ null
+ }
+
+ new HostColumnVector(dtype, numRows, Optional.of(nullCount), dataBuffer, validityBuffer,
+ offsetsBuffer, children.asJava)
+ }
+ }
+
+ /** Build a list of column offset descriptors using a pre-order traversal of the columns. */
+ private def buildColumnOffsets(
+ header: SerializedTableHeader,
+ buffer: HostMemoryBuffer): util.ArrayDeque[ColumnOffsets] = {
+ val numTopColumns = header.getNumColumns
+ val offsets = new util.ArrayDeque[ColumnOffsets]
+ var bufferOffset = 0L
+ (0 until numTopColumns).foreach { i =>
+ val columnHeader = header.getColumnHeader(i)
+ bufferOffset = buildColumnOffsetsForColumn(columnHeader, buffer, offsets, bufferOffset)
+ }
+ offsets
+ }
+
+ /** Append a list of column offset descriptors using a pre-order traversal of the column. */
+ private def buildColumnOffsetsForColumn(
+ columnHeader: SerializedColumnHeader,
+ buffer: HostMemoryBuffer,
+ offsetsList: util.ArrayDeque[ColumnOffsets],
+ startBufferOffset: Long): Long = {
+ var bufferOffset = startBufferOffset
+ val rowCount = columnHeader.getRowCount
+ var validity = 0L
+ var offsets = 0L
+ var data = 0L
+ var dataLen = 0L
+ if (columnHeader.getNullCount > 0) {
+ val validityLen = padFor64ByteAlignment((rowCount + 7) / 8)
+ validity = bufferOffset
+ bufferOffset += validityLen
+ }
+
+ val dtype = columnHeader.getType
+ if (dtype.hasOffsets) {
+ if (rowCount > 0) {
+ val offsetsLen = (rowCount + 1) * Integer.BYTES
+ offsets = bufferOffset
+ val startOffset = buffer.getInt(bufferOffset)
+ val endOffset = buffer.getInt(bufferOffset + (rowCount * Integer.BYTES))
+ bufferOffset += padFor64ByteAlignment(offsetsLen)
+ if (dtype.equals(DType.STRING)) {
+ dataLen = endOffset - startOffset
+ data = bufferOffset
+ bufferOffset += padFor64ByteAlignment(dataLen)
+ }
+ }
+ } else if (dtype.getSizeInBytes > 0) {
+ dataLen = dtype.getSizeInBytes * rowCount
+ data = bufferOffset
+ bufferOffset += padFor64ByteAlignment(dataLen)
+ }
+ offsetsList.add(ColumnOffsets(
+ validity = validity,
+ offsets = offsets,
+ data = data,
+ dataLen = dataLen))
+
+ val children = columnHeader.getChildren
+ if (children != null) {
+ children.foreach { child =>
+ bufferOffset = buildColumnOffsetsForColumn(child, buffer, offsetsList, bufferOffset)
+ }
+ }
+
+ bufferOffset
+ }
+
+ private def padFor64ByteAlignment(addr: Long): Long = ((addr + 63) / 64) * 64
}
diff --git a/tests-spark310+/pom.xml b/tests-spark310+/pom.xml
index 2e1e7c3945d..1f711a7652b 100644
--- a/tests-spark310+/pom.xml
+++ b/tests-spark310+/pom.xml
@@ -121,6 +121,10 @@
spark-hive_${scala.binary.version}
${spark311cdh.version}
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+
org.apache.arrow
*
diff --git a/tests/pom.xml b/tests/pom.xml
index 5fe97eb2995..234d93336c2 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -130,6 +130,10 @@
spark-hive_${scala.binary.version}
${spark311cdh.version}
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+
org.apache.arrow
*
diff --git a/udf-compiler/pom.xml b/udf-compiler/pom.xml
index afea0ce5871..03e9665a380 100644
--- a/udf-compiler/pom.xml
+++ b/udf-compiler/pom.xml
@@ -148,6 +148,12 @@
org.apache.spark
spark-hive_${scala.binary.version}
${spark311cdh.version}
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+
+
org.apache.curator
diff --git a/udf-examples/pom.xml b/udf-examples/pom.xml
index be17d06edcb..9d3ac1f6073 100644
--- a/udf-examples/pom.xml
+++ b/udf-examples/pom.xml
@@ -163,6 +163,12 @@
org.apache.spark
spark-hive_${scala.binary.version}
${spark311cdh.version}
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+
+
org.apache.curator