Skip to content

Commit

Permalink
Support reading decimal columns from parquet files (NVIDIA#1294)
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
  • Loading branch information
sperlingxx authored Jan 5, 2021
1 parent 1b221f9 commit 4ef9d90
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 51 deletions.
24 changes: 12 additions & 12 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,13 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -486,13 +486,13 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -17043,13 +17043,13 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td>S</td>
<td></td>
<td><b>NS</b></td>
<td></td>
<td><em>PS (missing nested DECIMAL, BINARY)</em></td>
<td><em>PS (missing nested DECIMAL, BINARY)</em></td>
<td><em>PS (missing nested DECIMAL, BINARY)</em></td>
<td><em>PS (missing nested BINARY)</em></td>
<td><em>PS (missing nested BINARY)</em></td>
<td><em>PS (missing nested BINARY)</em></td>
</tr>
<tr>
<th>Output</th>
Expand Down
71 changes: 47 additions & 24 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,30 @@ def read_parquet_df(data_path):
def read_parquet_sql(data_path):
return lambda spark : spark.sql('select * from parquet.`{}`'.format(data_path))


# Override decimal_gens because decimal with negative scale is unsupported in parquet reading
decimal_gens = [DecimalGen(), DecimalGen(precision=7, scale=3), DecimalGen(precision=10, scale=10),
DecimalGen(precision=9, scale=0), DecimalGen(precision=18, scale=15)]

parquet_gens_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen,
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)), ArrayGen(byte_gen),
ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen),
ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),
ArrayGen(DecimalGen()),
ArrayGen(ArrayGen(byte_gen)),
StructGen([['child0', ArrayGen(byte_gen)], ['child1', byte_gen], ['child2', float_gen]]),
ArrayGen(StructGen([['child0', string_gen], ['child1', double_gen], ['child2', int_gen]]))] + map_gens_sample,
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/132'))]
StructGen([['child0', ArrayGen(byte_gen)], ['child1', byte_gen], ['child2', float_gen], ['child3', DecimalGen()]]),
ArrayGen(StructGen([['child0', string_gen], ['child1', double_gen], ['child2', int_gen]]))] +
map_gens_sample + decimal_gens,
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/132'))]

# test with original parquet file reader, the multi-file parallel reader for cloud, and coalesce file reader for
# non-cloud
original_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}
multithreaded_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED'}
coalesce_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'}
original_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}
multithreaded_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED'}
coalesce_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'}
reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf]
coalesce_parquet_file_reader_conf]

@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
Expand All @@ -66,9 +73,9 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs,
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.parquet.read.enabled'])
def test_parquet_fallback(spark_tmp_path, read_func, disable_conf):
data_gens =[string_gen,
byte_gen, short_gen, int_gen, long_gen, boolean_gen]
data_gens = [string_gen,
byte_gen, short_gen, int_gen, long_gen, boolean_gen] + decimal_gens

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/PARQUET_DATA'
Expand Down Expand Up @@ -103,8 +110,8 @@ def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, rea
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, boolean_gen,
string_gen, date_gen,
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))]
# timestamp_gen
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens

@pytest.mark.parametrize('parquet_gen', parquet_pred_push_gens, ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
Expand Down Expand Up @@ -193,11 +200,27 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase,
lambda spark : readParquetCatchException(spark, data_path),
conf=all_confs)


@pytest.mark.parametrize('parquet_gens', [decimal_gens], ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf={'spark.sql.parquet.writeLegacyFormat': 'true'})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(read_func(data_path), conf=all_confs)


parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))],
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133')),
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))]
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens,
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133')),
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))]

@pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
Expand All @@ -221,7 +244,7 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs):
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))]
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0/key2=20'
with_cpu_session(
Expand Down Expand Up @@ -295,7 +318,7 @@ def test_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs):
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))]
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens
first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
Expand All @@ -320,7 +343,7 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_con
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))]
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens
first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
Expand Down Expand Up @@ -403,15 +426,15 @@ def test_small_file_memory(spark_tmp_path, v1_enabled_list):


_nested_pruning_schemas = [
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
[["a", StructGen([["c_1", StringGen()]])]]),
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
[["a", StructGen([["c_2", LongGen()]])]]),
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
[["a", StructGen([["c_3", ShortGen()]])]]),
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
[["a", StructGen([["c_1", StringGen()], ["c_3", ShortGen()]])]]),
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
([["a", StructGen([["c_1", StringGen()], ["c_2", LongGen()], ["c_3", ShortGen()]])]],
[["a", StructGen([["c_3", ShortGen()], ["c_2", LongGen()], ["c_1", StringGen()]])]]),
([["ar", ArrayGen(StructGen([["str_1", StringGen()],["str_2", StringGen()]]))]],
[["ar", ArrayGen(StructGen([["str_2", StringGen()]]))]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ class Spark300Shims extends SparkShims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP +
TypeSig.ARRAY).nested(), TypeSig.all),
TypeSig.ARRAY + TypeSig.DECIMAL).nested(), TypeSig.all),
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {

// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class Spark310Shims extends Spark301Shims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP +
TypeSig.ARRAY).nested(), TypeSig.all),
TypeSig.ARRAY + TypeSig.DECIMAL).nested(), TypeSig.all),
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types.{DateType, DecimalType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -229,6 +229,10 @@ object GpuCSVScan {
}
}
// TODO parsedOptions.emptyValueInRead

if (readSchema.exists(_.dataType.isInstanceOf[DecimalType])) {
meta.willNotWorkOnGpu("DecimalType is not supported")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object GpuOrcScanBase {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
schema.foreach { field =>
if (!GpuColumnVector.isNonNestedSupportedType(field.dataType)) {
if (!GpuOverrides.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuOrcScan does not support fields of type ${field.dataType}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.execution.datasources.v2.{AlterNamespaceSetPropertiesExec, AlterTableExec, AtomicReplaceTableExec, BatchScanExec, CreateNamespaceExec, CreateTableExec, DeleteFromTableExec, DescribeNamespaceExec, DescribeTableExec, DropNamespaceExec, DropTableExec, RefreshTableExec, RenameTableExec, ReplaceTableExec, SetCatalogAndNamespaceExec, ShowCurrentNamespaceExec, ShowNamespacesExec, ShowTablePropertiesExec, ShowTablesExec}
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
Expand Down Expand Up @@ -2193,7 +2194,8 @@ object GpuOverrides {
exec[BatchScanExec](
"The backend for most file input",
ExecChecks(
(TypeSig.commonCudfTypes + TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(),
(TypeSig.commonCudfTypes + TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY +
TypeSig.DECIMAL).nested(),
TypeSig.all),
(p, conf, parent, r) => new SparkPlanMeta[BatchScanExec](p, conf, parent, r) {
override val childScans: scala.Seq[ScanMeta[_]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ object GpuParquetScanBase {
allowMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true)) {
allowNesting = true,
allowDecimal = meta.conf.decimalTypeEnabled)) {
meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}")
}
}
Expand Down Expand Up @@ -197,6 +198,33 @@ object GpuParquetScanBase {
meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode")
}
}

private[rapids] def convertDecimal32Columns(t: Table): Table = {
val containDecimal32Column = (0 until t.getNumberOfColumns).exists { i =>
t.getColumn(i).getType.getTypeId == DType.DTypeEnum.DECIMAL32
}
// return input table if there exists no DECIMAL32 columns
if (!containDecimal32Column) return t

val columns = new Array[ColumnVector](t.getNumberOfColumns)
try {
RebaseHelper.withResource(t) { _ =>
(0 until t.getNumberOfColumns).foreach { i =>
t.getColumn(i).getType match {
case tpe if tpe.getTypeId == DType.DTypeEnum.DECIMAL32 =>
columns(i) = t.getColumn(i).castTo(
DType.create(DType.DTypeEnum.DECIMAL64, tpe.getScale))
case _ =>
columns(i) = t.getColumn(i).incRefCount()
}
}
}
new Table(columns: _*)
} finally {
// clean temporary column vectors
columns.safeClose()
}
}
}

/**
Expand Down Expand Up @@ -657,13 +685,16 @@ abstract class FileParquetPartitionReaderBase(
inputTable: Table,
filePath: String,
clippedSchema: MessageType): Table = {
if (readDataSchema.length > inputTable.getNumberOfColumns) {
// Convert Decimal32 columns to Decimal64, because spark-rapids only supports Decimal64.
val inTable = GpuParquetScanBase.convertDecimal32Columns(inputTable)

if (readDataSchema.length > inTable.getNumberOfColumns) {
// Spark+Parquet schema evolution is relatively simple with only adding/removing columns
// To type casting or anyting like that
val clippedGroups = clippedSchema.asGroupType()
val newColumns = new Array[ColumnVector](readDataSchema.length)
try {
withResource(inputTable) { table =>
withResource(inTable) { table =>
var readAt = 0
(0 until readDataSchema.length).foreach(writeAt => {
val readField = readDataSchema(writeAt)
Expand All @@ -686,7 +717,7 @@ abstract class FileParquetPartitionReaderBase(
newColumns.safeClose()
}
} else {
inputTable
inTable
}
}

Expand Down Expand Up @@ -1115,6 +1146,7 @@ class MultiFileParquetPartitionReader(
}
val parseOpts = ParquetOptions.builder()
.withTimeUnit(DType.TIMESTAMP_MICROSECONDS)
.enableStrictDecimalType(true)
.includeColumn(readDataSchema.fieldNames:_*).build()

// about to start using the GPU
Expand Down Expand Up @@ -1523,6 +1555,7 @@ class MultiFileCloudParquetPartitionReader(
}
val parseOpts = ParquetOptions.builder()
.withTimeUnit(DType.TIMESTAMP_MICROSECONDS)
.enableStrictDecimalType(true)
.includeColumn(readDataSchema.fieldNames: _*).build()

// about to start using the GPU
Expand Down Expand Up @@ -1658,6 +1691,7 @@ class ParquetPartitionReader(
}
val parseOpts = ParquetOptions.builder()
.withTimeUnit(DType.TIMESTAMP_MICROSECONDS)
.enableStrictDecimalType(true)
.includeColumn(readDataSchema.fieldNames:_*).build()

// about to start using the GPU
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ object HostColumnarToGpu {
if (cv.isNullAt(i)) {
b.appendNull()
} else {
// The precision here matters for cpu column vectors (such as OnHeapColumnVector).
b.append(cv.getDecimal(i, dt.precision, dt.scale).toUnscaledLong)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1236,13 +1236,13 @@ object SupportedOpsDocs {
println("<td>S</td>") // DATE
println("<td>S</td>") // TIMESTAMP
println("<td>S</td>") // STRING
println("<td><b>NS</b></td>") // DECIMAL
println("<td>S</td>") // DECIMAL
println("<td></td>") // NULL
println("<td><b>NS</b></td>") // BINARY
println("<td></td>") // CALENDAR
println("<td><em>PS (missing nested DECIMAL, BINARY)</em></td>") // ARRAY
println("<td><em>PS (missing nested DECIMAL, BINARY)</em></td>") // MAP
println("<td><em>PS (missing nested DECIMAL, BINARY)</em></td>") // STRUCT
println("<td><em>PS (missing nested BINARY)</em></td>") // ARRAY
println("<td><em>PS (missing nested BINARY)</em></td>") // MAP
println("<td><em>PS (missing nested BINARY)</em></td>") // STRUCT
println("</tr>")
println("<tr>")
println("<th>Output</th>")
Expand Down
Binary file not shown.
Loading

0 comments on commit 4ef9d90

Please sign in to comment.