Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] test_casting_from_double_to_timestamp failed for DATAGEN_SEED=1702329497 #10017

Closed
jlowe opened this issue Dec 11, 2023 · 3 comments · Fixed by #10092
Closed

[BUG] test_casting_from_double_to_timestamp failed for DATAGEN_SEED=1702329497 #10017

jlowe opened this issue Dec 11, 2023 · 3 comments · Fixed by #10092
Assignees
Labels
bug Something isn't working

Comments

@jlowe
Copy link
Member

jlowe commented Dec 11, 2023

Saw this fail in a premerge build:

cpu = datetime.datetime(1905, 7, 9, 5, 54, 10, 973000)
gpu = datetime.datetime(1905, 7, 9, 5, 54, 10, 972000)
Details
spark_tmp_path = '/tmp/pyspark_tests//premerge-ci-2-jenkins-rapids-premerge-github-8706-x2fp7-8xlmb-gw3-2052-1840249948/'
data_gen = Double

    @pytest.mark.parametrize('data_gen', [DoubleGen(max_exp=32, special_cases=None),
                                          DoubleGen(max_exp=32, special_cases=[8.88e9, 9.99e10, 1.314e11])])
    @allow_non_gpu(*non_utc_allow_orc_scan)
    def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen):
        # ORC will assume the original double value in seconds, we need to convert them to
        # timestamp(INT64 in micro-seconds).
        #
        # The 'datetime' module in python requires 0 <= year < 10000, and UTC timestamp is start from 1970/1/1.
        # That is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2^32.
        # So we set max_exp = 32 in DoubleGen.
        #
        # The maximum valid positive number is (10000 - 1970) * 365 * 24 * 3600 = 253234080000 -> 2e11 -> 2^37,
        # so we add some special cases from 2^33 - 2^37 (8e9 ~ 1e11).
        #
        # In DoubleGen, special_case=None will generate some NaN, INF corner cases.
    
        orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp'
        with_cpu_session(
            lambda spark: unary_op_df(spark, data_gen).write.orc(orc_path)
        )
        # the name of unique column is 'a', cast it into timestamp type
>       assert_gpu_and_cpu_are_equal_collect(
            lambda spark: spark.read.schema("a timestamp").orc(orc_path)
        )

../../src/main/python/orc_cast_test.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../src/main/python/asserts.py:588: in assert_gpu_and_cpu_are_equal_collect
    _assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first, result_canonicalize_func_before_compare=result_canonicalize_func_before_compare)
../../src/main/python/asserts.py:509: in _assert_gpu_and_cpu_are_equal
    assert_equal(from_cpu, from_gpu)
../../src/main/python/asserts.py:107: in assert_equal
    _assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
../../src/main/python/asserts.py:43: in _assert_equal
    _assert_equal(cpu[index], gpu[index], float_check, path + [index])
../../src/main/python/asserts.py:36: in _assert_equal
    _assert_equal(cpu[field], gpu[field], float_check, path + [field])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

cpu = datetime.datetime(1905, 7, 9, 5, 54, 10, 973000)
gpu = datetime.datetime(1905, 7, 9, 5, 54, 10, 972000)
float_check = <function get_float_check.<locals>.<lambda> at 0x7f6d0df259d0>
path = [1696, 'a']

    def _assert_equal(cpu, gpu, float_check, path):
        t = type(cpu)
        if (t is Row):
            assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
                assert cpu.__fields__ == gpu.__fields__, "CPU and GPU row have different fields at {} CPU: {} GPU: {}".format(path, cpu.__fields__, gpu.__fields__)
                for field in cpu.__fields__:
                    _assert_equal(cpu[field], gpu[field], float_check, path + [field])
            else:
                for index in range(len(cpu)):
                    _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is list):
            assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            for index in range(len(cpu)):
                _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is tuple):
            assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            for index in range(len(cpu)):
                _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is pytypes.GeneratorType):
            index = 0
            # generator has no zip :( so we have to do this the hard way
            done = False
            while not done:
                sub_cpu = None
                sub_gpu = None
                try:
                    sub_cpu = next(cpu)
                except StopIteration:
                    done = True
    
                try:
                    sub_gpu = next(gpu)
                except StopIteration:
                    done = True
    
                if done:
                    assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
                else:
                    _assert_equal(sub_cpu, sub_gpu, float_check, path + [index])
    
                index = index + 1
        elif (t is dict):
            # The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
            # so sort the items to do our best with ignoring the order of dicts
            cpu_items = list(cpu.items()).sort(key=_RowCmp)
            gpu_items = list(gpu.items()).sort(key=_RowCmp)
            _assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
        elif (t is int):
            assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
        elif (t is float):
            if (math.isnan(cpu)):
                assert math.isnan(gpu), "GPU and CPU float values are different at {}".format(path)
            else:
                assert float_check(cpu, gpu), "GPU and CPU float values are different {}".format(path)
        elif isinstance(cpu, str):
            assert cpu == gpu, "GPU and CPU string values are different at {}".format(path)
        elif isinstance(cpu, datetime):
>           assert cpu == gpu, "GPU and CPU timestamp values are different at {}".format(path)
E           AssertionError: GPU and CPU timestamp values are different at [1696, 'a']

../../src/main/python/asserts.py:87: AssertionError
@jlowe jlowe added bug Something isn't working ? - Needs Triage Need team to review and classify labels Dec 11, 2023
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Dec 12, 2023
@gerashegalov
Copy link
Collaborator

Specifically the data_gen1 case is failing:

SPARK_HOME=~/dist/spark-3.3.2-bin-hadoop3 TEST_PARALLEL=0 DATAGEN_SEED=1702329497 TZ=UTC \
    ./integration_tests/run_pyspark_from_build.sh \
    -k 'test_casting_from_double_to_timestamp and data_gen1' --test_oom_injection_mode=never 

@gerashegalov
Copy link
Collaborator

The minimum repro boils down to the value -2.0348715490275E9 and discrepancy is when the cast is pushed into the datasource via schema.

In [6]: spark.conf.set('spark.rapids.sql.enabled', False)
  ...: spark.createDataFrame([(-2.0348715490275E9,)], 'a double').write.mode('overwrite').orc(orc_path)
                                                                               
In [7]: spark.conf.set('spark.rapids.sql.enabled', True)
  ...: gpu = spark.read.orc(orc_path).collect()
  ...: spark.conf.set('spark.rapids.sql.enabled', False)
  ...: cpu = spark.read.orc(orc_path).collect()
  ...: print(f"CPU={cpu} GPU={gpu} equal={cpu==gpu}")
CPU=[Row(a=-2034871549.0275)] GPU=[Row(a=-2034871549.0275)] equal=True

In [8]: spark.conf.set('spark.rapids.sql.enabled', True)
  ...: gpu_cast_ts = spark.read.orc(orc_path).select('a', col('a').cast(TimestampType()).alias('a_cast_ts')).collect()
  ...: spark.conf.set('spark.rapids.sql.enabled', False)
  ...: cpu_cast_ts = spark.read.orc(orc_path).select('a', col('a').cast(TimestampType()).alias('a_cast_ts')).collect()
  ...: print(f"CPU={cpu_cast_ts} GPU={gpu_cast_ts} equal={cpu_cast_ts == gpu_cast_ts}")
CPU=[Row(a=-2034871549.0275, a_cast_ts=datetime.datetime(1905, 7, 9, 5, 54, 10, 972500))] GPU=[Row(a=-2034871549.0275, a_cast_ts=datetime.datetime(1905, 7, 9, 5, 54, 10, 972500))] equal=True

In [9]: spark.conf.set('spark.rapids.sql.enabled', True)
  ...: gpu_ts = spark.read.schema('a timestamp').orc(orc_path).collect()
  ...: spark.conf.set('spark.rapids.sql.enabled', False)
  ...: cpu_ts = spark.read.schema('a timestamp').orc(orc_path).collect()
  ...: print(f"CPU={cpu_ts} GPU={gpu_ts} equal={cpu_ts == gpu_ts}")
CPU=[Row(a=datetime.datetime(1905, 7, 9, 5, 54, 10, 973000))] GPU=[Row(a=datetime.datetime(1905, 7, 9, 5, 54, 10, 972000))] equal=False

gerashegalov added a commit to gerashegalov/spark-rapids that referenced this issue Dec 23, 2023
Fixes NVIDIA#10017

Spark and libcudf round are more like rould half away from zero

Reimplementing Math.round from Orc TimestampFromDouble conversion
using floor

Signed-off-by: Gera Shegalov <gera@apache.org>
gerashegalov added a commit to gerashegalov/spark-rapids that referenced this issue Dec 23, 2023
Fixes NVIDIA#10017

Spark and libcudf round are more like rould half away from zero

Reimplementing Math.round from Orc TimestampFromDouble conversion
using floor

Signed-off-by: Gera Shegalov <gera@apache.org>
@gerashegalov
Copy link
Collaborator

gerashegalov commented Dec 26, 2023

The root cause of the issue is the discrepancy between libcudf and java.lang.Math in how x.5 is rounded

 
        // java.lang.Math.round is a true half up, meaning rounding towards positive infinity
        // even for negative numbers
        //   assert(Math.round(-1.5) == -1)
        //   assert(Math.round(1.5) == 2)
        //
        // libcudf, Spark implement it half up in a half away from zero fashion
        // >> sql("SELECT ROUND(-1.5D, 0), ROUND(-0.5D, 0), ROUND(0.5D, 0)").show(truncate=False)
        // +--------------+--------------+-------------+
        // |round(-1.5, 0)|round(-0.5, 0)|round(0.5, 0)|
        // +--------------+--------------+-------------+
        // |-2.0          |-1.0          |1.0          |
        // +--------------+--------------+-------------+
        //
        // Math.round half up can be implemented in terms of floor
        // Math.round(x) = n iff x is in [n-0.5, n+0.5) iff x+0.5 is in [n,n+1) iff floor(x+0.5) = n

gerashegalov added a commit that referenced this issue Dec 26, 2023
Fixes #10017

Spark and libcudf `round` round half away from zero, i.e. half up for positive numbers round(1.5)=2 and half down for negative round(-1.5)=-2 . `Math.round` rounds half up towards positive infinity regardless of the sign: round(1.5)=2.0, round(-1.5)=-1.0

Reimplementing Math.round from Orc TimestampFromDouble conversion
using floor
```
Math.round(x) = n iff x is in [n-0.5, n+0.5) iff x+0.5 is in [n,n+1) iff floor(x+0.5) = n
```
Signed-off-by: Gera Shegalov <gera@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants