From 017b0ea71e03339336b5d199ecad4f50961e4948 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Sat, 14 Sep 2024 12:16:35 +0800 Subject: [PATCH] [SPARK-49556][SQL] Add SQL pipe syntax for the SELECT operator ### What changes were proposed in this pull request? This PR adds SQL pipe syntax support for the SELECT operator. For example: ``` CREATE TABLE t(x INT, y STRING) USING CSV; INSERT INTO t VALUES (0, 'abc'), (1, 'def'); TABLE t |> SELECT x, y 0 abc 1 def TABLE t |> SELECT x, y |> SELECT x + LENGTH(y) AS z 3 4 (SELECT * FROM t UNION ALL SELECT * FROM t) |> SELECT x + LENGTH(y) AS result 3 3 4 4 TABLE t |> SELECT sum(x) AS result Error: aggregate functions are not allowed in the pipe operator |> SELECT clause; please use the |> AGGREGATE clause instead ``` ### Why are the changes needed? The SQL pipe operator syntax will let users compose queries in a more flexible fashion. ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR adds a few unit test cases, but mostly relies on golden file test coverage. I did this to make sure the answers are correct as this feature is implemented and also so we can look at the analyzer output plans to ensure they look right as well. ### Was this patch authored or co-authored using generative AI tooling? No Closes #48047 from dtenedor/pipe-select. Authored-by: Daniel Tenedorio Signed-off-by: Wenchen Fan --- .../resources/error/error-conditions.json | 6 + .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 1 + .../sql/catalyst/parser/SqlBaseParser.g4 | 5 + .../sql/catalyst/expressions/PipeSelect.scala | 47 +++ .../sql/catalyst/parser/AstBuilder.scala | 58 +++- .../sql/catalyst/trees/TreePatterns.scala | 1 + .../sql/errors/QueryCompilationErrors.scala | 8 + .../apache/spark/sql/internal/SQLConf.scala | 9 + .../analyzer-results/pipe-operators.sql.out | 318 ++++++++++++++++++ .../sql-tests/inputs/pipe-operators.sql | 102 ++++++ .../sql-tests/results/pipe-operators.sql.out | 308 +++++++++++++++++ .../sql/execution/SparkSqlParserSuite.scala | 19 +- .../ThriftServerQueryTestSuite.scala | 3 +- 13 files changed, 876 insertions(+), 9 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala create mode 100644 sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out create mode 100644 sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 0a9dcd52ea831..a6d8550716b96 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3754,6 +3754,12 @@ ], "sqlState" : "42K03" }, + "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION" : { + "message" : [ + "Aggregate function is not allowed when using the pipe operator |> SELECT clause; please use the pipe operator |> AGGREGATE clause instead" + ], + "sqlState" : "0A000" + }, "PIVOT_VALUE_DATA_TYPE_MISMATCH" : { "message" : [ "Invalid pivot value '': value data type does not match pivot column data type ." diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 9ea213f3bf4a6..96a58b99debeb 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -506,6 +506,7 @@ TILDE: '~'; AMPERSAND: '&'; PIPE: '|'; CONCAT_PIPE: '||'; +OPERATOR_PIPE: '|>'; HAT: '^'; COLON: ':'; DOUBLE_COLON: '::'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 73d5cb55295ab..3ea408ca42703 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -613,6 +613,7 @@ queryTerm operator=INTERSECT setQuantifier? right=queryTerm #setOperation | left=queryTerm {!legacy_setops_precedence_enabled}? operator=(UNION | EXCEPT | SETMINUS) setQuantifier? right=queryTerm #setOperation + | left=queryTerm OPERATOR_PIPE operatorPipeRightSide #operatorPipeStatement ; queryPrimary @@ -1471,6 +1472,10 @@ version | stringLit ; +operatorPipeRightSide + : selectClause + ; + // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL. // - Reserved keywords: // Keywords that are reserved and can't be used as identifiers for table, view, column, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala new file mode 100644 index 0000000000000..0b5479cc8f0ee --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction +import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE, TreePattern} +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * Represents a SELECT clause when used with the |> SQL pipe operator. + * We use this to make sure that no aggregate functions exist in the SELECT expressions. + */ +case class PipeSelect(child: Expression) + extends UnaryExpression with RuntimeReplaceable { + final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE) + override def withNewChildInternal(newChild: Expression): Expression = PipeSelect(newChild) + override lazy val replacement: Expression = { + def visit(e: Expression): Unit = e match { + case a: AggregateFunction => + // If we used the pipe operator |> SELECT clause to specify an aggregate function, this is + // invalid; return an error message instructing the user to use the pipe operator + // |> AGGREGATE clause for this purpose instead. + throw QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(a) + case _: WindowExpression => + // Window functions are allowed in pipe SELECT operators, so do not traverse into children. + case _ => + e.children.foreach(visit) + } + visit(child) + child + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7ad7d60e70c96..edcb417da123b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -469,7 +469,8 @@ class AstBuilder extends DataTypeAstBuilder ctx.aggregationClause, ctx.havingClause, ctx.windowClause, - plan + plan, + isPipeOperatorSelect = false ) } } @@ -1057,7 +1058,8 @@ class AstBuilder extends DataTypeAstBuilder ctx.aggregationClause, ctx.havingClause, ctx.windowClause, - from + from, + isPipeOperatorSelect = false ) } @@ -1144,7 +1146,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause, havingClause, windowClause, - isDistinct = false) + isDistinct = false, + isPipeOperatorSelect = false) ScriptTransformation( string(visitStringLit(transformClause.script)), @@ -1165,6 +1168,8 @@ class AstBuilder extends DataTypeAstBuilder * Add a regular (SELECT) query specification to a logical plan. The query specification * is the core of the logical plan, this is where sourcing (FROM clause), projection (SELECT), * aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place. + * If 'isPipeOperatorSelect' is true, wraps each projected expression with a [[PipeSelect]] + * expression for future validation of the expressions during analysis. * * Note that query hints are ignored (both by the parser and the builder). */ @@ -1176,7 +1181,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause: AggregationClauseContext, havingClause: HavingClauseContext, windowClause: WindowClauseContext, - relation: LogicalPlan): LogicalPlan = withOrigin(ctx) { + relation: LogicalPlan, + isPipeOperatorSelect: Boolean): LogicalPlan = withOrigin(ctx) { val isDistinct = selectClause.setQuantifier() != null && selectClause.setQuantifier().DISTINCT() != null @@ -1188,7 +1194,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause, havingClause, windowClause, - isDistinct) + isDistinct, + isPipeOperatorSelect) // Hint selectClause.hints.asScala.foldRight(plan)(withHints) @@ -1202,7 +1209,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause: AggregationClauseContext, havingClause: HavingClauseContext, windowClause: WindowClauseContext, - isDistinct: Boolean): LogicalPlan = { + isDistinct: Boolean, + isPipeOperatorSelect: Boolean): LogicalPlan = { // Add lateral views. val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate) @@ -1216,7 +1224,20 @@ class AstBuilder extends DataTypeAstBuilder } def createProject() = if (namedExpressions.nonEmpty) { - Project(namedExpressions, withFilter) + val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) { + // If this is a pipe operator |> SELECT clause, add a [[PipeSelect]] expression wrapping + // each alias in the project list, so the analyzer can check invariants later. + namedExpressions.map { + case a: Alias => + a.withNewChildren(Seq(PipeSelect(a.child))) + .asInstanceOf[NamedExpression] + case other => + other + } + } else { + namedExpressions + } + Project(newProjectList, withFilter) } else { withFilter } @@ -5755,6 +5776,29 @@ class AstBuilder extends DataTypeAstBuilder visitSetVariableImpl(ctx.query(), ctx.multipartIdentifierList(), ctx.assignmentList()) } + override def visitOperatorPipeStatement(ctx: OperatorPipeStatementContext): LogicalPlan = { + visitOperatorPipeRightSide(ctx.operatorPipeRightSide(), plan(ctx.left)) + } + + private def visitOperatorPipeRightSide( + ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = { + if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) { + operationNotAllowed("Operator pipe SQL syntax using |>", ctx) + } + Option(ctx.selectClause).map { c => + withSelectQuerySpecification( + ctx = ctx, + selectClause = c, + lateralView = new java.util.ArrayList[LateralViewContext](), + whereClause = null, + aggregationClause = null, + havingClause = null, + windowClause = null, + relation = left, + isPipeOperatorSelect = true) + }.get + } + /** * Check plan for any parameters. * If it finds any throws UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index cbbfccfcab5e8..826ac52c2b817 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -72,6 +72,7 @@ object TreePattern extends Enumeration { val NOT: Value = Value val NULL_CHECK: Value = Value val NULL_LITERAL: Value = Value + val PIPE_OPERATOR_SELECT: Value = Value val SERIALIZE_FROM_OBJECT: Value = Value val OR: Value = Value val OUTER_REFERENCE: Value = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index e4c8c76e958f8..f1f8be3d15751 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -4104,4 +4104,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("functionName" -> functionName) ) } + + def pipeOperatorSelectContainsAggregateFunction(expr: Expression): Throwable = { + new AnalysisException( + errorClass = "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + messageParameters = Map( + "expr" -> expr.toString), + origin = expr.origin) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5853e4b66dcc0..c3a42dfd62a04 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -5012,6 +5012,15 @@ object SQLConf { .stringConf .createWithDefault("versionAsOf") + val OPERATOR_PIPE_SYNTAX_ENABLED = + buildConf("spark.sql.operatorPipeSyntaxEnabled") + .doc("If true, enable operator pipe syntax for Apache Spark SQL. This uses the operator " + + "pipe marker |> to indicate separation between clauses of SQL in a manner that describes " + + "the sequence of steps that the query performs in a composable fashion.") + .version("4.0.0") + .booleanConf + .createWithDefault(Utils.isTesting) + val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation") .internal() .doc("If true, the old bogus percentile_disc calculation is used. The old calculation " + diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out new file mode 100644 index 0000000000000..ab0635fef048b --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -0,0 +1,318 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +drop table if exists t +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t + + +-- !query +create table t(x int, y string) using csv +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`t`, false + + +-- !query +insert into t values (0, 'abc'), (1, 'def') +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/t, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/t], Append, `spark_catalog`.`default`.`t`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/t), [x, y] ++- Project [cast(col1#x as int) AS x#x, cast(col2#x as string) AS y#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +drop table if exists other +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.other + + +-- !query +create table other(a int, b int) using json +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`other`, false + + +-- !query +insert into other values (1, 1), (1, 2), (2, 4) +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/other, false, JSON, [path=file:[not included in comparison]/{warehouse_dir}/other], Append, `spark_catalog`.`default`.`other`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/other), [a, b] ++- Project [cast(col1#x as int) AS a#x, cast(col2#x as int) AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +drop table if exists st +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.st + + +-- !query +create table st(x int, col struct) using parquet +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`st`, false + + +-- !query +insert into st values (1, (2, 3)) +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/st, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/st], Append, `spark_catalog`.`default`.`st`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/st), [x, col] ++- Project [cast(col1#x as int) AS x#x, named_struct(i1, cast(col2#x.col1 as int), i2, cast(col2#x.col2 as int)) AS col#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +table t +|> select 1 as x +-- !query analysis +Project [pipeselect(1) AS x#x] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select x, y +-- !query analysis +Project [x#x, y#x] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select x, y +|> select x + length(y) as z +-- !query analysis +Project [pipeselect((x#x + length(y#x))) AS z#x] ++- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +values (0), (1) tab(col) +|> select col * 2 as result +-- !query analysis +Project [pipeselect((col#x * 2)) AS result#x] ++- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +(select * from t union all select * from t) +|> select x + length(y) as result +-- !query analysis +Project [pipeselect((x#x + length(y#x))) AS result#x] ++- Union false, false + :- Project [x#x, y#x] + : +- SubqueryAlias spark_catalog.default.t + : +- Relation spark_catalog.default.t[x#x,y#x] csv + +- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +(table t + |> select x, y + |> select x) +union all +select x from t where x < 1 +-- !query analysis +Union false, false +:- Project [x#x] +: +- Project [x#x, y#x] +: +- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- Project [x#x] + +- Filter (x#x < 1) + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +(select col from st) +|> select col.i1 +-- !query analysis +Project [col#x.i1 AS i1#x] ++- Project [col#x] + +- SubqueryAlias spark_catalog.default.st + +- Relation spark_catalog.default.st[x#x,col#x] parquet + + +-- !query +table st +|> select st.col.i1 +-- !query analysis +Project [col#x.i1 AS i1#x] ++- SubqueryAlias spark_catalog.default.st + +- Relation spark_catalog.default.st[x#x,col#x] parquet + + +-- !query +table t +|> select (select a from other where x = a limit 1) as result +-- !query analysis +Project [pipeselect(scalar-subquery#x [x#x]) AS result#x] +: +- GlobalLimit 1 +: +- LocalLimit 1 +: +- Project [a#x] +: +- Filter (outer(x#x) = a#x) +: +- SubqueryAlias spark_catalog.default.other +: +- Relation spark_catalog.default.other[a#x,b#x] json ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +select (values (0) tab(col) |> select col) as result +-- !query analysis +Project [scalar-subquery#x [] AS result#x] +: +- Project [col#x] +: +- SubqueryAlias tab +: +- LocalRelation [col#x] ++- OneRowRelation + + +-- !query +table t +|> select (select any_value(a) from other where x = a limit 1) as result +-- !query analysis +Project [pipeselect(scalar-subquery#x [x#x]) AS result#x] +: +- GlobalLimit 1 +: +- LocalLimit 1 +: +- Aggregate [any_value(a#x, false) AS any_value(a)#x] +: +- Filter (outer(x#x) = a#x) +: +- SubqueryAlias spark_catalog.default.other +: +- Relation spark_catalog.default.other[a#x,b#x] json ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select x + length(x) as z, z + 1 as plus_one +-- !query analysis +Project [z#x, pipeselect((z#x + 1)) AS plus_one#x] ++- Project [x#x, y#x, pipeselect((x#x + length(cast(x#x as string)))) AS z#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select first_value(x) over (partition by y) as result +-- !query analysis +Project [result#x] ++- Project [x#x, y#x, _we0#x, pipeselect(_we0#x) AS result#x] + +- Window [first_value(x#x, false) windowspecdefinition(y#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#x], [y#x] + +- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +select 1 x, 2 y, 3 z +|> select 1 + sum(x) over (), + avg(y) over (), + x, + avg(x+1) over (partition by y order by z) AS a2 +|> select a2 +-- !query analysis +Project [a2#x] ++- Project [(1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, x#x, a2#x] + +- Project [x#x, y#x, _w1#x, z#x, _we0#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, _we2#x, (cast(1 as bigint) + _we0#xL) AS (1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, pipeselect(_we2#x) AS a2#x] + +- Window [avg(_w1#x) windowspecdefinition(y#x, z#x ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we2#x], [y#x], [z#x ASC NULLS FIRST] + +- Window [sum(x#x) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#xL, avg(y#x) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x] + +- Project [x#x, y#x, (x#x + 1) AS _w1#x, z#x] + +- Project [1 AS x#x, 2 AS y#x, 3 AS z#x] + +- OneRowRelation + + +-- !query +table t +|> select x, count(*) over () +|> select x +-- !query analysis +Project [x#x] ++- Project [x#x, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL] + +- Project [x#x, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL] + +- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL] + +- Project [x#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select distinct x, y +-- !query analysis +Distinct ++- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select sum(x) as result +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "sum(x#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 19, + "stopIndex" : 24, + "fragment" : "sum(x)" + } ] +} + + +-- !query +table t +|> select y, length(y) + sum(x) as result +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "sum(x#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 34, + "stopIndex" : 39, + "fragment" : "sum(x)" + } ] +} + + +-- !query +drop table t +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t + + +-- !query +drop table other +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.other + + +-- !query +drop table st +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.st diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql new file mode 100644 index 0000000000000..7d0966e7f2095 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -0,0 +1,102 @@ +-- Prepare some test data. +-------------------------- +drop table if exists t; +create table t(x int, y string) using csv; +insert into t values (0, 'abc'), (1, 'def'); + +drop table if exists other; +create table other(a int, b int) using json; +insert into other values (1, 1), (1, 2), (2, 4); + +drop table if exists st; +create table st(x int, col struct) using parquet; +insert into st values (1, (2, 3)); + +-- Selection operators: positive tests. +--------------------------------------- + +-- Selecting a constant. +table t +|> select 1 as x; + +-- Selecting attributes. +table t +|> select x, y; + +-- Chained pipe SELECT operators. +table t +|> select x, y +|> select x + length(y) as z; + +-- Using the VALUES list as the source relation. +values (0), (1) tab(col) +|> select col * 2 as result; + +-- Using a table subquery as the source relation. +(select * from t union all select * from t) +|> select x + length(y) as result; + +-- Enclosing the result of a pipe SELECT operation in a table subquery. +(table t + |> select x, y + |> select x) +union all +select x from t where x < 1; + +-- Selecting struct fields. +(select col from st) +|> select col.i1; + +table st +|> select st.col.i1; + +-- Expression subqueries in the pipe operator SELECT list. +table t +|> select (select a from other where x = a limit 1) as result; + +-- Pipe operator SELECT inside expression subqueries. +select (values (0) tab(col) |> select col) as result; + +-- Aggregations are allowed within expression subqueries in the pipe operator SELECT list as long as +-- no aggregate functions exist in the top-level select list. +table t +|> select (select any_value(a) from other where x = a limit 1) as result; + +-- Lateral column aliases in the pipe operator SELECT list. +table t +|> select x + length(x) as z, z + 1 as plus_one; + +-- Window functions are allowed in the pipe operator SELECT list. +table t +|> select first_value(x) over (partition by y) as result; + +select 1 x, 2 y, 3 z +|> select 1 + sum(x) over (), + avg(y) over (), + x, + avg(x+1) over (partition by y order by z) AS a2 +|> select a2; + +table t +|> select x, count(*) over () +|> select x; + +-- DISTINCT is supported. +table t +|> select distinct x, y; + +-- Selection operators: negative tests. +--------------------------------------- + +-- Aggregate functions are not allowed in the pipe operator SELECT list. +table t +|> select sum(x) as result; + +table t +|> select y, length(y) + sum(x) as result; + +-- Cleanup. +----------- +drop table t; +drop table other; +drop table st; diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out new file mode 100644 index 0000000000000..7e0b7912105c2 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -0,0 +1,308 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +drop table if exists t +-- !query schema +struct<> +-- !query output + + + +-- !query +create table t(x int, y string) using csv +-- !query schema +struct<> +-- !query output + + + +-- !query +insert into t values (0, 'abc'), (1, 'def') +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table if exists other +-- !query schema +struct<> +-- !query output + + + +-- !query +create table other(a int, b int) using json +-- !query schema +struct<> +-- !query output + + + +-- !query +insert into other values (1, 1), (1, 2), (2, 4) +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table if exists st +-- !query schema +struct<> +-- !query output + + + +-- !query +create table st(x int, col struct) using parquet +-- !query schema +struct<> +-- !query output + + + +-- !query +insert into st values (1, (2, 3)) +-- !query schema +struct<> +-- !query output + + + +-- !query +table t +|> select 1 as x +-- !query schema +struct +-- !query output +1 +1 + + +-- !query +table t +|> select x, y +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +table t +|> select x, y +|> select x + length(y) as z +-- !query schema +struct +-- !query output +3 +4 + + +-- !query +values (0), (1) tab(col) +|> select col * 2 as result +-- !query schema +struct +-- !query output +0 +2 + + +-- !query +(select * from t union all select * from t) +|> select x + length(y) as result +-- !query schema +struct +-- !query output +3 +3 +4 +4 + + +-- !query +(table t + |> select x, y + |> select x) +union all +select x from t where x < 1 +-- !query schema +struct +-- !query output +0 +0 +1 + + +-- !query +(select col from st) +|> select col.i1 +-- !query schema +struct +-- !query output +2 + + +-- !query +table st +|> select st.col.i1 +-- !query schema +struct +-- !query output +2 + + +-- !query +table t +|> select (select a from other where x = a limit 1) as result +-- !query schema +struct +-- !query output +1 +NULL + + +-- !query +select (values (0) tab(col) |> select col) as result +-- !query schema +struct +-- !query output +0 + + +-- !query +table t +|> select (select any_value(a) from other where x = a limit 1) as result +-- !query schema +struct +-- !query output +1 +NULL + + +-- !query +table t +|> select x + length(x) as z, z + 1 as plus_one +-- !query schema +struct +-- !query output +1 2 +2 3 + + +-- !query +table t +|> select first_value(x) over (partition by y) as result +-- !query schema +struct +-- !query output +0 +1 + + +-- !query +select 1 x, 2 y, 3 z +|> select 1 + sum(x) over (), + avg(y) over (), + x, + avg(x+1) over (partition by y order by z) AS a2 +|> select a2 +-- !query schema +struct +-- !query output +2.0 + + +-- !query +table t +|> select x, count(*) over () +|> select x +-- !query schema +struct +-- !query output +0 +1 + + +-- !query +table t +|> select distinct x, y +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +table t +|> select sum(x) as result +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "sum(x#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 19, + "stopIndex" : 24, + "fragment" : "sum(x)" + } ] +} + + +-- !query +table t +|> select y, length(y) + sum(x) as result +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "sum(x#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 34, + "stopIndex" : 39, + "fragment" : "sum(x)" + } ] +} + + +-- !query +drop table t +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table other +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table st +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index decfb5555dd87..a80444feb68ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -26,10 +26,11 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, Un import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Concat, GreaterThan, Literal, NullsFirst, SortOrder, UnresolvedWindowExpression, UnspecifiedFrame, WindowSpecDefinition, WindowSpecReference} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, PROJECT, UNRESOLVED_RELATION} import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, RefreshResource} -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StringType import org.apache.spark.util.ArrayImplicits._ @@ -880,4 +881,20 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { parser.parsePlan("SELECT\u30001") // Unicode ideographic space } // scalastyle:on + + test("Operator pipe SQL syntax") { + withSQLConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED.key -> "true") { + // Basic selection. + // Here we check that every parsed plan contains a projection and a source relation or + // inline table. + def checkPipeSelect(query: String): Unit = { + val plan: LogicalPlan = parser.parsePlan(query) + assert(plan.containsPattern(PROJECT)) + assert(plan.containsAnyPattern(UNRESOLVED_RELATION, LOCAL_RELATION)) + } + checkPipeSelect("TABLE t |> SELECT 1 AS X") + checkPipeSelect("TABLE t |> SELECT 1 AS X, 2 AS Y |> SELECT X + Y AS Z") + checkPipeSelect("VALUES (0), (1) tab(col) |> SELECT col * 2 AS result") + } + } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 026b2388c593c..331572e62f566 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -103,7 +103,8 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServ // SPARK-42921 "timestampNTZ/datetime-special-ansi.sql", // SPARK-47264 - "collations.sql" + "collations.sql", + "pipe-operators.sql" ) override def runQueries(