Skip to content

Commit

Permalink
Group-by aggregation based optimization for UNBOUNDED collect_set w…
Browse files Browse the repository at this point in the history
…indow function (#10248)

Signed-off-by: MithunR <mythrocks@gmail.com>
  • Loading branch information
mythrocks authored Feb 12, 2024
1 parent 80f5670 commit a76f5b6
Show file tree
Hide file tree
Showing 4 changed files with 565 additions and 35 deletions.
99 changes: 99 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,105 @@ def test_window_aggs_for_rows_collect_set():
conf={'spark.rapids.sql.window.collectSet.enabled': True})


@ignore_order(local=True)
@allow_non_gpu(*non_utc_allow)
def test_window_aggs_for_fully_unbounded_partitioned_collect_set():
"""
Test that confirms that `collect_set` window aggregation, when run over UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING
runs through the `GpuUnboundedToUnboundedAggWindowExec` (which optimizes it to run via sort-based group-by
aggregations).
Note: This optimization only holds for the partitioned case. Unpartitioned windows are not supported yet.
"""
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, _gen_data_for_collect_set, length=2048),
"window_collect_table",
'''
select a, b,
sort_array(cc_bool),
sort_array(cc_int),
sort_array(cc_long),
sort_array(cc_short),
sort_array(cc_date),
sort_array(cc_ts),
sort_array(cc_byte),
sort_array(cc_str),
sort_array(cc_float),
sort_array(cc_double),
sort_array(cc_decimal_32),
sort_array(cc_decimal_64),
sort_array(cc_decimal_128),
sort_array(cc_fp_nan)
from (
select a, b,
collect_set(c_bool) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_bool,
collect_set(c_int) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_int,
collect_set(c_long) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_long,
collect_set(c_short) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_short,
collect_set(c_date) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_date,
collect_set(c_timestamp) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_ts,
collect_set(c_byte) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_byte,
collect_set(c_string) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_str,
collect_set(c_float) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_float,
collect_set(c_double) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_double,
collect_set(c_decimal_32) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_decimal_32,
collect_set(c_decimal_64) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_decimal_64,
collect_set(c_decimal_128) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_decimal_128,
collect_set(c_fp_nan) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_fp_nan
from window_collect_table
) t
''',
conf={'spark.rapids.sql.window.collectSet.enabled': True,
'spark.rapids.sql.window.unboundedAgg.enabled': True,
'spark.sql.parquet.int96RebaseModeInWrite': 'LEGACY'},
validate_execs_in_gpu_plan=['GpuUnboundedToUnboundedAggWindowExec'])


@ignore_order(local=True)
@allow_non_gpu(*non_utc_allow)
def test_window_aggs_for_fully_unbounded_unpartitioned_collect_set():
"""
Test that confirms that `collect_set` window aggregation, when run over UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING
falls back to GpuWindowExec, if no partition spec is specified.
"""
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, _gen_data_for_collect_set, length=2048),
"window_collect_table",
'''
select a, b,
sort_array(cc_int),
sort_array(cc_long),
sort_array(cc_short)
from (
select a, b,
collect_set(c_int) over
(order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_int,
collect_set(c_long) over
(order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_long,
collect_set(c_short) over
(order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_short
from window_collect_table
) t
''',
conf={'spark.rapids.sql.window.collectSet.enabled': True,
'spark.rapids.sql.window.unboundedAgg.enabled': True,
'spark.sql.parquet.int96RebaseModeInWrite': 'LEGACY'},
validate_execs_in_gpu_plan=['GpuWindowExec'])


# Note, using sort_array() on the CPU, because sort_array() does not yet
# support sorting certain nested/arbitrary types on the GPU
# See https://github.com/NVIDIA/spark-rapids/issues/3715
Expand Down
Loading

0 comments on commit a76f5b6

Please sign in to comment.