Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Delta SQL parser change to support CLUSTER BY #2328

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ statement
| cloneTableHeader SHALLOW CLONE source=qualifiedName clause=temporalClause?
(TBLPROPERTIES tableProps=propertyList)?
(LOCATION location=stringLit)? #clone
| .*? clusterBySpec+ .*? #clusterBy
| .*? #passThrough
;

Expand All @@ -118,6 +119,10 @@ zorderSpec
| ZORDER BY interleave+=qualifiedName (COMMA interleave+=qualifiedName)*
;

clusterBySpec
: CLUSTER BY LEFT_PAREN interleave+=qualifiedName (COMMA interleave+=qualifiedName)* RIGHT_PAREN
;

temporalClause
: FOR? (SYSTEM_VERSION | VERSION) AS OF version=(INTEGER_VALUE | STRING)
| FOR? (SYSTEM_TIME | TIMESTAMP) AS OF timestamp=STRING
Expand Down Expand Up @@ -224,6 +229,7 @@ nonReserved
| NO | STATISTICS
| CLONE | SHALLOW
| FEATURE | TRUNCATE
| CLUSTER
;

// Define how the keywords above should appear in a user's SQL statement.
Expand All @@ -234,6 +240,7 @@ AS: 'AS';
BY: 'BY';
CHECK: 'CHECK';
CLONE: 'CLONE';
CLUSTER: 'CLUSTER';
COMMA: ',';
COMMENT: 'COMMENT';
CONSTRAINT: 'CONSTRAINT';
Expand Down
32 changes: 31 additions & 1 deletion spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import java.util.Locale
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.TimeTravel
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByParserUtils, ClusterByPlan, ClusterBySpec}

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.commands._
Expand All @@ -58,7 +59,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface}
import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
import org.apache.spark.sql.catalyst.parser.ParserUtils.{checkDuplicateClauses, string, withOrigin}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddConstraint, AlterTableDropConstraint, AlterTableDropFeature, CloneTableStatement, LogicalPlan, RestoreTableStatement}
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
Expand All @@ -76,6 +77,8 @@ class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case clusterByPlan: ClusterByPlan =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not obvious to me. How does the Spark returns a ClusterByPlan? Can a user type CREATE TABLE ... CLUSTER BY using the Spark 3.5 which the Delta depends on?

Copy link
Collaborator Author

@zedtang zedtang Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClusterByPlan is injected by Delta in L416 below. We are using this workaround since Spark 3.5 doesn't support CLUSTER BY yet.

The flow looks like this:

  1. User types CREATE TABLE ... CLUSTER BY.
  2. CLUSTER BY gets matched with clusterBySpec in DeltaSqlBase.g4 and visitClusterBy is called.
  3. visitClusterBy will create a ClusterByPlan which gets detected in parsePlan and calls ClusterByParserUtils.parsePlan
  4. ClusterByParserUtils.parsePlan will replace CLUSTER BY with PARTITIONED BY and call SparkSqlParser(delegate.parsePlan) for validation.
  5. After successful parsing, clustering column is saved in table's partitioning transforms.

ClusterByParserUtils(clusterByPlan, delegate).parsePlan(sqlText)
case plan: LogicalPlan => plan
case _ => delegate.parsePlan(sqlText)
}
Expand Down Expand Up @@ -406,6 +409,33 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
RestoreTableStatement(timeTravelTableRelation.asInstanceOf[TimeTravel])
}

/**
* Captures any CLUSTER BY clause and creates a [[ClusterByPlan]] logical plan.
* The plan will be used as a sentinel for DeltaSqlParser to process it further.
*/
override def visitClusterBy(ctx: ClusterByContext): LogicalPlan = withOrigin(ctx) {
val clusterBySpecCtx = ctx.clusterBySpec.asScala.head
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is head always safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This function is invoked when we've matched clusterBySpec+, which means one or more clusterBySpec

checkDuplicateClauses(ctx.clusterBySpec, "CLUSTER BY", clusterBySpecCtx)
val columnNames =
clusterBySpecCtx.interleave.asScala
.map(_.identifier.asScala.map(_.getText).toSeq)
.map(_.asInstanceOf[Seq[String]]).toSeq
// get CLUSTER BY clause positions.
val startIndex = clusterBySpecCtx.getStart.getStartIndex
val stopIndex = clusterBySpecCtx.getStop.getStopIndex

// get CLUSTER BY parenthesis positions.
val parenStartIndex = clusterBySpecCtx.LEFT_PAREN().getSymbol.getStartIndex
val parenStopIndex = clusterBySpecCtx.RIGHT_PAREN().getSymbol.getStopIndex
ClusterByPlan(
ClusterBySpec(columnNames),
startIndex,
stopIndex,
parenStartIndex,
parenStopIndex,
clusterBySpecCtx)
}

/**
* Time travel the table to the given version or timestamp.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package org.apache.spark.sql.delta.skipping.clustering

import org.apache.spark.sql.delta.{ClusteringTableFeature, DeltaConfigs}
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.ClusteringTableFeature
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf

Expand All @@ -27,6 +27,11 @@ import org.apache.spark.sql.internal.SQLConf
* Clustered table utility functions.
*/
trait ClusteredTableUtilsBase extends DeltaLogging {
// Clustering columns property key. The column names are logical and separated by comma.
// This will be removed when we integrate with OSS Spark and use
// [[CatalogTable.PROP_CLUSTERING_COLUMNS]] directly.
val PROP_CLUSTERING_COLUMNS: String = "clusteringColumns"

/**
* Returns whether the protocol version supports the Liquid table feature.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.skipping.clustering.temp

import scala.reflect.ClassTag

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import org.antlr.v4.runtime.ParserRuleContext

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, ParserUtils}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, CreateTableAsSelect, LeafNode, LogicalPlan, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedReference, Transform}

/**
* A container for clustering information. Copied from OSS Spark.
*
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation.
* @see https://github.com/apache/spark/pull/42577
*
* @param columnNames the names of the columns used for clustering.
*/
case class ClusterBySpec(columnNames: Seq[NamedReference]) {
override def toString: String = toJson

def toJson: String =
ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames))
}

object ClusterBySpec {
private val mapper = {
val ret = new ObjectMapper() with ClassTagExtensions
ret.setSerializationInclusion(Include.NON_ABSENT)
ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
ret.registerModule(DefaultScalaModule)
ret
}

// ClassTag is added to avoid the "same type after erasure" issue with the case class.
def apply[_: ClassTag](columnNames: Seq[Seq[String]]): ClusterBySpec = {
ClusterBySpec(columnNames.map(FieldReference(_)))
}

// Convert from table property back to ClusterBySpec.
def fromProperty(columns: String): ClusterBySpec = {
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
}
}

/**
* A [[LogicalPlan]] representing a CLUSTER BY clause.
*
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation.
* @see https://github.com/apache/spark/pull/42577
*
* @param clusterBySpec: clusterBySpec which contains the clustering columns.
* @param startIndex: start index of CLUSTER BY clause.
* @param stopIndex: stop index of CLUSTER BY clause.
* @param parenStartIndex: start index of the left parenthesis in CLUSTER BY clause.
* @param parenStopIndex: stop index of the right parenthesis in CLUSTER BY clause.
* @param ctx: parser rule context of the CLUSTER BY clause.
*/
case class ClusterByPlan(
clusterBySpec: ClusterBySpec,
startIndex: Int,
stopIndex: Int,
parenStartIndex: Int,
parenStopIndex: Int,
ctx: ParserRuleContext)
extends LeafNode {
override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = this
override def output: Seq[Attribute] = Seq.empty
}

/**
* Parser utils for parsing a [[ClusterByPlan]] and converts it to table properties.
*
* This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation.
* @see https://github.com/apache/spark/pull/42577
*
* @param clusterByPlan: the ClusterByPlan to parse.
* @param delegate: delegate parser.
*/
case class ClusterByParserUtils(clusterByPlan: ClusterByPlan, delegate: ParserInterface) {
// Update partitioning to include clustering columns as transforms.
private def updatePartitioning(partitioning: Seq[Transform]): Seq[Transform] = {
// Validate no bucketing is specified.
if (partitioning.exists(t => t.isInstanceOf[BucketTransform])) {
ParserUtils.operationNotAllowed(
"Clustering and bucketing cannot both be specified. " +
"Please remove CLUSTERED BY INTO BUCKETS if you " +
"want to create a Delta table with clustering",
clusterByPlan.ctx)
}
Seq(ClusterByTransform(clusterByPlan.clusterBySpec.columnNames))
}

/**
* Parse the [[ClusterByPlan]] by replacing CLUSTER BY with PARTITIONED BY and
* leverage Spark SQL parser to perform the validation. After parsing, store the
* clustering columns in the logical plan's partitioning transforms.
*
* @param sqlText: original SQL text.
* @return the logical plan after parsing.
*/
def parsePlan(sqlText: String): LogicalPlan = {
val colText =
sqlText.substring(clusterByPlan.parenStartIndex, clusterByPlan.parenStopIndex + 1)
// Replace CLUSTER BY with PARTITIONED BY to let SparkSqlParser do the validation for us.
// This serves as a short-term workaround until Spark incorporates CREATE TABLE ... CLUSTER BY
// syntax.
val partitionedByText = "PARTITIONED BY " + colText
val newSqlText =
sqlText.substring(0, clusterByPlan.startIndex) +
partitionedByText +
sqlText.substring(clusterByPlan.stopIndex + 1)
try {
delegate.parsePlan(newSqlText) match {
case create: CreateTable =>
create.copy(partitioning = updatePartitioning(create.partitioning))
case ctas: CreateTableAsSelect =>
ctas.copy(partitioning = updatePartitioning(ctas.partitioning))
case replace: ReplaceTable =>
replace.copy(partitioning = updatePartitioning(replace.partitioning))
case rtas: ReplaceTableAsSelect =>
rtas.copy(partitioning = updatePartitioning(rtas.partitioning))
case plan => plan
}
} catch {
case e: ParseException if (e.errorClass.contains("DUPLICATE_CLAUSES")) =>
// Since we replace CLUSTER BY with PARTITIONED BY, duplicated clauses means we
// encountered CLUSTER BY with PARTITIONED BY.
ParserUtils.operationNotAllowed(
"Clustering and partitioning cannot both be specified. " +
"Please remove PARTITIONED BY if you want to create a Delta table with clustering",
clusterByPlan.ctx)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.skipping.clustering.temp

import org.apache.spark.sql.connector.expressions.{Expression, NamedReference, Transform}

/**
* Minimal version of Spark's ClusterByTransform. We'll remove this when we integrate with OSS
* Spark's CLUSTER BY implementation.
*
* This class represents a transform for `ClusterBySpec`. This is used to bundle
* ClusterBySpec in CreateTable's partitioning transforms to pass it down to analyzer/delta.
*/
final case class ClusterByTransform(
columnNames: Seq[NamedReference]) extends Transform {

override val name: String = "temp_cluster_by"

override def arguments: Array[Expression] = columnNames.toArray

override def toString: String = s"$name(${arguments.map(_.describe).mkString(", ")})"
}

/**
* Convenience extractor for ClusterByTransform.
*/
object ClusterByTransform {
def unapply(transform: Transform): Option[Seq[NamedReference]] =
transform match {
case NamedTransform("temp_cluster_by", arguments) =>
Some(arguments.map(_.asInstanceOf[NamedReference]))
case _ =>
None
}
}

/**
* Copied from OSS Spark. We'll remove this when we integrate with OSS Spark's CLUSTER BY.
* Convenience extractor for any Transform.
*/
private object NamedTransform {
def unapply(transform: Transform): Some[(String, Seq[Expression])] = {
Some((transform.name, transform.arguments))
}
}
Loading
Loading