diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index 9168873c317..7db790149d4 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -151,13 +151,15 @@ def test_division_fallback_on_decimal(data_gen): f.col('a') / f.col('b')), 'Divide') -@pytest.mark.parametrize('lhs', [DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2)], ids=idfn) -@pytest.mark.parametrize('rhs', [DecimalGen(4, 1)], ids=idfn) +@approximate_float # we should get the perfectly correct answer for floats except when casting a deciml to a float in some corner cases. +@pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, float_gen, DecimalGen(4, 1), DecimalGen(5, 0), DecimalGen(5, 1), DecimalGen(10, 5)], ids=idfn) +@pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, float_gen, DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2), DecimalGen(16, 1)], ids=idfn) def test_division_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( f.col('a') / f.col('b')), - conf=allow_negative_scale_of_decimal_conf) + conf=copy_and_update(allow_negative_scale_of_decimal_conf, + {'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})) @pytest.mark.parametrize('data_gen', integral_gens + [decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit, decimal_gen_18_3, decimal_gen_30_2, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala index 1ff9e0825d5..3025808a239 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala @@ -213,4 +213,16 @@ object DecimalUtil extends Arm { case t => t.defaultSize } } + + + /** + * Get the number of decimal places needed to hold the integral type held by this column + */ + def getPrecisionForIntegralType(input: DType): Int = input match { + case DType.INT8 => 3 // -128 to 127 + case DType.INT16 => 5 // -32768 to 32767 + case DType.INT32 => 10 // -2147483648 to 2147483647 + case DType.INT64 => 19 // -9223372036854775808 to 9223372036854775807 + case t => throw new IllegalArgumentException(s"Unsupported type $t") + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 403a8cf71aa..d07e32e7017 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -1116,22 +1116,11 @@ object GpuCast extends Arm { } } - /** - * Get the number of decimal places needed to hold the integral type held by this column - */ - private def getPrecisionForIntegralInput(input: ColumnView): Int = input.getType match { - case DType.INT8 => 3 // -128 to 127 - case DType.INT16 => 5 // -32768 to 32767 - case DType.INT32 => 10 // -2147483648 to 2147483647 - case DType.INT64 => 19 // -9223372036854775808 to 9223372036854775807 - case t => throw new IllegalArgumentException(s"Unsupported type $t") - } - private def castIntegralsToDecimal( input: ColumnView, dt: DecimalType, ansiMode: Boolean): ColumnVector = { - val prec = getPrecisionForIntegralInput(input) + val prec = DecimalUtil.getPrecisionForIntegralType(input.getType) // Cast input to decimal val inputDecimalType = new DecimalType(prec, 0) withResource(input.castTo(DecimalUtil.createCudfDecimal(prec, 0))) { castedInput => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a75374c286a..62d49b03fe5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -912,9 +912,57 @@ object GpuOverrides extends Logging { "CheckOverflow after arithmetic operations between DecimalType data", ExprChecks.unaryProjectInputMatchesOutput(TypeSig.DECIMAL_128_FULL, TypeSig.DECIMAL_128_FULL), - (a, conf, p, r) => new UnaryExprMeta[CheckOverflow](a, conf, p, r) { - override def convertToGpu(child: Expression): GpuExpression = - GpuCheckOverflow(child, wrapped.dataType, wrapped.nullOnOverflow) + (a, conf, p, r) => new ExprMeta[CheckOverflow](a, conf, p, r) { + private[this] def extractOrigParam(expr: BaseExprMeta[_]): BaseExprMeta[_] = + expr.wrapped match { + case PromotePrecision(_: Cast) => + // Strip out the promote precision and the cast so we get as close to the original + // values as we can. + val castExpr = expr.childExprs.head + castExpr.childExprs.head + case _ => expr + } + private[this] lazy val binExpr = childExprs.head + private[this] lazy val lhs = extractOrigParam(binExpr.childExprs.head) + private[this] lazy val rhs = extractOrigParam(binExpr.childExprs(1)) + + override def tagExprForGpu(): Unit = { + a.child match { + case _: Divide => + // Division of Decimal types is a little odd. Spark will cast the inputs + // to a common wider value where the scale is the max of the two input scales, and + // the precision is max of the two input non-scale portions + the new scale. Then it + // will do the divide as a BigDecimal value but lie about the return type. Then here + // in CheckOverflow it will reset the scale and check the precision so that they know + // it fits in final desired result. + // Here we try to strip out the extra casts, etc to get to as close to the original + // query as possible. This lets us then calculate what CUDF needs to get the correct + // answer, which in some cases is a lot smaller. Our GpuDecimalDivide handles the + // overflow checking/etc. + val l = GpuDecimalDivide.asDecimalType(lhs.wrapped.asInstanceOf[Expression].dataType) + val r = GpuDecimalDivide.asDecimalType(rhs.wrapped.asInstanceOf[Expression].dataType) + val intermediatePrecision = + GpuDecimalDivide.nonRoundedIntermediateArgPrecision(l, r, a.dataType) + + if (intermediatePrecision > DType.DECIMAL128_MAX_PRECISION) { + binExpr.willNotWorkOnGpu(s"The intermediate precision of $intermediatePrecision " + + s"that is required to guarnatee no overflow issues for this divide is too " + + s"large to be supported on the GPU") + } + case _ => // NOOP + } + } + + override def convertToGpu(): GpuExpression = { + a.child match { + case _: Divide => + // Get as close to the original divide as possible + GpuDecimalDivide(lhs.convertToGpu(), rhs.convertToGpu(), wrapped.dataType) + case _ => + GpuCheckOverflow(childExprs.head.convertToGpu(), + wrapped.dataType, wrapped.nullOnOverflow) + } + } }), expr[ToDegrees]( "Converts radians to degrees", @@ -2002,46 +2050,15 @@ object GpuOverrides extends Logging { ("rhs", TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL)), (a, conf, p, r) => new BinaryExprMeta[Divide](a, conf, p, r) { - override def tagExprForGpu(): Unit = { - // Division of Decimal types is a little odd. Spark will cast the inputs - // to a common wider value where scale is max of the two input scales, and precision is - // max of the two input non-scale portions + the new scale. Then it will do the divide, - // which the rules for it are a little complex, but lie about it - // in the return type of the Divide operator. Then in CheckOverflow it will reset the - // scale and check the precision so that they know it fits in final desired result. - // We would like to avoid all of this if possible because having a temporary intermediate - // value that can have a scale quite a bit larger than the final result reduces the - // maximum precision that we could support, as we don't have unlimited precision. But - // sadly because of how the logical plan is compiled down to the physical plan we have - // lost what the original types were and cannot recover it. As such for now we are going - // to do what Spark does, but we have to recompute/recheck the temporary precision to be - // sure it will fit on the GPU. In addition to this we have it a little harder because - // the decimal divide itself will do rounding on the result before it is returned, - // effectively calculating an extra digit of precision. Because cudf does not support this - // right now we actually increase the scale (and corresponding precision) to get an extra - // decimal place so we can round it in GpuCheckOverflow - val Seq(leftDataType, rightDataType) = childExprs.flatMap(_.typeMeta.dataType) - (leftDataType, rightDataType) match { - case (l: DecimalType, r: DecimalType) => - val outputScale = GpuDivideUtil.outputDecimalScale(l, r) - val outputPrecision = GpuDivideUtil.outputDecimalPrecision(l, r, outputScale) - if (outputPrecision > DType.DECIMAL128_MAX_PRECISION) { - willNotWorkOnGpu("The final output precision of the divide is too " + - s"large to be supported on the GPU $outputPrecision") - } - val intermediatePrecision = - GpuDivideUtil.intermediateDecimalPrecision(l, r, outputScale) - - if (intermediatePrecision > DType.DECIMAL128_MAX_PRECISION) { - willNotWorkOnGpu("The intermediate output precision of the divide is too " + - s"large to be supported on the GPU $intermediatePrecision") - } - case _ => // NOOP - } - } - + // Division of Decimal types is a little odd. To work around some issues with + // what Spark does the tagging/checks are in CheckOverflow instead of here. override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = - GpuDivide(lhs, rhs) + a.dataType match { + case _: DecimalType => + throw new IllegalStateException("Decimal Divide should be converted in CheckOverflow") + case _ => + GpuDivide(lhs, rhs) + } }), expr[IntegralDivide]( "Division with a integer result", diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala index 9b8395d27fc..c9c2366d33b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala @@ -438,18 +438,18 @@ case class GpuMultiply( } object GpuDivModLike extends Arm { - def replaceZeroWithNull(v: GpuColumnVector): ColumnVector = { + def replaceZeroWithNull(v: ColumnVector): ColumnVector = { var zeroScalar: Scalar = null var nullScalar: Scalar = null var zeroVec: ColumnVector = null var nullVec: ColumnVector = null try { - val dtype = v.getBase.getType + val dtype = v.getType zeroScalar = makeZeroScalar(dtype) nullScalar = Scalar.fromNull(dtype) zeroVec = ColumnVector.fromScalar(zeroScalar, 1) nullVec = ColumnVector.fromScalar(nullScalar, 1) - v.getBase.findAndReplaceAll(zeroVec, nullVec) + v.findAndReplaceAll(zeroVec, nullVec) } finally { if (zeroScalar != null) { zeroScalar.close() @@ -547,6 +547,14 @@ object GpuDivModLike extends Arm { } } } + + def divByZeroError(): Nothing = { + throw new ArithmeticException("divide by zero") + } + + def divOverflowError(): Nothing = { + throw new ArithmeticException("Overflow in integral divide.") + } } trait GpuDivModLike extends CudfBinaryArithmetic { @@ -560,14 +568,6 @@ trait GpuDivModLike extends CudfBinaryArithmetic { import GpuDivModLike._ - private def divByZeroError(): Nothing = { - throw new ArithmeticException("divide by zero") - } - - private def divOverflowError(): Nothing = { - throw new ArithmeticException("Overflow in integral divide.") - } - override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = { if (failOnError) { withResource(makeZeroScalar(rhs.getBase.getType)) { zeroScalar => @@ -583,7 +583,7 @@ trait GpuDivModLike extends CudfBinaryArithmetic { if (checkDivideOverflow && isDivOverflow(lhs, rhs)) { divOverflowError() } - withResource(replaceZeroWithNull(rhs)) { replaced => + withResource(replaceZeroWithNull(rhs.getBase)) { replaced => super.doColumnar(lhs, GpuColumnVector.from(replaced, rhs.dataType)) } } @@ -593,7 +593,7 @@ trait GpuDivModLike extends CudfBinaryArithmetic { if (checkDivideOverflow && isDivOverflow(lhs, rhs)) { divOverflowError() } - withResource(replaceZeroWithNull(rhs)) { replaced => + withResource(replaceZeroWithNull(rhs.getBase)) { replaced => super.doColumnar(lhs, GpuColumnVector.from(replaced, rhs.dataType)) } } @@ -616,53 +616,156 @@ trait GpuDivModLike extends CudfBinaryArithmetic { } } -object GpuDivideUtil { +/** + * A version of Divide specifically for DecimalType that does not force the left and right to be + * the same type. This lets us calculate the correct result on a wider range of values without + * the need for unbounded precision in the processing. + */ +case class GpuDecimalDivide( + left: Expression, + right: Expression, + dataType: DecimalType, + failOnError: Boolean = ShimLoader.getSparkShims.shouldFailDivByZero()) extends + ShimExpression with GpuExpression { + + override def toString: String = s"($left / $right)" + + override def sql: String = s"(${left.sql} / ${right.sql})" + + private[this] lazy val lhsType: DecimalType = GpuDecimalDivide.asDecimalType(left.dataType) + private[this] lazy val rhsType: DecimalType = GpuDecimalDivide.asDecimalType(right.dataType) + // This is the type that the LHS will be cast to. The precision will match the precision of + // the intermediate rhs (to make CUDF happy doing the divide), but the scale will be shifted + // enough so CUDF produces the desired output scale + private[this] lazy val intermediateLhsType = + GpuDecimalDivide.intermediateLhsType(lhsType, rhsType, dataType) + // This is the type that the RHS will be cast to. The precision will match the precision of the + // intermediate lhs (to make CUDF happy doing the divide), but the scale will be the same + // as the input RHS scale. + private[this] lazy val intermediateRhsType = + GpuDecimalDivide.intermediateRhsType(lhsType, rhsType, dataType) + + // This is the data type that CUDF will return as the output of the divide. It should be + // very close to outputType, but with the scale increased by 1 so that we can round the result + // and produce the same answer as Spark. + private[this] lazy val intermediateResultType = + GpuDecimalDivide.intermediateResultType(dataType) + + private[this] def divByZeroFixes(rhs: ColumnVector): ColumnVector = { + if (failOnError) { + withResource(GpuDivModLike.makeZeroScalar(rhs.getType)) { zeroScalar => + if (rhs.contains(zeroScalar)) { + GpuDivModLike.divByZeroError() + } + } + rhs.incRefCount() + } else { + GpuDivModLike.replaceZeroWithNull(rhs) + } + } + + override def columnarEval(batch: ColumnarBatch): Any = { + val castLhs = withResource(GpuExpressionsUtils.columnarEvalToColumn(left, batch)) { lhs => + GpuCast.doCast(lhs.getBase, lhs.dataType(), intermediateLhsType, ansiMode = failOnError, + legacyCastToString = false, stringToDateAnsiModeEnabled = false) + } + val ret = withResource(castLhs) { castLhs => + val castRhs = withResource(GpuExpressionsUtils.columnarEvalToColumn(right, batch)) { rhs => + withResource(divByZeroFixes(rhs.getBase)) { fixed => + GpuCast.doCast(fixed, rhs.dataType(), intermediateRhsType, ansiMode = failOnError, + legacyCastToString = false, stringToDateAnsiModeEnabled = false) + } + } + withResource(castRhs) { castRhs => + castLhs.div(castRhs, GpuColumnVector.getNonNestedRapidsType(intermediateResultType)) + } + } + withResource(ret) { ret => + // Here we cast the output of CUDF to the final result. This will handle overflow checks + // to see if the divide is too large to fit in the expected type. This should never happen + // in the common case with us. It will also handle rounding the result to the final scale + // to match what Spark does. + GpuColumnVector.from(GpuCast.doCast(ret, intermediateResultType, dataType, + ansiMode = failOnError, legacyCastToString = false, stringToDateAnsiModeEnabled = false), + dataType) + } + } + + override def nullable: Boolean = true + + override def children: Seq[Expression] = Seq(left, right) +} + +object GpuDecimalDivide { // For Spark the final desired output is // new_scale = max(6, lhs.scale + rhs.precision + 1) // new_precision = lhs.precision - lhs.scale + rhs.scale + new_scale // But Spark will round the final result, so we need at least one more // decimal place on the scale to be able to do the rounding too. - // That rounding happens in `GpuCheckOverflow` - def outputDecimalScale(l: DecimalType, r: DecimalType): Int = - math.max(6, l.scale + r.precision + 1) + 1 + def asDecimalType(t: DataType): DecimalType = t match { + case dt: DecimalType => dt + case ByteType | ShortType | IntegerType | LongType => + val prec = DecimalUtil.getPrecisionForIntegralType(GpuColumnVector.getNonNestedRapidsType(t)) + DecimalType(prec, 0) + case _ => + throw new IllegalArgumentException( + s"Internal Error: type $t cannot automatically be cast to a supported DecimalType") + } - def outputDecimalPrecision(l: DecimalType, r: DecimalType, outputScale: Int): Int = - l.precision - l.scale + r.scale + outputScale + def lhsNeededScale(rhs: DecimalType, outputType: DecimalType): Int = + outputType.scale + rhs.scale + 1 - def outputDecimalType(l: DecimalType, r: DecimalType): DataType = { - val outputScale = outputDecimalScale(l, r) - DecimalType(outputDecimalPrecision(l, r, outputScale), outputScale) + def lhsNeededPrecision(lhs: DecimalType, rhs: DecimalType, outputType: DecimalType): Int = { + val neededLhsScale = lhsNeededScale(rhs, outputType) + (lhs.precision - lhs.scale) + neededLhsScale } - // In CUDF a divide's output is the same precision as the input, but the scale - // is lhs.scale - rhs.scale. + def nonRoundedIntermediateArgPrecision( + lhs: DecimalType, + rhs: DecimalType, + outputType: DecimalType): Int = { + val neededLhsPrecision = lhsNeededPrecision(lhs, rhs, outputType) + math.max(neededLhsPrecision, rhs.precision) + } - // Spark casts the inputs to the same wider type, but we do not - // know what the original lhs and rhs were. We need to make sure that we are going to provide - // enough information to CUDF without overflowing to get the desired output scale and - // precision based off of the inputs. - // - // To do this we get the output scale, and add it to the precision and scale for the - // LHS, as an intermediate value. The RHS intermediate just needs to make sure that it matches - // the same precision as the LHS so that CUDF is happy. + def intermediateArgPrecision(lhs: DecimalType, rhs: DecimalType, outputType: DecimalType): Int = + math.min( + nonRoundedIntermediateArgPrecision(lhs, rhs, outputType), + DType.DECIMAL128_MAX_PRECISION) - def intermediateDecimalScale(l: DecimalType, outputScale: Int): Int = l.scale + outputScale + def intermediateLhsType( + lhs: DecimalType, + rhs: DecimalType, + outputType: DecimalType): DecimalType = { + val precision = intermediateArgPrecision(lhs, rhs, outputType) + val scale = lhsNeededScale(rhs, outputType) + DecimalType(precision, scale) + } - def intermediateDecimalPrecision(l: DecimalType, r: DecimalType, outputScale: Int): Int = { - // In practice r.precision == l.precision, but we want to future proof it a bit. - math.max(l.precision + outputScale, r.precision) + def intermediateRhsType( + lhs: DecimalType, + rhs: DecimalType, + outputType: DecimalType): DecimalType = { + val precision = intermediateArgPrecision(lhs, rhs, outputType) + DecimalType(precision, rhs.scale) } - def intermediateDecimalType(l: DecimalType, r: DecimalType, outputScale: Int): DecimalType = - new DecimalType( - intermediateDecimalPrecision(l, r, outputScale), - intermediateDecimalScale(l, outputScale)) + def intermediateResultType(outputType: DecimalType): DecimalType = { + // If the user says that this will not overflow we will still + // try to do rounding for a correct answer, unless we cannot + // because it is already a scale of 38 + DecimalType( + math.min(outputType.precision + 1, DType.DECIMAL128_MAX_PRECISION), + math.min(outputType.scale + 1, DType.DECIMAL128_MAX_PRECISION)) + } } case class GpuDivide(left: Expression, right: Expression, failOnErrorOverride: Boolean = ShimLoader.getSparkShims.shouldFailDivByZero()) extends GpuDivModLike { + assert(!left.dataType.isInstanceOf[DecimalType], + "DecimalType divides need to be handled by GpuDecimalDivide") override lazy val failOnError: Boolean = failOnErrorOverride @@ -670,89 +773,9 @@ case class GpuDivide(left: Expression, right: Expression, override def symbol: String = "/" - override def binaryOp: BinaryOp = (left.dataType, right.dataType) match { - case (_: DecimalType, _: DecimalType) => BinaryOp.DIV - case _ => BinaryOp.TRUE_DIV - } - - // Override the output type as a special case for decimal - override def dataType: DataType = (left.dataType, right.dataType) match { - case (l: DecimalType, r: DecimalType) => GpuDivideUtil.outputDecimalType(l, r) - case _ => super.dataType - } - - override def outputTypeOverride: DType = - GpuColumnVector.getNonNestedRapidsType(dataType) - - @transient private[this] lazy val lhsDec = left.dataType.asInstanceOf[DecimalType] - @transient private[this] lazy val rhsDec = right.dataType.asInstanceOf[DecimalType] - @transient private[this] lazy val outputScale = - GpuDivideUtil.outputDecimalScale(lhsDec, rhsDec) - - @transient private[this] lazy val intermediateLhsType: DecimalType = - GpuDivideUtil.intermediateDecimalType(lhsDec, rhsDec, outputScale) - - @transient private[this] lazy val intermediateLhsCudfType = - GpuColumnVector.getNonNestedRapidsType(intermediateLhsType) + override def binaryOp: BinaryOp = BinaryOp.TRUE_DIV - @transient private[this] lazy val intermediateRhsType = - DecimalType(intermediateLhsType.precision, rhsDec.scale) - - @transient private[this] lazy val intermediateRhsCudfType = - GpuColumnVector.getNonNestedRapidsType(intermediateRhsType) - - override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = { - // LHS and RHS are the same general - if (left.dataType.isInstanceOf[DecimalType]) { - // This will always be an upcast for the LHS (because at a minimum need to shift it over) - withResource(lhs.getBase.castTo(intermediateLhsCudfType)) { upcastLhs => - val upcastRhs = if (right.dataType.equals(intermediateRhsType)) { - rhs.getBase.incRefCount() - } else { - rhs.getBase.castTo(intermediateRhsCudfType) - } - withResource(upcastRhs) { upcastRhs => - super.doColumnar(GpuColumnVector.from(upcastLhs, intermediateLhsType), - GpuColumnVector.from(upcastRhs, intermediateRhsType)) - } - } - } else { - super.doColumnar(lhs, rhs) - } - } - - override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { - // LHS and RHS are the same type - if (left.dataType.isInstanceOf[DecimalType]) { - // This will always be an upcast for the LHS (because at a minimum need to shift it over) - withResource(lhs.getBase.castTo(intermediateLhsCudfType)) { upcastLhs => - withResource(GpuScalar(rhs.getValue, intermediateRhsType)) { upcastRhs => - super.doColumnar(GpuColumnVector.from(upcastLhs, intermediateLhsType), upcastRhs) - } - } - } else { - super.doColumnar(lhs, rhs) - } - } - - override def doColumnar(lhs: GpuScalar, rhs: GpuColumnVector): ColumnVector = { - // LHS and RHS are the same type - if (left.dataType.isInstanceOf[DecimalType]) { - // This will always be an upcast for the LHS (because at a minimum need to shift it over) - withResource(GpuScalar(lhs.getValue, intermediateLhsType)) { upcastLhs => - val upcastRhs = if (right.dataType.equals(intermediateRhsType)) { - rhs.getBase.incRefCount() - } else { - rhs.getBase.castTo(intermediateRhsCudfType) - } - withResource(upcastRhs) { upcastRhs => - super.doColumnar(upcastLhs, GpuColumnVector.from(upcastRhs, intermediateRhsType)) - } - } - } else { - super.doColumnar(lhs, rhs) - } - } + override def outputTypeOverride: DType = GpuColumnVector.getNonNestedRapidsType(dataType) } case class GpuIntegralDivide(left: Expression, right: Expression) extends GpuDivModLike {