Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write int96 to parquet #1068

Merged
merged 6 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 22 additions & 35 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,33 @@
reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf]

writer_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'CORRECTED'}

parquet_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, timestamp_gen]]

parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']

@pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize('ts_type', ["TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"])
@pytest.mark.parametrize('ts_type', parquet_ts_write_options)
def test_write_round_trip(spark_tmp_path, parquet_gens, v1_enabled_list, ts_type, reader_confs):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': ts_type})
all_confs.update(writer_confs)
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf=all_confs)

@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS'])
@pytest.mark.parametrize('ts_type', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['CORRECTED'])
@ignore_order
def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase):
Expand All @@ -61,6 +66,7 @@ def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase):
lambda spark, path: spark.read.parquet(path),
data_path,
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.legacy.parquet.int96RebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_type})

parquet_part_write_gens = [
Expand All @@ -74,15 +80,15 @@ def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase):
@pytest.mark.parametrize('parquet_gen', parquet_part_write_gens, ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS'])
@pytest.mark.parametrize('ts_type', parquet_ts_write_options)
def test_part_write_round_trip(spark_tmp_path, parquet_gen, v1_enabled_list, ts_type, reader_confs):
gen_list = [('a', RepeatSeqGen(parquet_gen, 10)),
('b', parquet_gen)]
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': ts_type})
all_confs.update(writer_confs)
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.partitionBy('a').parquet(path),
lambda spark, path: spark.read.parquet(path),
Expand All @@ -105,12 +111,12 @@ def test_compress_write_round_trip(spark_tmp_path, compress, v1_enabled_list, re
conf=all_confs)

@pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn)
@pytest.mark.parametrize('ts_type', ["TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"])
@pytest.mark.parametrize('ts_type', parquet_ts_write_options)
def test_write_save_table(spark_tmp_path, parquet_gens, ts_type, spark_tmp_table_factory):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': ts_type}
all_confs={'spark.sql.parquet.outputTimestampType': ts_type}
all_confs.update(writer_confs)
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.format("parquet").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
lambda spark, path: spark.read.parquet(path),
Expand All @@ -124,32 +130,31 @@ def write_parquet_sql_from(spark, df, data_path, write_to_table):
spark.sql(write_cmd)

@pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn)
@pytest.mark.parametrize('ts_type', ["TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"])
@pytest.mark.parametrize('ts_type', parquet_ts_write_options)
def test_write_sql_save_table(spark_tmp_path, parquet_gens, ts_type, spark_tmp_table_factory):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': ts_type}
all_confs={'spark.sql.parquet.outputTimestampType': ts_type}
all_confs.update(writer_confs)
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: write_parquet_sql_from(spark, gen_df(spark, gen_list).coalesce(1), path, spark_tmp_table_factory.get()),
lambda spark, path: spark.read.parquet(path),
data_path,
conf=all_confs)

def writeParquetUpgradeCatchException(spark, df, data_path, spark_tmp_table_factory, ts_rebase, ts_write):
spark.conf.set('spark.sql.legacy.parquet.datetimeRebaseModeInWrite', ts_rebase)
spark.conf.set('spark.sql.parquet.outputTimestampType', ts_write)
spark.conf.set('spark.sql.legacy.parquet.datetimeRebaseModeInWrite', ts_rebase)
spark.conf.set('spark.sql.legacy.parquet.int96RebaseModeInWrite', ts_rebase) # for spark 310
with pytest.raises(Exception) as e_info:
df.coalesce(1).write.format("parquet").mode('overwrite').option("path", data_path).saveAsTable(spark_tmp_table_factory.get())
assert e_info.match(r".*SparkUpgradeException.*")

# TODO - https://github.com/NVIDIA/spark-rapids/issues/1130 to handle TIMESTAMP_MILLIS
parquet_ts_write_options = ['TIMESTAMP_MICROS']

@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_write', ['INT96', 'TIMESTAMP_MICROS'])
@pytest.mark.parametrize('ts_rebase', ['EXCEPTION'])
def test_ts_write_fails_datetime_exception(spark_tmp_path, ts_write, ts_rebase, spark_tmp_table_factory):
gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
gen = TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc))
revans2 marked this conversation as resolved.
Show resolved Hide resolved
data_path = spark_tmp_path + '/PARQUET_DATA'
with_gpu_session(
lambda spark : writeParquetUpgradeCatchException(spark, unary_op_df(spark, gen), data_path, spark_tmp_table_factory, ts_rebase, ts_write))
Expand All @@ -168,32 +173,14 @@ def test_ts_write_twice_fails_exception(spark_tmp_path, spark_tmp_table_factory)
with_gpu_session(
lambda spark : writeParquetNoOverwriteCatchException(spark, unary_op_df(spark, gen), data_path, table_name))

parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']

@allow_non_gpu('DataWritingCommandExec')
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['LEGACY'])
def test_parquet_write_legacy_fallback(spark_tmp_path, ts_write, ts_rebase, spark_tmp_table_factory):
gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.legacy.parquet.int96RebaseModeInWrite': "CORRECTED",
'spark.sql.parquet.outputTimestampType': ts_write}
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("parquet").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
lambda spark, path: spark.read.parquet(path),
data_path,
'DataWritingCommandExec',
conf=all_confs)

@allow_non_gpu('DataWritingCommandExec')
@pytest.mark.parametrize('ts_write', ['INT96'])
jlowe marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'EXCEPTION', 'LEGACY'])
def test_parquet_write_int96_fallback(spark_tmp_path, ts_write, ts_rebase, spark_tmp_table_factory):
gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.legacy.parquet.int96RebaseModeInWrite': "CORRECTED",
'spark.sql.legacy.parquet.int96RebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_write}
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("parquet").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ object GpuParquetFileFormat {
outputTimestampType: ParquetOutputTimestampType.Value): Boolean = {
outputTimestampType match {
case ParquetOutputTimestampType.TIMESTAMP_MICROS |
ParquetOutputTimestampType.TIMESTAMP_MILLIS => true
ParquetOutputTimestampType.TIMESTAMP_MILLIS |
ParquetOutputTimestampType.INT96 => true
case _ => false
}
}
Expand Down Expand Up @@ -270,6 +271,7 @@ class GpuParquetWriter(
val builder = ParquetWriterOptions.builder()
.withMetadata(writeContext.getExtraMetaData)
.withCompressionType(compressionType)
.withTimestampInt96(outputTimestampType == ParquetOutputTimestampType.INT96)
dataSchema.foreach(entry => {
if (entry.nullable) {
builder.withColumnNames(entry.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,6 @@ class ParquetWriterSuite extends SparkQueryCompareTestSuite {
}
}

testExpectedException[IllegalArgumentException](
"int96 timestamps not supported",
_.getMessage.startsWith("Part of the plan is not columnar"),
frameFromParquet("timestamp-date-test-msec.parquet"),
new SparkConf().set("spark.sql.parquet.outputTimestampType", "INT96")) {
val tempFile = File.createTempFile("int96", "parquet")
tempFile.delete()
frame => {
frame.write.mode("overwrite").parquet(tempFile.getAbsolutePath)
frame
}
}

testExpectedGpuException(
"Old dates in EXCEPTION mode",
classOf[SparkException],
Expand Down