diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 23a74c78ab4..2baaff28f2e 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -200,8 +200,9 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase, lambda spark : readParquetCatchException(spark, data_path), conf=all_confs) - -@pytest.mark.parametrize('parquet_gens', [decimal_gens], ids=idfn) +@pytest.mark.parametrize('parquet_gens', [decimal_gens, + [ArrayGen(DecimalGen(7,2), max_length=10)], + [StructGen([['child0', DecimalGen(7, 2)]])]], ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala index d10563ee02d..929c76896c6 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala @@ -390,12 +390,12 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { table: Table, schema: StructType): ParquetBufferConsumer = { val buffer = new ParquetBufferConsumer(table.getRowCount.toInt) - val options = ParquetWriterOptions.builder() - .withDecimalPrecisions(GpuParquetFileFormat.getPrecisionList(schema):_*) + val builder = ParquetWriterOptions.builder() + .withDecimalPrecisions(GpuParquetFileFormat.getPrecisionList(schema): _*) .withStatisticsFrequency(StatisticsFrequency.ROWGROUP) .withTimestampInt96(false) - .build() - withResource(Table.writeParquetChunked(options, buffer)) { writer => + schema.fields.indices.foreach(index => builder.withColumnNames(s"_col$index")) + withResource(Table.writeParquetChunked(builder.build(), buffer)) { writer => writer.write(table) } buffer diff --git a/shims/spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala b/shims/spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala index ade9b30118a..04f35dead25 100644 --- a/shims/spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala +++ b/shims/spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.rapids.shims.spark311 -import com.nvidia.spark.rapids.GpuExec +import com.nvidia.spark.rapids.{GpuExec, GpuMetric} import com.nvidia.spark.rapids.shims.spark311.ParquetCachedBatchSerializer import org.apache.spark.rdd.RDD @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.vectorized.ColumnarBatch case class GpuInMemoryTableScanExec( @@ -54,7 +53,7 @@ case class GpuInMemoryTableScanExec( relation.cacheBuilder.serializer.vectorTypes(attributes, conf) private lazy val columnarInputRDD: RDD[ColumnarBatch] = { - val numOutputRows = longMetric("numOutputRows") + val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS) val buffers = filteredCachedBatches() relation.cacheBuilder.serializer.asInstanceOf[ParquetCachedBatchSerializer] .gpuConvertCachedBatchToColumnarBatch( diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 55b45d40d74..c2cda9e5ab1 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -106,7 +106,7 @@ private static String hexString(byte[] bytes) { public static synchronized void debug(String name, HostColumnVectorCore hostCol) { DType type = hostCol.getType(); System.err.println("COLUMN " + name + " - " + type); - if (type.getTypeId() == DType.DTypeEnum.DECIMAL64) { + if (type.isDecimalType()) { for (int i = 0; i < hostCol.getRowCount(); i++) { if (hostCol.isNull(i)) { System.err.println(i + " NULL"); @@ -472,8 +472,7 @@ private static DType toRapidsOrNull(DataType type) { if (dt.precision() > DType.DECIMAL64_MAX_PRECISION) { return null; } else { - // Map all DecimalType to DECIMAL64, in case of underlying DType transaction. - return DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale()); + return DecimalUtil.createCudfDecimal(dt.precision(), dt.scale()); } } return null; @@ -864,7 +863,6 @@ public static int[] toIntArray(ai.rapids.cudf.ColumnVector vec) { */ GpuColumnVector(DataType type, ai.rapids.cudf.ColumnVector cudfCv) { super(type); - // TODO need some checks to be sure everything matches this.cudfCv = cudfCv; } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java index 4a64f315eae..7d119adf37a 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java @@ -163,10 +163,16 @@ public final ColumnarMap getMap(int ordinal) { @Override public final Decimal getDecimal(int rowId, int precision, int scale) { assert precision <= DType.DECIMAL64_MAX_PRECISION : "Assert " + precision + " <= DECIMAL64_MAX_PRECISION(" + DType.DECIMAL64_MAX_PRECISION + ")"; - assert cudfCv.getType().getTypeId() == DType.DTypeEnum.DECIMAL64: "Assert DType to be DECIMAL64"; assert scale == -cudfCv.getType().getScale() : "Assert fetch decimal with its original scale " + scale + " expected " + (-cudfCv.getType().getScale()); - return Decimal.createUnsafe(cudfCv.getLong(rowId), precision, scale); + if (precision <= Decimal.MAX_INT_DIGITS()) { + assert cudfCv.getType().getTypeId() == DType.DTypeEnum.DECIMAL32 : "type should be DECIMAL32"; + return Decimal.createUnsafe(cudfCv.getInt(rowId), precision, scale); + } else { + assert cudfCv.getType().getTypeId() == DType.DTypeEnum.DECIMAL64 : "type should be DECIMAL64"; + return Decimal.createUnsafe(cudfCv.getLong(rowId), precision, scale); + } + } @Override diff --git a/sql-plugin/src/main/java/org/apache/spark/sql/catalyst/CudfUnsafeRow.java b/sql-plugin/src/main/java/org/apache/spark/sql/catalyst/CudfUnsafeRow.java index 08bc53053f5..b773ec848a1 100644 --- a/sql-plugin/src/main/java/org/apache/spark/sql/catalyst/CudfUnsafeRow.java +++ b/sql-plugin/src/main/java/org/apache/spark/sql/catalyst/CudfUnsafeRow.java @@ -238,8 +238,9 @@ public Decimal getDecimal(int ordinal, int precision, int scale) { if (isNullAt(ordinal)) { return null; } - // TODO when DECIMAL32 is supported a special case will need to be added here - if (precision <= Decimal.MAX_LONG_DIGITS()) { + if (precision <= Decimal.MAX_INT_DIGITS()) { + return Decimal.createUnsafe(getInt(ordinal), precision, scale); + } else if (precision <= Decimal.MAX_LONG_DIGITS()) { return Decimal.createUnsafe(getLong(ordinal), precision, scale); } else { throw new IllegalArgumentException("NOT IMPLEMENTED YET"); diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala new file mode 100644 index 00000000000..6da513b6cc7 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.DType + +import org.apache.spark.sql.types.{DataType, Decimal, DecimalType} + +object DecimalUtil { + + def createCudfDecimal(precision: Int, scale: Int): DType = { + if (precision <= Decimal.MAX_INT_DIGITS) { + DType.create(DType.DTypeEnum.DECIMAL32, -scale) + } else { + DType.create(DType.DTypeEnum.DECIMAL64, -scale) + } + } + + /** + * Return the size in bytes of the Fixed-width data types. + * WARNING: Do not use this method for variable-width data types + */ + private[rapids] def getDataTypeSize(dt: DataType): Int = { + dt match { + case d: DecimalType if d.precision <= Decimal.MAX_INT_DIGITS => 4 + case t => t.defaultSize + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 672c1ac2068..620bbc6de66 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -25,6 +25,7 @@ import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.{Cast, CastBase, Expression, NullIntolerant, TimeZoneAwareExpression} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** Meta-data for cast and ansi_cast. */ @@ -405,9 +406,7 @@ case class GpuCast( } case (from: DecimalType, to: DecimalType) => - withResource(input.copyToColumnVector()) { inputVector => - castDecimalToDecimal(inputVector, from, to) - } + castDecimalToDecimal(input.copyToColumnVector(), from, to) case (_: DecimalType, StringType) => input.castTo(DType.STRING) @@ -444,16 +443,18 @@ case class GpuCast( } withResource(minValue) { minValue => - throwIfAny(inclusiveMin match { - case true => values.lessThan(minValue) - case false => values.lessOrEqualTo(minValue) + throwIfAny(if (inclusiveMin) { + values.lessThan(minValue) + } else { + values.lessOrEqualTo(minValue) }) } withResource(maxValue) { maxValue => - throwIfAny(inclusiveMax match { - case true => values.greaterThan(maxValue) - case false => values.greaterOrEqualTo(maxValue) + throwIfAny(if (inclusiveMax) { + values.greaterThan(maxValue) + } else { + values.greaterOrEqualTo(maxValue) }) } } @@ -478,14 +479,16 @@ case class GpuCast( withResource(minValue) { minValue => withResource(maxValue) { maxValue => - val minPredicate = inclusiveMin match { - case true => values.lessThan(minValue) - case false => values.lessOrEqualTo(minValue) + val minPredicate = if (inclusiveMin) { + values.lessThan(minValue) + } else { + values.lessOrEqualTo(minValue) } withResource(minPredicate) { minPredicate => - val maxPredicate = inclusiveMax match { - case true => values.greaterThan(maxValue) - case false => values.greaterOrEqualTo(maxValue) + val maxPredicate = if (inclusiveMax) { + values.greaterThan(maxValue) + } else { + values.greaterOrEqualTo(maxValue) } withResource(maxPredicate) { maxPredicate => withResource(maxPredicate.or(minPredicate)) { rangePredicate => @@ -1127,17 +1130,17 @@ case class GpuCast( if (dt.scale < 0) { // Rounding is essential when scale is negative, // so we apply HALF_UP rounding manually to keep align with CpuCast. - withResource(checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, 0))) { + withResource(checked.castTo(DecimalUtil.createCudfDecimal(dt.precision, 0))) { scaleZero => scaleZero.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) } } else if (dt.scale > 0) { // Integer will be enlarged during casting if scale > 0, so we cast input to INT64 // before casting it to decimal in case of overflow. withResource(checked.castTo(DType.INT64)) { long => - long.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) + long.castTo(DecimalUtil.createCudfDecimal(dt.precision, dt.scale)) } } else { - checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) + checked.castTo(DecimalUtil.createCudfDecimal(dt.precision, dt.scale)) } } } @@ -1174,13 +1177,13 @@ case class GpuCast( } withResource(checkedInput) { checked => - val targetType = DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale) + val targetType = DecimalUtil.createCudfDecimal(dt.precision, dt.scale) // If target scale reaches DECIMAL64_MAX_PRECISION, container DECIMAL can not // be created because of precision overflow. In this case, we perform casting op directly. val casted = if (DType.DECIMAL64_MAX_PRECISION == dt.scale) { checked.castTo(targetType) } else { - val containerType = DType.create(DType.DTypeEnum.DECIMAL64, -(dt.scale + 1)) + val containerType = DecimalUtil.createCudfDecimal(dt.precision, (dt.scale + 1)) withResource(checked.castTo(containerType)) { container => container.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) } @@ -1200,42 +1203,118 @@ case class GpuCast( from: DecimalType, to: DecimalType): ColumnVector = { - // At first, we conduct overflow check onto input column. - // Then, we cast checked input into target decimal type. - val checkedInput = if (to.scale <= from.scale) { - // No need to promote precision unless target scale is larger than the source one, - // which indicates the cast is always valid when to.scale <= from.scale. - input.incRefCount() - } else { - // Check whether there exists overflow during promoting precision or not. - // We do NOT use `Scalar.fromDecimal(-to.scale, math.pow(10, 18).toLong)` here, because - // cuDF binaryOperation on decimal will rescale right input to fit the left one. - // The rescaling may lead to overflow. - val absBound = math.pow(10, DType.DECIMAL64_MAX_PRECISION + from.scale - to.scale).toLong - if (ansiMode) { - assertValuesInRange(input, - minValue = Scalar.fromDecimal(-from.scale, -absBound), - maxValue = Scalar.fromDecimal(-from.scale, absBound), - inclusiveMin = false, inclusiveMax = false) - input.incRefCount() + val isFrom32Bit = DecimalType.is32BitDecimalType(from) + val isTo32Bit = DecimalType.is32BitDecimalType(to) + val cudfDecimal = DecimalUtil.createCudfDecimal(to.precision, to.scale) + + def castCheckedDecimal(checkedInput: ColumnVector): ColumnVector = { + if (to.scale == from.scale) { + if (isFrom32Bit == isTo32Bit) { + checkedInput.incRefCount() + } else { + // the input is already checked, just cast it + checkedInput.castTo(cudfDecimal) + } + } else if (to.scale > from.scale) { + checkedInput.castTo(cudfDecimal) } else { - replaceOutOfRangeValues(input, - minValue = Scalar.fromDecimal(-from.scale, -absBound), - maxValue = Scalar.fromDecimal(-from.scale, absBound), - replaceValue = Scalar.fromNull(input.getType), - inclusiveMin = false, inclusiveMax = false) + withResource(checkedInput.round(to.scale, ai.rapids.cudf.RoundMode.HALF_UP)) { + rounded => rounded.castTo(cudfDecimal) + } } } - withResource(checkedInput) { checked => - to.scale - from.scale match { - case 0 => - checked.incRefCount() - case diff if diff > 0 => - checked.castTo(GpuColumnVector.getNonNestedRapidsType(to)) - case _ => - checked.round(to.scale, ai.rapids.cudf.RoundMode.HALF_UP) + if (to.scale <= from.scale) { + if (!isFrom32Bit && isTo32Bit) { + // check for overflow when 64bit => 32bit + withResource(checkForOverflow(input, from, to, isFrom32Bit)) { checkedInput => + castCheckedDecimal(checkedInput) + } + } else { + if (to.scale < 0 && !SQLConf.get.allowNegativeScaleOfDecimalEnabled) { + throw new IllegalStateException(s"Negative scale is not allowed: ${to.scale}. " + + s"You can use spark.sql.legacy.allowNegativeScaleOfDecimal=true " + + s"to enable legacy mode to allow it.") + } + castCheckedDecimal(input) + } + } else { + // from.scale > to.scale + withResource(checkForOverflow(input, from, to, isFrom32Bit)) { checkedInput => + castCheckedDecimal(checkedInput) } } } + + def checkForOverflow( + input: ColumnVector, + from: DecimalType, + to: DecimalType, + isFrom32Bit: Boolean): ColumnVector = { + + // Decimal numbers in general terms have two parts, a part before decimal (whole number) + // and a part after decimal (fractional number) + // When moving from a smaller scale to a bigger scale (or 32-bit to 64-bit), the target type is + // able to hold much more values on the fractional side which leaves less room for the whole + // number. In the following examples we have kept the precision constant to keep it simple. + // + // Ex: + // 999999.999 => from.scale = 3 + // 9999.99999 => to.scale = 5 + // + // In the above example the source can have a maximum of 4 digits for the whole number and + // 3 digits for fractional side. We are not worried about the fractional side as the target can + // hold more digits than the source can. What we need to make sure is the source + // doesn't have values that are bigger than the destination whole number side can hold. + // So we calculate the max number that should be in the input column before we can safely cast + // the values without overflowing. If we find values bigger, we handle it depending on if we + // are in ANSI mode or not. + // + // When moving from a bigger scale to a smaller scale (or 64-bit to 32-bit), the target type + // is able to have more digits on the whole number side but less on the fractional + // side. In this case all we need to do is round the value to the new scale. Only, in case we + // are moving from 64-bit to a 32-bit do we need to check for overflow + // + // Ex: + // 9999.99999 => from.scale = 5 + // 999999.999 => to.scale = 3 + // + // Here you can see the "to.scale" can hold less fractional values but more on the whole + // number side so overflow check is unnecessary when the bases are the same i.e. 32-bit to + // 32-bit and 64-bit to 64-bit. Only when we go from a 64-bit number to a 32-bit number in this + // case we need to check for overflow. + // + // Therefore the values of absMax and absMin will be calculated based on the absBoundPrecision + // value to make sure the source has values that don't exceed the upper and lower bounds + val absBoundPrecision = to.precision - to.scale + + // When we support 128 bit Decimals we should add a check for that + // if (isFrom32Bit && prec > Decimal.MAX_INT_DIGITS || + // !isFrom32Bit && prec > Decimal.MAX_LONG_DIGITS) + if (isFrom32Bit && absBoundPrecision > Decimal.MAX_INT_DIGITS) { + return input.incRefCount() + } + val (minValueScalar, maxValueScalar) = if (!isFrom32Bit) { + val absBound = math.pow(10, absBoundPrecision).toLong + (Scalar.fromDecimal(0, -absBound), Scalar.fromDecimal(0, absBound)) + } else { + val absBound = math.pow(10, absBoundPrecision).toInt + (Scalar.fromDecimal(0, -absBound), Scalar.fromDecimal(0, absBound)) + } + val checkedInput = if (ansiMode) { + assertValuesInRange(input, + minValue = minValueScalar, + maxValue = maxValueScalar, + inclusiveMin = false, inclusiveMax = false) + input.incRefCount() + } else { + replaceOutOfRangeValues(input, + minValue = minValueScalar, + maxValue = maxValueScalar, + replaceValue = Scalar.fromNull(input.getType), + inclusiveMin = false, inclusiveMax = false) + } + + checkedInput + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index bb9858f70d2..fd3aebbcb7b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -50,13 +50,13 @@ class AcceleratedColumnarToRowIterator( // for packMap the nth entry is the index of the original input column that we want at // the nth entry. - // TODO When we support DECIMAL32 we will need to add in a special case here - // because defaultSize of DecimalType does not take that into account. private val packMap: Array[Int] = schema - .zipWithIndex - .sortWith(_._1.dataType.defaultSize > _._1.dataType.defaultSize) - .map(_._2) - .toArray + .zipWithIndex + .sortWith { + (x, y) => + DecimalUtil.getDataTypeSize(x._1.dataType) > DecimalUtil.getDataTypeSize(y._1.dataType) + }.map(_._2) + .toArray // For unpackMap the nth entry is the index in the row that came back for the original private val unpackMap: Array[Int] = packMap .zipWithIndex diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala index 3eb9379ed54..64a80952f39 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala @@ -191,7 +191,7 @@ trait CudfBinaryExpression extends GpuBinaryExpression { def outputTypeOverride: DType = null def castOutputAtEnd: Boolean = false - def outputType(l: BinaryOperable, r: BinaryOperable) : DType = { + def outputType(l: BinaryOperable, r: BinaryOperable): DType = { val over = outputTypeOverride if (over == null) { BinaryOperable.implicitConversion(binaryOp, l, r) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala index 31c93290d43..0a9171b0556 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala @@ -86,10 +86,13 @@ case class GpuInSet( } case t: DecimalType => val decs = values.asInstanceOf[Seq[Decimal]] - // When we support DECIMAL32 this will need to change to support that - withResource(HostColumnVector.builder(DType.create(DType.DTypeEnum.DECIMAL64, - t.scale), - decs.size)) { builder => - decs.foreach(d => builder.appendUnscaledDecimal(d.toUnscaledLong)) + withResource(HostColumnVector.builder( + DecimalUtil.createCudfDecimal(t.precision, t.scale), decs.size)) { builder => + if (DecimalType.is32BitDecimalType(t)) { + decs.foreach(d => builder.appendUnscaledDecimal(d.toUnscaledLong.toInt)) + } else { + decs.foreach(d => builder.appendUnscaledDecimal(d.toUnscaledLong)) + } builder.buildAndPutOnDevice() } case _ => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 486b53dd257..d1a8c3a979e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.rapids.ColumnarWriteTaskStatsTracker import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DateType, DecimalType, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DateType, Decimal, DecimalType, StructType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch object GpuParquetFileFormat { @@ -277,7 +277,7 @@ class GpuParquetWriter( new GpuColumnVector(DataTypes.TimestampType, withResource(cv.getBase()) { v => v.castTo(DType.TIMESTAMP_MILLISECONDS) }) - case d: DecimalType if d.precision < 10 => + case d: DecimalType if d.precision <= Decimal.MAX_INT_DIGITS => // There is a bug in Spark that causes a problem if we write Decimals with // precision < 10 as Decimal64. // https://issues.apache.org/jira/browse/SPARK-34167 diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 1e39a544713..5d346edf9ec 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -19,16 +19,17 @@ package com.nvidia.spark.rapids import java.io.{File, OutputStream} import java.net.{URI, URISyntaxException} import java.nio.charset.StandardCharsets -import java.util.{Collections, Locale} +import java.util.{Collections, Locale, Optional} import java.util.concurrent._ import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.immutable.HashSet -import scala.collection.mutable.{ArrayBuffer, LinkedHashMap, Queue} +import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, LinkedHashMap, ListBuffer, Queue} import scala.math.max import ai.rapids.cudf._ +import ai.rapids.cudf.DType.DTypeEnum import com.google.common.util.concurrent.ThreadFactoryBuilder import com.nvidia.spark.RebaseHelper import com.nvidia.spark.rapids.GpuMetric._ @@ -44,7 +45,7 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} import org.apache.parquet.hadoop.metadata._ -import org.apache.parquet.schema.{GroupType, MessageType, Types} +import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type, Types} import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -61,10 +62,11 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.InputFileUtils import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{ArrayType, DataType, DateType, MapType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DataType, DateType, Decimal, DecimalType, MapType, StringType, StructType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration + /** * Base GpuParquetScan used for common code across Spark versions. Gpu version of * Spark's 'ParquetScan'. @@ -197,33 +199,6 @@ object GpuParquetScanBase { meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode") } } - - private[rapids] def convertDecimal32Columns(t: Table): Table = { - val containDecimal32Column = (0 until t.getNumberOfColumns).exists { i => - t.getColumn(i).getType.getTypeId == DType.DTypeEnum.DECIMAL32 - } - // return input table if there exists no DECIMAL32 columns - if (!containDecimal32Column) return t - - val columns = new Array[ColumnVector](t.getNumberOfColumns) - try { - RebaseHelper.withResource(t) { _ => - (0 until t.getNumberOfColumns).foreach { i => - t.getColumn(i).getType match { - case tpe if tpe.getTypeId == DType.DTypeEnum.DECIMAL32 => - columns(i) = t.getColumn(i).castTo( - DType.create(DType.DTypeEnum.DECIMAL64, tpe.getScale)) - case _ => - columns(i) = t.getColumn(i).incRefCount() - } - } - } - new Table(columns: _*) - } finally { - // clean temporary column vectors - columns.safeClose() - } - } } /** @@ -680,25 +655,44 @@ abstract class FileParquetPartitionReaderBase( } } + def getPrecisionsList(fields: Seq[Type]): Seq[Int] = { + fields.filter(field => field.getOriginalType == OriginalType.DECIMAL || !field.isPrimitive()) + .flatMap { field => + if (!field.isPrimitive) { + getPrecisionsList(field.asGroupType().getFields.asScala) + } else { + Seq(field.asPrimitiveType().getDecimalMetadata.getPrecision) + } + } + } + protected def evolveSchemaIfNeededAndClose( inputTable: Table, filePath: String, clippedSchema: MessageType): Table = { - // Convert Decimal32 columns to Decimal64, because spark-rapids only supports Decimal64. - val inTable = GpuParquetScanBase.convertDecimal32Columns(inputTable) - if (readDataSchema.length > inTable.getNumberOfColumns) { + val precisions = getPrecisionsList(clippedSchema.asGroupType().getFields.asScala) + // check if there are cols with precision that can be stored in an int + val typeCastingNeeded = precisions.filter(p => p <= Decimal.MAX_INT_DIGITS).nonEmpty + if (readDataSchema.length > inputTable.getNumberOfColumns || typeCastingNeeded) { // Spark+Parquet schema evolution is relatively simple with only adding/removing columns // To type casting or anyting like that val clippedGroups = clippedSchema.asGroupType() val newColumns = new Array[ColumnVector](readDataSchema.length) + val precisionList = scala.collection.mutable.Queue(precisions: _*) try { - withResource(inTable) { table => + withResource(inputTable) { table => var readAt = 0 (0 until readDataSchema.length).foreach(writeAt => { val readField = readDataSchema(writeAt) if (areNamesEquiv(clippedGroups, readAt, readField.name, isSchemaCaseSensitive)) { - newColumns(writeAt) = table.getColumn(readAt).incRefCount() + val origCol = table.getColumn(readAt) + val col = if (typeCastingNeeded && precisionList.nonEmpty) { + convertDecimal64ToDecimal32Wrapper(origCol, precisionList.dequeue()) + } else { + origCol.incRefCount() + } + newColumns(writeAt) = col readAt += 1 } else { withResource(GpuScalar.from(null, readField.dataType)) { n => @@ -716,7 +710,83 @@ abstract class FileParquetPartitionReaderBase( newColumns.safeClose() } } else { - inTable + inputTable + } + } + + /** + * This method casts the input ColumnView to a new column if it contains Decimal data that + * could be stored in smaller data type. e.g. a DecimalType(7,2) stored as 64-bit DecimalType can + * easily be stored in a 32-bit DecimalType + * @param cv The column view that could potentially have Decimal64 columns with + * precision < 10 + * @param precision precisions of all the decimal columns in this Column + * @return + */ + private def convertDecimal64ToDecimal32Wrapper(cv: ColumnVector, precision: Int): ColumnVector = { + def convertDecimal64ToDecimal32( + cv: ColumnView, + precision: Int, + newCols: ArrayBuilder[ColumnView]): ColumnView = { + val dt = cv.getType + if (!dt.isNestedType) { + if (dt.getTypeId == DTypeEnum.DECIMAL64 && precision <= DType.DECIMAL32_MAX_PRECISION) { + // we want to handle the legacy case where Decimals are written as an array of bytes + // cudf reads them back as a 64-bit Decimal + // castTo will create a new ColumnVector which we cannot close until we have copied out + // the entire nested type. So we store them temporarily in this array and close it after + // `copyToColumnVector` is called + val col = cv.castTo(DecimalUtil.createCudfDecimal(precision, -dt.getScale())) + newCols += col + col + } else { + cv + } + } else if (dt == DType.LIST) { + val child = cv.getChildColumnView(0) + val newChild = convertDecimal64ToDecimal32(child, precision, newCols) + if (child == newChild) { + cv + } else { + cv.replaceListChild(newChild) + } + } else if (dt == DType.STRUCT) { + val newColumns = ArrayBuilder.make[ColumnView]() + newColumns.sizeHint(cv.getNumChildren) + val newColIndices = ArrayBuilder.make[Int]() + newColIndices.sizeHint(cv.getNumChildren) + (0 until cv.getNumChildren).foreach { i => + val child = cv.getChildColumnView(i) + val newChild = convertDecimal64ToDecimal32(child, precision, newCols) + if (newChild != child) { + newColumns += newChild + newColIndices += i + } + } + val cols = newColumns.result() + if (cols.nonEmpty) { + // create a new struct column with the new ones + cv.replaceChildrenWithViews(newColIndices.result(), cols) + } else { + cv + } + } else { + throw new IllegalArgumentException("Unknown data type") + } + } + + val newColumns = ArrayBuilder.make[ColumnView]() + val tmp = convertDecimal64ToDecimal32(cv, precision, newColumns) + if (tmp != cv) { + val result = withResource(newColumns.result()) { _ => + tmp.copyToColumnVector() + } + if (tmp.getType.isNestedType) { + tmp.close() + } + result + } else { + tmp.asInstanceOf[ColumnVector].incRefCount() } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index cff73275ee4..09de317bae0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -526,13 +526,23 @@ private object GpuRowToColumnConverter { row: SpecializedGetters, column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { - // Because DECIMAL64 is the only supported decimal DType, we can - // append unscaledLongValue instead of BigDecimal itself to speedup this conversion. - builder.append(row.getDecimal(column, precision, scale).toUnscaledLong) - 8 + builder.append(row.getDecimal(column, precision, scale).toJavaBigDecimal) + // We are basing our DType.DECIMAL on precision in GpuColumnVector#toRapidsOrNull so we can + // safely assume the underlying vector is Int if precision < 10 otherwise Long + if (precision <= Decimal.MAX_INT_DIGITS) { + 4 + } else { + 8 + } } - override def getNullSize: Double = 8 + VALIDITY + override def getNullSize: Double = { + if (precision <= Decimal.MAX_INT_DIGITS) { + 4 + } else { + 8 + } + VALIDITY + } } } @@ -653,8 +663,7 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { val attr = pair._1 val colIndex = pair._2 // This only works on fixed width types - // TODO once we support DECIMAL32 we will need a special case in here for it. - val length = attr.dataType.defaultSize + val length = DecimalUtil.getDataTypeSize(attr.dataType) cudfOffset = CudfUnsafeRow.alignOffset(cudfOffset, length) val ret = length match { case 1 => s"Platform.putByte(null, startAddress + $cudfOffset, Platform.getByte($rowBaseObj, $rowBaseOffset + ${sparkValidityOffset + (colIndex * 8)}));" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index e0a61082691..a33f04f749c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -211,22 +211,28 @@ object HostColumnarToGpu extends Logging { b.appendNull() } case (dt: DecimalType, nullable) => - // Because DECIMAL64 is the only supported decimal DType, we can - // append unscaledLongValue instead of BigDecimal itself to speedup this conversion. - // If we know that the value is WritableColumnVector we could - // speed this up even more by getting the unscaled long or int directly. if (nullable) { for (i <- 0 until rows) { if (cv.isNullAt(i)) { b.appendNull() } else { // The precision here matters for cpu column vectors (such as OnHeapColumnVector). - b.append(cv.getDecimal(i, dt.precision, dt.scale).toUnscaledLong) + if (DecimalType.is32BitDecimalType(dt)) { + b.append(cv.getDecimal(i, dt.precision, dt.scale).toUnscaledLong.toInt) + } else { + b.append(cv.getDecimal(i, dt.precision, dt.scale).toUnscaledLong) + } } } } else { - for (i <- 0 until rows) { - b.append(cv.getDecimal(i, dt.precision, dt.scale).toUnscaledLong) + if (DecimalType.is32BitDecimalType(dt)) { + for (i <- 0 until rows) { + b.append(cv.getDecimal(i, dt.precision, dt.scale).toUnscaledLong.toInt) + } + } else { + for (i <- 0 until rows) { + b.append(cv.getDecimal(i, dt.precision, dt.scale).toUnscaledLong) + } } } case (t, _) => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/decimalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/decimalExpressions.scala index aeda82eff93..65bebb22516 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/decimalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/decimalExpressions.scala @@ -56,8 +56,14 @@ case class GpuUnscaledValue(child: Expression) extends GpuUnaryExpression { override def toString: String = s"UnscaledValue($child)" override protected def doColumnar(input: GpuColumnVector): ColumnVector = { - withResource(input.getBase.bitCastTo(DType.INT64)) { view => - view.copyToColumnVector() + if (input.getBase.getType.isBackedByInt) { + withResource(input.getBase.bitCastTo(DType.INT32)) { int32View => + int32View.castTo(DType.INT64) + } + } else { + withResource(input.getBase.bitCastTo(DType.INT64)) { view => + view.copyToColumnVector() + } } } } @@ -78,7 +84,7 @@ case class GpuMakeDecimal( override protected def doColumnar(input: GpuColumnVector): ColumnVector = { val base = input.getBase - val outputType = DType.create(DType.DTypeEnum.DECIMAL64, cudfScale) + val outputType = DecimalUtil.createCudfDecimal(precision, sparkScale) if (nullOnOverflow) { val overflowed = withResource(Scalar.fromLong(maxValue)) { limit => base.greaterThan(limit) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/implicits.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/implicits.scala index 96cdc3ff4ab..e470b381da4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/implicits.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/implicits.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids +import ai.rapids.cudf.{ColumnVector, ColumnView, DType} import scala.collection.{mutable, SeqLike} import scala.collection.generic.CanBuildFrom import scala.reflect.ClassTag diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala index c03e6f0b01c..79c8b54bdd0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala @@ -66,7 +66,7 @@ object GpuScalar { case DType.TIMESTAMP_DAYS => v.getInt case DType.TIMESTAMP_MICROSECONDS => v.getLong case DType.STRING => v.getJavaString - case dt: DType if dt.isDecimalType && dt.isBackedByLong => Decimal(v.getBigDecimal) + case dt: DType if dt.isDecimalType => Decimal(v.getBigDecimal) case t => throw new IllegalStateException(s"$t is not a supported rapids scalar type yet") } @@ -90,9 +90,12 @@ object GpuScalar { case b: Boolean => Scalar.fromBool(b) case s: String => Scalar.fromString(s) case s: UTF8String => Scalar.fromString(s.toString) - // make sure all scalars created are backed by DECIMAL64 case dec: Decimal => - Scalar.fromDecimal(-dec.scale, dec.toUnscaledLong) + if (dec.precision <= Decimal.MAX_INT_DIGITS) { + Scalar.fromDecimal(-dec.scale, dec.toUnscaledLong.toInt) + } else { + Scalar.fromDecimal(-dec.scale, dec.toUnscaledLong) + } case dec: BigDecimal => Scalar.fromDecimal(-dec.scale, dec.bigDecimal.unscaledValue().longValueExact()) case _ => @@ -117,8 +120,11 @@ object GpuScalar { if (bigDec.precision() > t.asInstanceOf[DecimalType].precision) { throw new IllegalArgumentException(s"BigDecimal $bigDec exceeds precision constraint of $t") } - // make sure all scalars created are backed by DECIMAL64 - Scalar.fromDecimal(-bigDec.scale(), bigDec.unscaledValue().longValueExact()) + if (!DecimalType.is32BitDecimalType(t.asInstanceOf[DecimalType])) { + Scalar.fromDecimal(-bigDec.scale(), bigDec.unscaledValue().longValue()) + } else { + Scalar.fromDecimal(-bigDec.scale(), bigDec.unscaledValue().intValue()) + } case l: Long => t match { case LongType => Scalar.fromLong(l) case TimestampType => Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, l) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala index 62de9d304b2..b914b9378c2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf._ import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.DecimalUtil.createCudfDecimal import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} @@ -40,8 +41,14 @@ case class GpuUnaryMinus(child: Expression) extends GpuUnaryExpression dataType match { case dt: DecimalType => val scale = dt.scale - withResource(Scalar.fromDecimal(-scale, 0L)) { scalar => - scalar.sub(input.getBase) + if (DecimalType.is32BitDecimalType(dt)) { + withResource(Scalar.fromDecimal(-scale, 0)) { scalar => + scalar.sub(input.getBase) + } + } else { + withResource(Scalar.fromDecimal(-scale, 0L)) { scalar => + scalar.sub(input.getBase) + } } case _ => withResource(Scalar.fromByte(0.toByte)) { scalar => @@ -118,6 +125,89 @@ case class GpuMultiply( case (l: DecimalType, r: DecimalType) => GpuMultiplyUtil.decimalDataType(l, r) case _ => super.dataType } + + override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = { + import DecimalUtil._ + (left.dataType, right.dataType) match { + case (l: DecimalType, r: DecimalType) + if !DecimalType.is32BitDecimalType(dataType) && + DecimalType.is32BitDecimalType(l) && + DecimalType.is32BitDecimalType(r) => { + // we are casting to the smallest 64-bit decimal so the answer doesn't overflow + val decimalType = createCudfDecimal(10, Math.max(l.scale, r.scale)) + val cudfOutputType = GpuColumnVector.getNonNestedRapidsType(dataType) + withResource(lhs.getBase.castTo(decimalType)) { decimalLhs => + withResource(rhs.getBase.castTo(decimalType)) { decimalRhs => + val tmp = decimalLhs.mul(decimalRhs, cudfOutputType) + if (tmp.getType != cudfOutputType) { + withResource(tmp) { tmp => + tmp.castTo(cudfOutputType) + } + } else { + tmp + } + } + } + } + case _ => super.doColumnar(lhs, rhs) + } + } + + override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { + (left.dataType, right.dataType) match { + case (l: DecimalType, r: DecimalType) + if !DecimalType.is32BitDecimalType(dataType) && + DecimalType.is32BitDecimalType(l) && + DecimalType.is32BitDecimalType(r) => { + // we are casting to the smallest 64-bit decimal so the answer doesn't overflow + val sparkDecimalType = DecimalType(10, Math.max(l .scale, r.scale)) + val decimalType = GpuColumnVector.getNonNestedRapidsType(sparkDecimalType) + val cudfOutputType = GpuColumnVector.getNonNestedRapidsType(dataType) + withResource(GpuScalar.from(rhs.getBigDecimal().intValue(), sparkDecimalType)) { + decimalRhs => + withResource(lhs.getBase.castTo(decimalType)) { decimalLhs => + val tmp = decimalLhs.mul(decimalRhs, cudfOutputType) + if (tmp.getType != cudfOutputType) { + withResource(tmp) { tmp => + tmp.castTo(cudfOutputType) + } + } else { + tmp + } + } + } + } + case _ => super.doColumnar(lhs, rhs) + } + } + + override def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector = { + (left.dataType, right.dataType) match { + case (l: DecimalType, r: DecimalType) + if !DecimalType.is32BitDecimalType(dataType) && + DecimalType.is32BitDecimalType(l) && + DecimalType.is32BitDecimalType(r) => { + // we are casting to the smallest 64-bit decimal so the answer doesn't overflow + val sparkDecimalType = DecimalType(10, Math.max(l .scale, r.scale)) + val decimalType = GpuColumnVector.getNonNestedRapidsType(sparkDecimalType) + val cudfOutputType = GpuColumnVector.getNonNestedRapidsType(dataType) + withResource(GpuScalar.from(lhs.getBigDecimal().intValue(), sparkDecimalType)) { + decimalLhs => + withResource(rhs.getBase.castTo(decimalType)) { decimalRhs => + val tmp = decimalLhs.mul(decimalRhs, cudfOutputType) + if (tmp.getType != cudfOutputType) { + withResource(tmp) { tmp => + tmp.castTo(cudfOutputType) + } + } else { + tmp + } + } + } + } + case _ => super.doColumnar(lhs, rhs) + } + } } object GpuDivModLike { @@ -200,14 +290,14 @@ trait GpuDivModLike extends CudfBinaryArithmetic { } } else { withResource(replaceZeroWithNull(rhs)) { replaced => - super.doColumnar(lhs, GpuColumnVector.from(replaced, right.dataType)) + super.doColumnar(lhs, GpuColumnVector.from(replaced, rhs.dataType)) } } } override def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector = { withResource(replaceZeroWithNull(rhs)) { replaced => - super.doColumnar(lhs, GpuColumnVector.from(replaced, right.dataType)) + super.doColumnar(lhs, GpuColumnVector.from(replaced, rhs.dataType)) } } @@ -269,9 +359,23 @@ case class GpuDivide(left: Expression, right: Expression) extends GpuDivModLike override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = { (left.dataType, right.dataType) match { case (_: DecimalType, r: DecimalType) => { + val (upcastedLhs, upcastedRhs) = if (!DecimalType.is32BitDecimalType(dataType) && + DecimalType.is32BitDecimalType(r)) { + // we are casting to the smallest 64-bit decimal so the answer doesn't exceed 64-bit + val decimalType = createCudfDecimal(10, r.scale) + (lhs.getBase.castTo(decimalType), rhs.getBase.castTo(decimalType)) + } else { + (lhs.getBase(), rhs.getBase()) + } val newType = getIntermediaryType(r) - withResource(lhs.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(newType))) { - modLhs => super.doColumnar(GpuColumnVector.from(modLhs, newType), rhs) + withResource(upcastedLhs) { upcastedLhs => + withResource(upcastedRhs) { upcastedRhs => + withResource(upcastedLhs.castTo(GpuColumnVector.getNonNestedRapidsType(newType))) { + modLhs => + super.doColumnar(GpuColumnVector.from(modLhs, newType), + GpuColumnVector.from(upcastedRhs, DecimalType(10, r.scale))) + } + } } } case _ => super.doColumnar(lhs, rhs) @@ -281,9 +385,23 @@ case class GpuDivide(left: Expression, right: Expression) extends GpuDivModLike override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { (left.dataType, right.dataType) match { case (_: DecimalType, r: DecimalType) => { + val (upcastedLhs, upcastedRhs) = if (!DecimalType.is32BitDecimalType(dataType) && + DecimalType.is32BitDecimalType(r)) { + // we are casting to the smallest 64-bit decimal so the answer doesn't overflow + val sparkDecimalType = DecimalType(10, r.scale) + val decimalType = GpuColumnVector.getNonNestedRapidsType(sparkDecimalType) + (lhs.getBase.castTo(decimalType), + GpuScalar.from(rhs.getBigDecimal().intValue().toLong, sparkDecimalType)) + } else { + (lhs.getBase(), rhs) + } val newType = getIntermediaryType(r) - withResource(lhs.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(newType))) { - modLhs => super.doColumnar(GpuColumnVector.from(modLhs, newType), rhs) + withResource(upcastedLhs) { upcastedLhs => + withResource(upcastedRhs) { upcastedRhs => + withResource(upcastedLhs.castTo(GpuColumnVector.getNonNestedRapidsType(newType))) { + modLhs => super.doColumnar(GpuColumnVector.from(modLhs, newType), upcastedRhs) + } + } } } case _ => super.doColumnar(lhs, rhs) @@ -293,9 +411,23 @@ case class GpuDivide(left: Expression, right: Expression) extends GpuDivModLike override def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector = { (left.dataType, right.dataType) match { case (_: DecimalType, r: DecimalType) => { + val (upcastedLhs, upcastedRhs) = if (!DecimalType.is32BitDecimalType(dataType) && + DecimalType.is32BitDecimalType(r)) { + // we are casting to the smallest 64-bit decimal so the answer doesn't overflow + val sparkDecimalType = DecimalType(10, r.scale) + val decimalType = GpuColumnVector.getNonNestedRapidsType(sparkDecimalType) + (GpuScalar.from(lhs.getBigDecimal().intValue().toLong, sparkDecimalType), + rhs.getBase.castTo(decimalType)) + } else { + (lhs, rhs.getBase()) + } val newType = getIntermediaryType(r) - withResource(GpuScalar.from(lhs.getBigDecimal.longValue(), newType)) { modLhs => - super.doColumnar(modLhs, rhs) + withResource(upcastedRhs) { upcastedRhs => + withResource(upcastedLhs) { upcastedLhs => + withResource(GpuScalar.from(upcastedLhs.getBigDecimal.longValue(), newType)) { modLhs => + super.doColumnar(modLhs, upcastedRhs) + } + } } } case _ => super.doColumnar(lhs, rhs) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala index 0c7b316cd53..6e423e907c5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids import java.io.Serializable import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, RoundMode, Scalar, UnaryOp} -import com.nvidia.spark.rapids.{Arm, CudfBinaryExpression, CudfUnaryExpression, FloatUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuUnaryExpression} +import com.nvidia.spark.rapids.{Arm, CudfBinaryExpression, CudfUnaryExpression, DecimalUtil, FloatUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuUnaryExpression} import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, ImplicitCastInputTypes} @@ -156,8 +156,7 @@ case class GpuCeil(child: Expression) extends CudfUnaryMathExpression("CEIL") { override def outputTypeOverride: DType = dataType match { case dt: DecimalType => - val scale = dt.scale - DType.create(DType.DTypeEnum.DECIMAL64, -scale) + DecimalUtil.createCudfDecimal(dt.precision, dt.scale) case _ => DType.INT64 } @@ -216,8 +215,7 @@ case class GpuFloor(child: Expression) extends CudfUnaryMathExpression("FLOOR") override def outputTypeOverride: DType = dataType match { case dt: DecimalType => - val scale = dt.scale - DType.create(DType.DTypeEnum.DECIMAL64, -scale) + DecimalUtil.createCudfDecimal(dt.precision, dt.scale) case _ => DType.INT64 } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index f145c56ff75..22d22d7e874 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -496,6 +496,16 @@ class CastOpSuite extends GpuExpressionTestSuite { scale = 2, ansiEnabled = true, customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(18, 2), + precision = 9, scale = 2, + customRandGenerator = Some(new scala.util.Random(1234L))) + + testCastToDecimal(DataTypes.createDecimalType(8, 0), + scale = 0, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(8, 2), + precision = 18, scale = 2, + customRandGenerator = Some(new scala.util.Random(1234L))) // fromScale > toScale testCastToDecimal(DataTypes.createDecimalType(18, 1), @@ -505,19 +515,40 @@ class CastOpSuite extends GpuExpressionTestSuite { scale = 2, ansiEnabled = true, customRandGenerator = Some(new scala.util.Random(1234L))) - testCastToDecimal(DataTypes.createDecimalType(18, 18), - scale = 15, + testCastToDecimal(DataTypes.createDecimalType(18, 10), + precision = 9, scale = 2, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(8, 4), + precision = 18, scale = 15, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(8, 1), + scale = -1, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(8, 7), + precision = 5, scale = 2, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(8, 7), + precision = 18, scale = 5, customRandGenerator = Some(new scala.util.Random(1234L))) // fromScale < toScale testCastToDecimal(DataTypes.createDecimalType(18, 0), scale = 3, customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(9, 5), + precision = 18, scale = 10, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(18, 3), + precision = 9, scale = 5, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(8, 0), + precision = 8, scale = 3, + customRandGenerator = Some(new scala.util.Random(1234L))) testCastToDecimal(DataTypes.createDecimalType(18, 5), - scale = 10, + precision = 9, scale = 3, customRandGenerator = Some(new scala.util.Random(1234L))) - testCastToDecimal(DataTypes.createDecimalType(18, 10), - scale = 17, + testCastToDecimal(DataTypes.createDecimalType(8, 5), + precision = 18, scale = 7, customRandGenerator = Some(new scala.util.Random(1234L))) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/DecimalBinaryOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/DecimalBinaryOpSuite.scala index 734de64fc83..57bcc54f7e0 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/DecimalBinaryOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/DecimalBinaryOpSuite.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.types.{DataTypes, Decimal, DecimalType} class DecimalBinaryOpSuite extends GpuExpressionTestSuite { private val schema = FuzzerUtils.createSchema(Seq( - DecimalType(DType.DECIMAL32_MAX_PRECISION, 4), + DecimalType(DType.DECIMAL32_MAX_PRECISION, 3), DecimalType(DType.DECIMAL32_MAX_PRECISION, 2))) - private val litValue = Decimal(12345.6789) - private val lit = GpuLiteral(litValue, DecimalType(DType.DECIMAL64_MAX_PRECISION, 5)) + private val litValue = Decimal(12345.678) + private val lit = GpuLiteral(litValue, DecimalType(DType.DECIMAL32_MAX_PRECISION, 3)) private val leftExpr = GpuBoundReference(0, schema.head.dataType, nullable = true) private val rightExpr = GpuBoundReference(1, schema(1).dataType, nullable = true) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala index 924bd4de152..16ec73bcbc2 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala @@ -117,11 +117,8 @@ class DecimalUnitTest extends GpuUnitTests { GpuColumnVector.from(dcv, DecimalType(DType.DECIMAL64_MAX_PRECISION + 1, 0)) } } - // assertion error throws because of unsupported DType DECIMAL32 - assertThrows[AssertionError] { - withResource(ColumnVector.decimalFromInts(0, 1)) { dcv => - GpuColumnVector.from(dcv, DecimalType(1, 0)) - } + withResource(ColumnVector.decimalFromInts(0, 1)) { dcv => + GpuColumnVector.from(dcv, DecimalType(1, 0)) } withResource(GpuScalar.from(dec64Data(0), dt64)) { scalar => withResource(GpuColumnVector.from(scalar, 10, dt64)) { cv =>