From 611e69c810864252263e85f56b82914115aecd2c Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 5 Aug 2021 15:41:09 -0500 Subject: [PATCH] Simple nested additions v2 (#3155) Signed-off-by: Robert (Bobby) Evans --- docs/supported_ops.md | 98 +++++++++---------- .../src/main/python/conditionals_test.py | 10 ++ integration_tests/src/main/python/data_gen.py | 5 + .../src/main/python/hash_aggregate_test.py | 36 +++++-- .../src/main/python/window_function_test.py | 10 +- .../nvidia/spark/rapids/GpuOverrides.scala | 41 +++++--- 6 files changed, 125 insertions(+), 75 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 8032e79779e..7ba2a05ce4b 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -7699,9 +7699,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) -NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -7720,9 +7720,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) -NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -7741,9 +7741,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) -NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18081,9 +18081,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) -NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18123,9 +18123,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) -NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18333,9 +18333,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) -NS -NS +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18375,9 +18375,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) -NS -NS +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18397,9 +18397,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) -NS -NS +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18439,9 +18439,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) -NS -NS +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18461,9 +18461,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) -NS -NS +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18503,9 +18503,9 @@ Accelerator support is described below. S NS NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) -NS -NS +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -18731,9 +18731,9 @@ Accelerator support is described below. NS NS NS -NS -NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, UDT) NS @@ -18752,7 +18752,7 @@ Accelerator support is described below. -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, UDT) @@ -18774,9 +18774,9 @@ Accelerator support is described below. NS NS NS -NS -NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, UDT) NS @@ -18795,7 +18795,7 @@ Accelerator support is described below. -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, UDT) @@ -19109,7 +19109,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS @@ -19130,7 +19130,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS @@ -19152,7 +19152,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS @@ -19173,7 +19173,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS @@ -19242,7 +19242,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS @@ -19263,7 +19263,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS @@ -19285,7 +19285,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS @@ -19306,7 +19306,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 9f766346917..b2988c2b3ff 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -56,6 +56,16 @@ def test_if_else(data_gen): 'IF(a, {}, c)'.format(null_lit)), conf = allow_negative_scale_of_decimal_conf) +# Maps scalars are not really supported by Spark from python without jumping through a lot of hoops +# so for now we are going to skip them +@pytest.mark.parametrize('data_gen', map_gens_sample, ids=idfn) +def test_if_else_map(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen).selectExpr( + 'IF(TRUE, b, c)', + 'IF(a, b, c)'), + conf = allow_negative_scale_of_decimal_conf) + @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens, ids=idfn) def test_case_when(data_gen): num_cmps = 20 diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 703ccdaf236..9c03e139e4e 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -652,6 +652,11 @@ def _mark_as_lit(data, data_type): # Sadly you cannot create a literal from just a dict/tuple in pyspark children = zip(data, data_type.fields) return f.struct([_mark_as_lit(x, fd.dataType).alias(fd.name) for x, fd in children]) + elif isinstance(data_type, DateType) and data is not None: + # Due to https://bugs.python.org/issue13305 we need to zero pad for years prior to 1000, + # but this works for all of them + dateString = data.strftime("%Y-%m-%d").zfill(10) + return f.lit(dateString).cast(data_type) else: # lit does not take a data type so we might have to cast it return f.lit(data).cast(data_type) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 337ab3816fe..56a593b901d 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -399,20 +399,29 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf): ('c', LongRangeGen())] for value_gen in _repeat_agg_column_for_collect_op ] -# We wrapped sort_array functions on collect_list/collect_set because the orders of collected lists/sets are not -# deterministic. The annotation `ignore_order` only affects on the order between rows, while with collect ops we also -# need to guarantee the consistency of the row-wise order (the orders within each array produced by collect ops). -@approximate_float +_repeat_agg_column_for_collect_list_op = [ + RepeatSeqGen(ArrayGen(int_gen), length=15), + RepeatSeqGen(all_basic_struct_gen, length=15), + RepeatSeqGen(simple_string_to_string_map_gen, length=15) +] + +_gen_data_for_collect_list_op = _gen_data_for_collect_op + [[ + ('a', RepeatSeqGen(LongGen(), length=20)), + ('b', value_gen), + ('c', LongRangeGen())] for value_gen in _repeat_agg_column_for_collect_list_op +] + +# to avoid ordering issues with collect_list we do it all in a single task @ignore_order(local=True) -@incompat @pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn) @pytest.mark.parametrize('use_obj_hash_agg', [True, False], ids=idfn) def test_hash_groupby_collect_list(data_gen, use_obj_hash_agg): assert_gpu_and_cpu_are_equal_collect( - lambda spark: gen_df(spark, data_gen, length=100) + lambda spark: gen_df(spark, data_gen, length=100).coalesce(1) .groupby('a') - .agg(f.sort_array(f.collect_list('b')), f.count('b')), - conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower()}) + .agg(f.collect_list('b')), + conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower(), + 'spark.sql.shuffle.partitons': '1'}) @approximate_float @ignore_order(local=True) @@ -699,6 +708,17 @@ def test_count_distinct_with_nan_floats(data_gen): string_gen, boolean_gen, date_gen, timestamp_gen] +@pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn) +def test_first_last_reductions_extra_types(data_gen): + assert_gpu_and_cpu_are_equal_collect( + # Coalesce and sort are to make sure that first and last, which are non-deterministic + # become deterministic + lambda spark : unary_op_df(spark, data_gen)\ + .coalesce(1).selectExpr( + 'first(a)', + 'last(a)'), + conf = allow_negative_scale_of_decimal_conf) + @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) @pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( condition=not is_before_spark_311(), reason="parameterless count not supported by default in Spark 3.1+"))]) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 6080f463f88..e3bf4aab5f0 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -674,7 +674,9 @@ def test_window_aggs_for_ranges_timestamps(data_gen): ['child_int', IntegerGen()], ['child_time', DateGen()], ['child_string', StringGen()], - ['child_decimal', DecimalGen(precision=8, scale=3)]]))] + ['child_decimal', DecimalGen(precision=8, scale=3)]])), + ('c_array', ArrayGen(int_gen)), + ('c_map', simple_string_to_string_map_gen)] # SortExec does not support array type, so sort the result locally. @@ -708,7 +710,11 @@ def test_window_aggs_for_rows_collect_list(): collect_list(c_decimal) over (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_decimal, collect_list(c_struct) over - (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_struct + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_struct, + collect_list(c_array) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_array, + collect_list(c_map) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_map from window_collect_table ''') diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 090aa561a26..1218e29b1da 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -845,11 +845,11 @@ object GpuOverrides { "\"window\") of rows", ExprChecks.windowOnly( (TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + - TypeSig.ARRAY + TypeSig.STRUCT ).nested(), + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all, Seq(ParamCheck("windowFunction", (TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + - TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all), ParamCheck("windowSpec", TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL, @@ -1772,14 +1772,14 @@ object GpuOverrides { expr[If]( "IF expression", ExprChecks.projectNotLambda( - (_commonTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + (_commonTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all, Seq(ParamCheck("predicate", TypeSig.BOOLEAN, TypeSig.BOOLEAN), ParamCheck("trueValue", - (_commonTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + (_commonTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all), ParamCheck("falseValue", - (_commonTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + (_commonTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all))), (a, conf, p, r) => new ExprMeta[If](a, conf, p, r) { override def convertToGpu(): GpuExpression = { @@ -1893,13 +1893,13 @@ object GpuOverrides { expr[AggregateExpression]( "Aggregate expression", ExprChecks.fullAgg( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all, Seq(ParamCheck( "aggFunc", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all)), Some(RepeatingParamCheck("filter", TypeSig.BOOLEAN, TypeSig.BOOLEAN))), (a, conf, p, r) => new ExprMeta[AggregateExpression](a, conf, p, r) { @@ -2101,16 +2101,24 @@ object GpuOverrides { }), expr[First]( "first aggregate operator", - ExprChecks.aggNotWindow(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all, - Seq(ParamCheck("input", TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all))), + ExprChecks.aggNotWindow( + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.all, + Seq(ParamCheck("input", + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.all))), (a, conf, p, r) => new ExprMeta[First](a, conf, p, r) { override def convertToGpu(): GpuExpression = GpuFirst(childExprs.head.convertToGpu(), a.ignoreNulls) }), expr[Last]( "last aggregate operator", - ExprChecks.aggNotWindow(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all, - Seq(ParamCheck("input", TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all))), + ExprChecks.aggNotWindow( + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.all, + Seq(ParamCheck("input", + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.all))), (a, conf, p, r) => new ExprMeta[Last](a, conf, p, r) { override def convertToGpu(): GpuExpression = GpuLast(childExprs.head.convertToGpu(), a.ignoreNulls) @@ -2691,11 +2699,12 @@ object GpuOverrides { "Collect a list of non-unique elements, not supported in reduction.", // GpuCollectList is not yet supported in Reduction context. ExprChecks.aggNotReduction( - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", - TypeSig.commonCudfTypes + TypeSig.DECIMAL + - TypeSig.STRUCT.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL), + (TypeSig.commonCudfTypes + TypeSig.DECIMAL + + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all))), (c, conf, p, r) => new TypedImperativeAggExprMeta[CollectList](c, conf, p, r) { override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = {