Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49556][SQL] Add SQL pipe syntax for the SELECT operator #48047

Closed
wants to merge 11 commits into from
Closed
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3748,6 +3748,12 @@
],
"sqlState" : "42K03"
},
"PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION" : {
"message" : [
"Aggregate function <expr> is not allowed when using the pipe operator |> SELECT clause; please use the pipe operator |> AGGREGATE clause instead"
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
],
"sqlState" : "0A000"
},
"PIVOT_VALUE_DATA_TYPE_MISMATCH" : {
"message" : [
"Invalid pivot value '<value>': value data type <valueType> does not match pivot column data type <pivotType>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ TILDE: '~';
AMPERSAND: '&';
PIPE: '|';
CONCAT_PIPE: '||';
OPERATOR_PIPE: '|>';
HAT: '^';
COLON: ':';
DOUBLE_COLON: '::';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ queryTerm
operator=INTERSECT setQuantifier? right=queryTerm #setOperation
| left=queryTerm {!legacy_setops_precedence_enabled}?
operator=(UNION | EXCEPT | SETMINUS) setQuantifier? right=queryTerm #setOperation
| left=queryTerm OPERATOR_PIPE operatorPipeRightSide #operatorPipeStatement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a hard time understanding this recursive parser rule. How does it match continuous pipe operators? And what is the Operator Precedence with mixed classic SQL query syntax and the new pipe syntax?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with ANTLR enough. So this recursive parser rule matches the SQL string from the end? e.g. it finds the first operatorPipeRightSide from the end, and then tries to match a chain of pipe operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, no problem, I can try to explain it.

ANTLR tokenizes each SQL query it receives, converting the input string into a sequence of tokens (using SqlBaseLexer.g4). Then the parser's job (in this file) is to convert that sequence of tokens into an initial unresolved logical plan representing the parse tree.

To do so, the parser checks each rule in the listed sequence, one-by-one, comparing the provided tokens at the current index in the sequence with the required tokens from the rule. If the rule matches, wherein all keywords and other components in the rule map to corresponding input tokens, then the parser generates the rule's unresolved logical plan tree using the logic in AstBuilder.scala.

In this case, we define the new token OPERATOR_PIPE: '|>'; in SqlBaseLexer.g4. Then we add a new option to the existing queryTerm rule to allow any syntax matching an existing queryTerm to appear on the left side of this |> token and the syntax of operatorPipeRightSide on the right side (which in this PR is limited to only a selectClause).

ANTLR grammar allows left-recursive rules wherein any alternative may begin with a reference to the same rule, so the queryTerm on the left side may match any valid existing syntax for a queryTerm such as TABLE t, a table subquery, etc. Since we are extending queryTerm to also match against queryTerm OPERATOR_PIPE operatorPipeRightSide, this alternative implements the recursion wherein we may chain multiple pipe operators together. For example, in TABLE t |> SELECT x |> LIMIT 2, TABLE t matches a queryTerm, then TABLE t |> SELECT x matches another, and finally the entire query (using the new recursive #operatorPipeStatement alternative two times).

Otherwise, if the rule does not match, then the parser moves on to try the next rule in the sequence, and so on, similar to a Scala pattern-match. This defines the precedence of the rules amongst each other: the ones appearing first in the list in SqlBaseParser.g4 apply first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the parser generates a basic parse tree, and AstBuilder.scala transforms that into an unresolved logical plan? Thanks for the clear and detailed explanation! I'm adding SQL syntax too and this is very helpful.

;

queryPrimary
Expand Down Expand Up @@ -1462,6 +1463,10 @@ version
| stringLit
;

operatorPipeRightSide
: selectClause
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
// - Reserved keywords:
// Keywords that are reserved and can't be used as identifiers for table, view, column,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE, TreePattern}
import org.apache.spark.sql.errors.QueryCompilationErrors

/**
* Represents a SELECT clause when used with the |> SQL pipe operator.
* We use this to make sure that no aggregate functions exist in the SELECT expressions.
*/
case class PipeSelect(child: Expression)
extends UnaryExpression with RuntimeReplaceable {
final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE)
override def withNewChildInternal(newChild: Expression): Expression = PipeSelect(newChild)
override lazy val replacement: Expression = {
def visit(e: Expression): Unit = e match {
case a: AggregateFunction =>
// If we used the pipe operator |> SELECT clause to specify an aggregate function, this is
// invalid; return an error message instructing the user to use the pipe operator
// |> AGGREGATE clause for this purpose instead.
throw QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(a)
case _: WindowExpression =>
// Window functions are allowed in pipe SELECT operators, so do not traverse into children.
case _ =>
e.children.foreach(visit)
}
visit(child)
child
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ class AstBuilder extends DataTypeAstBuilder
ctx.aggregationClause,
ctx.havingClause,
ctx.windowClause,
plan
plan,
isPipeOperatorSelect = false
)
}
}
Expand Down Expand Up @@ -1013,7 +1014,8 @@ class AstBuilder extends DataTypeAstBuilder
ctx.aggregationClause,
ctx.havingClause,
ctx.windowClause,
from
from,
isPipeOperatorSelect = false
)
}

Expand Down Expand Up @@ -1100,7 +1102,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause,
havingClause,
windowClause,
isDistinct = false)
isDistinct = false,
isPipeOperatorSelect = false)

ScriptTransformation(
string(visitStringLit(transformClause.script)),
Expand All @@ -1121,6 +1124,8 @@ class AstBuilder extends DataTypeAstBuilder
* Add a regular (SELECT) query specification to a logical plan. The query specification
* is the core of the logical plan, this is where sourcing (FROM clause), projection (SELECT),
* aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
* If 'isPipeOperatorSelect' is true, wraps each projected expression with a [[PipeSelect]]
* expression for future validation of the expressions during analysis.
*
* Note that query hints are ignored (both by the parser and the builder).
*/
Expand All @@ -1132,7 +1137,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause: AggregationClauseContext,
havingClause: HavingClauseContext,
windowClause: WindowClauseContext,
relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
relation: LogicalPlan,
isPipeOperatorSelect: Boolean): LogicalPlan = withOrigin(ctx) {
val isDistinct = selectClause.setQuantifier() != null &&
selectClause.setQuantifier().DISTINCT() != null

Expand All @@ -1144,7 +1150,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause,
havingClause,
windowClause,
isDistinct)
isDistinct,
isPipeOperatorSelect)

// Hint
selectClause.hints.asScala.foldRight(plan)(withHints)
Expand All @@ -1158,7 +1165,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause: AggregationClauseContext,
havingClause: HavingClauseContext,
windowClause: WindowClauseContext,
isDistinct: Boolean): LogicalPlan = {
isDistinct: Boolean,
isPipeOperatorSelect: Boolean): LogicalPlan = {
// Add lateral views.
val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate)

Expand All @@ -1172,7 +1180,20 @@ class AstBuilder extends DataTypeAstBuilder
}

def createProject() = if (namedExpressions.nonEmpty) {
Project(namedExpressions, withFilter)
val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) {
// If this is a pipe operator |> SELECT clause, add a [[PipeSelect]] expression wrapping
// each alias in the project list, so the analyzer can check invariants later.
namedExpressions.map {
case a: Alias =>
a.withNewChildren(Seq(PipeSelect(a.child)))
.asInstanceOf[NamedExpression]
case other =>
other
}
} else {
namedExpressions
}
Project(newProjectList, withFilter)
} else {
withFilter
}
Expand Down Expand Up @@ -5711,6 +5732,29 @@ class AstBuilder extends DataTypeAstBuilder
visitSetVariableImpl(ctx.query(), ctx.multipartIdentifierList(), ctx.assignmentList())
}

override def visitOperatorPipeStatement(ctx: OperatorPipeStatementContext): LogicalPlan = {
visitOperatorPipeRightSide(ctx.operatorPipeRightSide(), plan(ctx.left))
}

private def visitOperatorPipeRightSide(
ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = {
if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
}
Option(ctx.selectClause).map { c =>
withSelectQuerySpecification(
ctx = ctx,
selectClause = c,
lateralView = new java.util.ArrayList[LateralViewContext](),
whereClause = null,
aggregationClause = null,
havingClause = null,
windowClause = null,
relation = left,
isPipeOperatorSelect = true)
}.get
}

/**
* Check plan for any parameters.
* If it finds any throws UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object TreePattern extends Enumeration {
val NOT: Value = Value
val NULL_CHECK: Value = Value
val NULL_LITERAL: Value = Value
val PIPE_OPERATOR_SELECT: Value = Value
val SERIALIZE_FROM_OBJECT: Value = Value
val OR: Value = Value
val OUTER_REFERENCE: Value = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4096,4 +4096,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("functionName" -> functionName)
)
}

def pipeOperatorSelectContainsAggregateFunction(expr: Expression): Throwable = {
new AnalysisException(
errorClass = "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
messageParameters = Map(
"expr" -> expr.toString),
origin = expr.origin)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4989,6 +4989,15 @@ object SQLConf {
.stringConf
.createWithDefault("versionAsOf")

val OPERATOR_PIPE_SYNTAX_ENABLED =
buildConf("spark.sql.operatorPipeSyntaxEnabled")
.doc("If true, enable operator pipe syntax for Apache Spark SQL. This uses the operator " +
"pipe marker |> to indicate separation between clauses of SQL in a manner that describes " +
"the sequence of steps that the query performs in a composable fashion.")
.version("4.0.0")
.booleanConf
.createWithDefault(Utils.isTesting)

val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation")
.internal()
.doc("If true, the old bogus percentile_disc calculation is used. The old calculation " +
Expand Down
Loading