Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cost-based optimizer: Implement simple cost model that demonstrates benefits with NDS queries #2744

Merged
merged 12 commits into from
Jun 23, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetStructField, WindowFrame, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftAnti, LeftSemi}
import org.apache.spark.sql.execution.{GlobalLimitExec, LocalLimitExec, SparkPlan, TakeOrderedAndProjectExec, UnionExec}
import org.apache.spark.sql.execution.adaptive.{CustomShuffleReaderExec, QueryStageExec}
import org.apache.spark.sql.execution.{GlobalLimitExec, LocalLimitExec, ProjectExec, SparkPlan, TakeOrderedAndProjectExec, UnionExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, CustomShuffleReaderExec, QueryStageExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
Expand Down Expand Up @@ -67,7 +67,6 @@ class CostBasedOptimizer extends Optimizer with Logging {
optimizations
}


/**
* Walk the plan and determine CPU and GPU costs for each operator and then make decisions
* about whether operators should run on CPU or GPU.
Expand All @@ -87,44 +86,47 @@ class CostBasedOptimizer extends Optimizer with Logging {
optimizations: ListBuffer[Optimization],
finalOperator: Boolean): (Double, Double) = {

// get the CPU and GPU cost of the child plan(s)
val childCosts = plan.childPlans
.map(child => recursivelyOptimize(
conf,
cpuCostModel,
gpuCostModel,
child,
optimizations,
finalOperator = false))

val (childCpuCosts, childGpuCosts) = childCosts.unzip

// get the CPU and GPU cost of this operator (excluding cost of children)
val operatorCpuCost = cpuCostModel.getCost(plan)
val operatorGpuCost = gpuCostModel.getCost(plan)

// get the CPU and GPU cost of the child plan(s)
val childCosts = plan.childPlans
.map(child => recursivelyOptimize(
conf,
cpuCostModel,
gpuCostModel,
child,
optimizations,
finalOperator = false))
val (childCpuCosts, childGpuCosts) = childCosts.unzip

// calculate total (this operator + children)
val totalCpuCost = operatorCpuCost + childCpuCosts.sum
var totalGpuCost = operatorGpuCost + childGpuCosts.sum
showCosts(plan, "Operator costs", operatorCpuCost, operatorGpuCost)
showCosts(plan, "Operator + child costs", totalCpuCost, totalGpuCost)

plan.estimatedOutputRows = RowCountPlanVisitor.visit(plan)

// determine how many transitions between CPU and GPU are taking place between
// the child operators and this operator
val numTransitions = plan.childPlans
.count(_.canThisBeReplaced != plan.canThisBeReplaced)
.count(canRunOnGpu(_) != canRunOnGpu(plan))
showCosts(plan, s"numTransitions=$numTransitions", totalCpuCost, totalGpuCost)

if (numTransitions > 0) {
// there are transitions between CPU and GPU so we need to calculate the transition costs
// and also make decisions based on those costs to see whether any parts of the plan would
// have been better off just staying on the CPU

// is this operator on the GPU?
if (plan.canThisBeReplaced) {
if (canRunOnGpu(plan)) {
// at least one child is transitioning from CPU to GPU so we calculate the
// transition costs
val transitionCost = plan.childPlans.filter(!_.canThisBeReplaced)
.map(transitionToGpuCost(conf, _)).sum
val transitionCost = plan.childPlans
.filterNot(canRunOnGpu)
.map(transitionToGpuCost(conf, _)).sum

// if the GPU cost including transition is more than the CPU cost then avoid this
// transition and reset the GPU cost
Expand All @@ -134,20 +136,22 @@ class CostBasedOptimizer extends Optimizer with Logging {
plan.costPreventsRunningOnGpu()
// reset GPU cost
totalGpuCost = totalCpuCost
showCosts(plan, s"Avoid transition to GPU", totalCpuCost, totalGpuCost)
} else {
// add transition cost to total GPU cost
totalGpuCost += transitionCost
showCosts(plan, s"transitionFromCpuCost=$transitionCost", totalCpuCost, totalGpuCost)
}
} else {
// at least one child is transitioning from GPU to CPU so we evaulate each of this
// at least one child is transitioning from GPU to CPU so we evaluate each of this
// child plans to see if it was worth running on GPU now that we have the cost of
// transitioning back to CPU
plan.childPlans.zip(childCosts).foreach {
case (child, childCosts) =>
val (childCpuCost, childGpuCost) = childCosts
val transitionCost = transitionToCpuCost(conf, child)
val childGpuTotal = childGpuCost + transitionCost
if (child.canThisBeReplaced && !isExchangeOp(child)
if (canRunOnGpu(child) && !isExchangeOp(child)
&& childGpuTotal > childCpuCost) {
// force this child plan back onto CPU
optimizations.append(ReplaceSection(
Expand All @@ -158,39 +162,71 @@ class CostBasedOptimizer extends Optimizer with Logging {

// recalculate the transition costs because child plans may have changed
val transitionCost = plan.childPlans
.filter(_.canThisBeReplaced)
.map(transitionToCpuCost(conf, _)).sum
.filter(canRunOnGpu)
.map(transitionToCpuCost(conf, _)).sum
totalGpuCost += transitionCost
showCosts(plan, s"transitionFromGpuCost=$transitionCost", totalCpuCost, totalGpuCost)
}
}

// special behavior if this is the final operator in the plan because we always have the
// cost of going back to CPU at the end
if (finalOperator && plan.canThisBeReplaced) {
totalGpuCost += transitionToCpuCost(conf, plan)
if (finalOperator && canRunOnGpu(plan)) {
val transitionCost = transitionToCpuCost(conf, plan)
totalGpuCost += transitionCost
showCosts(plan, s"final operator, transitionFromGpuCost=$transitionCost",
totalCpuCost, totalGpuCost)
}

if (totalGpuCost > totalCpuCost) {
// we have reached a point where we have transitioned onto GPU for part of this
// plan but with no benefit from doing so, so we want to undo this and go back to CPU
if (plan.canThisBeReplaced && !isExchangeOp(plan)) {
if (canRunOnGpu(plan) && !isExchangeOp(plan)) {
// this plan would have been on GPU so we move it and onto CPU and recurse down
// until we reach a part of the plan that is already on CPU and then stop
optimizations.append(ReplaceSection(plan, totalCpuCost, totalGpuCost))
plan.recursiveCostPreventsRunningOnGpu()
// reset the costs because this section of the plan was not moved to GPU
totalGpuCost = totalCpuCost
showCosts(plan, s"ReplaceSection: ${plan}", totalCpuCost, totalGpuCost)
}
}

if (!plan.canThisBeReplaced || isExchangeOp(plan)) {
if (!canRunOnGpu(plan) || isExchangeOp(plan)) {
// reset the costs because this section of the plan was not moved to GPU
totalGpuCost = totalCpuCost
showCosts(plan, s"Reset costs (not on GPU / exchange)", totalCpuCost, totalGpuCost)
}

showCosts(plan, "END", totalCpuCost, totalGpuCost)
(totalCpuCost, totalGpuCost)
}

private def showCosts(
jlowe marked this conversation as resolved.
Show resolved Hide resolved
plan: SparkPlanMeta[_],
message: String,
cpuCost: Double,
gpuCost: Double): Unit = {
val sign = if (cpuCost == gpuCost) {
"=="
} else if (cpuCost < gpuCost) {
"<"
} else {
">"
}
logTrace(s"CBO [${plan.wrapped.getClass.getSimpleName}] $message: " +
s"cpuCost=$cpuCost $sign gpuCost=$gpuCost)")
}

private def canRunOnGpu(plan: SparkPlanMeta[_]): Boolean = plan.wrapped match {
case _: AdaptiveSparkPlanExec =>
// this is hacky but AdaptiveSparkPlanExec is always tagged as "cannot replace" and
// there are no child plans to inspect, so we just assume that the plan is running
// on GPU
true
case _ => plan.canThisBeReplaced
}

private def transitionToGpuCost(conf: RapidsConf, plan: SparkPlanMeta[SparkPlan]): Double = {
val rowCount = RowCountPlanVisitor.visit(plan).map(_.toDouble)
.getOrElse(conf.defaultRowCount.toDouble)
Expand Down Expand Up @@ -218,13 +254,13 @@ class CostBasedOptimizer extends Optimizer with Logging {
// if the child query stage already executed on GPU then we need to keep the
// next operator on GPU in these cases
SQLConf.get.adaptiveExecutionEnabled && (plan.wrapped match {
case _: CustomShuffleReaderExec
| _: ShuffledHashJoinExec
| _: BroadcastHashJoinExec
| _: BroadcastExchangeExec
| _: BroadcastNestedLoopJoinExec => true
case _ => false
})
case _: CustomShuffleReaderExec
| _: ShuffledHashJoinExec
| _: BroadcastHashJoinExec
| _: BroadcastExchangeExec
| _: BroadcastNestedLoopJoinExec => true
case _ => false
})
}
}

Expand All @@ -250,9 +286,21 @@ class CpuCostModel(conf: RapidsConf) extends CostModel {
val rowCount = RowCountPlanVisitor.visit(plan).map(_.toDouble)
.getOrElse(conf.defaultRowCount.toDouble)

val operatorCost = plan.conf
.getCpuOperatorCost(plan.wrapped.getClass.getSimpleName)
.getOrElse(conf.defaultCpuOperatorCost) * rowCount
val operatorCost = plan.wrapped match {
case _: ProjectExec =>
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
// GpuProject cost is zero (in our cost model) and we don't want to encourage moving to
// the GPU just to do a trivial projection, so we pretend the overhead of a
// CPU projection (beyond evaluating the expressions) is also zero
0
case _: UnionExec =>
// union does not further process data produced by its children
0
jlowe marked this conversation as resolved.
Show resolved Hide resolved
case _ => plan.conf
.getCpuOperatorCost(plan.wrapped.getClass.getSimpleName)
.getOrElse(conf.defaultCpuOperatorCost) * rowCount
}

val exprEvalCost = plan.childExprs
.map(expr => exprCost(expr.asInstanceOf[BaseExprMeta[Expression]], rowCount))
Expand Down Expand Up @@ -299,9 +347,18 @@ class GpuCostModel(conf: RapidsConf) extends CostModel {
val rowCount = RowCountPlanVisitor.visit(plan).map(_.toDouble)
.getOrElse(conf.defaultRowCount.toDouble)

val operatorCost = plan.conf
.getGpuOperatorCost(plan.wrapped.getClass.getSimpleName)
.getOrElse(conf.defaultGpuOperatorCost) * rowCount
val operatorCost = plan.wrapped match {
case _: ProjectExec =>
// The cost of a GPU projection is mostly the cost of evaluating the expressions
// to produce the projected columns
0
case _: UnionExec =>
// union does not further process data produced by its children
0
jlowe marked this conversation as resolved.
Show resolved Hide resolved
case _ => plan.conf
.getGpuOperatorCost(plan.wrapped.getClass.getSimpleName)
.getOrElse(conf.defaultGpuOperatorCost) * rowCount
}

val exprEvalCost = plan.childExprs
.map(expr => exprCost(expr.asInstanceOf[BaseExprMeta[Expression]], rowCount))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,25 +1153,25 @@ object RapidsConf {

val OPTIMIZER_DEFAULT_CPU_OPERATOR_COST = conf("spark.rapids.sql.optimizer.cpu.exec.default")
.internal()
.doc("Default per-row CPU cost of executing an operator")
.doc("Default per-row CPU cost of executing an operator, in seconds")
.doubleConf
.createWithDefault(0.0)
.createWithDefault(0.0002)

val OPTIMIZER_DEFAULT_CPU_EXPRESSION_COST = conf("spark.rapids.sql.optimizer.cpu.expr.default")
.internal()
.doc("Default per-row CPU cost of evaluating an expression")
.doc("Default per-row CPU cost of evaluating an expression, in seconds")
.doubleConf
.createWithDefault(0.0)

val OPTIMIZER_DEFAULT_GPU_OPERATOR_COST = conf("spark.rapids.sql.optimizer.gpu.exec.default")
.internal()
.doc("Default per-row GPU cost of executing an operator")
.doc("Default per-row GPU cost of executing an operator, in seconds")
.doubleConf
.createWithDefault(0.0)
.createWithDefault(0.0001)

val OPTIMIZER_DEFAULT_GPU_EXPRESSION_COST = conf("spark.rapids.sql.optimizer.gpu.expr.default")
.internal()
.doc("Default per-row GPU cost of evaluating an expression")
.doc("Default per-row GPU cost of evaluating an expression, in seconds")
.doubleConf
.createWithDefault(0.0)

Expand Down
Loading