Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in reporting of time taken to transition plan to GPU #3315

Merged
merged 4 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3314,17 +3314,24 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {

// Spark calls this method once for the whole plan when AQE is off. When AQE is on, it
// gets called once for each query stage (where a query stage is an `Exchange`).
override def apply(plan: SparkPlan) :SparkPlan = {
override def apply(plan: SparkPlan): SparkPlan = {
val conf = new RapidsConf(plan.conf)
if (conf.isSqlEnabled) {
val start = System.nanoTime()
val updatedPlan = if (plan.conf.adaptiveExecutionEnabled) {
// AQE can cause Spark to inject undesired CPU shuffles into the plan because GPU and CPU
// distribution expressions are not semantically equal.
GpuOverrides.removeExtraneousShuffles(plan, conf)
} else {
plan
}
applyOverrides(updatedPlan, conf)
val ret = applyOverrides(updatedPlan, conf)
val end = System.nanoTime()
if (conf.shouldExplain) {
val timeMs = (end - start) / 1000000.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider NANOSECONDS.toMillis(end - start)

Also consider a utility function to measure duration to avoid duplicating code

def withDurationLog[T](block: => T, conf: RapidsConf, format: String): T 

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started off using NANOSECONDS.toMillis, but it returns a long, not a double. If I wanted just millisecond precision I would have measured the values in milliseconds to begin with. I also looked at MILLISECONDS.toNanos(1) to produce the magic number, but decided against it. Happy to move back to that if you think it is better.

I also thought about writing a utility, but the code is only in 2 places and it is 6 lines of code in each place. So best case I would save 3 lines of code total. 12 lines as it is now vs 6 lines for the utility body + 1 line for the utility def + 2 lines to call the utility from each place. It just didn't feel like it was worth it at this time. But for code cleanliness if you want me to I will do it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point about fractional millis.

It's easily conceivable that we want to instrument more and more durations in code and this PR already needs it twice. Thus I'd favor a util method making it easy.

logInfo(f"Plan conversion to the GPU took $timeMs%.2f ms")
}
ret
} else {
plan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids

import java.lang.reflect.Method

import scala.annotation.tailrec

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, SortOrder}
Expand Down Expand Up @@ -159,6 +161,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
p.withNewChildren(p.children.map(c => optimizeAdaptiveTransitions(c, Some(p))))
}

@tailrec
private def isGpuShuffleLike(execNode: SparkPlan): Boolean = execNode match {
case _: GpuShuffleExchangeExecBase | _: GpuCustomShuffleReaderExec => true
case qs: ShuffleQueryStageExec => isGpuShuffleLike(qs.plan)
Expand Down Expand Up @@ -445,7 +448,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
!conf.testingAllowedNonGpu.exists(nonGpuClass =>
PlanUtils.sameClass(plan, nonGpuClass))) {
throw new IllegalArgumentException(s"Part of the plan is not columnar " +
s"${plan.getClass}\n${plan}")
s"${plan.getClass}\n$plan")
}
// filter out the output expressions since those are not GPU expressions
val planOutput = plan.output.toSet
Expand All @@ -472,7 +475,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
}
// to set to make uniq execs
val execsFound = PlanUtils.findOperators(plan, planContainsInstanceOf).toSet
val execsNotFound = validateExecs.diff(execsFound.map(_.getClass().getSimpleName))
val execsNotFound = validateExecs.diff(execsFound.map(_.getClass.getSimpleName))
require(execsNotFound.isEmpty,
s"Plan ${plan.toString()} does not contain the following execs: " +
execsNotFound.mkString(","))
Expand All @@ -489,6 +492,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
this.rapidsConf = new RapidsConf(plan.conf)
if (rapidsConf.isSqlEnabled) {
val start = System.nanoTime()
var updatedPlan = insertHashOptimizeSorts(plan)
updatedPlan = updateScansForInput(updatedPlan)
updatedPlan = insertColumnarFromGpu(updatedPlan)
Expand All @@ -513,6 +517,11 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
updatedPlan.canonicalized
validateExecsInGpuPlan(updatedPlan, rapidsConf)
}
val end = System.nanoTime()
if (rapidsConf.shouldExplain) {
val timeMs = (end - start) / 1000000.0
logInfo(f"GPU plan transition optimization took $timeMs%.2f ms")
}
updatedPlan
} else {
plan
Expand Down