Skip to content

Commit

Permalink
Unshim GpuRowBasedScalaUDF (#5013)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Mar 23, 2022
1 parent 5641e0a commit a88bf8a
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 166 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,7 @@ abstract class Spark31XShims extends SparkShims with Spark31Xuntil33XShims with
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled)
}
}),
GpuScalaUDFMeta.exprMeta
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled)
}
}),
GpuScalaUDFMeta.exprMeta
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging {
ParamCheck("windowSpec",
TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL_64 +
TypeSig.DAYTIME, TypeSig.numericAndInterval))),
(windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r)),
GpuScalaUDFMeta.exprMeta
(windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2448,6 +2448,7 @@ object GpuOverrides extends Logging {
childExprs.map(_.convertToGpu()),
a.evalType, a.udfDeterministic, a.resultId)
}),
GpuScalaUDFMeta.exprMeta,
expr[Rand](
"Generate a random column with i.i.d. uniformly distributed values in [0, 1)",
ExprChecks.projectOnly(TypeSig.DOUBLE, TypeSig.DOUBLE,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,13 +19,13 @@ package org.apache.spark.sql.rapids
import java.lang.invoke.SerializedLambda

import com.nvidia.spark.RapidsUDF
import com.nvidia.spark.rapids.{DataFromReplacementRule, ExprMeta, GpuExpression, GpuRowBasedUserDefinedFunction, GpuUserDefinedFunction, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids._

import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF, SpecializedGetters}
import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, ScalaUDF, SpecializedGetters}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType, MapType, StructType}

case class GpuScalaUDF(
function: RapidsUDF,
Expand All @@ -40,23 +40,113 @@ case class GpuScalaUDF(
override val name: String = udfName.getOrElse("???")
}

abstract class GpuRowBasedScalaUDFBase(
object GpuScalaUDFMeta {
def exprMeta: ExprRule[ScalaUDF] = GpuOverrides.expr[ScalaUDF](
"User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface " +
"to get better performance.",
ExprChecks.projectOnly(
GpuUserDefinedFunction.udfTypeSig,
TypeSig.all,
repeatingParamCheck =
Some(RepeatingParamCheck("param", GpuUserDefinedFunction.udfTypeSig, TypeSig.all))),
(expr, conf, p, r) => new ExprMeta(expr, conf, p, r) {
lazy val opRapidsFunc = GpuScalaUDF.getRapidsUDFInstance(expr.function)

override def tagExprForGpu(): Unit = {
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
val udfName = expr.udfName.getOrElse("UDF")
val udfClass = expr.function.getClass
willNotWorkOnGpu(s"neither $udfName implemented by $udfClass provides " +
s"a GPU implementation, nor the conf `${RapidsConf.ENABLE_CPU_BASED_UDF.key}` " +
s"is enabled")
}
}

override def convertToGpu(): GpuExpression = {
// It can come here only when at least one option as below is true.
// 1. UDF implements a RAPIDS accelerated interface.
// 2. The conf "spark.rapids.sql.rowBasedUDF.enabled" is enabled.
opRapidsFunc.map { rapidsFunc =>
GpuScalaUDF(
rapidsFunc,
expr.dataType,
childExprs.map(_.convertToGpu()),
expr.udfName,
expr.nullable,
expr.udfDeterministic)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
GpuRowBasedScalaUDF(
expr.function,
expr.dataType,
childExprs.map(_.convertToGpu()),
expr.inputEncoders,
expr.outputEncoder,
expr.udfName,
expr.nullable,
expr.udfDeterministic)
}
}
})
}

case class GpuRowBasedScalaUDF(
sparkFunc: AnyRef,
dataType: DataType,
children: Seq[Expression],
inputEncoders: Seq[Option[ExpressionEncoder[_]]],
outputEncoder: Option[ExpressionEncoder[_]],
udfName: Option[String]) extends GpuRowBasedUserDefinedFunction {
udfName: Option[String],
nullable: Boolean,
udfDeterministic: Boolean) extends GpuRowBasedUserDefinedFunction {

/**
* Create the converter which converts the catalyst data type to the scala data type.
* This converter will be used for the UDF input type conversion.
* We use `CatalystTypeConverters` to create the converter for:
* - UDF which doesn't provide inputEncoders, e.g., untyped Scala UDF and Java UDF
* - type which isn't supported by `ExpressionEncoder`, e.g., Any
* - primitive types, in order to use `identity` for better performance
* For other cases like case class, Option[T], we use `ExpressionEncoder` instead since
* `CatalystTypeConverters` doesn't support these data types.
*
* @param i the index of the child
* @param dataType the output data type of the i-th child
* @return the converter
*/
def createInputConverter(i: Int, dataType: DataType): Any => Any
def createInputConverter(i: Int, dataType: DataType): Any => Any = {
val useEncoder =
!(inputEncoders.isEmpty || // for untyped Scala UDF and Java UDF
inputEncoders(i).isEmpty || // for types aren't supported by encoder, e.g. Any
inputPrimitives(i)) // for primitive types

if (useEncoder) {
val enc = inputEncoders(i).get
val fromRow = enc.createDeserializer()
if (enc.isSerializedAsStructForTopLevel) {
row: Any => fromRow(row.asInstanceOf[InternalRow])
} else {
val inputRow = new GenericInternalRow(1)
value: Any => inputRow.update(0, value); fromRow(inputRow)
}
} else { // use CatalystTypeConverters
CatalystTypeConverters.createToScalaConverter(dataType)
}
}

/**
* Need nulls check when there are array types with nulls in the input.
* This is for `https://github.com/NVIDIA/spark-rapids/issues/3942`.
*/
override val checkNull: Boolean = children.exists(child => hasArrayWithNulls(child.dataType))

private def hasArrayWithNulls(dt: DataType): Boolean = dt match {
case ArrayType(et, hasNull) => hasNull || hasArrayWithNulls(et)
case MapType(kt, vt, _) => hasArrayWithNulls(kt) || hasArrayWithNulls(vt)
case StructType(fields) => fields.exists(f => hasArrayWithNulls(f.dataType))
case _ => false
}

override def toString: String = s"$name(${children.mkString(", ")})"

Expand Down Expand Up @@ -519,46 +609,6 @@ abstract class GpuRowBasedScalaUDFBase(

}

abstract class ScalaUDFMetaBase(
expr: ScalaUDF,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule) extends ExprMeta(expr, conf, parent, rule) {

lazy val opRapidsFunc = GpuScalaUDF.getRapidsUDFInstance(expr.function)

override def tagExprForGpu(): Unit = {
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
val udfName = expr.udfName.getOrElse("UDF")
val udfClass = expr.function.getClass
willNotWorkOnGpu(s"neither $udfName implemented by $udfClass provides " +
s"a GPU implementation, nor the conf `${RapidsConf.ENABLE_CPU_BASED_UDF.key}` " +
s"is enabled")
}
}

override def convertToGpu(): GpuExpression = {
// It can come here only when at least one option as below is true.
// 1. UDF implements a RAPIDS accelerated interface.
// 2. The conf "spark.rapids.sql.rowBasedUDF.enabled" is enabled.
opRapidsFunc.map { rapidsFunc =>
GpuScalaUDF(
rapidsFunc,
expr.dataType,
childExprs.map(_.convertToGpu()),
expr.udfName,
expr.nullable,
expr.udfDeterministic)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
rowBasedScalaUDF
}
}

protected def rowBasedScalaUDF: GpuRowBasedScalaUDFBase
}

object GpuScalaUDF {
/**
* Determine if the UDF function implements the [[com.nvidia.spark.RapidsUDF]] interface,
Expand Down

0 comments on commit a88bf8a

Please sign in to comment.