Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49556][SQL] Add SQL pipe syntax for the SELECT operator #48047

Closed
wants to merge 11 commits into from
Closed
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3707,6 +3707,12 @@
],
"sqlState" : "42K03"
},
"PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION" : {
"message" : [
"Aggregate function <expr> is not allowed when using the pipe operator |> SELECT clause; please use the pipe operator |> AGGREGATE clause instead"
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
],
"sqlState" : "0A000"
},
"PIVOT_VALUE_DATA_TYPE_MISMATCH" : {
"message" : [
"Invalid pivot value '<value>': value data type <valueType> does not match pivot column data type <pivotType>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ TILDE: '~';
AMPERSAND: '&';
PIPE: '|';
CONCAT_PIPE: '||';
OPERATOR_PIPE: '|>';
HAT: '^';
COLON: ':';
DOUBLE_COLON: '::';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ queryTerm
operator=INTERSECT setQuantifier? right=queryTerm #setOperation
| left=queryTerm {!legacy_setops_precedence_enabled}?
operator=(UNION | EXCEPT | SETMINUS) setQuantifier? right=queryTerm #setOperation
| left=queryTerm OPERATOR_PIPE operatorPipeRightSide #operatorPipeStatement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a hard time understanding this recursive parser rule. How does it match continuous pipe operators? And what is the Operator Precedence with mixed classic SQL query syntax and the new pipe syntax?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with ANTLR enough. So this recursive parser rule matches the SQL string from the end? e.g. it finds the first operatorPipeRightSide from the end, and then tries to match a chain of pipe operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, no problem, I can try to explain it.

ANTLR tokenizes each SQL query it receives, converting the input string into a sequence of tokens (using SqlBaseLexer.g4). Then the parser's job (in this file) is to convert that sequence of tokens into an initial unresolved logical plan representing the parse tree.

To do so, the parser checks each rule in the listed sequence, one-by-one, comparing the provided tokens at the current index in the sequence with the required tokens from the rule. If the rule matches, wherein all keywords and other components in the rule map to corresponding input tokens, then the parser generates the rule's unresolved logical plan tree using the logic in AstBuilder.scala.

In this case, we define the new token OPERATOR_PIPE: '|>'; in SqlBaseLexer.g4. Then we add a new option to the existing queryTerm rule to allow any syntax matching an existing queryTerm to appear on the left side of this |> token and the syntax of operatorPipeRightSide on the right side (which in this PR is limited to only a selectClause).

ANTLR grammar allows left-recursive rules wherein any alternative may begin with a reference to the same rule, so the queryTerm on the left side may match any valid existing syntax for a queryTerm such as TABLE t, a table subquery, etc. Since we are extending queryTerm to also match against queryTerm OPERATOR_PIPE operatorPipeRightSide, this alternative implements the recursion wherein we may chain multiple pipe operators together. For example, in TABLE t |> SELECT x |> LIMIT 2, TABLE t matches a queryTerm, then TABLE t |> SELECT x matches another, and finally the entire query (using the new recursive #operatorPipeStatement alternative two times).

Otherwise, if the rule does not match, then the parser moves on to try the next rule in the sequence, and so on, similar to a Scala pattern-match. This defines the precedence of the rules amongst each other: the ones appearing first in the list in SqlBaseParser.g4 apply first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the parser generates a basic parse tree, and AstBuilder.scala transforms that into an unresolved logical plan? Thanks for the clear and detailed explanation! I'm adding SQL syntax too and this is very helpful.

;

queryPrimary
Expand Down Expand Up @@ -1457,6 +1458,10 @@ version
| stringLit
;

operatorPipeRightSide
: selectClause
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
// - Reserved keywords:
// Keywords that are reserved and can't be used as identifiers for table, view, column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
extendedResolutionRules : _*),
Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
Batch("Post-Hoc Resolution", Once,
Seq(ResolveCommandsWithIfExists) ++
Seq(ResolveCommandsWithIfExists, RemovePipeOperators) ++
postHocResolutionRules: _*),
Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints),
Expand Down Expand Up @@ -2727,6 +2727,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
t => t.containsAnyPattern(AGGREGATE_EXPRESSION, PYTHON_UDF) && t.containsPattern(PROJECT),
ruleId) {
case Project(projectList, child) if containsAggregates(projectList) =>
if (child.isInstanceOf[PipeOperatorSelect]) {
dtenedor marked this conversation as resolved.
Show resolved Hide resolved
// If we used the pipe operator |> SELECT clause to specify an aggregate function, this is
// invalid; return an error message instructing the user to use the pipe operator
// |> AGGREGATE clause for this purpose instead.
throw QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(projectList.head)
}
Aggregate(Nil, projectList, child)
}

Expand All @@ -2747,6 +2753,17 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
}

/**
* Removes placeholder PipeOperator* logical plan nodes and checks invariants.
*/
object RemovePipeOperators extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(PIPE_OPERATOR_SELECT), ruleId) {
case PipeOperatorSelect(child) =>
child
}
}

/**
* This rule finds aggregate expressions that are not in an aggregate operator. For example,
* those in a HAVING clause or ORDER BY clause. These expressions are pushed down to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5677,4 +5677,31 @@ class AstBuilder extends DataTypeAstBuilder
withOrigin(ctx) {
visitSetVariableImpl(ctx.query(), ctx.multipartIdentifierList(), ctx.assignmentList())
}

override def visitOperatorPipeStatement(ctx: OperatorPipeStatementContext): LogicalPlan = {
visitOperatorPipeRightSide(ctx.operatorPipeRightSide(), plan(ctx.left))
}

private def visitOperatorPipeRightSide(
ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = {
if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
}
Option(ctx.selectClause).map { c =>
withSelectQuerySpecification(
ctx = ctx,
selectClause = c,
lateralView = new java.util.ArrayList[LateralViewContext](),
whereClause = null,
aggregationClause = null,
havingClause = null,
windowClause = null,
left) match {
// The input should always be a projection since we only pass a context for the SELECT
// clause here and pass "null" for all other clauses.
case p: Project =>
dtenedor marked this conversation as resolved.
Show resolved Hide resolved
p.copy(child = PipeOperatorSelect(p.child))
}
}.get
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT, TreePattern}

/**
* Represents a SELECT clause when used with the |> SQL pipe operator.
* We use this operator to make sure that no aggregate functions exist in the SELECT expressions.
*/
case class PipeOperatorSelect(child: LogicalPlan) extends UnaryNode {
final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR_SELECT)
override def output: Seq[Attribute] = child.output
override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
PipeOperatorSelect(newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$RemovePipeOperators" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggAliasInGroupBy" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases" ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object TreePattern extends Enumeration {
val NOT: Value = Value
val NULL_CHECK: Value = Value
val NULL_LITERAL: Value = Value
val PIPE_OPERATOR_SELECT: Value = Value
val SERIALIZE_FROM_OBJECT: Value = Value
val OR: Value = Value
val OUTER_REFERENCE: Value = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4096,4 +4096,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("functionName" -> functionName)
)
}

def pipeOperatorSelectContainsAggregateFunction(expr: Expression): Throwable = {
new AnalysisException(
errorClass = "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
messageParameters = Map(
"expr" -> expr.toString),
origin = expr.origin)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4989,6 +4989,15 @@ object SQLConf {
.stringConf
.createWithDefault("versionAsOf")

val OPERATOR_PIPE_SYNTAX_ENABLED =
buildConf("spark.sql.operatorPipeSyntaxEnabled")
.doc("If true, enable operator pipe syntax for Apache Spark SQL. This uses the operator " +
"pipe marker |> to indicate separation between clauses of SQL in a manner that describes " +
"the sequence of steps that the query performs in a composable fashion.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
dtenedor marked this conversation as resolved.
Show resolved Hide resolved

val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation")
.internal()
.doc("If true, the old bogus percentile_disc calculation is used. The old calculation " +
Expand Down
Loading