Skip to content

Commit

Permalink
Decimal32 support (NVIDIA#1717)
Browse files Browse the repository at this point in the history
* Add support for Decimal32

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed unary_minus

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* unscaledLong fix

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* More support for Decimal32

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* implicit for casting dec32todec64

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* cleanup

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* refactored castDecimalToDecimal

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed legacy decimal read for non-nested

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed casting and added more tests

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* added nested tests for reading legacy decimals

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* removed implicit

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* struct working

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* Lists working

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* divide not working

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* cleanup

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* division working but problem with casting

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* moved div code to GpuModLike

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* code cleanup

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* some more fixes

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* some more fixes

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* addressed review comments

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* added more comments

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* park

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* properly cast scalar

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed gpu metric

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* upmerged

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed castFloatsToDecimals to pick the right precision

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* addressed review comments

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* Fixed memory leak

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* removed length restriction

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed test failure due to upmerge

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

Co-authored-by: Raza Jafri <rjafri@nvidia.com>
  • Loading branch information
razajafri and razajafri authored Apr 13, 2021
1 parent 2a68db5 commit 2d9c2e2
Show file tree
Hide file tree
Showing 23 changed files with 554 additions and 168 deletions.
5 changes: 3 additions & 2 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase,
lambda spark : readParquetCatchException(spark, data_path),
conf=all_confs)


@pytest.mark.parametrize('parquet_gens', [decimal_gens], ids=idfn)
@pytest.mark.parametrize('parquet_gens', [decimal_gens,
[ArrayGen(DecimalGen(7,2), max_length=10)],
[StructGen([['child0', DecimalGen(7, 2)]])]], ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
table: Table,
schema: StructType): ParquetBufferConsumer = {
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
val options = ParquetWriterOptions.builder()
.withDecimalPrecisions(GpuParquetFileFormat.getPrecisionList(schema):_*)
val builder = ParquetWriterOptions.builder()
.withDecimalPrecisions(GpuParquetFileFormat.getPrecisionList(schema): _*)
.withStatisticsFrequency(StatisticsFrequency.ROWGROUP)
.withTimestampInt96(false)
.build()
withResource(Table.writeParquetChunked(options, buffer)) { writer =>
schema.fields.indices.foreach(index => builder.withColumnNames(s"_col$index"))
withResource(Table.writeParquetChunked(builder.build(), buffer)) { writer =>
writer.write(table)
}
buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.shims.spark311

import com.nvidia.spark.rapids.GpuExec
import com.nvidia.spark.rapids.{GpuExec, GpuMetric}
import com.nvidia.spark.rapids.shims.spark311.ParquetCachedBatchSerializer

import org.apache.spark.rdd.RDD
Expand All @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuInMemoryTableScanExec(
Expand Down Expand Up @@ -54,7 +53,7 @@ case class GpuInMemoryTableScanExec(
relation.cacheBuilder.serializer.vectorTypes(attributes, conf)

private lazy val columnarInputRDD: RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS)
val buffers = filteredCachedBatches()
relation.cacheBuilder.serializer.asInstanceOf[ParquetCachedBatchSerializer]
.gpuConvertCachedBatchToColumnarBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private static String hexString(byte[] bytes) {
public static synchronized void debug(String name, HostColumnVectorCore hostCol) {
DType type = hostCol.getType();
System.err.println("COLUMN " + name + " - " + type);
if (type.getTypeId() == DType.DTypeEnum.DECIMAL64) {
if (type.isDecimalType()) {
for (int i = 0; i < hostCol.getRowCount(); i++) {
if (hostCol.isNull(i)) {
System.err.println(i + " NULL");
Expand Down Expand Up @@ -472,8 +472,7 @@ private static DType toRapidsOrNull(DataType type) {
if (dt.precision() > DType.DECIMAL64_MAX_PRECISION) {
return null;
} else {
// Map all DecimalType to DECIMAL64, in case of underlying DType transaction.
return DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale());
return DecimalUtil.createCudfDecimal(dt.precision(), dt.scale());
}
}
return null;
Expand Down Expand Up @@ -864,7 +863,6 @@ public static int[] toIntArray(ai.rapids.cudf.ColumnVector vec) {
*/
GpuColumnVector(DataType type, ai.rapids.cudf.ColumnVector cudfCv) {
super(type);
// TODO need some checks to be sure everything matches
this.cudfCv = cudfCv;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,16 @@ public final ColumnarMap getMap(int ordinal) {
@Override
public final Decimal getDecimal(int rowId, int precision, int scale) {
assert precision <= DType.DECIMAL64_MAX_PRECISION : "Assert " + precision + " <= DECIMAL64_MAX_PRECISION(" + DType.DECIMAL64_MAX_PRECISION + ")";
assert cudfCv.getType().getTypeId() == DType.DTypeEnum.DECIMAL64: "Assert DType to be DECIMAL64";
assert scale == -cudfCv.getType().getScale() :
"Assert fetch decimal with its original scale " + scale + " expected " + (-cudfCv.getType().getScale());
return Decimal.createUnsafe(cudfCv.getLong(rowId), precision, scale);
if (precision <= Decimal.MAX_INT_DIGITS()) {
assert cudfCv.getType().getTypeId() == DType.DTypeEnum.DECIMAL32 : "type should be DECIMAL32";
return Decimal.createUnsafe(cudfCv.getInt(rowId), precision, scale);
} else {
assert cudfCv.getType().getTypeId() == DType.DTypeEnum.DECIMAL64 : "type should be DECIMAL64";
return Decimal.createUnsafe(cudfCv.getLong(rowId), precision, scale);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ public Decimal getDecimal(int ordinal, int precision, int scale) {
if (isNullAt(ordinal)) {
return null;
}
// TODO when DECIMAL32 is supported a special case will need to be added here
if (precision <= Decimal.MAX_LONG_DIGITS()) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
return Decimal.createUnsafe(getInt(ordinal), precision, scale);
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
return Decimal.createUnsafe(getLong(ordinal), precision, scale);
} else {
throw new IllegalArgumentException("NOT IMPLEMENTED YET");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import ai.rapids.cudf.DType

import org.apache.spark.sql.types.{DataType, Decimal, DecimalType}

object DecimalUtil {

def createCudfDecimal(precision: Int, scale: Int): DType = {
if (precision <= Decimal.MAX_INT_DIGITS) {
DType.create(DType.DTypeEnum.DECIMAL32, -scale)
} else {
DType.create(DType.DTypeEnum.DECIMAL64, -scale)
}
}

/**
* Return the size in bytes of the Fixed-width data types.
* WARNING: Do not use this method for variable-width data types
*/
private[rapids] def getDataTypeSize(dt: DataType): Int = {
dt match {
case d: DecimalType if d.precision <= Decimal.MAX_INT_DIGITS => 4
case t => t.defaultSize
}
}
}
Loading

0 comments on commit 2d9c2e2

Please sign in to comment.