From 5ed86dd3e29907c8160edf2ae1b958cfa42a7eb5 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Mon, 21 Mar 2022 14:02:04 -0700 Subject: [PATCH] Implement support for ArrayExists expression (#4973) This PR implements ArrayExists, it has two major phases 1. first apply function to produce array of Booleans 2. run segmented reduce ANY to if any of the values are true Spark 3.x default is the 3VL logic: - if any element is true the array maps to true - if no element is true and there is at least one null, the array maps to null - if no element is true and none is null, the array maps to false Legacy mode 2VL: - if any element is true the array maps to true - if no element is true , the array maps to false Closes #4815 Signed-off-by: Gera Shegalov --- docs/configs.md | 1 + docs/supported_ops.md | 218 ++++++++++++------ .../src/main/python/array_test.py | 25 ++ .../nvidia/spark/rapids/GpuOverrides.scala | 19 ++ .../spark/rapids/higherOrderFunctions.scala | 157 +++++++++++-- 5 files changed, 324 insertions(+), 96 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 7f61853a2d6..61ff2d91d2a 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -148,6 +148,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.And|`and`|Logical AND|true|None| spark.rapids.sql.expression.AnsiCast| |Convert a column of one type of data into another type|true|None| spark.rapids.sql.expression.ArrayContains|`array_contains`|Returns a boolean if the array contains the passed in key|true|None| +spark.rapids.sql.expression.ArrayExists|`exists`|Return true if any element satisfies the predicate LambdaFunction|true|None| spark.rapids.sql.expression.ArrayMax|`array_max`|Returns the maximum value in the array|true|None| spark.rapids.sql.expression.ArrayMin|`array_min`|Returns the minimum value in the array|true|None| spark.rapids.sql.expression.ArrayTransform|`transform`|Transform elements in an array using the transform function. This is similar to a `map` in functional programming|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 510d672cedb..b94adb770b5 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -2030,12 +2030,12 @@ are limited. -ArrayMax -`array_max` -Returns the maximum value in the array -None -project -input +ArrayExists +`exists` +Return true if any element satisfies the predicate LambdaFunction +None +project +argument @@ -2050,31 +2050,52 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, STRUCT, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
-result -S -S -S -S -S -S -S -S -PS
UTC is only supported TZ for TIMESTAMP
-S +function S + + + + + + + + + + + + + + + + + + + +result S -NS -NS -NS -NS -NS + + + + + + + + + + + + + + + + Expression @@ -2103,6 +2124,53 @@ are limited. UDT +ArrayMax +`array_max` +Returns the maximum value in the array +None +project +input + + + + + + + + + + + + + + +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, STRUCT, UDT
+ + + + + +result +S +S +S +S +S +S +S +S +PS
UTC is only supported TZ for TIMESTAMP
+S +S +S +NS +NS +NS + +NS +NS + + ArrayMin `array_min` Returns the minimum value in the array @@ -2398,6 +2466,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + AtLeastNNonNulls Checks if number of non null/Nan values is greater than a given value @@ -2445,32 +2539,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - Atan `atan` Inverse tangent @@ -2767,6 +2835,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + BitLength `bit_length` The bit length of string data @@ -2814,32 +2908,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - BitwiseAnd `&` Returns the bitwise AND of the operands diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index d20120fa380..b80a3211751 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -307,3 +307,28 @@ def test_get_array_struct_fields(data_gen): max_length=6) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, array_struct_gen).selectExpr('a.child0')) + +@pytest.mark.parametrize('data_gen', [ArrayGen(string_gen), ArrayGen(int_gen)]) +@pytest.mark.parametrize('threeVL', [ + pytest.param(False, id='3VL:off'), + pytest.param(True, id='3VL:on'), +]) +def test_array_exists(data_gen, threeVL): + def do_it(spark): + columns = ['a'] + element_type = data_gen.data_type.elementType + if isinstance(element_type, IntegralType): + columns.extend([ + 'exists(a, item -> item % 2 = 0) as exists_even', + 'exists(a, item -> item < 0) as exists_negative', + 'exists(a, item -> item >= 0) as exists_non_negative' + ]) + + if isinstance(element_type, StringType): + columns.extend(['exists(a, entry -> length(entry) > 5) as exists_longer_than_5']) + + return unary_op_df(spark, data_gen).selectExpr(columns) + + assert_gpu_and_cpu_are_equal_collect(do_it, conf= { + 'spark.sql.legacy.followThreeValuedLogicInArrayExists' : threeVL, + }) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a94390349c2..61c66adb941 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2857,6 +2857,25 @@ object GpuOverrides extends Logging { GpuArrayTransform(childExprs.head.convertToGpu(), childExprs(1).convertToGpu()) } }), + expr[ArrayExists]( + "Return true if any element satisfies the predicate LambdaFunction", + ExprChecks.projectOnly(TypeSig.BOOLEAN, TypeSig.BOOLEAN, + Seq( + ParamCheck("argument", + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.NULL + + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP), + TypeSig.ARRAY.nested(TypeSig.all)), + ParamCheck("function", TypeSig.BOOLEAN, TypeSig.BOOLEAN))), + (in, conf, p, r) => new ExprMeta[ArrayExists](in, conf, p, r) { + override def convertToGpu(): GpuExpression = { + GpuArrayExists( + childExprs.head.convertToGpu(), + childExprs(1).convertToGpu(), + SQLConf.get.getConf(SQLConf.LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC) + ) + } + }), + expr[TransformKeys]( "Transform keys in a map using a transform function", ExprChecks.projectOnly(TypeSig.MAP.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala index 2f3ef10a3a0..0d0d79d3980 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala @@ -23,7 +23,7 @@ import ai.rapids.cudf.DType import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression} -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, Metadata} +import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, MapType, Metadata} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -206,28 +206,16 @@ trait GpuSimpleHigherOrderFunction extends GpuHigherOrderFunction with GpuBind { } } -case class GpuArrayTransform( - argument: Expression, - function: Expression, - isBound: Boolean = false, - boundIntermediate: Seq[GpuExpression] = Seq.empty) - extends GpuSimpleHigherOrderFunction { - override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) - - override def prettyName: String = "transform" +trait GpuArrayTransformBase extends GpuSimpleHigherOrderFunction { + def isBound: Boolean + def boundIntermediate: Seq[GpuExpression] - private lazy val inputToLambda: Seq[DataType] = { + protected lazy val inputToLambda: Seq[DataType] = { assert(isBound) boundIntermediate.map(_.dataType) ++ lambdaFunction.arguments.map(_.dataType) } - override def bind(input: AttributeSeq): GpuExpression = { - val (boundFunc, boundArg, boundIntermediate) = bindLambdaFunc(input) - - GpuArrayTransform(boundArg, boundFunc, isBound = true, boundIntermediate) - } - private[this] def makeElementProjectBatch( inputBatch: ColumnarBatch, listColumn: cudf.ColumnVector): ColumnarBatch = { @@ -277,20 +265,147 @@ case class GpuArrayTransform( } } + /* + * Post-process the column view of the array after applying the function parameter + */ + protected def transformListColumnView( + lambdaTransformedCV: cudf.ColumnView): GpuColumnVector + override def columnarEval(batch: ColumnarBatch): Any = { withResource(GpuExpressionsUtils.columnarEvalToColumn(argument, batch)) { arg => val dataCol = withResource( makeElementProjectBatch(batch, arg.getBase)) { cb => GpuExpressionsUtils.columnarEvalToColumn(function, cb) } - withResource(dataCol) { dataCol => - withResource(GpuListUtils.replaceListDataColumnAsView(arg.getBase, dataCol.getBase)) { - retView => - GpuColumnVector.from(retView.copyToColumnVector(), dataType) + withResource(dataCol) { _ => + val cv = GpuListUtils.replaceListDataColumnAsView(arg.getBase, dataCol.getBase) + withResource(cv)(transformListColumnView(_)) + } + } + } +} + + +case class GpuArrayTransform( + argument: Expression, + function: Expression, + isBound: Boolean = false, + boundIntermediate: Seq[GpuExpression] = Seq.empty) extends GpuArrayTransformBase { + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def prettyName: String = "transform" + + override def bind(input: AttributeSeq): GpuExpression = { + val (boundFunc, boundArg, boundIntermediate) = bindLambdaFunc(input) + + GpuArrayTransform(boundArg, boundFunc, isBound = true, boundIntermediate) + } + + override protected def transformListColumnView( + lambdaTransformedCV: cudf.ColumnView): GpuColumnVector = { + GpuColumnVector.from(lambdaTransformedCV.copyToColumnVector(), dataType) + } +} + + +case class GpuArrayExists( + argument: Expression, + function: Expression, + followThreeValuedLogic: Boolean, + isBound: Boolean = false, + boundIntermediate: Seq[GpuExpression] = Seq.empty) extends GpuArrayTransformBase { + + override def dataType: DataType = BooleanType + + override def prettyName: String = "exists" + + override def nullable: Boolean = super.nullable || function.nullable + + override def bind(input: AttributeSeq): GpuExpression = { + val (boundFunc, boundArg, boundIntermediate) = bindLambdaFunc(input) + GpuArrayExists(boundArg, boundFunc, followThreeValuedLogic,isBound = true, boundIntermediate) + } + + private def imputeFalseForEmptyArrays( + transformedCV: cudf.ColumnView, + result: cudf.ColumnView + ): GpuColumnVector = { + withResource(cudf.Scalar.fromBool(false)) { falseScalar => + withResource(cudf.Scalar.fromInt(0)) { zeroScalar => + withResource(transformedCV.countElements()) { elementCounts => + withResource(elementCounts.equalTo(zeroScalar)) { isEmptyList => + GpuColumnVector.from(isEmptyList.ifElse(falseScalar, result), dataType) + } } } } } + + private def existsReduce(columnView: cudf.ColumnView, nullPolicy: cudf.NullPolicy) = { + columnView.listReduce( + cudf.SegmentedReductionAggregation.any(), + nullPolicy, + DType.BOOL8) + } + + private def replaceChildNullsByFalseView(cv: cudf.ColumnView): cudf.ColumnView = { + withResource(cudf.Scalar.fromBool(false)) { falseScalar => + withResource(cv.getChildColumnView(0)) { childView => + withResource(childView.replaceNulls(falseScalar)) { noNullsChildView => + cv.replaceListChild(noNullsChildView) + } + } + } + } + + /* + * The difference between legacyExists and EXCLUDE nulls reduction + * is that the list without valid values (all nulls) should produce false + * which is equivalent to replacing nulls with false after lambda prior + * to aggregation + */ + private def legacyExists(cv: cudf.ColumnView): cudf.ColumnView = { + withResource(replaceChildNullsByFalseView(cv)) { reduceInput => + existsReduce(reduceInput, cudf.NullPolicy.EXCLUDE) + } + } + + /* + * 3VL is true if EXCLUDE nulls reduce is true + * 3VL is false if INCLUDE nulls reduce is false + * 3VL is null if + * EXCLUDE null reduce is false and + * INCLUDE nulls reduce is null + */ + private def threeValueExists(cv: cudf.ColumnView): cudf.ColumnView = { + withResource(existsReduce(cv, cudf.NullPolicy.EXCLUDE)) { existsNullsExcludedCV => + withResource(existsReduce(cv, cudf.NullPolicy.INCLUDE)) { existsNullsIncludedCV => + existsNullsExcludedCV.ifElse(existsNullsExcludedCV, existsNullsIncludedCV) + } + } + } + + private def exists(cv: cudf.ColumnView) = { + if (followThreeValuedLogic) { + threeValueExists(cv) + } else { + legacyExists(cv) + } + } + + override protected def transformListColumnView( + lambdaTransformedCV: cudf.ColumnView + ): GpuColumnVector = { + withResource(exists(lambdaTransformedCV)) { existsCV => + // exists is false for empty arrays + // post process empty arrays until cudf allows specifying + // the initial value for a list reduction (i.e. similar to Scala fold) + // https://github.com/rapidsai/cudf/issues/10455 + imputeFalseForEmptyArrays(lambdaTransformedCV, existsCV) + } + } + } trait GpuMapSimpleHigherOrderFunction extends GpuSimpleHigherOrderFunction with GpuBind {