Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group-by aggregation based optimization for UNBOUNDED collect_set window function #10248

Merged
merged 59 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
f545a29
Add in framework for unbounded to unbounded window agg optimization
revans2 Jan 5, 2024
9594f43
Fix copyright
revans2 Jan 5, 2024
97eced3
Update the GpuWindowExecMeta to support UnboundedToUnboundedAgg
revans2 Jan 5, 2024
4873937
Fix some bugs
revans2 Jan 5, 2024
f825ed8
Add in basic repeat code
revans2 Jan 9, 2024
6325cf2
Fixed a few issues
revans2 Jan 11, 2024
d1f2e59
Added in some more options to try and slice the agg data
revans2 Jan 12, 2024
16720b1
Finished putting these pieces back together again
revans2 Jan 12, 2024
2e4acc8
Initial stab: Unbounded windows with grouped aggs.
mythrocks Jan 13, 2024
54977de
DEBUG
revans2 Jan 16, 2024
3ba57b4
Merge branch 'branch-24.02' into agg_for_window
revans2 Jan 16, 2024
c9a66f8
Aggregating iterator is roughly working.
mythrocks Jan 17, 2024
746035a
Merge remote-tracking branch 'revans2/agg_for_window' into revans-agg…
mythrocks Jan 17, 2024
44a4159
Step
revans2 Jan 17, 2024
57cc117
Some bug fixes
revans2 Jan 17, 2024
bf9057f
Update tests
revans2 Jan 17, 2024
e81385a
Copyright
revans2 Jan 17, 2024
d290594
Some fixes for JDK11 and Scala 2.13
revans2 Jan 17, 2024
d38501d
Hooked up 2nd and 3rd iterator.
mythrocks Jan 18, 2024
70d0643
Able to calculate slice points.
mythrocks Jan 19, 2024
0ddaab7
Partitioning works.
mythrocks Jan 20, 2024
c08cded
Refactored: Moved util functions to single location.
mythrocks Jan 21, 2024
050ea43
Cleaned up PartitionedFirstPass after epiphany.
mythrocks Jan 22, 2024
e8f5e50
Merging works.
mythrocks Jan 22, 2024
70c4d82
Removed debug statements.
mythrocks Jan 22, 2024
b8aa6f4
Added opTime.
mythrocks Jan 22, 2024
852f242
Registered cleanup for shutdown.
mythrocks Jan 23, 2024
7ba4378
Document why the COUNT aggregate needs to be scaled up to INT64.
mythrocks Jan 23, 2024
02b0a93
Moved some variables to private.
mythrocks Jan 24, 2024
031ec09
Added a 1s column, to account for GpuCount(1).
mythrocks Jan 26, 2024
7d968e1
Added test for unbounded case.
mythrocks Jan 26, 2024
b873b82
Refactor: Moved processing to separate function, to make it retry-able.
mythrocks Jan 26, 2024
7f6d897
Fixed leak in merged columns.
mythrocks Jan 26, 2024
94d9120
Fixed leak in preProcess.
mythrocks Jan 27, 2024
bbf3c6c
Fixed leak in upscale.
mythrocks Jan 27, 2024
c278c3b
Fixed leak in removeGroupColumns.
mythrocks Jan 27, 2024
224caeb
Added retriability for second iterator.
mythrocks Jan 29, 2024
7703316
Fixed ordering (nulls-first).
mythrocks Jan 30, 2024
a86ce83
Fixed leak in groupByAggregate.
mythrocks Jan 30, 2024
fbf246e
Fixed leak in sliceAndMakeSpillable.
mythrocks Jan 30, 2024
1d0e5af
Removed code repetition in getTableSlice.
mythrocks Jan 31, 2024
1b99f3b
Removed code marker.
mythrocks Jan 31, 2024
c776c16
Disable unbounded-agg implementation when there are no groups.
mythrocks Jan 31, 2024
b56ac4f
Added tests for collect_set() fallback to GpuWindowExec.
mythrocks Jan 31, 2024
35d307a
Fixed leak in groupByMerge.
mythrocks Feb 2, 2024
223e01b
Fixed memory leak in concat.
mythrocks Feb 2, 2024
617d77f
Clarified todos.
mythrocks Feb 2, 2024
02bfa0b
Merge branch 'branch-24.04' into agg_for_window
revans2 Feb 2, 2024
93d8a3b
Merge remote-tracking branch 'origin/branch-24.04' into revans-agg-fo…
mythrocks Feb 2, 2024
04ce68a
Fixed compile errors in GpuUnboundedToUnboundedAggWindowSuite.
mythrocks Feb 2, 2024
1f83174
Merge remote-tracking branch 'revans2/agg_for_window' into revans-agg…
mythrocks Feb 2, 2024
67bd509
Removed code marker.
mythrocks Feb 2, 2024
f517162
Docs for the utility functions.
mythrocks Feb 2, 2024
861851b
Fixes for Scala 2.13.
mythrocks Feb 2, 2024
482fa82
Fix verify errors.
mythrocks Feb 3, 2024
81099dc
Fix check for grouping columns.
mythrocks Feb 5, 2024
b122cbc
Review fixes: Clarifying docs, case class.
mythrocks Feb 6, 2024
6e674d4
Additional comment.
mythrocks Feb 6, 2024
e59a1b0
Merge remote-tracking branch 'origin/branch-24.04' into revans-agg-fo…
mythrocks Feb 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,105 @@ def test_window_aggs_for_rows_collect_set():
conf={'spark.rapids.sql.window.collectSet.enabled': True})


@ignore_order(local=True)
@allow_non_gpu(*non_utc_allow)
def test_window_aggs_for_fully_unbounded_partitioned_collect_set():
"""
Test that confirms that `collect_set` window aggregation, when run over UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING
runs through the `GpuUnboundedToUnboundedAggWindowExec` (which optimizes it to run via sort-based group-by
aggregations).
Note: This optimization only holds for the partitioned case. Unpartitioned windows are not supported yet.
"""
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, _gen_data_for_collect_set, length=2048),
"window_collect_table",
'''
select a, b,
sort_array(cc_bool),
sort_array(cc_int),
sort_array(cc_long),
sort_array(cc_short),
sort_array(cc_date),
sort_array(cc_ts),
sort_array(cc_byte),
sort_array(cc_str),
sort_array(cc_float),
sort_array(cc_double),
sort_array(cc_decimal_32),
sort_array(cc_decimal_64),
sort_array(cc_decimal_128),
sort_array(cc_fp_nan)
from (
select a, b,
collect_set(c_bool) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_bool,
collect_set(c_int) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_int,
collect_set(c_long) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_long,
collect_set(c_short) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_short,
collect_set(c_date) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_date,
collect_set(c_timestamp) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_ts,
collect_set(c_byte) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_byte,
collect_set(c_string) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_str,
collect_set(c_float) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_float,
collect_set(c_double) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_double,
collect_set(c_decimal_32) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_decimal_32,
collect_set(c_decimal_64) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_decimal_64,
collect_set(c_decimal_128) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_decimal_128,
collect_set(c_fp_nan) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_fp_nan
from window_collect_table
) t
''',
conf={'spark.rapids.sql.window.collectSet.enabled': True,
'spark.rapids.sql.window.unboundedAgg.enabled': True,
'spark.sql.parquet.int96RebaseModeInWrite': 'LEGACY'},
validate_execs_in_gpu_plan=['GpuUnboundedToUnboundedAggWindowExec'])


@ignore_order(local=True)
@allow_non_gpu(*non_utc_allow)
def test_window_aggs_for_fully_unbounded_unpartitioned_collect_set():
"""
Test that confirms that `collect_set` window aggregation, when run over UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING
falls back to GpuWindowExec, if no partition spec is specified.
"""
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, _gen_data_for_collect_set, length=2048),
"window_collect_table",
'''
select a, b,
sort_array(cc_int),
sort_array(cc_long),
sort_array(cc_short)
from (
select a, b,
collect_set(c_int) over
(order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_int,
collect_set(c_long) over
(order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_long,
collect_set(c_short) over
(order by b,c_int rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as cc_short
from window_collect_table
) t
''',
conf={'spark.rapids.sql.window.collectSet.enabled': True,
'spark.rapids.sql.window.unboundedAgg.enabled': True,
'spark.sql.parquet.int96RebaseModeInWrite': 'LEGACY'},
validate_execs_in_gpu_plan=['GpuWindowExec'])


# Note, using sort_array() on the CPU, because sort_array() does not yet
# support sorting certain nested/arbitrary types on the GPU
# See https://github.com/NVIDIA/spark-rapids/issues/3715
Expand Down
Loading
Loading