Skip to content

Commit

Permalink
Extend range of supported decimal divide operations (#3891)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Oct 22, 2021
1 parent b747a82 commit e52b6fb
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 179 deletions.
8 changes: 5 additions & 3 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,15 @@ def test_division_fallback_on_decimal(data_gen):
f.col('a') / f.col('b')),
'Divide')

@pytest.mark.parametrize('lhs', [DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2)], ids=idfn)
@pytest.mark.parametrize('rhs', [DecimalGen(4, 1)], ids=idfn)
@approximate_float # we should get the perfectly correct answer for floats except when casting a deciml to a float in some corner cases.
@pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, float_gen, DecimalGen(4, 1), DecimalGen(5, 0), DecimalGen(5, 1), DecimalGen(10, 5)], ids=idfn)
@pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, float_gen, DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2), DecimalGen(16, 1)], ids=idfn)
def test_division_mixed(lhs, rhs):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : two_col_df(spark, lhs, rhs).select(
f.col('a') / f.col('b')),
conf=allow_negative_scale_of_decimal_conf)
conf=copy_and_update(allow_negative_scale_of_decimal_conf,
{'spark.rapids.sql.castDecimalToFloat.enabled': 'true'}))

@pytest.mark.parametrize('data_gen', integral_gens + [decimal_gen_default, decimal_gen_scale_precision,
decimal_gen_same_scale_precision, decimal_gen_64bit, decimal_gen_18_3, decimal_gen_30_2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,16 @@ object DecimalUtil extends Arm {
case t => t.defaultSize
}
}


/**
* Get the number of decimal places needed to hold the integral type held by this column
*/
def getPrecisionForIntegralType(input: DType): Int = input match {
case DType.INT8 => 3 // -128 to 127
case DType.INT16 => 5 // -32768 to 32767
case DType.INT32 => 10 // -2147483648 to 2147483647
case DType.INT64 => 19 // -9223372036854775808 to 9223372036854775807
case t => throw new IllegalArgumentException(s"Unsupported type $t")
}
}
13 changes: 1 addition & 12 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1116,22 +1116,11 @@ object GpuCast extends Arm {
}
}

/**
* Get the number of decimal places needed to hold the integral type held by this column
*/
private def getPrecisionForIntegralInput(input: ColumnView): Int = input.getType match {
case DType.INT8 => 3 // -128 to 127
case DType.INT16 => 5 // -32768 to 32767
case DType.INT32 => 10 // -2147483648 to 2147483647
case DType.INT64 => 19 // -9223372036854775808 to 9223372036854775807
case t => throw new IllegalArgumentException(s"Unsupported type $t")
}

private def castIntegralsToDecimal(
input: ColumnView,
dt: DecimalType,
ansiMode: Boolean): ColumnVector = {
val prec = getPrecisionForIntegralInput(input)
val prec = DecimalUtil.getPrecisionForIntegralType(input.getType)
// Cast input to decimal
val inputDecimalType = new DecimalType(prec, 0)
withResource(input.castTo(DecimalUtil.createCudfDecimal(prec, 0))) { castedInput =>
Expand Down
101 changes: 59 additions & 42 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Original file line number Diff line number Diff line change
Expand Up @@ -912,9 +912,57 @@ object GpuOverrides extends Logging {
"CheckOverflow after arithmetic operations between DecimalType data",
ExprChecks.unaryProjectInputMatchesOutput(TypeSig.DECIMAL_128_FULL,
TypeSig.DECIMAL_128_FULL),
(a, conf, p, r) => new UnaryExprMeta[CheckOverflow](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
GpuCheckOverflow(child, wrapped.dataType, wrapped.nullOnOverflow)
(a, conf, p, r) => new ExprMeta[CheckOverflow](a, conf, p, r) {
private[this] def extractOrigParam(expr: BaseExprMeta[_]): BaseExprMeta[_] =
expr.wrapped match {
case PromotePrecision(_: Cast) =>
// Strip out the promote precision and the cast so we get as close to the original
// values as we can.
val castExpr = expr.childExprs.head
castExpr.childExprs.head
case _ => expr
}
private[this] lazy val binExpr = childExprs.head
private[this] lazy val lhs = extractOrigParam(binExpr.childExprs.head)
private[this] lazy val rhs = extractOrigParam(binExpr.childExprs(1))

override def tagExprForGpu(): Unit = {
a.child match {
case _: Divide =>
// Division of Decimal types is a little odd. Spark will cast the inputs
// to a common wider value where the scale is the max of the two input scales, and
// the precision is max of the two input non-scale portions + the new scale. Then it
// will do the divide as a BigDecimal value but lie about the return type. Then here
// in CheckOverflow it will reset the scale and check the precision so that they know
// it fits in final desired result.
// Here we try to strip out the extra casts, etc to get to as close to the original
// query as possible. This lets us then calculate what CUDF needs to get the correct
// answer, which in some cases is a lot smaller. Our GpuDecimalDivide handles the
// overflow checking/etc.
val l = GpuDecimalDivide.asDecimalType(lhs.wrapped.asInstanceOf[Expression].dataType)
val r = GpuDecimalDivide.asDecimalType(rhs.wrapped.asInstanceOf[Expression].dataType)
val intermediatePrecision =
GpuDecimalDivide.nonRoundedIntermediateArgPrecision(l, r, a.dataType)

if (intermediatePrecision > DType.DECIMAL128_MAX_PRECISION) {
binExpr.willNotWorkOnGpu(s"The intermediate precision of $intermediatePrecision " +
s"that is required to guarnatee no overflow issues for this divide is too " +
s"large to be supported on the GPU")
}
case _ => // NOOP
}
}

override def convertToGpu(): GpuExpression = {
a.child match {
case _: Divide =>
// Get as close to the original divide as possible
GpuDecimalDivide(lhs.convertToGpu(), rhs.convertToGpu(), wrapped.dataType)
case _ =>
GpuCheckOverflow(childExprs.head.convertToGpu(),
wrapped.dataType, wrapped.nullOnOverflow)
}
}
}),
expr[ToDegrees](
"Converts radians to degrees",
Expand Down Expand Up @@ -2002,46 +2050,15 @@ object GpuOverrides extends Logging {
("rhs", TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL)),
(a, conf, p, r) => new BinaryExprMeta[Divide](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
// Division of Decimal types is a little odd. Spark will cast the inputs
// to a common wider value where scale is max of the two input scales, and precision is
// max of the two input non-scale portions + the new scale. Then it will do the divide,
// which the rules for it are a little complex, but lie about it
// in the return type of the Divide operator. Then in CheckOverflow it will reset the
// scale and check the precision so that they know it fits in final desired result.
// We would like to avoid all of this if possible because having a temporary intermediate
// value that can have a scale quite a bit larger than the final result reduces the
// maximum precision that we could support, as we don't have unlimited precision. But
// sadly because of how the logical plan is compiled down to the physical plan we have
// lost what the original types were and cannot recover it. As such for now we are going
// to do what Spark does, but we have to recompute/recheck the temporary precision to be
// sure it will fit on the GPU. In addition to this we have it a little harder because
// the decimal divide itself will do rounding on the result before it is returned,
// effectively calculating an extra digit of precision. Because cudf does not support this
// right now we actually increase the scale (and corresponding precision) to get an extra
// decimal place so we can round it in GpuCheckOverflow
val Seq(leftDataType, rightDataType) = childExprs.flatMap(_.typeMeta.dataType)
(leftDataType, rightDataType) match {
case (l: DecimalType, r: DecimalType) =>
val outputScale = GpuDivideUtil.outputDecimalScale(l, r)
val outputPrecision = GpuDivideUtil.outputDecimalPrecision(l, r, outputScale)
if (outputPrecision > DType.DECIMAL128_MAX_PRECISION) {
willNotWorkOnGpu("The final output precision of the divide is too " +
s"large to be supported on the GPU $outputPrecision")
}
val intermediatePrecision =
GpuDivideUtil.intermediateDecimalPrecision(l, r, outputScale)

if (intermediatePrecision > DType.DECIMAL128_MAX_PRECISION) {
willNotWorkOnGpu("The intermediate output precision of the divide is too " +
s"large to be supported on the GPU $intermediatePrecision")
}
case _ => // NOOP
}
}

// Division of Decimal types is a little odd. To work around some issues with
// what Spark does the tagging/checks are in CheckOverflow instead of here.
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuDivide(lhs, rhs)
a.dataType match {
case _: DecimalType =>
throw new IllegalStateException("Decimal Divide should be converted in CheckOverflow")
case _ =>
GpuDivide(lhs, rhs)
}
}),
expr[IntegralDivide](
"Division with a integer result",
Expand Down
Loading

0 comments on commit e52b6fb

Please sign in to comment.