-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Delta SQL parser change to support CLUSTER BY #2328
Closed
Closed
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ import java.util.Locale | |
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.spark.sql.catalyst.TimeTravel | ||
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByParserUtils, ClusterByPlan, ClusterBySpec} | ||
|
||
import org.apache.spark.sql.delta._ | ||
import org.apache.spark.sql.delta.commands._ | ||
|
@@ -58,7 +59,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} | |
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||
import org.apache.spark.sql.catalyst.analysis._ | ||
import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface} | ||
import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin} | ||
import org.apache.spark.sql.catalyst.parser.ParserUtils.{checkDuplicateClauses, string, withOrigin} | ||
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddConstraint, AlterTableDropConstraint, AlterTableDropFeature, CloneTableStatement, LogicalPlan, RestoreTableStatement} | ||
import org.apache.spark.sql.catalyst.trees.Origin | ||
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} | ||
|
@@ -76,6 +77,8 @@ class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface { | |
|
||
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => | ||
builder.visit(parser.singleStatement()) match { | ||
case clusterByPlan: ClusterByPlan => | ||
ClusterByParserUtils(clusterByPlan, delegate).parsePlan(sqlText) | ||
case plan: LogicalPlan => plan | ||
case _ => delegate.parsePlan(sqlText) | ||
} | ||
|
@@ -406,6 +409,33 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { | |
RestoreTableStatement(timeTravelTableRelation.asInstanceOf[TimeTravel]) | ||
} | ||
|
||
/** | ||
* Captures any CLUSTER BY clause and creates a [[ClusterByPlan]] logical plan. | ||
* The plan will be used as a sentinel for DeltaSqlParser to process it further. | ||
*/ | ||
override def visitClusterBy(ctx: ClusterByContext): LogicalPlan = withOrigin(ctx) { | ||
val clusterBySpecCtx = ctx.clusterBySpec.asScala.head | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. This function is invoked when we've matched |
||
checkDuplicateClauses(ctx.clusterBySpec, "CLUSTER BY", clusterBySpecCtx) | ||
val columnNames = | ||
clusterBySpecCtx.interleave.asScala | ||
.map(_.identifier.asScala.map(_.getText).toSeq) | ||
.map(_.asInstanceOf[Seq[String]]).toSeq | ||
// get CLUSTER BY clause positions. | ||
val startIndex = clusterBySpecCtx.getStart.getStartIndex | ||
val stopIndex = clusterBySpecCtx.getStop.getStopIndex | ||
|
||
// get CLUSTER BY parenthesis positions. | ||
val parenStartIndex = clusterBySpecCtx.LEFT_PAREN().getSymbol.getStartIndex | ||
val parenStopIndex = clusterBySpecCtx.RIGHT_PAREN().getSymbol.getStopIndex | ||
ClusterByPlan( | ||
ClusterBySpec(columnNames), | ||
startIndex, | ||
stopIndex, | ||
parenStartIndex, | ||
parenStopIndex, | ||
clusterBySpecCtx) | ||
} | ||
|
||
/** | ||
* Time travel the table to the given version or timestamp. | ||
*/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
155 changes: 155 additions & 0 deletions
155
spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
/* | ||
* Copyright (2021) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.delta.skipping.clustering.temp | ||
|
||
import scala.reflect.ClassTag | ||
|
||
import com.fasterxml.jackson.annotation.JsonInclude.Include | ||
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} | ||
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} | ||
import org.antlr.v4.runtime.ParserRuleContext | ||
|
||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, ParserUtils} | ||
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, CreateTableAsSelect, LeafNode, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} | ||
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedReference, Transform} | ||
|
||
/** | ||
* A container for clustering information. Copied from OSS Spark. | ||
* | ||
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. | ||
* @see https://github.com/apache/spark/pull/42577 | ||
* | ||
* @param columnNames the names of the columns used for clustering. | ||
*/ | ||
case class ClusterBySpec(columnNames: Seq[NamedReference]) { | ||
override def toString: String = toJson | ||
|
||
def toJson: String = | ||
ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames)) | ||
} | ||
|
||
object ClusterBySpec { | ||
private val mapper = { | ||
val ret = new ObjectMapper() with ClassTagExtensions | ||
ret.setSerializationInclusion(Include.NON_ABSENT) | ||
ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | ||
ret.registerModule(DefaultScalaModule) | ||
ret | ||
} | ||
|
||
// ClassTag is added to avoid the "same type after erasure" issue with the case class. | ||
def apply[_: ClassTag](columnNames: Seq[Seq[String]]): ClusterBySpec = { | ||
ClusterBySpec(columnNames.map(FieldReference(_))) | ||
} | ||
|
||
// Convert from table property back to ClusterBySpec. | ||
def fromProperty(columns: String): ClusterBySpec = { | ||
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_))) | ||
} | ||
} | ||
|
||
/** | ||
* A [[LogicalPlan]] representing a CLUSTER BY clause. | ||
* | ||
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. | ||
* @see https://github.com/apache/spark/pull/42577 | ||
* | ||
* @param clusterBySpec: clusterBySpec which contains the clustering columns. | ||
* @param startIndex: start index of CLUSTER BY clause. | ||
* @param stopIndex: stop index of CLUSTER BY clause. | ||
* @param parenStartIndex: start index of the left parenthesis in CLUSTER BY clause. | ||
* @param parenStopIndex: stop index of the right parenthesis in CLUSTER BY clause. | ||
* @param ctx: parser rule context of the CLUSTER BY clause. | ||
*/ | ||
case class ClusterByPlan( | ||
clusterBySpec: ClusterBySpec, | ||
startIndex: Int, | ||
stopIndex: Int, | ||
parenStartIndex: Int, | ||
parenStopIndex: Int, | ||
ctx: ParserRuleContext) | ||
extends LeafNode { | ||
override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = this | ||
override def output: Seq[Attribute] = Seq.empty | ||
} | ||
|
||
/** | ||
* Parser utils for parsing a [[ClusterByPlan]] and converts it to table properties. | ||
* | ||
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. | ||
* @see https://github.com/apache/spark/pull/42577 | ||
* | ||
* @param clusterByPlan: the ClusterByPlan to parse. | ||
* @param delegate: delegate parser. | ||
*/ | ||
case class ClusterByParserUtils(clusterByPlan: ClusterByPlan, delegate: ParserInterface) { | ||
// Update partitioning to include clustering columns as transforms. | ||
private def updatePartitioning(partitioning: Seq[Transform]): Seq[Transform] = { | ||
// Validate no bucketing is specified. | ||
if (partitioning.exists(t => t.isInstanceOf[BucketTransform])) { | ||
ParserUtils.operationNotAllowed( | ||
"Clustering and bucketing cannot both be specified. " + | ||
"Please remove CLUSTERED BY INTO BUCKETS if you " + | ||
"want to create a Delta table with clustering", | ||
clusterByPlan.ctx) | ||
} | ||
Seq(ClusterByTransform(clusterByPlan.clusterBySpec.columnNames)) | ||
} | ||
|
||
/** | ||
* Parse the [[ClusterByPlan]] by replacing CLUSTER BY with PARTITIONED BY and | ||
* leverage Spark SQL parser to perform the validation. After parsing, store the | ||
* clustering columns in the logical plan's partitioning transforms. | ||
* | ||
* @param sqlText: original SQL text. | ||
* @return the logical plan after parsing. | ||
*/ | ||
def parsePlan(sqlText: String): LogicalPlan = { | ||
val colText = | ||
sqlText.substring(clusterByPlan.parenStartIndex, clusterByPlan.parenStopIndex + 1) | ||
// Replace CLUSTER BY with PARTITIONED BY to let SparkSqlParser do the validation for us. | ||
// This serves as a short-term workaround until Spark incorporates CREATE TABLE ... CLUSTER BY | ||
// syntax. | ||
val partitionedByText = "PARTITIONED BY " + colText | ||
val newSqlText = | ||
sqlText.substring(0, clusterByPlan.startIndex) + | ||
partitionedByText + | ||
sqlText.substring(clusterByPlan.stopIndex + 1) | ||
try { | ||
delegate.parsePlan(newSqlText) match { | ||
case create: CreateTable => | ||
create.copy(partitioning = updatePartitioning(create.partitioning)) | ||
case ctas: CreateTableAsSelect => | ||
ctas.copy(partitioning = updatePartitioning(ctas.partitioning)) | ||
case replace: ReplaceTable => | ||
replace.copy(partitioning = updatePartitioning(replace.partitioning)) | ||
case rtas: ReplaceTableAsSelect => | ||
rtas.copy(partitioning = updatePartitioning(rtas.partitioning)) | ||
case plan => plan | ||
} | ||
} catch { | ||
case e: ParseException if (e.errorClass.contains("DUPLICATE_CLAUSES")) => | ||
// Since we replace CLUSTER BY with PARTITIONED BY, duplicated clauses means we | ||
// encountered CLUSTER BY with PARTITIONED BY. | ||
ParserUtils.operationNotAllowed( | ||
"Clustering and partitioning cannot both be specified. " + | ||
"Please remove PARTITIONED BY if you want to create a Delta table with clustering", | ||
clusterByPlan.ctx) | ||
} | ||
} | ||
} |
59 changes: 59 additions & 0 deletions
59
...c/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterByTransform.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright (2021) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.delta.skipping.clustering.temp | ||
|
||
import org.apache.spark.sql.connector.expressions.{Expression, NamedReference, Transform} | ||
|
||
/** | ||
* Minimal version of Spark's ClusterByTransform. We'll remove this when we integrate with OSS | ||
* Spark's CLUSTER BY implementation. | ||
* | ||
* This class represents a transform for `ClusterBySpec`. This is used to bundle | ||
* ClusterBySpec in CreateTable's partitioning transforms to pass it down to analyzer/delta. | ||
*/ | ||
final case class ClusterByTransform( | ||
columnNames: Seq[NamedReference]) extends Transform { | ||
|
||
override val name: String = "temp_cluster_by" | ||
|
||
override def arguments: Array[Expression] = columnNames.toArray | ||
|
||
override def toString: String = s"$name(${arguments.map(_.describe).mkString(", ")})" | ||
} | ||
|
||
/** | ||
* Convenience extractor for ClusterByTransform. | ||
*/ | ||
object ClusterByTransform { | ||
def unapply(transform: Transform): Option[Seq[NamedReference]] = | ||
transform match { | ||
case NamedTransform("temp_cluster_by", arguments) => | ||
Some(arguments.map(_.asInstanceOf[NamedReference])) | ||
case _ => | ||
None | ||
} | ||
} | ||
|
||
/** | ||
* Copied from OSS Spark. We'll remove this when we integrate with OSS Spark's CLUSTER BY. | ||
* Convenience extractor for any Transform. | ||
*/ | ||
private object NamedTransform { | ||
def unapply(transform: Transform): Some[(String, Seq[Expression])] = { | ||
Some((transform.name, transform.arguments)) | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, not obvious to me. How does the Spark returns a
ClusterByPlan
? Can a user typeCREATE TABLE ... CLUSTER BY
using the Spark 3.5 which the Delta depends on?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
ClusterByPlan
is injected by Delta in L416 below. We are using this workaround since Spark 3.5 doesn't supportCLUSTER BY
yet.The flow looks like this:
CREATE TABLE ... CLUSTER BY
.CLUSTER BY
gets matched withclusterBySpec
inDeltaSqlBase.g4
andvisitClusterBy
is called.visitClusterBy
will create aClusterByPlan
which gets detected inparsePlan
and callsClusterByParserUtils.parsePlan
ClusterByParserUtils.parsePlan
will replaceCLUSTER BY
withPARTITIONED BY
and call SparkSqlParser(delegate.parsePlan
) for validation.partitioning
transforms.