Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend range of supported decimal divide operations #3891

Merged
merged 2 commits into from
Oct 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,15 @@ def test_division_fallback_on_decimal(data_gen):
f.col('a') / f.col('b')),
'Divide')

@pytest.mark.parametrize('lhs', [DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2)], ids=idfn)
@pytest.mark.parametrize('rhs', [DecimalGen(4, 1)], ids=idfn)
@approximate_float # we should get the perfectly correct answer for floats except when casting a deciml to a float in some corner cases.
@pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, float_gen, DecimalGen(4, 1), DecimalGen(5, 0), DecimalGen(5, 1), DecimalGen(10, 5)], ids=idfn)
@pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, float_gen, DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2), DecimalGen(16, 1)], ids=idfn)
def test_division_mixed(lhs, rhs):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : two_col_df(spark, lhs, rhs).select(
f.col('a') / f.col('b')),
conf=allow_negative_scale_of_decimal_conf)
conf=copy_and_update(allow_negative_scale_of_decimal_conf,
{'spark.rapids.sql.castDecimalToFloat.enabled': 'true'}))

@pytest.mark.parametrize('data_gen', integral_gens + [decimal_gen_default, decimal_gen_scale_precision,
decimal_gen_same_scale_precision, decimal_gen_64bit, decimal_gen_18_3, decimal_gen_30_2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,16 @@ object DecimalUtil extends Arm {
case t => t.defaultSize
}
}


/**
* Get the number of decimal places needed to hold the integral type held by this column
*/
def getPrecisionForIntegralType(input: DType): Int = input match {
case DType.INT8 => 3 // -128 to 127
case DType.INT16 => 5 // -32768 to 32767
case DType.INT32 => 10 // -2147483648 to 2147483647
case DType.INT64 => 19 // -9223372036854775808 to 9223372036854775807
case t => throw new IllegalArgumentException(s"Unsupported type $t")
}
}
13 changes: 1 addition & 12 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1116,22 +1116,11 @@ object GpuCast extends Arm {
}
}

/**
* Get the number of decimal places needed to hold the integral type held by this column
*/
private def getPrecisionForIntegralInput(input: ColumnView): Int = input.getType match {
case DType.INT8 => 3 // -128 to 127
case DType.INT16 => 5 // -32768 to 32767
case DType.INT32 => 10 // -2147483648 to 2147483647
case DType.INT64 => 19 // -9223372036854775808 to 9223372036854775807
case t => throw new IllegalArgumentException(s"Unsupported type $t")
}

private def castIntegralsToDecimal(
input: ColumnView,
dt: DecimalType,
ansiMode: Boolean): ColumnVector = {
val prec = getPrecisionForIntegralInput(input)
val prec = DecimalUtil.getPrecisionForIntegralType(input.getType)
// Cast input to decimal
val inputDecimalType = new DecimalType(prec, 0)
withResource(input.castTo(DecimalUtil.createCudfDecimal(prec, 0))) { castedInput =>
Expand Down
101 changes: 59 additions & 42 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Original file line number Diff line number Diff line change
Expand Up @@ -912,9 +912,57 @@ object GpuOverrides extends Logging {
"CheckOverflow after arithmetic operations between DecimalType data",
ExprChecks.unaryProjectInputMatchesOutput(TypeSig.DECIMAL_128_FULL,
TypeSig.DECIMAL_128_FULL),
(a, conf, p, r) => new UnaryExprMeta[CheckOverflow](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
GpuCheckOverflow(child, wrapped.dataType, wrapped.nullOnOverflow)
(a, conf, p, r) => new ExprMeta[CheckOverflow](a, conf, p, r) {
private[this] def extractOrigParam(expr: BaseExprMeta[_]): BaseExprMeta[_] =
expr.wrapped match {
case PromotePrecision(_: Cast) =>
// Strip out the promote precision and the cast so we get as close to the original
// values as we can.
val castExpr = expr.childExprs.head
castExpr.childExprs.head
case _ => expr
}
private[this] lazy val binExpr = childExprs.head
private[this] lazy val lhs = extractOrigParam(binExpr.childExprs.head)
private[this] lazy val rhs = extractOrigParam(binExpr.childExprs(1))

override def tagExprForGpu(): Unit = {
a.child match {
case _: Divide =>
// Division of Decimal types is a little odd. Spark will cast the inputs
// to a common wider value where the scale is the max of the two input scales, and
// the precision is max of the two input non-scale portions + the new scale. Then it
// will do the divide as a BigDecimal value but lie about the return type. Then here
// in CheckOverflow it will reset the scale and check the precision so that they know
// it fits in final desired result.
// Here we try to strip out the extra casts, etc to get to as close to the original
// query as possible. This lets us then calculate what CUDF needs to get the correct
// answer, which in some cases is a lot smaller. Our GpuDecimalDivide handles the
// overflow checking/etc.
val l = GpuDecimalDivide.asDecimalType(lhs.wrapped.asInstanceOf[Expression].dataType)
val r = GpuDecimalDivide.asDecimalType(rhs.wrapped.asInstanceOf[Expression].dataType)
val intermediatePrecision =
GpuDecimalDivide.nonRoundedIntermediateArgPrecision(l, r, a.dataType)

if (intermediatePrecision > DType.DECIMAL128_MAX_PRECISION) {
binExpr.willNotWorkOnGpu(s"The intermediate precision of $intermediatePrecision " +
s"that is required to guarnatee no overflow issues for this divide is too " +
s"large to be supported on the GPU")
}
case _ => // NOOP
}
}

override def convertToGpu(): GpuExpression = {
a.child match {
case _: Divide =>
// Get as close to the original divide as possible
GpuDecimalDivide(lhs.convertToGpu(), rhs.convertToGpu(), wrapped.dataType)
case _ =>
GpuCheckOverflow(childExprs.head.convertToGpu(),
wrapped.dataType, wrapped.nullOnOverflow)
}
}
}),
expr[ToDegrees](
"Converts radians to degrees",
Expand Down Expand Up @@ -2002,46 +2050,15 @@ object GpuOverrides extends Logging {
("rhs", TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL)),
(a, conf, p, r) => new BinaryExprMeta[Divide](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
// Division of Decimal types is a little odd. Spark will cast the inputs
// to a common wider value where scale is max of the two input scales, and precision is
// max of the two input non-scale portions + the new scale. Then it will do the divide,
// which the rules for it are a little complex, but lie about it
// in the return type of the Divide operator. Then in CheckOverflow it will reset the
// scale and check the precision so that they know it fits in final desired result.
// We would like to avoid all of this if possible because having a temporary intermediate
// value that can have a scale quite a bit larger than the final result reduces the
// maximum precision that we could support, as we don't have unlimited precision. But
// sadly because of how the logical plan is compiled down to the physical plan we have
// lost what the original types were and cannot recover it. As such for now we are going
// to do what Spark does, but we have to recompute/recheck the temporary precision to be
// sure it will fit on the GPU. In addition to this we have it a little harder because
// the decimal divide itself will do rounding on the result before it is returned,
// effectively calculating an extra digit of precision. Because cudf does not support this
// right now we actually increase the scale (and corresponding precision) to get an extra
// decimal place so we can round it in GpuCheckOverflow
val Seq(leftDataType, rightDataType) = childExprs.flatMap(_.typeMeta.dataType)
(leftDataType, rightDataType) match {
case (l: DecimalType, r: DecimalType) =>
val outputScale = GpuDivideUtil.outputDecimalScale(l, r)
val outputPrecision = GpuDivideUtil.outputDecimalPrecision(l, r, outputScale)
if (outputPrecision > DType.DECIMAL128_MAX_PRECISION) {
willNotWorkOnGpu("The final output precision of the divide is too " +
s"large to be supported on the GPU $outputPrecision")
}
val intermediatePrecision =
GpuDivideUtil.intermediateDecimalPrecision(l, r, outputScale)

if (intermediatePrecision > DType.DECIMAL128_MAX_PRECISION) {
willNotWorkOnGpu("The intermediate output precision of the divide is too " +
s"large to be supported on the GPU $intermediatePrecision")
}
case _ => // NOOP
}
}

// Division of Decimal types is a little odd. To work around some issues with
// what Spark does the tagging/checks are in CheckOverflow instead of here.
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuDivide(lhs, rhs)
a.dataType match {
case _: DecimalType =>
throw new IllegalStateException("Decimal Divide should be converted in CheckOverflow")
case _ =>
GpuDivide(lhs, rhs)
}
}),
expr[IntegralDivide](
"Division with a integer result",
Expand Down
Loading