Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support org.apache.spark.sql.catalyst.expressions.ArrayExists #4815

Closed
viadea opened this issue Feb 17, 2022 · 3 comments · Fixed by #4973
Closed

[FEA] Support org.apache.spark.sql.catalyst.expressions.ArrayExists #4815

viadea opened this issue Feb 17, 2022 · 3 comments · Fixed by #4973
Assignees
Labels
feature request New feature or request P1 Nice to have for release

Comments

@viadea
Copy link
Collaborator

viadea commented Feb 17, 2022

I wish we can support org.apache.spark.sql.catalyst.expressions.ArrayExists.

Mini repro:

import org.apache.spark.sql.types._
val arrayData = Seq(
    Row("John",List("apple","orange","banana"),1),
    Row("David",List("apple","orange","banana"),2),
    Row("Harry",List("apple","other"),1)
)

val arraySchema = new StructType().add("name",StringType).add("fruits", ArrayType(StringType)).add("favorite",IntegerType)

val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData),arraySchema)
df.write.mode("overwrite").format("parquet").save("/tmp/testparquet")
val df2=spark.read.parquet("/tmp/testparquet")
df2.createOrReplaceTempView("df2")

spark.sql("select name,exists(fruits,x -> x == 'other') as if_love_apple from df2").show()

Unsupported messages:

! <ArrayExists> exists(fruits#92, lambdafunction((lambda x#207 = other), lambda x#207, false)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.ArrayExists
@viadea viadea added feature request New feature or request ? - Needs Triage Need team to review and classify labels Feb 17, 2022
@revans2
Copy link
Collaborator

revans2 commented Feb 18, 2022

This should end up being an ArrayTransform on the lambda function followed by an array reduction using any. We could hack it just like we do for array_max and array_min, but CUDF is putting in rapidsai/cudf#9621 and we should be able to switch over to that instead. They also support the any aggregation in that new API. Also the API is not specific to list/arrays so we could avoid copying the result of the higher order function into an array. We probably could just do the reduction directly on the result and the offsets in the input.

@jlowe jlowe added P1 Nice to have for release and removed ? - Needs Triage Need team to review and classify labels Feb 22, 2022
@gerashegalov
Copy link
Collaborator

gerashegalov commented Mar 10, 2022

IIUC we could implement ArrayAggregate on top of rapidsai/cudf#9621.

exists(arrayExpr, pred)
is equivalent to
aggregate(arrrayExpr, false, (acc, x) -> acc or pred(x))

For example:

>>> data=[ [[1,2,3],], [[1,3],] ]
>>> spark.createDataFrame(data).createOrReplaceTempView("df")
>>> sql("select aggregate(_1, false, (acc, x) -> acc or x=2) exists2 from df").show()
...
        ! <ArrayAggregate> aggregate(_1#26, false, lambdafunction((lambda acc#28 OR (lambda x#29L = 2)), lambda acc#28, lambda x#29L, false), lambdafunction(lambda id#30, lambda id#30, false)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.ArrayAggregate

+-------+
|exists2|
+-------+
|   true|
|  false|
+-------+       

@revans2
Copy link
Collaborator

revans2 commented Mar 14, 2022

We do not support the generic aggregate API yet and rapidsai/cudf#9621 is not enough to get us there 100%. I do not want to try and implement ArrayAggregate before ArrayExists. ArrayExists is simple to do once rapidsai/cudf#10417 is merged in. ArrayAggregate at a minimum, would require us to do pattern matching on the higher order function and the initial value to know that we can do it or not, and what aggregate to use.

For this specific case it is not too bad. We would have to be sure that zero, the initial value, is a scalar that is false. We would also have to make sure that the aggregate higher order function matches exactly acc or pred(x) so we can run pred(x) as a transform operation followed by the any aggregation (with nulls included). But what if the higher order function is pred(x) or acc? we now need two pattern matching rules or we have to normalize the expression in a way that would make it always work for this kind of pattern matching. What about if they write it with an if/else instead (acc, x) -> if(acc, acc, x=2) or a case when.

And this is just to try and match the code for exists and a few things that are equivalent to it. It does not include what we would have to do for other operations, like (acc, x) -> if(isNull(x), acc, acc or x=2) which is a null exclude version of the any aggregation. Or how about (acc, x) -> acc or if (isNull(x), false, x=2), or (acc, x) -> (acc or not(isNull(x = 2))) and (acc or x=2))).

If we want to support ArrayAggregate we really need example queries from customers that they want to support so we can get the patterns right, and even then if they change the query is a subtle way we would end up falling back to the CPU. Also for floating point we are likely to never be able to support it with segmented reductions, because there is an inherent order to the higher order function, our aggregations cannot guarantee any kind of order, and some operators, especially for floating point, require a set order.

gerashegalov added a commit that referenced this issue Mar 21, 2022
This PR implements ArrayExists, it has two major phases
1.  first apply function to produce array of Booleans 
2. run segmented reduce ANY to if any of the values are true

Spark 3.x default is the 3VL logic:
- if any element is true the array maps to true
- if no element is true and there is at least one null, the array maps to null
- if no element is true and none is null, the array maps to false

Legacy mode 2VL:
- if any element is true the array maps to true
- if no element is true , the array maps to false

Closes #4815

Signed-off-by: Gera Shegalov <gera@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request P1 Nice to have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants