Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix canonicalization of GpuFileSourceScanExec, GpuShuffleCoalesceExec #1310

Merged
merged 1 commit into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ class Spark300Shims extends SparkShims {
wrapped.optionalBucketSet,
None,
wrapped.dataFilters,
wrapped.tableIdentifier,
conf)
wrapped.tableIdentifier)(conf)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
Expand Down Expand Up @@ -401,7 +400,7 @@ class Spark300Shims extends SparkShims {
override def copyFileSourceScanExec(
scanExec: GpuFileSourceScanExec,
queryUsesInputFile: Boolean): GpuFileSourceScanExec = {
scanExec.copy(queryUsesInputFile=queryUsesInputFile)
scanExec.copy(queryUsesInputFile=queryUsesInputFile)(scanExec.rapidsConf)
}

override def getGpuColumnarToRowTransition(plan: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ class Spark301dbShims extends Spark301Shims {
// TODO: Does Databricks have coalesced bucketing implemented?
None,
wrapped.dataFilters,
wrapped.tableIdentifier,
conf)
wrapped.tableIdentifier)(conf)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
Expand Down Expand Up @@ -211,7 +210,7 @@ class Spark301dbShims extends Spark301Shims {
override def copyFileSourceScanExec(
scanExec: GpuFileSourceScanExec,
queryUsesInputFile: Boolean): GpuFileSourceScanExec = {
scanExec.copy(queryUsesInputFile=queryUsesInputFile)
scanExec.copy(queryUsesInputFile=queryUsesInputFile)(scanExec.rapidsConf)
}

override def getGpuShuffleExchangeExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ class Spark310Shims extends Spark301Shims {
wrapped.optionalBucketSet,
wrapped.optionalNumCoalescedBuckets,
wrapped.dataFilters,
wrapped.tableIdentifier,
conf)
wrapped.tableIdentifier)(conf)
}
}),
GpuOverrides.exec[InMemoryTableScanExec](
Expand Down Expand Up @@ -270,7 +269,7 @@ class Spark310Shims extends Spark301Shims {
override def copyFileSourceScanExec(
scanExec: GpuFileSourceScanExec,
queryUsesInputFile: Boolean): GpuFileSourceScanExec = {
scanExec.copy(queryUsesInputFile=queryUsesInputFile)
scanExec.copy(queryUsesInputFile=queryUsesInputFile)(scanExec.rapidsConf)
}

override def getGpuColumnarToRowTransition(plan: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* @note This should ALWAYS appear in the plan after a GPU shuffle when RAPIDS shuffle is
* not being used.
*/
case class GpuShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsConf)
case class GpuShuffleCoalesceExec(child: SparkPlan, targetBatchByteSize: Long)
extends UnaryExecNode with GpuExec {

import GpuMetricNames._
Expand All @@ -59,11 +59,11 @@ case class GpuShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: Rapid

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val metricsMap = metrics
val targetBatchByteSize = rapidsConf.gpuTargetBatchSizeBytes
val targetSize = targetBatchByteSize
val sparkSchema = GpuColumnVector.extractTypes(schema)

child.executeColumnar().mapPartitions { iter =>
new GpuShuffleCoalesceIterator(iter, targetBatchByteSize, sparkSchema, metricsMap)
new GpuShuffleCoalesceIterator(iter, targetSize, sparkSchema, metricsMap)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
if (GpuShuffleEnv.isRapidsShuffleEnabled) {
GpuCoalesceBatches(plan, TargetSize(conf.gpuTargetBatchSizeBytes))
} else {
GpuShuffleCoalesceExec(plan, conf)
GpuShuffleCoalesceExec(plan, conf.gpuTargetBatchSizeBytes)
}
}

Expand Down Expand Up @@ -301,7 +301,8 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
private def insertShuffleCoalesce(plan: SparkPlan): SparkPlan = plan match {
case exec: GpuShuffleExchangeExecBase =>
// always follow a GPU shuffle with a shuffle coalesce
GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), conf)
GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)),
conf.gpuTargetBatchSizeBytes)
case exec => exec.withNewChildren(plan.children.map(insertShuffleCoalesce))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ case class GpuFileSourceScanExec(
optionalNumCoalescedBuckets: Option[Int],
dataFilters: Seq[Expression],
tableIdentifier: Option[TableIdentifier],
@transient rapidsConf: RapidsConf,
queryUsesInputFile: Boolean = false)
queryUsesInputFile: Boolean = false)(@transient val rapidsConf: RapidsConf)
extends GpuDataSourceScanExec with GpuExec {

private val isParquetFileFormat: Boolean = relation.fileFormat.isInstanceOf[ParquetFileFormat]
Expand Down Expand Up @@ -541,8 +540,7 @@ case class GpuFileSourceScanExec(
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None,
rapidsConf,
queryUsesInputFile)
queryUsesInputFile)(rapidsConf)
}
}

Expand Down