You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
test_exact_percentile_groupby_partial_fallback_to_cpu failed in premerge with DATAGEN_SEED=1713928179
Steps/Code to reproduce bug
TEST_PARALLEL=0 DATAGEN_SEED=1713928179 ./integration_tests/run_pyspark_from_build.sh -s -k test_exact_percentile_groupby_partial_fallback_to_cpu
=================================== FAILURES ===================================
_ test_exact_percentile_groupby_partial_fallback_to_cpu[false-final|complete-[('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]] _
data_gen = [('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]
replace_mode = 'final|complete', use_obj_hash_agg = 'false'
@ignore_order
@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning',
'AggregateExpression', 'Alias', 'Cast', 'Literal', 'ProjectExec',
'Percentile')
@pytest.mark.parametrize('data_gen', exact_percentile_groupby_cpu_fallback_data_gen, ids=idfn)
@pytest.mark.parametrize('replace_mode', ['partial', 'final|complete'], ids=idfn)
@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9494')
def test_exact_percentile_groupby_partial_fallback_to_cpu(data_gen, replace_mode, use_obj_hash_agg):
cpu_clz, gpu_clz = ['Percentile'], ['GpuPercentileDefault']
exist_clz, non_exist_clz = [], []
# For aggregations without distinct, Databricks runtime removes the partial Aggregate stage (
# map-side combine). There only exists an AggregateExec in Databricks runtimes. So, we need to
# set the expected exist_classes according to runtime.
if is_databricks_runtime():
if replace_mode == 'partial':
exist_clz, non_exist_clz = cpu_clz, gpu_clz
else:
exist_clz, non_exist_clz = gpu_clz, cpu_clz
else:
exist_clz = cpu_clz + gpu_clz
> assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: gen_df(spark, data_gen).groupby('key').agg(
f.expr('percentile(val, 0.1)'),
f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))'),
f.expr('percentile(val, 0.1, abs(freq))'),
f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))')),
exist_classes=','.join(exist_clz),
non_exist_classes=','.join(non_exist_clz),
conf={'spark.rapids.sql.hashAgg.replaceMode': replace_mode,
'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg}
)
../../src/main/python/hash_aggregate_test.py:1031:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../src/main/python/asserts.py:419: in assert_cpu_and_gpu_are_equal_collect_with_capture
assert_equal(from_cpu, from_gpu)
../../src/main/python/asserts.py:107: in assert_equal
_assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
../../src/main/python/asserts.py:43: in _assert_equal
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
../../src/main/python/asserts.py:36: in _assert_equal
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cpu = None, gpu = 0.0
float_check = <function get_float_check.<locals>.<lambda> at 0x7ff7b804b940>
path = [87, 'percentile(val, 0.1, abs(freq))']
def _assert_equal(cpu, gpu, float_check, path):
t = type(cpu)
if (t is Row):
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
assert cpu.__fields__ == gpu.__fields__, "CPU and GPU row have different fields at {} CPU: {} GPU: {}".format(path, cpu.__fields__, gpu.__fields__)
for field in cpu.__fields__:
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
else:
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is list):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is tuple):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is pytypes.GeneratorType):
index = 0
# generator has no zip :( so we have to do this the hard way
done = False
while not done:
sub_cpu = None
sub_gpu = None
try:
sub_cpu = next(cpu)
except StopIteration:
done = True
try:
sub_gpu = next(gpu)
except StopIteration:
done = True
if done:
assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
else:
_assert_equal(sub_cpu, sub_gpu, float_check, path + [index])
index = index + 1
elif (t is dict):
# The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
# so sort the items to do our best with ignoring the order of dicts
cpu_items = list(cpu.items()).sort(key=_RowCmp)
gpu_items = list(gpu.items()).sort(key=_RowCmp)
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
elif (t is int):
assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
elif (t is float):
if (math.isnan(cpu)):
assert math.isnan(gpu), "GPU and CPU float values are different at {}".format(path)
else:
assert float_check(cpu, gpu), "GPU and CPU float values are different {}".format(path)
elif isinstance(cpu, str):
assert cpu == gpu, "GPU and CPU string values are different at {}".format(path)
elif isinstance(cpu, datetime):
assert cpu == gpu, "GPU and CPU timestamp values are different at {}".format(path)
elif isinstance(cpu, date):
assert cpu == gpu, "GPU and CPU date values are different at {}".format(path)
elif isinstance(cpu, bool):
assert cpu == gpu, "GPU and CPU boolean values are different at {}".format(path)
elif isinstance(cpu, Decimal):
assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path)
elif isinstance(cpu, bytearray):
assert cpu == gpu, "GPU and CPU bytearray values are different at {}".format(path)
elif isinstance(cpu, timedelta):
# Used by interval type DayTimeInterval for Pyspark 3.3.0+
assert cpu == gpu, "GPU and CPU timedelta values are different at {}".format(path)
elif (cpu == None):
> assert cpu == gpu, "GPU and CPU are not both null at {}".format(path)
E AssertionError: GPU and CPU are not both null at [87, 'percentile(val, 0.1, abs(freq))']
../../src/main/python/asserts.py:100: AssertionError
_ test_exact_percentile_groupby_partial_fallback_to_cpu[true-final|complete-[('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]] _
data_gen = [('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]
replace_mode = 'final|complete', use_obj_hash_agg = 'true'
@ignore_order
@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning',
'AggregateExpression', 'Alias', 'Cast', 'Literal', 'ProjectExec',
'Percentile')
@pytest.mark.parametrize('data_gen', exact_percentile_groupby_cpu_fallback_data_gen, ids=idfn)
@pytest.mark.parametrize('replace_mode', ['partial', 'final|complete'], ids=idfn)
@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9494')
def test_exact_percentile_groupby_partial_fallback_to_cpu(data_gen, replace_mode, use_obj_hash_agg):
cpu_clz, gpu_clz = ['Percentile'], ['GpuPercentileDefault']
exist_clz, non_exist_clz = [], []
# For aggregations without distinct, Databricks runtime removes the partial Aggregate stage (
# map-side combine). There only exists an AggregateExec in Databricks runtimes. So, we need to
# set the expected exist_classes according to runtime.
if is_databricks_runtime():
if replace_mode == 'partial':
exist_clz, non_exist_clz = cpu_clz, gpu_clz
else:
exist_clz, non_exist_clz = gpu_clz, cpu_clz
else:
exist_clz = cpu_clz + gpu_clz
> assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: gen_df(spark, data_gen).groupby('key').agg(
f.expr('percentile(val, 0.1)'),
f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))'),
f.expr('percentile(val, 0.1, abs(freq))'),
f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))')),
exist_classes=','.join(exist_clz),
non_exist_classes=','.join(non_exist_clz),
conf={'spark.rapids.sql.hashAgg.replaceMode': replace_mode,
'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg}
)
../../src/main/python/hash_aggregate_test.py:1031:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../src/main/python/asserts.py:419: in assert_cpu_and_gpu_are_equal_collect_with_capture
assert_equal(from_cpu, from_gpu)
../../src/main/python/asserts.py:107: in assert_equal
_assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
../../src/main/python/asserts.py:43: in _assert_equal
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
../../src/main/python/asserts.py:36: in _assert_equal
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cpu = None, gpu = 0.0
float_check = <function get_float_check.<locals>.<lambda> at 0x7ff7c9c5c310>
path = [87, 'percentile(val, 0.1, abs(freq))']
def _assert_equal(cpu, gpu, float_check, path):
t = type(cpu)
if (t is Row):
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
assert cpu.__fields__ == gpu.__fields__, "CPU and GPU row have different fields at {} CPU: {} GPU: {}".format(path, cpu.__fields__, gpu.__fields__)
for field in cpu.__fields__:
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
else:
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is list):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is tuple):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is pytypes.GeneratorType):
index = 0
# generator has no zip :( so we have to do this the hard way
done = False
while not done:
sub_cpu = None
sub_gpu = None
try:
sub_cpu = next(cpu)
except StopIteration:
done = True
try:
sub_gpu = next(gpu)
except StopIteration:
done = True
if done:
assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
else:
_assert_equal(sub_cpu, sub_gpu, float_check, path + [index])
index = index + 1
elif (t is dict):
# The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
# so sort the items to do our best with ignoring the order of dicts
cpu_items = list(cpu.items()).sort(key=_RowCmp)
gpu_items = list(gpu.items()).sort(key=_RowCmp)
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
elif (t is int):
assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
elif (t is float):
if (math.isnan(cpu)):
assert math.isnan(gpu), "GPU and CPU float values are different at {}".format(path)
else:
assert float_check(cpu, gpu), "GPU and CPU float values are different {}".format(path)
elif isinstance(cpu, str):
assert cpu == gpu, "GPU and CPU string values are different at {}".format(path)
elif isinstance(cpu, datetime):
assert cpu == gpu, "GPU and CPU timestamp values are different at {}".format(path)
elif isinstance(cpu, date):
assert cpu == gpu, "GPU and CPU date values are different at {}".format(path)
elif isinstance(cpu, bool):
assert cpu == gpu, "GPU and CPU boolean values are different at {}".format(path)
elif isinstance(cpu, Decimal):
assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path)
elif isinstance(cpu, bytearray):
assert cpu == gpu, "GPU and CPU bytearray values are different at {}".format(path)
elif isinstance(cpu, timedelta):
# Used by interval type DayTimeInterval for Pyspark 3.3.0+
assert cpu == gpu, "GPU and CPU timedelta values are different at {}".format(path)
elif (cpu == None):
> assert cpu == gpu, "GPU and CPU are not both null at {}".format(path)
E AssertionError: GPU and CPU are not both null at [87, 'percentile(val, 0.1, abs(freq))']
../../src/main/python/asserts.py:100: AssertionError
=============================== warnings summary ===============================
../../src/main/python/parquet_testing_test.py:134
/home/haoyangl/spark-rapids/integration_tests/src/main/python/parquet_testing_test.py:134: UserWarning: Skipping parquet-testing tests. Unable to locate data in any of: /home/haoyangl/spark-rapids/integration_tests/src/test/resources/parquet-testing/data/*.parquet, /home/haoyangl/spark-rapids/integration_tests/src/test/resources/parquet-testing/bad_data/*.parquet, /home/haoyangl/spark-rapids/thirdparty/parquet-testing/data/*.parquet, /home/haoyangl/spark-rapids/thirdparty/parquet-testing/bad_data/*.parquet
warnings.warn("Skipping parquet-testing tests. Unable to locate data in any of: " + locations)
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
- generated xml file: /home/haoyangl/spark-rapids/integration_tests/target/run_dir-20240424153239-vaes/TEST-pytest-1713943959273627587.xml -
=========================== short test summary info ============================
FAILED ../../src/main/python/hash_aggregate_test.py::test_exact_percentile_groupby_partial_fallback_to_cpu[false-final|complete-[('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]][DATAGEN_SEED=1713928179, TZ=UTC, IGNORE_ORDER, ALLOW_NON_GPU(ObjectHashAggregateExec,SortAggregateExec,ShuffleExchangeExec,HashPartitioning,AggregateExpression,Alias,Cast,Literal,ProjectExec,Percentile)]
FAILED ../../src/main/python/hash_aggregate_test.py::test_exact_percentile_groupby_partial_fallback_to_cpu[true-final|complete-[('key', RepeatSeq(Integer)), ('val', Integer), ('freq', Long(not_null))]][DATAGEN_SEED=1713928179, TZ=UTC, INJECT_OOM, IGNORE_ORDER, ALLOW_NON_GPU(ObjectHashAggregateExec,SortAggregateExec,ShuffleExchangeExec,HashPartitioning,AggregateExpression,Alias,Cast,Literal,ProjectExec,Percentile)]
========== 2 failed, 6 passed, 27053 deselected, 1 warning in 16.90s ===========
Expected behavior
It should pass. Set datagen seed to a fixed value first.
The text was updated successfully, but these errors were encountered:
Ok, looks like a proper bug. Likely in libcudfspark-rapids-jni. I've identified the case. I'll narrow it down in the CUDFspark-rapids-jnipercentile code.
Describe the bug
test_exact_percentile_groupby_partial_fallback_to_cpu failed in premerge with DATAGEN_SEED=1713928179
Steps/Code to reproduce bug
Expected behavior
It should pass. Set datagen seed to a fixed value first.
The text was updated successfully, but these errors were encountered: