Skip to content

Commit

Permalink
[SPARK-49557][SQL] Add SQL pipe syntax for the WHERE operator
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds SQL pipe syntax support for the WHERE operator.

For example:

```
CREATE TABLE t(x INT, y STRING) USING CSV;
INSERT INTO t VALUES (0, 'abc'), (1, 'def');

CREATE TABLE other(a INT, b INT) USING JSON;
INSERT INTO other VALUES (1, 1), (1, 2), (2, 4);

TABLE t
|> WHERE x + LENGTH(y) < 4;

0	abc

TABLE t
|> WHERE (SELECT ANY_VALUE(a) FROM other WHERE x = a LIMIT 1) = 1

1       def

TABLE t
|> WHERE SUM(x) = 1

Error: aggregate functions are not allowed in the pipe operator |> WHERE clause
```

### Why are the changes needed?

The SQL pipe operator syntax will let users compose queries in a more flexible fashion.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds a few unit test cases, but mostly relies on golden file test coverage. I did this to make sure the answers are correct as this feature is implemented and also so we can look at the analyzer output plans to ensure they look right as well.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48091 from dtenedor/pipe-where.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
dtenedor authored and gengliangwang committed Sep 20, 2024
1 parent f76a9b1 commit bdea091
Show file tree
Hide file tree
Showing 6 changed files with 658 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,7 @@ version

operatorPipeRightSide
: selectClause
| whereClause
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5876,7 +5876,20 @@ class AstBuilder extends DataTypeAstBuilder
windowClause = null,
relation = left,
isPipeOperatorSelect = true)
}.get
}.getOrElse(Option(ctx.whereClause).map { c =>
// Add a table subquery boundary between the new filter and the input plan if one does not
// already exist. This helps the analyzer behave as if we had added the WHERE clause after a
// table subquery containing the input plan.
val withSubqueryAlias = left match {
case s: SubqueryAlias =>
s
case u: UnresolvedRelation =>
u
case _ =>
SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
}
withWhereClause(c, withSubqueryAlias)
}.get)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,55 @@ Distinct
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select *
-- !query analysis
Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select * except (y)
-- !query analysis
Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ *
-- !query analysis
Repartition 3, true
+- Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ distinct x
-- !query analysis
Repartition 3, true
+- Distinct
+- Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ all x
-- !query analysis
Repartition 3, true
+- Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select sum(x) as result
Expand Down Expand Up @@ -297,6 +346,229 @@ org.apache.spark.sql.AnalysisException
}


-- !query
table t
|> where true
-- !query analysis
Filter true
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where x + length(y) < 4
-- !query analysis
Filter ((x#x + length(y#x)) < 4)
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where x + length(y) < 4
|> where x + length(y) < 3
-- !query analysis
Filter ((x#x + length(y#x)) < 3)
+- SubqueryAlias __auto_generated_subquery_name
+- Filter ((x#x + length(y#x)) < 4)
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select x, sum(length(y)) as sum_len from t group by x)
|> where x = 1
-- !query analysis
Filter (x#x = 1)
+- SubqueryAlias __auto_generated_subquery_name
+- Aggregate [x#x], [x#x, sum(length(y#x)) AS sum_len#xL]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where t.x = 1
-- !query analysis
Filter (x#x = 1)
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where spark_catalog.default.t.x = 1
-- !query analysis
Filter (x#x = 1)
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select col from st)
|> where col.i1 = 1
-- !query analysis
Filter (col#x.i1 = 1)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [col#x]
+- SubqueryAlias spark_catalog.default.st
+- Relation spark_catalog.default.st[x#x,col#x] parquet


-- !query
table st
|> where st.col.i1 = 2
-- !query analysis
Filter (col#x.i1 = 2)
+- SubqueryAlias spark_catalog.default.st
+- Relation spark_catalog.default.st[x#x,col#x] parquet


-- !query
table t
|> where exists (select a from other where x = a limit 1)
-- !query analysis
Filter exists#x [x#x]
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Project [a#x]
: +- Filter (outer(x#x) = a#x)
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where (select any_value(a) from other where x = a limit 1) = 1
-- !query analysis
Filter (scalar-subquery#x [x#x] = 1)
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Aggregate [any_value(a#x, false) AS any_value(a)#x]
: +- Filter (outer(x#x) = a#x)
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where sum(x) = 1
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INVALID_WHERE_CONDITION",
"sqlState" : "42903",
"messageParameters" : {
"condition" : "\"(sum(x) = 1)\"",
"expressionList" : "sum(spark_catalog.default.t.x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 27,
"fragment" : "table t\n|> where sum(x) = 1"
} ]
}


-- !query
table t
|> where y = 'abc' or length(y) + sum(x) = 1
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INVALID_WHERE_CONDITION",
"sqlState" : "42903",
"messageParameters" : {
"condition" : "\"((y = abc) OR ((length(y) + sum(x)) = 1))\"",
"expressionList" : "sum(spark_catalog.default.t.x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 52,
"fragment" : "table t\n|> where y = 'abc' or length(y) + sum(x) = 1"
} ]
}


-- !query
table t
|> where first_value(x) over (partition by y) = 1
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "_LEGACY_ERROR_TEMP_1034",
"messageParameters" : {
"clauseName" : "WHERE"
}
}


-- !query
select * from t where first_value(x) over (partition by y) = 1
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "_LEGACY_ERROR_TEMP_1034",
"messageParameters" : {
"clauseName" : "WHERE"
}
}


-- !query
table t
|> select x, length(y) as z
|> where x + length(y) < 4
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
"proposal" : "`x`, `z`"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 57,
"stopIndex" : 57,
"fragment" : "y"
} ]
}


-- !query
(select x, sum(length(y)) as sum_len from t group by x)
|> where sum(length(y)) = 3
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
"proposal" : "`x`, `sum_len`"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 77,
"stopIndex" : 77,
"fragment" : "y"
} ]
}


-- !query
drop table t
-- !query analysis
Expand Down
Loading

0 comments on commit bdea091

Please sign in to comment.