Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GpuCollectList and GpuCollectSet as TypedImperativeAggregate #2971

Merged
merged 17 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Year"></a>spark.rapids.sql.expression.Year|`year`|Returns the year from a date or timestamp|true|None|
<a name="sql.expression.AggregateExpression"></a>spark.rapids.sql.expression.AggregateExpression| |Aggregate expression|true|None|
<a name="sql.expression.Average"></a>spark.rapids.sql.expression.Average|`avg`, `mean`|Average aggregate operator|true|None|
<a name="sql.expression.CollectList"></a>spark.rapids.sql.expression.CollectList|`collect_list`|Collect a list of non-unique elements, only supported in rolling window in current.|true|None|
<a name="sql.expression.CollectSet"></a>spark.rapids.sql.expression.CollectSet|`collect_set`|Collect a set of unique elements, only supported in rolling window in current.|true|None|
<a name="sql.expression.CollectList"></a>spark.rapids.sql.expression.CollectList|`collect_list`|Collect a list of non-unique elements, not supported in reduction.|true|None|
<a name="sql.expression.CollectSet"></a>spark.rapids.sql.expression.CollectSet|`collect_set`|Collect a set of unique elements, not supported in reduction.|true|None|
<a name="sql.expression.Count"></a>spark.rapids.sql.expression.Count|`count`|Count aggregate operator|true|None|
<a name="sql.expression.First"></a>spark.rapids.sql.expression.First|`first_value`, `first`|first aggregate operator|true|None|
<a name="sql.expression.Last"></a>spark.rapids.sql.expression.Last|`last`, `last_value`|last aggregate operator|true|None|
Expand Down Expand Up @@ -327,6 +327,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.UnionExec"></a>spark.rapids.sql.exec.UnionExec|The backend for the union operator|true|None|
<a name="sql.exec.CustomShuffleReaderExec"></a>spark.rapids.sql.exec.CustomShuffleReaderExec|A wrapper of shuffle query stage|true|None|
<a name="sql.exec.HashAggregateExec"></a>spark.rapids.sql.exec.HashAggregateExec|The backend for hash based aggregations|true|None|
<a name="sql.exec.ObjectHashAggregateExec"></a>spark.rapids.sql.exec.ObjectHashAggregateExec|The backend for hash based aggregations supporting TypedImperativeAggregate functions|true|None|
<a name="sql.exec.SortAggregateExec"></a>spark.rapids.sql.exec.SortAggregateExec|The backend for sort based aggregations|true|None|
<a name="sql.exec.DataWritingCommandExec"></a>spark.rapids.sql.exec.DataWritingCommandExec|Writing data|true|None|
<a name="sql.exec.BatchScanExec"></a>spark.rapids.sql.exec.BatchScanExec|The backend for most file input|true|None|
Expand Down
137 changes: 80 additions & 57 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ Accelerator supports are described below.
<th>UDT</th>
</tr>
<tr>
<td>SortAggregateExec</td>
<td>The backend for sort based aggregations</td>
<td>ObjectHashAggregateExec</td>
<td>The backend for hash based aggregations supporting TypedImperativeAggregate functions</td>
<td>None</td>
<td>S</td>
<td>S</td>
Expand All @@ -512,10 +512,33 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (not allowed for grouping expressions; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (not allowed for grouping expressions; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (not allowed for grouping expressions; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS (missing nested BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
</tr>
<tr>
<td>SortAggregateExec</td>
<td>The backend for sort based aggregations</td>
<td>None</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (not allowed for grouping expressions; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (not allowed for grouping expressions; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (not allowed for grouping expressions; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>DataWritingCommandExec</td>
Expand Down Expand Up @@ -817,29 +840,6 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<td>WindowInPandasExec</td>
<td>The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. For now it only supports row based window frame.</td>
<td>This is disabled by default because it only supports row based frame for now</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th>Executor</th>
<th>Description</th>
<th>Notes</th>
Expand All @@ -863,6 +863,29 @@ Accelerator supports are described below.
<th>UDT</th>
</tr>
<tr>
<td>WindowInPandasExec</td>
<td>The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. For now it only supports row based window frame.</td>
<td>This is disabled by default because it only supports row based frame for now</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>WindowExec</td>
<td>Window-operator backend</td>
<td>None</td>
Expand Down Expand Up @@ -18305,9 +18328,9 @@ Accelerator support is described below.
<tr>
<td rowSpan="6">CollectList</td>
<td rowSpan="6">`collect_list`</td>
<td rowSpan="6">Collect a list of non-unique elements, only supported in rolling window in current.</td>
<td rowSpan="6">Collect a list of non-unique elements, not supported in reduction.</td>
<td rowSpan="6">None</td>
<td rowSpan="2">aggregation</td>
<td rowSpan="2">reduction</td>
<td>input</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -18350,25 +18373,25 @@ Accelerator support is described below.
<td> </td>
</tr>
<tr>
<td rowSpan="2">reduction</td>
<td rowSpan="2">aggregation</td>
<td>input</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -18387,7 +18410,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -18438,9 +18461,9 @@ Accelerator support is described below.
<tr>
<td rowSpan="6">CollectSet</td>
<td rowSpan="6">`collect_set`</td>
<td rowSpan="6">Collect a set of unique elements, only supported in rolling window in current.</td>
<td rowSpan="6">Collect a set of unique elements, not supported in reduction.</td>
<td rowSpan="6">None</td>
<td rowSpan="2">aggregation</td>
<td rowSpan="2">reduction</td>
<td>input</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -18483,19 +18506,19 @@ Accelerator support is described below.
<td> </td>
</tr>
<tr>
<td rowSpan="2">reduction</td>
<td rowSpan="2">aggregation</td>
<td>input</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -18520,7 +18543,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
119 changes: 119 additions & 0 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,125 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf):
.agg(f.sum('c')),
conf=conf)

_repeat_agg_column_for_collect_op = [
RepeatSeqGen(BooleanGen(), length=15),
RepeatSeqGen(IntegerGen(), length=15),
RepeatSeqGen(LongGen(), length=15),
RepeatSeqGen(ShortGen(), length=15),
RepeatSeqGen(DateGen(), length=15),
RepeatSeqGen(TimestampGen(), length=15),
RepeatSeqGen(ByteGen(), length=15),
RepeatSeqGen(StringGen(), length=15),
RepeatSeqGen(FloatGen(), length=15),
RepeatSeqGen(DoubleGen(), length=15),
RepeatSeqGen(DecimalGen(precision=8, scale=3), length=15),
# case to verify the NAN_UNEQUAL strategy
RepeatSeqGen(FloatGen().with_special_case(math.nan, 200.0), length=5),
]

_gen_data_for_collect_op = [[
('a', RepeatSeqGen(LongGen(), length=20)),
('b', value_gen),
('c', LongRangeGen())] for value_gen in _repeat_agg_column_for_collect_op
]

@approximate_float
@ignore_order(local=True)
@incompat
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
@pytest.mark.parametrize('use_obj_hash_agg', [True, False], ids=idfn)
def test_hash_groupby_collect_list(data_gen, use_obj_hash_agg):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
.groupby('a')
.agg(f.sort_array(f.collect_list('b')), f.count('b')),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems every collect_* function is wrapped in a sort_array. Is that on purpose? Could a comment be added somewhere on why? Especially because we have @ignore_order so I was curious.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because @ignore_order only ensures the order between rows, while in these cases we also need to take care of the orders of each Array produced by collect ops.
And I added this comment to the test file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I’d extend ignore_order to do the sorting after the collect for the array case. The reason being you would be able to test the aggregate in another way a user is likely to invoke.

I’m ok if you want to do that as a follow up also.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be fine with an @ignore_array_order or something like that. I'd rather not have @ignore_order cover both.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But do it as a follow on issue if we do it at all.

conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower()})

@approximate_float
@ignore_order(local=True)
@incompat
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
def test_hash_groupby_collect_set(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.sort_array(f.collect_set('b')), f.count('b')))

@approximate_float
@ignore_order(local=True)
@incompat
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
def test_hash_groupby_collect_with_single_distinct(data_gen):
# test collect_ops with other distinct aggregations
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.sort_array(f.collect_list('b')),
f.sort_array(f.collect_set('b')),
f.countDistinct('c'),
f.count('c')))

# test distinct collect with other aggregations
revans2 marked this conversation as resolved.
Show resolved Hide resolved
sql = """select a,
sort_array(collect_list(distinct b)),
sort_array(collect_set(b)),
count(distinct b),
count(c)
from tbl group by a"""
assert_gpu_and_cpu_are_equal_sql(
df_fun=lambda spark: gen_df(spark, data_gen, length=100),
table_name="tbl", sql=sql)

# Queries with multiple distinct aggregations will fallback to CPU if they also contain
# collect aggregations. Because Spark optimizer will insert expressions like `If` and `First`
# when rewriting distinct aggregates, while `GpuIf` and `GpuFirst` doesn't support the datatype
# of collect aggregations (ArrayType).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you dd in references to the issue to support these for GpuIF and GpuFirst? If they do not exist, then could you please file them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I filed the issue.

@approximate_float
@ignore_order(local=True)
@allow_non_gpu('SortAggregateExec',
'SortArray', 'Alias', 'Literal', 'First', 'If', 'EqualTo', 'Count',
'CollectList', 'CollectSet', 'AggregateExpression')
@incompat
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
def test_hash_groupby_collect_with_multi_distinct_fallback(data_gen):
sql = """select a,
sort_array(collect_list(b)),
sort_array(collect_set(b)),
count(distinct b),
count(distinct c)
from tbl group by a"""
assert_gpu_and_cpu_are_equal_sql(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically when we have a fallback test we want an assertion that verifies part of the code actually did fall back like assert_gpu_sql_fallback_collect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the check of fallback capture.

df_fun=lambda spark: gen_df(spark, data_gen, length=100),
table_name="tbl",
sql=sql)

@approximate_float
@ignore_order(local=True)
@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec',
'HashPartitioning', 'SortArray', 'Alias', 'Literal',
'Count', 'CollectList', 'CollectSet', 'AggregateExpression')
@incompat
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
@pytest.mark.parametrize('conf', [_nans_float_conf_partial, _nans_float_conf_final], ids=idfn)
@pytest.mark.parametrize('aqe_enabled', ['true', 'false'], ids=idfn)
def test_hash_groupby_collect_partial_replace_fallback(data_gen, conf, aqe_enabled):
conf.update({'spark.sql.adaptive.enabled': aqe_enabled})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please always copy conf before doing an update. We have seen issues with global values being modified by tests doing this and it is just good practice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

# test without Distinct
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.sort_array(f.collect_list('b')), f.sort_array(f.collect_set('b'))),
conf=conf)
# test with single Distinct
assert_gpu_and_cpu_are_equal_collect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too if the tests are slightly different, then lets have a different test function for each test case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refined.

lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.sort_array(f.collect_list('b')),
f.sort_array(f.collect_set('b')),
f.countDistinct('c'),
f.count('c')),
conf=conf)

@approximate_float
@ignore_order
@incompat
Expand Down
Loading