Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove GpuAttributeReference and GpuSortOrder #253

Merged
merged 5 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ object ApiValidation extends Logging {
"com.nvidia.spark.rapids.GpuExpression",
"org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression" ->
"org.apache.spark.sql.rapids.GpuAggregateExpression",
"org.apache.spark.sql.catalyst.expressions.AttributeReference" ->
"com.nvidia.spark.rapids.GpuAttributeReference",
"org.apache.spark.sql.execution.command.DataWritingCommand" ->
"com.nvidia.spark.rapids.GpuDataWritingCommand",
"org.apache.spark.sql.execution.joins.BuildSide" ->
Expand Down
26 changes: 15 additions & 11 deletions docs/dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,24 @@ On startup use: `--conf [conf key]=[conf value]`. For example:
[...]
```

### Input Expressions and Output Attributes
For the common case of nodes expecting GPU columnar data as input and
### Expressions
For nodes expecting GPU columnar data as input and
producing GPU columnar data as output, the child node(s) passed to the case
class constructor should have the `GpuExpression` type.

Note that any attribute references that appear in the node's `output` should
*not* be `GpuAttributeReference` instances but rather normal
`AttributeReference` instances. Using `GpuAttributeReference` instances in
the node's `output` can cause these attributes to find their way into
non-plugin nodes in the plan that are unaware of `GpuAttributeReference`,
causing an error when planning the query.
class constructor should have the `Expression` type. This is a little
odd because they should all be instances of `GpuExpression` except for
`AttributeReference` and `SortOrder`. This is needed because `AttributeReference`
is weaved into a lot of the magic that is built into Spark expressions.
`SortOrder` is similar as Spark itself will insert `SortOrder` instances into
the plan automatically in many cases. These are both `Unevaluable` expressions
so they should never be run columnar or otherwise. These `Expressions` should be
bound using `GpuBindReferences` which will make sure that all `AttributeReference`
instances are replaced with `GpuBoundReference` implementations and everything is
on the GPU. So after calling `GpuBindReferences.bindReferences` you should be able
to cast the result to `GpuExpression` unless you know you have a SortOrder in there,
which should be rare.

### The GPU Semaphore
Typically Spark runs a task per CPU core, but there are often many more CPU
Typically, Spark runs a task per CPU core, but there are often many more CPU
cores than GPUs. This can lead to situations where Spark wants to run more
concurrent tasks than can reasonably fit on a GPU. The plugin works around
this problem with the `GpuSemaphore` object. This object acts as a traffic
Expand Down
7 changes: 2 additions & 5 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
from data_gen import *
import pyspark.sql.functions as f

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/187')
def test_passing_gpuExpr_as_Expr():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, string_gen)
.select(f.col("a")).na.drop()
.groupBy(f.col("a"))
.agg(f.count(f.col("a")))
.orderBy("count(a)", ascending=False)
.agg(f.count(f.col("a")).alias("count_a"))
.orderBy(f.col("count_a").desc(), f.col("a"))
.cache()
.limit(50)
)


Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,49 +17,73 @@
package com.nvidia.spark.rapids

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.AttributeSeq
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, SortOrder}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuBindReferences extends Logging {

private[this] def postBindCheck[A <: Expression](base: A): Unit = {
base.foreach { expr =>
// The condition is needed to have it match what transform
// looks at, otherwise we can check things that would not be modified.
if (expr.containsChild.nonEmpty) {
expr match {
case _: GpuExpression =>
case _: SortOrder =>
case other =>
throw new IllegalArgumentException(
s"Bound an expression that shouldn't be here ${other.getClass}")
}
}
}
}

// Mostly copied from BoundAttribute.scala so we can do columnar processing
def bindReference[A <: GpuExpression](
private[this] def bindRefInternal[A <: Expression, R <: Expression](
expression: A,
input: AttributeSeq,
allowFailures: Boolean = false): A = {
expression.transform { case a: GpuAttributeReference =>
val ordinal = input.indexOf(a.exprId)
if (ordinal == -1) {
if (allowFailures) {
a
} else {
input: AttributeSeq): R = {
val ret = expression.transform {
case a: AttributeReference =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the time we come here we will only have 2 types in the expression i.e. AttributeReference, SortOrder?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transform walks through the tree and if the supplied pattern matches something in the tree it will be replaced with what is returned. So the goal of the code is to replace all AttributeReference instances with GpuBoundReference instances. Everything else passes through.

val ordinal = input.indexOf(a.exprId)
if (ordinal == -1) {
sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}")
} else {
GpuBoundReference(ordinal, a.dataType, input(ordinal).nullable)
}
} else {
GpuBoundReference(ordinal, a.dataType, input(ordinal).nullable)
}
}.asInstanceOf[A]
}.asInstanceOf[R]
postBindCheck(ret)
ret
}

def bindGpuReference[A <: Expression](
expression: A,
input: AttributeSeq): GpuExpression =
bindRefInternal(expression, input)

/**
* bindReferences[GpuExpression]: a helper function to bind given expressions to
* an input schema where the expressions are GpuExpressions.
* A helper function to bind given expressions to an input schema where the expressions are
* to be processed on the GPU, and the result type indicates this.
*/
def bindReferences[A <: GpuExpression](
def bindGpuReferences[A <: Expression](
expressions: Seq[A],
input: AttributeSeq): Seq[A] = {
expressions.map(GpuBindReferences.bindReference(_, input))
}
input: AttributeSeq): Seq[GpuExpression] =
expressions.map(GpuBindReferences.bindGpuReference(_, input))

def bindReference[A <: Expression](
expression: A,
input: AttributeSeq): A =
bindRefInternal(expression, input)

/**
* A version of `bindReferences` that takes `AttributeSeq` as its expressions
* A helper function to bind given expressions to an input schema where the expressions are
* to be processed on the GPU. Most of the time `bindGpuReferences` should be used, unless
* you know that the return type is `SortOrder` or is a comment trait like `Attribute`.
*/
def bindReferences(expressions: AttributeSeq, input: AttributeSeq): Seq[GpuExpression] = {
bindReferences(expressions.attrs.map(ref => GpuAttributeReference(
ref.name, ref.dataType, ref.nullable, ref.metadata)(ref.exprId, ref.qualifier)),
input)
}
def bindReferences[A <: Expression](
expressions: Seq[A],
input: AttributeSeq): Seq[A] =
expressions.map(GpuBindReferences.bindReference(_, input))
}

case class GpuBoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class CastExprMeta[INPUT <: CastBase](
}
}

override def convertToGpu(child: GpuExpression): GpuExpression =
override def convertToGpu(child: Expression): GpuExpression =
GpuCast(child, toType, ansiEnabled, cast.timeZoneId)
}

Expand Down Expand Up @@ -179,7 +179,7 @@ object GpuCast {
* Casts using the GPU
*/
case class GpuCast(
child: GpuExpression,
child: Expression,
dataType: DataType,
ansiMode: Boolean = false,
timeZoneId: Option[String] = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@ class GpuExpandExecMeta(
rule: ConfKeysAndIncompat)
extends SparkPlanMeta[ExpandExec](expand, conf, parent, rule) {

private val gpuProjections: Seq[Seq[ExprMeta[_]]] =
private val gpuProjections: Seq[Seq[BaseExprMeta[_]]] =
expand.projections.map(_.map(GpuOverrides.wrapExpr(_, conf, Some(this))))

private val outputAttributes: Seq[ExprMeta[_]] =
private val outputAttributes: Seq[BaseExprMeta[_]] =
expand.output.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override val childExprs: Seq[ExprMeta[_]] = gpuProjections.flatten ++ outputAttributes
override val childExprs: Seq[BaseExprMeta[_]] = gpuProjections.flatten ++ outputAttributes

/**
* Convert what this wraps to a GPU enabled version.
*/
override def convertToGpu(): GpuExec = {
val projections = gpuProjections.map(_.map(_.convertToGpu()))
val attributes = outputAttributes.map(_.convertToGpu()).asInstanceOf[Seq[GpuAttributeReference]]
GpuExpandExec(projections, attributes, childPlans.head.convertIfNeeded())
GpuExpandExec(projections, expand.output,
childPlans.head.convertIfNeeded())
}
}

Expand All @@ -60,12 +60,12 @@ class GpuExpandExecMeta(
* multiple output rows for an input row.
* @param projections The group of expressions, all of the group expressions should
* output the same schema specified bye the parameter `output`
* @param resultExpressions Attribute references to Output
* @param output Attribute references to Output
* @param child Child operator
*/
case class GpuExpandExec(
projections: Seq[Seq[GpuExpression]],
resultExpressions: Seq[GpuAttributeReference],
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: SparkPlan)
extends UnaryExecNode with GpuExec {

Expand All @@ -81,16 +81,13 @@ case class GpuExpandExec(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)

override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)

@transient
override lazy val references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val boundProjections: Seq[Seq[GpuExpression]] =
projections.map(GpuBindReferences.bindReferences(_, child.output))

projections.map(GpuBindReferences.bindGpuReferences(_, child.output))
child.executeColumnar().mapPartitions { it =>
new GpuExpandIterator(boundProjections, metrics, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object GpuExpressionsUtils {
resultCvs
}

def getTrimString(trimStr: Option[GpuExpression]): String = trimStr match {
def getTrimString(trimStr: Option[Expression]): String = trimStr match {
case Some(GpuLiteral(data, StringType)) =>
if (data == null) {
null
Expand Down Expand Up @@ -79,7 +79,7 @@ trait GpuExpression extends Expression with Unevaluable with Arm {
* we have to jump through some hoops to make this work.
*/
def disableCoalesceUntilInput(): Boolean =
children.exists{
children.exists {
case c: GpuExpression => c.disableCoalesceUntilInput()
case _ => false // This path should never really happen
}
Expand Down Expand Up @@ -118,7 +118,7 @@ abstract class GpuUnaryExpression extends UnaryExpression with GpuExpression {
def outputTypeOverride: DType = null

override def columnarEval(batch: ColumnarBatch): Any = {
val input = child.asInstanceOf[GpuExpression].columnarEval(batch)
val input = child.columnarEval(batch)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please clarify. From what I understand so far children could possibly contain a SortOrder??

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing that is evaluated can contain a SortOrder. SortOrder is Unevaluable. So it will not work columnar or otherwise. It is up to spark to only insert it into places that it knows will work properly, like SortExec.

try {
input match {
case vec: GpuColumnVector =>
Expand Down Expand Up @@ -165,8 +165,8 @@ trait GpuBinaryExpression extends BinaryExpression with GpuExpression {
var lhs: Any = null
var rhs: Any = null
try {
lhs = left.asInstanceOf[GpuExpression].columnarEval(batch)
rhs = right.asInstanceOf[GpuExpression].columnarEval(batch)
lhs = left.columnarEval(batch)
rhs = right.columnarEval(batch)

(lhs, rhs) match {
case (l: GpuColumnVector, r: GpuColumnVector) => doColumnar(l, r)
Expand Down Expand Up @@ -237,11 +237,11 @@ abstract class CudfBinaryOperator extends GpuBinaryOperator with CudfBinaryExpre

trait GpuString2TrimExpression extends String2TrimExpression with GpuExpression {

override def srcStr: GpuExpression
override def srcStr: Expression

override def trimStr: Option[GpuExpression]
override def trimStr: Option[Expression]

override def children: Seq[GpuExpression] = srcStr +: trimStr.toSeq
override def children: Seq[Expression] = srcStr +: trimStr.toSeq

def strippedColumnVector(value: GpuColumnVector, sclarValue: Scalar): GpuColumnVector

Expand Down Expand Up @@ -294,9 +294,9 @@ trait GpuTernaryExpression extends TernaryExpression with GpuExpression {
var val1: Any = null
var val2: Any = null
try {
val0 = children(0).asInstanceOf[GpuExpression].columnarEval(batch)
val1 = children(1).asInstanceOf[GpuExpression].columnarEval(batch)
val2 = children(2).asInstanceOf[GpuExpression].columnarEval(batch)
val0 = children(0).columnarEval(batch)
val1 = children(1).columnarEval(batch)
val2 = children(2).columnarEval(batch)

(val0, val1, val2) match {
case (v0: GpuColumnVector, v1: GpuColumnVector, v2: GpuColumnVector) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class GpuGenerateExecSparkPlanMeta(
p: Option[RapidsMeta[_, _, _]],
r: ConfKeysAndIncompat) extends SparkPlanMeta[GenerateExec](gen, conf, p, r) {

private def exprsFromArray(data: ArrayData, dataType: DataType): Seq[ExprMeta[Expression]] = {
private def exprsFromArray(data: ArrayData, dataType: DataType): Seq[BaseExprMeta[Expression]] = {
(0 until data.numElements()).map { i =>
Literal(data.get(i, dataType), dataType).asInstanceOf[Expression]
}.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
Expand All @@ -60,7 +60,7 @@ class GpuGenerateExecSparkPlanMeta(
case _ => Seq.empty
}

override val childExprs: Seq[ExprMeta[_]] = arrayExprs
override val childExprs: Seq[BaseExprMeta[_]] = arrayExprs

override def tagPlanForGpu(): Unit = {
// We can only run on the GPU if we are doing a posexplode of an array we are generating
Expand Down Expand Up @@ -100,7 +100,7 @@ class GpuGenerateExecSparkPlanMeta(
*/
case class GpuGenerateExec(
includePos: Boolean,
arrayProject: Seq[GpuExpression],
arrayProject: Seq[Expression],
requiredChildOutput: Seq[Attribute],
generatorOutput: Seq[Attribute],
child: SparkPlan
Expand All @@ -119,10 +119,11 @@ case class GpuGenerateExec(
val numOutputRows = longMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES)
val totalTime = longMetric(TOTAL_TIME)
val boundArrayProjectList = GpuBindReferences.bindReferences(arrayProject, child.output).toArray
val boundArrayProjectList =
GpuBindReferences.bindGpuReferences(arrayProject, child.output).toArray
val numArrayColumns = boundArrayProjectList.length
val boundOthersProjectList =
GpuBindReferences.bindReferences(requiredChildOutput, child.output).toArray
val boundOthersProjectList: Array[GpuExpression] =
GpuBindReferences.bindGpuReferences(requiredChildOutput, child.output).toArray
val numOtherColumns = boundOthersProjectList.length
val numExplodeColumns = if (includePos) 2 else 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ trait GpuHashJoin extends GpuExec with HashJoin {
protected lazy val (gpuBuildKeys, gpuStreamedKeys) = {
require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
"Join keys from two sides should have same types")
val lkeys = GpuBindReferences.bindReferences(leftKeys.asInstanceOf[Seq[GpuExpression]],
left.output)
val rkeys = GpuBindReferences.bindReferences(rightKeys.asInstanceOf[Seq[GpuExpression]],
right.output)
val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output)
val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output)
buildSide match {
case BuildLeft => (lkeys, rkeys)
case BuildRight => (rkeys, lkeys)
Expand All @@ -71,7 +69,7 @@ trait GpuHashJoin extends GpuExec with HashJoin {

def doJoin(builtTable: Table,
streamedBatch: ColumnarBatch,
condition: Option[GpuExpression],
condition: Option[Expression],
numOutputRows: SQLMetric,
numJoinOutputRows: SQLMetric,
numOutputBatches: SQLMetric,
Expand Down
Loading