Skip to content

Commit

Permalink
Simple nested additions v2 (#3155)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Aug 5, 2021
1 parent 081ecfc commit 611e69c
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 75 deletions.
98 changes: 49 additions & 49 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -7699,9 +7699,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -7720,9 +7720,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -7741,9 +7741,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -18081,9 +18081,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -18123,9 +18123,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -18333,9 +18333,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -18375,9 +18375,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -18397,9 +18397,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -18439,9 +18439,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -18461,9 +18461,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -18503,9 +18503,9 @@ Accelerator support is described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -18731,9 +18731,9 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -18752,7 +18752,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -18774,9 +18774,9 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -18795,7 +18795,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><em>PS* (missing nested NULL, BINARY, CALENDAR, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -19109,7 +19109,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -19130,7 +19130,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -19152,7 +19152,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -19173,7 +19173,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -19242,7 +19242,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -19263,7 +19263,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -19285,7 +19285,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -19306,7 +19306,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ def test_if_else(data_gen):
'IF(a, {}, c)'.format(null_lit)),
conf = allow_negative_scale_of_decimal_conf)

# Maps scalars are not really supported by Spark from python without jumping through a lot of hoops
# so for now we are going to skip them
@pytest.mark.parametrize('data_gen', map_gens_sample, ids=idfn)
def test_if_else_map(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen).selectExpr(
'IF(TRUE, b, c)',
'IF(a, b, c)'),
conf = allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens, ids=idfn)
def test_case_when(data_gen):
num_cmps = 20
Expand Down
5 changes: 5 additions & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,11 @@ def _mark_as_lit(data, data_type):
# Sadly you cannot create a literal from just a dict/tuple in pyspark
children = zip(data, data_type.fields)
return f.struct([_mark_as_lit(x, fd.dataType).alias(fd.name) for x, fd in children])
elif isinstance(data_type, DateType) and data is not None:
# Due to https://bugs.python.org/issue13305 we need to zero pad for years prior to 1000,
# but this works for all of them
dateString = data.strftime("%Y-%m-%d").zfill(10)
return f.lit(dateString).cast(data_type)
else:
# lit does not take a data type so we might have to cast it
return f.lit(data).cast(data_type)
Expand Down
36 changes: 28 additions & 8 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,29 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf):
('c', LongRangeGen())] for value_gen in _repeat_agg_column_for_collect_op
]

# We wrapped sort_array functions on collect_list/collect_set because the orders of collected lists/sets are not
# deterministic. The annotation `ignore_order` only affects on the order between rows, while with collect ops we also
# need to guarantee the consistency of the row-wise order (the orders within each array produced by collect ops).
@approximate_float
_repeat_agg_column_for_collect_list_op = [
RepeatSeqGen(ArrayGen(int_gen), length=15),
RepeatSeqGen(all_basic_struct_gen, length=15),
RepeatSeqGen(simple_string_to_string_map_gen, length=15)
]

_gen_data_for_collect_list_op = _gen_data_for_collect_op + [[
('a', RepeatSeqGen(LongGen(), length=20)),
('b', value_gen),
('c', LongRangeGen())] for value_gen in _repeat_agg_column_for_collect_list_op
]

# to avoid ordering issues with collect_list we do it all in a single task
@ignore_order(local=True)
@incompat
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
@pytest.mark.parametrize('use_obj_hash_agg', [True, False], ids=idfn)
def test_hash_groupby_collect_list(data_gen, use_obj_hash_agg):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
lambda spark: gen_df(spark, data_gen, length=100).coalesce(1)
.groupby('a')
.agg(f.sort_array(f.collect_list('b')), f.count('b')),
conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower()})
.agg(f.collect_list('b')),
conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower(),
'spark.sql.shuffle.partitons': '1'})

@approximate_float
@ignore_order(local=True)
Expand Down Expand Up @@ -699,6 +708,17 @@ def test_count_distinct_with_nan_floats(data_gen):
string_gen, boolean_gen, date_gen, timestamp_gen]


@pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn)
def test_first_last_reductions_extra_types(data_gen):
assert_gpu_and_cpu_are_equal_collect(
# Coalesce and sort are to make sure that first and last, which are non-deterministic
# become deterministic
lambda spark : unary_op_df(spark, data_gen)\
.coalesce(1).selectExpr(
'first(a)',
'last(a)'),
conf = allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn)
@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail(
condition=not is_before_spark_311(), reason="parameterless count not supported by default in Spark 3.1+"))])
Expand Down
10 changes: 8 additions & 2 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,9 @@ def test_window_aggs_for_ranges_timestamps(data_gen):
['child_int', IntegerGen()],
['child_time', DateGen()],
['child_string', StringGen()],
['child_decimal', DecimalGen(precision=8, scale=3)]]))]
['child_decimal', DecimalGen(precision=8, scale=3)]])),
('c_array', ArrayGen(int_gen)),
('c_map', simple_string_to_string_map_gen)]


# SortExec does not support array type, so sort the result locally.
Expand Down Expand Up @@ -708,7 +710,11 @@ def test_window_aggs_for_rows_collect_list():
collect_list(c_decimal) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_decimal,
collect_list(c_struct) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_struct
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_struct,
collect_list(c_array) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_array,
collect_list(c_map) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_map
from window_collect_table
''')

Expand Down
Loading

0 comments on commit 611e69c

Please sign in to comment.