Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] parquet_write_test.py::test_ts_write_fails_datetime_exception failed in spark 3.1.1 and 3.1.2 #4294

Closed
pxLi opened this issue Dec 6, 2021 · 2 comments · Fixed by #4306
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@pxLi
Copy link
Collaborator

pxLi commented Dec 6, 2021

Describe the bug

errors,

[2021-12-04T07:55:56.313Z] FAILED ../../src/main/python/parquet_write_test.py::test_ts_write_fails_datetime_exception[CORRECTED-ts_write_data_gen0]
[2021-12-04T07:55:56.313Z] FAILED ../../src/main/python/parquet_write_test.py::test_ts_write_fails_datetime_exception[EXCEPTION-ts_write_data_gen0]
[2021-12-04T07:55:56.310Z] =================================== FAILURES ===================================

[2021-12-04T07:55:56.310Z] _____ test_ts_write_fails_datetime_exception[CORRECTED-ts_write_data_gen0] _____

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z] spark_tmp_path = '/tmp/pyspark_tests//417878/'

[2021-12-04T07:55:56.311Z] ts_write_data_gen = ('INT96', Timestamp)

[2021-12-04T07:55:56.311Z] spark_tmp_table_factory = <conftest.TmpTableFactory object at 0x7fa85c0cc760>

[2021-12-04T07:55:56.311Z] rebase = 'CORRECTED'

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z]     @pytest.mark.parametrize('ts_write_data_gen', [('INT96', limited_int96()), ('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc)))])

[2021-12-04T07:55:56.311Z]     @pytest.mark.parametrize('rebase', ["CORRECTED","EXCEPTION"])

[2021-12-04T07:55:56.311Z]     def test_ts_write_fails_datetime_exception(spark_tmp_path, ts_write_data_gen, spark_tmp_table_factory, rebase):

[2021-12-04T07:55:56.311Z]         ts_write, gen = ts_write_data_gen

[2021-12-04T07:55:56.311Z]         data_path = spark_tmp_path + '/PARQUET_DATA'

[2021-12-04T07:55:56.311Z]         int96_rebase = "EXCEPTION" if (ts_write == "INT96") else rebase

[2021-12-04T07:55:56.311Z]         date_time_rebase = "EXCEPTION" if (ts_write == "TIMESTAMP_MICROS") else rebase

[2021-12-04T07:55:56.311Z]         if is_before_spark_311() and ts_write == 'INT96':

[2021-12-04T07:55:56.311Z]             all_confs = {'spark.sql.parquet.outputTimestampType': ts_write}

[2021-12-04T07:55:56.311Z]             all_confs.update({'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': date_time_rebase,

[2021-12-04T07:55:56.311Z]                               'spark.sql.legacy.parquet.int96RebaseModeInWrite': int96_rebase})

[2021-12-04T07:55:56.311Z]             assert_gpu_and_cpu_writes_are_equal_collect(

[2021-12-04T07:55:56.311Z]                 lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.parquet(path),

[2021-12-04T07:55:56.311Z]                 lambda spark, path: spark.read.parquet(path),

[2021-12-04T07:55:56.311Z]                 data_path,

[2021-12-04T07:55:56.311Z]                 conf=all_confs)

[2021-12-04T07:55:56.311Z]         else:

[2021-12-04T07:55:56.311Z] >           with_gpu_session(

[2021-12-04T07:55:56.311Z]                 lambda spark : writeParquetUpgradeCatchException(spark,

[2021-12-04T07:55:56.311Z]                                                                  unary_op_df(spark, gen),

[2021-12-04T07:55:56.311Z]                                                                  data_path,

[2021-12-04T07:55:56.311Z]                                                                  spark_tmp_table_factory,

[2021-12-04T07:55:56.311Z]                                                                  int96_rebase, date_time_rebase, ts_write))

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z] ../../src/main/python/parquet_write_test.py:250: 

[2021-12-04T07:55:56.311Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

[2021-12-04T07:55:56.311Z] ../../src/main/python/spark_session.py:103: in with_gpu_session

[2021-12-04T07:55:56.311Z]     return with_spark_session(func, conf=copy)

[2021-12-04T07:55:56.311Z] ../../src/main/python/spark_session.py:70: in with_spark_session

[2021-12-04T07:55:56.311Z]     ret = func(_spark)

[2021-12-04T07:55:56.311Z] ../../src/main/python/parquet_write_test.py:251: in <lambda>

[2021-12-04T07:55:56.311Z]     lambda spark : writeParquetUpgradeCatchException(spark,

[2021-12-04T07:55:56.311Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z] spark = <pyspark.sql.session.SparkSession object at 0x7fa873014eb0>

[2021-12-04T07:55:56.311Z] df = DataFrame[a: timestamp]

[2021-12-04T07:55:56.311Z] data_path = '/tmp/pyspark_tests//417878//PARQUET_DATA'

[2021-12-04T07:55:56.311Z] spark_tmp_table_factory = <conftest.TmpTableFactory object at 0x7fa85c0cc760>

[2021-12-04T07:55:56.311Z] int96_rebase = 'EXCEPTION', datetime_rebase = 'CORRECTED', ts_write = 'INT96'

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z]     def writeParquetUpgradeCatchException(spark, df, data_path, spark_tmp_table_factory, int96_rebase, datetime_rebase, ts_write):

[2021-12-04T07:55:56.311Z]         spark.conf.set('spark.sql.parquet.outputTimestampType', ts_write)

[2021-12-04T07:55:56.311Z]         spark.conf.set('spark.sql.legacy.parquet.datetimeRebaseModeInWrite', datetime_rebase)

[2021-12-04T07:55:56.311Z]         spark.conf.set('spark.sql.legacy.parquet.int96RebaseModeInWrite', int96_rebase) # for spark 310

[2021-12-04T07:55:56.311Z]         with pytest.raises(Exception) as e_info:

[2021-12-04T07:55:56.311Z] >           df.coalesce(1).write.format("parquet").mode('overwrite').option("path", data_path).saveAsTable(spark_tmp_table_factory.get())

[2021-12-04T07:55:56.311Z] E           Failed: DID NOT RAISE <class 'Exception'>

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z] ../../src/main/python/parquet_write_test.py:228: Failed

[2021-12-04T07:55:56.311Z] _____ test_ts_write_fails_datetime_exception[EXCEPTION-ts_write_data_gen0] _____

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z] spark_tmp_path = '/tmp/pyspark_tests//937813/'

[2021-12-04T07:55:56.311Z] ts_write_data_gen = ('INT96', Timestamp)

[2021-12-04T07:55:56.311Z] spark_tmp_table_factory = <conftest.TmpTableFactory object at 0x7fa85df8f280>

[2021-12-04T07:55:56.311Z] rebase = 'EXCEPTION'

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z]     @pytest.mark.parametrize('ts_write_data_gen', [('INT96', limited_int96()), ('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc)))])

[2021-12-04T07:55:56.311Z]     @pytest.mark.parametrize('rebase', ["CORRECTED","EXCEPTION"])

[2021-12-04T07:55:56.311Z]     def test_ts_write_fails_datetime_exception(spark_tmp_path, ts_write_data_gen, spark_tmp_table_factory, rebase):

[2021-12-04T07:55:56.311Z]         ts_write, gen = ts_write_data_gen

[2021-12-04T07:55:56.311Z]         data_path = spark_tmp_path + '/PARQUET_DATA'

[2021-12-04T07:55:56.311Z]         int96_rebase = "EXCEPTION" if (ts_write == "INT96") else rebase

[2021-12-04T07:55:56.311Z]         date_time_rebase = "EXCEPTION" if (ts_write == "TIMESTAMP_MICROS") else rebase

[2021-12-04T07:55:56.311Z]         if is_before_spark_311() and ts_write == 'INT96':

[2021-12-04T07:55:56.311Z]             all_confs = {'spark.sql.parquet.outputTimestampType': ts_write}

[2021-12-04T07:55:56.311Z]             all_confs.update({'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': date_time_rebase,

[2021-12-04T07:55:56.311Z]                               'spark.sql.legacy.parquet.int96RebaseModeInWrite': int96_rebase})

[2021-12-04T07:55:56.311Z]             assert_gpu_and_cpu_writes_are_equal_collect(

[2021-12-04T07:55:56.311Z]                 lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.parquet(path),

[2021-12-04T07:55:56.311Z]                 lambda spark, path: spark.read.parquet(path),

[2021-12-04T07:55:56.311Z]                 data_path,

[2021-12-04T07:55:56.311Z]                 conf=all_confs)

[2021-12-04T07:55:56.311Z]         else:

[2021-12-04T07:55:56.311Z] >           with_gpu_session(

[2021-12-04T07:55:56.311Z]                 lambda spark : writeParquetUpgradeCatchException(spark,

[2021-12-04T07:55:56.311Z]                                                                  unary_op_df(spark, gen),

[2021-12-04T07:55:56.311Z]                                                                  data_path,

[2021-12-04T07:55:56.311Z]                                                                  spark_tmp_table_factory,

[2021-12-04T07:55:56.311Z]                                                                  int96_rebase, date_time_rebase, ts_write))

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.311Z] ../../src/main/python/parquet_write_test.py:250: 

[2021-12-04T07:55:56.311Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

[2021-12-04T07:55:56.311Z] ../../src/main/python/spark_session.py:103: in with_gpu_session

[2021-12-04T07:55:56.311Z]     return with_spark_session(func, conf=copy)

[2021-12-04T07:55:56.311Z] ../../src/main/python/spark_session.py:70: in with_spark_session

[2021-12-04T07:55:56.311Z]     ret = func(_spark)

[2021-12-04T07:55:56.311Z] ../../src/main/python/parquet_write_test.py:251: in <lambda>

[2021-12-04T07:55:56.311Z]     lambda spark : writeParquetUpgradeCatchException(spark,

[2021-12-04T07:55:56.311Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

[2021-12-04T07:55:56.311Z] 

[2021-12-04T07:55:56.312Z] spark = <pyspark.sql.session.SparkSession object at 0x7fa873014eb0>

[2021-12-04T07:55:56.312Z] df = DataFrame[a: timestamp]

[2021-12-04T07:55:56.312Z] data_path = '/tmp/pyspark_tests//937813//PARQUET_DATA'

[2021-12-04T07:55:56.312Z] spark_tmp_table_factory = <conftest.TmpTableFactory object at 0x7fa85df8f280>

[2021-12-04T07:55:56.312Z] int96_rebase = 'EXCEPTION', datetime_rebase = 'EXCEPTION', ts_write = 'INT96'

[2021-12-04T07:55:56.312Z] 

[2021-12-04T07:55:56.312Z]     def writeParquetUpgradeCatchException(spark, df, data_path, spark_tmp_table_factory, int96_rebase, datetime_rebase, ts_write):

[2021-12-04T07:55:56.312Z]         spark.conf.set('spark.sql.parquet.outputTimestampType', ts_write)

[2021-12-04T07:55:56.312Z]         spark.conf.set('spark.sql.legacy.parquet.datetimeRebaseModeInWrite', datetime_rebase)

[2021-12-04T07:55:56.312Z]         spark.conf.set('spark.sql.legacy.parquet.int96RebaseModeInWrite', int96_rebase) # for spark 310

[2021-12-04T07:55:56.312Z]         with pytest.raises(Exception) as e_info:

[2021-12-04T07:55:56.312Z] >           df.coalesce(1).write.format("parquet").mode('overwrite').option("path", data_path).saveAsTable(spark_tmp_table_factory.get())

[2021-12-04T07:55:56.312Z] E           Failed: DID NOT RAISE <class 'Exception'>

[2021-12-04T07:55:56.312Z] 

[2021-12-04T07:55:56.312Z] ../../src/main/python/parquet_write_test.py:228: Failed
@pxLi pxLi added bug Something isn't working ? - Needs Triage Need team to review and classify labels Dec 6, 2021
@tgravescs tgravescs self-assigned this Dec 6, 2021
@tgravescs
Copy link
Collaborator

taking an initial look

@tgravescs
Copy link
Collaborator

it looks like this was introduced by #4235 - commonzing some of the shim code, must be a path that was supposed to be unique for 3.1.x

@tgravescs tgravescs added P0 Must have for release and removed ? - Needs Triage Need team to review and classify labels Dec 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants