diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index 43540feef2e..941aabdadc6 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -18,16 +18,11 @@ package com.nvidia.spark.rapids.shims.spark300 import com.nvidia.spark.rapids._ -import org.apache.spark.TaskContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, ShuffledHashJoinExec} -import org.apache.spark.sql.rapids.execution.{GpuHashJoin, GpuShuffledHashJoinBase} -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.rapids.execution.GpuHashJoin object GpuJoinUtils { def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = { @@ -85,79 +80,6 @@ case class GpuShuffledHashJoinExec( extends GpuShuffledHashJoinBase( leftKeys, rightKeys, - joinType, + buildSide, condition, - left, - right, - isSkewJoin) - with GpuHashJoin { - import GpuMetric._ - - override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL - override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL - override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE), - BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), - STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME), - JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), - JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), - FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) - - override def requiredChildDistribution: Seq[Distribution] = - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil - - override protected def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException( - "GpuShuffledHashJoin does not support the execute() code path.") - } - - override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { - case GpuBuildLeft => Seq(RequireSingleBatch, null) - case GpuBuildRight => Seq(null, RequireSingleBatch) - } - - override def doExecuteColumnar() : RDD[ColumnarBatch] = { - val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) - val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) - val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) - val totalTime = gpuLongMetric(TOTAL_TIME) - val buildTime = gpuLongMetric(BUILD_TIME) - val streamTime = gpuLongMetric(STREAM_TIME) - val joinTime = gpuLongMetric(JOIN_TIME) - val filterTime = gpuLongMetric(FILTER_TIME) - val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) - - streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { - (streamIter, buildIter) => { - var combinedSize = 0 - - val startTime = System.nanoTime() - val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( - buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => - withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) - withResource(combined) { combined => - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } - } - } - - val delta = System.nanoTime() - startTime - buildTime += delta - totalTime += delta - buildDataSize += combinedSize - val context = TaskContext.get() - context.addTaskCompletionListener[Unit](_ => builtTable.close()) - - doJoin(builtTable, streamIter, boundCondition, - numOutputRows, joinOutputRows, numOutputBatches, - streamTime, joinTime, filterTime, totalTime) - } - } - } -} + isSkewJoin = isSkewJoin) diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala index 21f76b2f29f..ebf511cce29 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala @@ -18,17 +18,12 @@ package com.nvidia.spark.rapids.shims.spark301db import com.nvidia.spark.rapids._ -import org.apache.spark.TaskContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} -import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.rapids.execution.GpuHashJoin -import org.apache.spark.sql.vectorized.ColumnarBatch object GpuJoinUtils { def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = { @@ -80,74 +75,10 @@ case class GpuShuffledHashJoinExec( buildSide: GpuBuildSide, condition: Option[Expression], left: SparkPlan, - right: SparkPlan) extends BinaryExecNode with GpuHashJoin { - import GpuMetric._ - - override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL - override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL - override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE), - BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), - STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME), - JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), - JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), - FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) - - override def requiredChildDistribution: Seq[Distribution] = - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil - - override protected def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException( - "GpuShuffledHashJoin does not support the execute() code path.") - } - - override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { - case GpuBuildLeft => Seq(RequireSingleBatch, null) - case GpuBuildRight => Seq(null, RequireSingleBatch) - } - - override def doExecuteColumnar() : RDD[ColumnarBatch] = { - val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) - val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) - val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) - val totalTime = gpuLongMetric(TOTAL_TIME) - val buildTime = gpuLongMetric(BUILD_TIME) - val streamTime = gpuLongMetric(STREAM_TIME) - val joinTime = gpuLongMetric(JOIN_TIME) - val filterTime = gpuLongMetric(FILTER_TIME) - val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) - - streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { - (streamIter, buildIter) => { - var combinedSize = 0 - - val startTime = System.nanoTime() - val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( - buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => - withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) - withResource(combined) { combined => - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } - } - } - - val delta = System.nanoTime() - startTime - buildTime += delta - totalTime += delta - buildDataSize += combinedSize - val context = TaskContext.get() - context.addTaskCompletionListener[Unit](_ => builtTable.close()) - - doJoin(builtTable, streamIter, boundCondition, - numOutputRows, joinOutputRows, numOutputBatches, - streamTime, joinTime, filterTime, totalTime) - } - } - } -} + right: SparkPlan) + extends GpuShuffledHashJoinBase( + leftKeys, + rightKeys, + buildSide, + condition, + isSkewJoin = false) diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala index 30c50804c86..fa2b3439681 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala @@ -18,17 +18,12 @@ package com.nvidia.spark.rapids.shims.spark311 import com.nvidia.spark.rapids._ -import org.apache.spark.TaskContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec -import org.apache.spark.sql.rapids.execution.{GpuHashJoin, GpuShuffledHashJoinBase} -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.rapids.execution.GpuHashJoin object GpuJoinUtils { def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = { @@ -86,79 +81,6 @@ case class GpuShuffledHashJoinExec( extends GpuShuffledHashJoinBase( leftKeys, rightKeys, - joinType, + buildSide, condition, - left, - right, - isSkewJoin) - with GpuHashJoin { - import GpuMetric._ - - override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL - override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL - override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE), - BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), - STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME), - JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), - JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), - FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) - - override def requiredChildDistribution: Seq[Distribution] = - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil - - override protected def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException( - "GpuShuffledHashJoin does not support the execute() code path.") - } - - override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { - case GpuBuildLeft => Seq(RequireSingleBatch, null) - case GpuBuildRight => Seq(null, RequireSingleBatch) - } - - override def doExecuteColumnar() : RDD[ColumnarBatch] = { - val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) - val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) - val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) - val totalTime = gpuLongMetric(TOTAL_TIME) - val buildTime = gpuLongMetric(BUILD_TIME) - val streamTime = gpuLongMetric(STREAM_TIME) - val joinTime = gpuLongMetric(JOIN_TIME) - val filterTime = gpuLongMetric(FILTER_TIME) - val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) - - streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { - (streamIter, buildIter) => { - var combinedSize = 0 - - val startTime = System.nanoTime() - val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( - buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => - withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) - withResource(combined) { combined => - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } - } - } - - val delta = System.nanoTime() - startTime - buildTime += delta - totalTime += delta - buildDataSize += combinedSize - val context = TaskContext.get() - context.addTaskCompletionListener[Unit](_ => builtTable.close()) - - doJoin(builtTable, streamIter, boundCondition, - numOutputRows, joinOutputRows, numOutputBatches, - streamTime, joinTime, filterTime, totalTime) - } - } - } -} + isSkewJoin = isSkewJoin) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala new file mode 100644 index 00000000000..efa0f84d316 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} +import org.apache.spark.sql.execution.BinaryExecNode +import org.apache.spark.sql.rapids.execution.GpuHashJoin +import org.apache.spark.sql.vectorized.ColumnarBatch + +abstract class GpuShuffledHashJoinBase( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: GpuBuildSide, + condition: Option[Expression], + val isSkewJoin: Boolean) extends BinaryExecNode with GpuHashJoin { + import GpuMetric._ + + override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL + override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL + override lazy val additionalMetrics: Map[String, GpuMetric] = Map( + BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE), + BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), + STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME), + JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), + JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), + FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) + + override def requiredChildDistribution: Seq[Distribution] = + HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + "GpuShuffledHashJoin does not support the execute() code path.") + } + + override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { + case GpuBuildLeft => Seq(RequireSingleBatch, null) + case GpuBuildRight => Seq(null, RequireSingleBatch) + } + + override def doExecuteColumnar() : RDD[ColumnarBatch] = { + val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) + val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) + val totalTime = gpuLongMetric(TOTAL_TIME) + val buildTime = gpuLongMetric(BUILD_TIME) + val streamTime = gpuLongMetric(STREAM_TIME) + val joinTime = gpuLongMetric(JOIN_TIME) + val filterTime = gpuLongMetric(FILTER_TIME) + val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) + + val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + + streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { + (streamIter, buildIter) => { + var combinedSize = 0 + + val startTime = System.nanoTime() + val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( + buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => + withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) + withResource(combined) { combined => + combinedSize = + GpuColumnVector.extractColumns(combined) + .map(_.getBase.getDeviceMemorySize).sum.toInt + GpuColumnVector.from(combined) + } + } + } + + val delta = System.nanoTime() - startTime + buildTime += delta + totalTime += delta + buildDataSize += combinedSize + val context = TaskContext.get() + context.addTaskCompletionListener[Unit](_ => builtTable.close()) + + doJoin(builtTable, streamIter, boundCondition, + numOutputRows, joinOutputRows, numOutputBatches, + streamTime, joinTime, filterTime, totalTime) + } + } + } + + override def nodeName: String = { + if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName + } +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleHashJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleHashJoinExec.scala deleted file mode 100644 index 16e5f7a6d55..00000000000 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleHashJoinExec.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids.execution - -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} - -abstract class GpuShuffledHashJoinBase( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - joinType: JoinType, - condition: Option[Expression], - left: SparkPlan, - right: SparkPlan, - val isSkewJoin: Boolean) extends BinaryExecNode { - - override def nodeName: String = { - if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName - } -} \ No newline at end of file diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala index 5de8e891450..dd78bbef58f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions.{col, when} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.execution.{GpuCustomShuffleReaderExec, GpuShuffledHashJoinBase} +import org.apache.spark.sql.rapids.execution.GpuCustomShuffleReaderExec import org.apache.spark.sql.types.{ArrayType, DecimalType, IntegerType, StructField, StructType} object AdaptiveQueryExecSuite {