Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] test_exact_percentile_groupby_partial_fallback_to_cpu failed with DATAGEN_SEED=1713928179 #10738

Closed
thirtiseven opened this issue Apr 24, 2024 · 4 comments · Fixed by #10795
Assignees
Labels
bug Something isn't working

Comments

@thirtiseven
Copy link
Collaborator

Describe the bug
test_exact_percentile_groupby_partial_fallback_to_cpu failed in premerge with DATAGEN_SEED=1713928179

Steps/Code to reproduce bug

TEST_PARALLEL=0 DATAGEN_SEED=1713928179 ./integration_tests/run_pyspark_from_build.sh -s -k test_exact_percentile_groupby_partial_fallback_to_cpu

=================================== FAILURES ===================================
_ test_exact_percentile_groupby_partial_fallback_to_cpu[false-final|complete-[('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]] _

data_gen = [('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]
replace_mode = 'final|complete', use_obj_hash_agg = 'false'

    @ignore_order
    @allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning',
                   'AggregateExpression', 'Alias', 'Cast', 'Literal', 'ProjectExec',
                   'Percentile')
    @pytest.mark.parametrize('data_gen', exact_percentile_groupby_cpu_fallback_data_gen, ids=idfn)
    @pytest.mark.parametrize('replace_mode', ['partial', 'final|complete'], ids=idfn)
    @pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
    @pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9494')
    def test_exact_percentile_groupby_partial_fallback_to_cpu(data_gen, replace_mode, use_obj_hash_agg):
        cpu_clz, gpu_clz = ['Percentile'], ['GpuPercentileDefault']
        exist_clz, non_exist_clz = [], []
        # For aggregations without distinct, Databricks runtime removes the partial Aggregate stage (
        # map-side combine). There only exists an AggregateExec in Databricks runtimes. So, we need to
        # set the expected exist_classes according to runtime.
        if is_databricks_runtime():
            if replace_mode == 'partial':
                exist_clz, non_exist_clz = cpu_clz, gpu_clz
            else:
                exist_clz, non_exist_clz = gpu_clz, cpu_clz
        else:
            exist_clz = cpu_clz + gpu_clz

>       assert_cpu_and_gpu_are_equal_collect_with_capture(
            lambda spark: gen_df(spark, data_gen).groupby('key').agg(
                f.expr('percentile(val, 0.1)'),
                f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))'),
                f.expr('percentile(val, 0.1, abs(freq))'),
                f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))')),
            exist_classes=','.join(exist_clz),
            non_exist_classes=','.join(non_exist_clz),
            conf={'spark.rapids.sql.hashAgg.replaceMode': replace_mode,
                  'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg}
        )

../../src/main/python/hash_aggregate_test.py:1031:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../src/main/python/asserts.py:419: in assert_cpu_and_gpu_are_equal_collect_with_capture
    assert_equal(from_cpu, from_gpu)
../../src/main/python/asserts.py:107: in assert_equal
    _assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
../../src/main/python/asserts.py:43: in _assert_equal
    _assert_equal(cpu[index], gpu[index], float_check, path + [index])
../../src/main/python/asserts.py:36: in _assert_equal
    _assert_equal(cpu[field], gpu[field], float_check, path + [field])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

cpu = None, gpu = 0.0
float_check = <function get_float_check.<locals>.<lambda> at 0x7ff7b804b940>
path = [87, 'percentile(val, 0.1, abs(freq))']

    def _assert_equal(cpu, gpu, float_check, path):
        t = type(cpu)
        if (t is Row):
            assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
                assert cpu.__fields__ == gpu.__fields__, "CPU and GPU row have different fields at {} CPU: {} GPU: {}".format(path, cpu.__fields__, gpu.__fields__)
                for field in cpu.__fields__:
                    _assert_equal(cpu[field], gpu[field], float_check, path + [field])
            else:
                for index in range(len(cpu)):
                    _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is list):
            assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            for index in range(len(cpu)):
                _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is tuple):
            assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            for index in range(len(cpu)):
                _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is pytypes.GeneratorType):
            index = 0
            # generator has no zip :( so we have to do this the hard way
            done = False
            while not done:
                sub_cpu = None
                sub_gpu = None
                try:
                    sub_cpu = next(cpu)
                except StopIteration:
                    done = True

                try:
                    sub_gpu = next(gpu)
                except StopIteration:
                    done = True

                if done:
                    assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
                else:
                    _assert_equal(sub_cpu, sub_gpu, float_check, path + [index])

                index = index + 1
        elif (t is dict):
            # The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
            # so sort the items to do our best with ignoring the order of dicts
            cpu_items = list(cpu.items()).sort(key=_RowCmp)
            gpu_items = list(gpu.items()).sort(key=_RowCmp)
            _assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
        elif (t is int):
            assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
        elif (t is float):
            if (math.isnan(cpu)):
                assert math.isnan(gpu), "GPU and CPU float values are different at {}".format(path)
            else:
                assert float_check(cpu, gpu), "GPU and CPU float values are different {}".format(path)
        elif isinstance(cpu, str):
            assert cpu == gpu, "GPU and CPU string values are different at {}".format(path)
        elif isinstance(cpu, datetime):
            assert cpu == gpu, "GPU and CPU timestamp values are different at {}".format(path)
        elif isinstance(cpu, date):
            assert cpu == gpu, "GPU and CPU date values are different at {}".format(path)
        elif isinstance(cpu, bool):
            assert cpu == gpu, "GPU and CPU boolean values are different at {}".format(path)
        elif isinstance(cpu, Decimal):
            assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path)
        elif isinstance(cpu, bytearray):
            assert cpu == gpu, "GPU and CPU bytearray values are different at {}".format(path)
        elif isinstance(cpu, timedelta):
            # Used by interval type DayTimeInterval for Pyspark 3.3.0+
            assert cpu == gpu, "GPU and CPU timedelta values are different at {}".format(path)
        elif (cpu == None):
>           assert cpu == gpu, "GPU and CPU are not both null at {}".format(path)
E           AssertionError: GPU and CPU are not both null at [87, 'percentile(val, 0.1, abs(freq))']

../../src/main/python/asserts.py:100: AssertionError
_ test_exact_percentile_groupby_partial_fallback_to_cpu[true-final|complete-[('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]] _

data_gen = [('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]
replace_mode = 'final|complete', use_obj_hash_agg = 'true'

    @ignore_order
    @allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning',
                   'AggregateExpression', 'Alias', 'Cast', 'Literal', 'ProjectExec',
                   'Percentile')
    @pytest.mark.parametrize('data_gen', exact_percentile_groupby_cpu_fallback_data_gen, ids=idfn)
    @pytest.mark.parametrize('replace_mode', ['partial', 'final|complete'], ids=idfn)
    @pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
    @pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9494')
    def test_exact_percentile_groupby_partial_fallback_to_cpu(data_gen, replace_mode, use_obj_hash_agg):
        cpu_clz, gpu_clz = ['Percentile'], ['GpuPercentileDefault']
        exist_clz, non_exist_clz = [], []
        # For aggregations without distinct, Databricks runtime removes the partial Aggregate stage (
        # map-side combine). There only exists an AggregateExec in Databricks runtimes. So, we need to
        # set the expected exist_classes according to runtime.
        if is_databricks_runtime():
            if replace_mode == 'partial':
                exist_clz, non_exist_clz = cpu_clz, gpu_clz
            else:
                exist_clz, non_exist_clz = gpu_clz, cpu_clz
        else:
            exist_clz = cpu_clz + gpu_clz

>       assert_cpu_and_gpu_are_equal_collect_with_capture(
            lambda spark: gen_df(spark, data_gen).groupby('key').agg(
                f.expr('percentile(val, 0.1)'),
                f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))'),
                f.expr('percentile(val, 0.1, abs(freq))'),
                f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))')),
            exist_classes=','.join(exist_clz),
            non_exist_classes=','.join(non_exist_clz),
            conf={'spark.rapids.sql.hashAgg.replaceMode': replace_mode,
                  'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg}
        )

../../src/main/python/hash_aggregate_test.py:1031:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../src/main/python/asserts.py:419: in assert_cpu_and_gpu_are_equal_collect_with_capture
    assert_equal(from_cpu, from_gpu)
../../src/main/python/asserts.py:107: in assert_equal
    _assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
../../src/main/python/asserts.py:43: in _assert_equal
    _assert_equal(cpu[index], gpu[index], float_check, path + [index])
../../src/main/python/asserts.py:36: in _assert_equal
    _assert_equal(cpu[field], gpu[field], float_check, path + [field])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

cpu = None, gpu = 0.0
float_check = <function get_float_check.<locals>.<lambda> at 0x7ff7c9c5c310>
path = [87, 'percentile(val, 0.1, abs(freq))']

    def _assert_equal(cpu, gpu, float_check, path):
        t = type(cpu)
        if (t is Row):
            assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
                assert cpu.__fields__ == gpu.__fields__, "CPU and GPU row have different fields at {} CPU: {} GPU: {}".format(path, cpu.__fields__, gpu.__fields__)
                for field in cpu.__fields__:
                    _assert_equal(cpu[field], gpu[field], float_check, path + [field])
            else:
                for index in range(len(cpu)):
                    _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is list):
            assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            for index in range(len(cpu)):
                _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is tuple):
            assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
            for index in range(len(cpu)):
                _assert_equal(cpu[index], gpu[index], float_check, path + [index])
        elif (t is pytypes.GeneratorType):
            index = 0
            # generator has no zip :( so we have to do this the hard way
            done = False
            while not done:
                sub_cpu = None
                sub_gpu = None
                try:
                    sub_cpu = next(cpu)
                except StopIteration:
                    done = True

                try:
                    sub_gpu = next(gpu)
                except StopIteration:
                    done = True

                if done:
                    assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
                else:
                    _assert_equal(sub_cpu, sub_gpu, float_check, path + [index])

                index = index + 1
        elif (t is dict):
            # The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
            # so sort the items to do our best with ignoring the order of dicts
            cpu_items = list(cpu.items()).sort(key=_RowCmp)
            gpu_items = list(gpu.items()).sort(key=_RowCmp)
            _assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
        elif (t is int):
            assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
        elif (t is float):
            if (math.isnan(cpu)):
                assert math.isnan(gpu), "GPU and CPU float values are different at {}".format(path)
            else:
                assert float_check(cpu, gpu), "GPU and CPU float values are different {}".format(path)
        elif isinstance(cpu, str):
            assert cpu == gpu, "GPU and CPU string values are different at {}".format(path)
        elif isinstance(cpu, datetime):
            assert cpu == gpu, "GPU and CPU timestamp values are different at {}".format(path)
        elif isinstance(cpu, date):
            assert cpu == gpu, "GPU and CPU date values are different at {}".format(path)
        elif isinstance(cpu, bool):
            assert cpu == gpu, "GPU and CPU boolean values are different at {}".format(path)
        elif isinstance(cpu, Decimal):
            assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path)
        elif isinstance(cpu, bytearray):
            assert cpu == gpu, "GPU and CPU bytearray values are different at {}".format(path)
        elif isinstance(cpu, timedelta):
            # Used by interval type DayTimeInterval for Pyspark 3.3.0+
            assert cpu == gpu, "GPU and CPU timedelta values are different at {}".format(path)
        elif (cpu == None):
>           assert cpu == gpu, "GPU and CPU are not both null at {}".format(path)
E           AssertionError: GPU and CPU are not both null at [87, 'percentile(val, 0.1, abs(freq))']

../../src/main/python/asserts.py:100: AssertionError
=============================== warnings summary ===============================
../../src/main/python/parquet_testing_test.py:134
  /home/haoyangl/spark-rapids/integration_tests/src/main/python/parquet_testing_test.py:134: UserWarning: Skipping parquet-testing tests. Unable to locate data in any of: /home/haoyangl/spark-rapids/integration_tests/src/test/resources/parquet-testing/data/*.parquet, /home/haoyangl/spark-rapids/integration_tests/src/test/resources/parquet-testing/bad_data/*.parquet, /home/haoyangl/spark-rapids/thirdparty/parquet-testing/data/*.parquet, /home/haoyangl/spark-rapids/thirdparty/parquet-testing/bad_data/*.parquet
    warnings.warn("Skipping parquet-testing tests. Unable to locate data in any of: " + locations)

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
- generated xml file: /home/haoyangl/spark-rapids/integration_tests/target/run_dir-20240424153239-vaes/TEST-pytest-1713943959273627587.xml -
=========================== short test summary info ============================
FAILED ../../src/main/python/hash_aggregate_test.py::test_exact_percentile_groupby_partial_fallback_to_cpu[false-final|complete-[('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]][DATAGEN_SEED=1713928179, TZ=UTC, IGNORE_ORDER, ALLOW_NON_GPU(ObjectHashAggregateExec,SortAggregateExec,ShuffleExchangeExec,HashPartitioning,AggregateExpression,Alias,Cast,Literal,ProjectExec,Percentile)]
FAILED ../../src/main/python/hash_aggregate_test.py::test_exact_percentile_groupby_partial_fallback_to_cpu[true-final|complete-[('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]][DATAGEN_SEED=1713928179, TZ=UTC, INJECT_OOM, IGNORE_ORDER, ALLOW_NON_GPU(ObjectHashAggregateExec,SortAggregateExec,ShuffleExchangeExec,HashPartitioning,AggregateExpression,Alias,Cast,Literal,ProjectExec,Percentile)]
========== 2 failed, 6 passed, 27053 deselected, 1 warning in 16.90s ===========

Expected behavior
It should pass. Set datagen seed to a fixed value first.

@mattahrens
Copy link
Collaborator

Is this a dup of #10719?

@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Apr 30, 2024
@mythrocks
Copy link
Collaborator

Is this a dup of #10719?

Kinda. I think @thirtiseven's observation is that the fixed seed from #10739 should have allowed this test to pass.

@mythrocks
Copy link
Collaborator

mythrocks commented May 8, 2024

Ok, looks like a proper bug. Likely in libcudf spark-rapids-jni. I've identified the case. I'll narrow it down in the CUDF spark-rapids-jni percentile code.

@mythrocks
Copy link
Collaborator

I have raised NVIDIA/spark-rapids-jni#2029. It looks like a bug in how percentiles are derived from the constructed histograms, to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants