forked from delta-io/delta
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Delta SQL parser change to support CLUSTER BY
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms. When we integrate with Apache Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR. Closes delta-io#2328 GitOrigin-RevId: 19262070edbcaead765e7f9eefe96b6e63a7f884
- Loading branch information
1 parent
ab83957
commit 5e2cd96
Showing
6 changed files
with
398 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
155 changes: 155 additions & 0 deletions
155
spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
/* | ||
* Copyright (2021) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.delta.skipping.clustering.temp | ||
|
||
import scala.reflect.ClassTag | ||
|
||
import com.fasterxml.jackson.annotation.JsonInclude.Include | ||
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} | ||
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} | ||
import org.antlr.v4.runtime.ParserRuleContext | ||
|
||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, ParserUtils} | ||
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, CreateTableAsSelect, LeafNode, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} | ||
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedReference, Transform} | ||
|
||
/** | ||
* A container for clustering information. Copied from OSS Spark. | ||
* | ||
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. | ||
* @see https://github.com/apache/spark/pull/42577 | ||
* | ||
* @param columnNames the names of the columns used for clustering. | ||
*/ | ||
case class ClusterBySpec(columnNames: Seq[NamedReference]) { | ||
override def toString: String = toJson | ||
|
||
def toJson: String = | ||
ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames)) | ||
} | ||
|
||
object ClusterBySpec { | ||
private val mapper = { | ||
val ret = new ObjectMapper() with ClassTagExtensions | ||
ret.setSerializationInclusion(Include.NON_ABSENT) | ||
ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | ||
ret.registerModule(DefaultScalaModule) | ||
ret | ||
} | ||
|
||
// ClassTag is added to avoid the "same type after erasure" issue with the case class. | ||
def apply[_: ClassTag](columnNames: Seq[Seq[String]]): ClusterBySpec = { | ||
ClusterBySpec(columnNames.map(FieldReference(_))) | ||
} | ||
|
||
// Convert from table property back to ClusterBySpec. | ||
def fromProperty(columns: String): ClusterBySpec = { | ||
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_))) | ||
} | ||
} | ||
|
||
/** | ||
* A [[LogicalPlan]] representing a CLUSTER BY clause. | ||
* | ||
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. | ||
* @see https://github.com/apache/spark/pull/42577 | ||
* | ||
* @param clusterBySpec: clusterBySpec which contains the clustering columns. | ||
* @param startIndex: start index of CLUSTER BY clause. | ||
* @param stopIndex: stop index of CLUSTER BY clause. | ||
* @param parenStartIndex: start index of the left parenthesis in CLUSTER BY clause. | ||
* @param parenStopIndex: stop index of the right parenthesis in CLUSTER BY clause. | ||
* @param ctx: parser rule context of the CLUSTER BY clause. | ||
*/ | ||
case class ClusterByPlan( | ||
clusterBySpec: ClusterBySpec, | ||
startIndex: Int, | ||
stopIndex: Int, | ||
parenStartIndex: Int, | ||
parenStopIndex: Int, | ||
ctx: ParserRuleContext) | ||
extends LeafNode { | ||
override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = this | ||
override def output: Seq[Attribute] = Seq.empty | ||
} | ||
|
||
/** | ||
* Parser utils for parsing a [[ClusterByPlan]] and converts it to table properties. | ||
* | ||
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. | ||
* @see https://github.com/apache/spark/pull/42577 | ||
* | ||
* @param clusterByPlan: the ClusterByPlan to parse. | ||
* @param delegate: delegate parser. | ||
*/ | ||
case class ClusterByParserUtils(clusterByPlan: ClusterByPlan, delegate: ParserInterface) { | ||
// Update partitioning to include clustering columns as transforms. | ||
private def updatePartitioning(partitioning: Seq[Transform]): Seq[Transform] = { | ||
// Validate no bucketing is specified. | ||
if (partitioning.exists(t => t.isInstanceOf[BucketTransform])) { | ||
ParserUtils.operationNotAllowed( | ||
"Clustering and bucketing cannot both be specified. " + | ||
"Please remove CLUSTERED BY INTO BUCKETS if you " + | ||
"want to create a Delta table with clustering", | ||
clusterByPlan.ctx) | ||
} | ||
Seq(ClusterByTransform(clusterByPlan.clusterBySpec.columnNames)) | ||
} | ||
|
||
/** | ||
* Parse the [[ClusterByPlan]] by replacing CLUSTER BY with PARTITIONED BY and | ||
* leverage Spark SQL parser to perform the validation. After parsing, store the | ||
* clustering columns in the logical plan's partitioning transforms. | ||
* | ||
* @param sqlText: original SQL text. | ||
* @return the logical plan after parsing. | ||
*/ | ||
def parsePlan(sqlText: String): LogicalPlan = { | ||
val colText = | ||
sqlText.substring(clusterByPlan.parenStartIndex, clusterByPlan.parenStopIndex + 1) | ||
// Replace CLUSTER BY with PARTITIONED BY to let SparkSqlParser do the validation for us. | ||
// This serves as a short-term workaround until Spark incorporates CREATE TABLE ... CLUSTER BY | ||
// syntax. | ||
val partitionedByText = "PARTITIONED BY " + colText | ||
val newSqlText = | ||
sqlText.substring(0, clusterByPlan.startIndex) + | ||
partitionedByText + | ||
sqlText.substring(clusterByPlan.stopIndex + 1) | ||
try { | ||
delegate.parsePlan(newSqlText) match { | ||
case create: CreateTable => | ||
create.copy(partitioning = updatePartitioning(create.partitioning)) | ||
case ctas: CreateTableAsSelect => | ||
ctas.copy(partitioning = updatePartitioning(ctas.partitioning)) | ||
case replace: ReplaceTable => | ||
replace.copy(partitioning = updatePartitioning(replace.partitioning)) | ||
case rtas: ReplaceTableAsSelect => | ||
rtas.copy(partitioning = updatePartitioning(rtas.partitioning)) | ||
case plan => plan | ||
} | ||
} catch { | ||
case e: ParseException if (e.errorClass.contains("DUPLICATE_CLAUSES")) => | ||
// Since we replace CLUSTER BY with PARTITIONED BY, duplicated clauses means we | ||
// encountered CLUSTER BY with PARTITIONED BY. | ||
ParserUtils.operationNotAllowed( | ||
"Clustering and partitioning cannot both be specified. " + | ||
"Please remove PARTITIONED BY if you want to create a Delta table with clustering", | ||
clusterByPlan.ctx) | ||
} | ||
} | ||
} |
59 changes: 59 additions & 0 deletions
59
...c/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterByTransform.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright (2021) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.delta.skipping.clustering.temp | ||
|
||
import org.apache.spark.sql.connector.expressions.{Expression, NamedReference, Transform} | ||
|
||
/** | ||
* Minimal version of Spark's ClusterByTransform. We'll remove this when we integrate with OSS | ||
* Spark's CLUSTER BY implementation. | ||
* | ||
* This class represents a transform for `ClusterBySpec`. This is used to bundle | ||
* ClusterBySpec in CreateTable's partitioning transforms to pass it down to analyzer/delta. | ||
*/ | ||
final case class ClusterByTransform( | ||
columnNames: Seq[NamedReference]) extends Transform { | ||
|
||
override val name: String = "temp_cluster_by" | ||
|
||
override def arguments: Array[Expression] = columnNames.toArray | ||
|
||
override def toString: String = s"$name(${arguments.map(_.describe).mkString(", ")})" | ||
} | ||
|
||
/** | ||
* Convenience extractor for ClusterByTransform. | ||
*/ | ||
object ClusterByTransform { | ||
def unapply(transform: Transform): Option[Seq[NamedReference]] = | ||
transform match { | ||
case NamedTransform("temp_cluster_by", arguments) => | ||
Some(arguments.map(_.asInstanceOf[NamedReference])) | ||
case _ => | ||
None | ||
} | ||
} | ||
|
||
/** | ||
* Copied from OSS Spark. We'll remove this when we integrate with OSS Spark's CLUSTER BY. | ||
* Convenience extractor for any Transform. | ||
*/ | ||
private object NamedTransform { | ||
def unapply(transform: Transform): Some[(String, Seq[Expression])] = { | ||
Some((transform.name, transform.arguments)) | ||
} | ||
} |
Oops, something went wrong.