From 5e2cd968988f8d35b4a1c84e6e14bea671a39a33 Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Sat, 2 Dec 2023 03:08:29 -0800 Subject: [PATCH] Delta SQL parser change to support CLUSTER BY This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms. When we integrate with Apache Spark's CLUSTER BY implementation([PR](https://github.com/apache/spark/pull/42577)), we'll remove the workaround in this PR. Closes https://github.com/delta-io/delta/pull/2328 GitOrigin-RevId: 19262070edbcaead765e7f9eefe96b6e63a7f884 --- .../io/delta/sql/parser/DeltaSqlBase.g4 | 7 + .../io/delta/sql/parser/DeltaSqlParser.scala | 32 +++- .../clustering/ClusteredTableUtils.scala | 9 +- .../clustering/temp/ClusterBySpec.scala | 155 ++++++++++++++++++ .../clustering/temp/ClusterByTransform.scala | 59 +++++++ .../sql/parser/DeltaSqlParserSuite.scala | 140 +++++++++++++++- 6 files changed, 398 insertions(+), 4 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterByTransform.scala diff --git a/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 b/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 index 8d0ffdba3ef..8a1c386440b 100644 --- a/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 +++ b/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 @@ -97,6 +97,7 @@ statement | cloneTableHeader SHALLOW CLONE source=qualifiedName clause=temporalClause? (TBLPROPERTIES tableProps=propertyList)? (LOCATION location=stringLit)? #clone + | .*? clusterBySpec+ .*? #clusterBy | .*? #passThrough ; @@ -118,6 +119,10 @@ zorderSpec | ZORDER BY interleave+=qualifiedName (COMMA interleave+=qualifiedName)* ; +clusterBySpec + : CLUSTER BY LEFT_PAREN interleave+=qualifiedName (COMMA interleave+=qualifiedName)* RIGHT_PAREN + ; + temporalClause : FOR? (SYSTEM_VERSION | VERSION) AS OF version=(INTEGER_VALUE | STRING) | FOR? (SYSTEM_TIME | TIMESTAMP) AS OF timestamp=STRING @@ -224,6 +229,7 @@ nonReserved | NO | STATISTICS | CLONE | SHALLOW | FEATURE | TRUNCATE + | CLUSTER ; // Define how the keywords above should appear in a user's SQL statement. @@ -234,6 +240,7 @@ AS: 'AS'; BY: 'BY'; CHECK: 'CHECK'; CLONE: 'CLONE'; +CLUSTER: 'CLUSTER'; COMMA: ','; COMMENT: 'COMMENT'; CONSTRAINT: 'CONSTRAINT'; diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index a198f05df18..92558106808 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -43,6 +43,7 @@ import java.util.Locale import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.TimeTravel +import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByParserUtils, ClusterByPlan, ClusterBySpec} import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.commands._ @@ -58,7 +59,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface} -import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin} +import org.apache.spark.sql.catalyst.parser.ParserUtils.{checkDuplicateClauses, string, withOrigin} import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddConstraint, AlterTableDropConstraint, AlterTableDropFeature, CloneTableStatement, LogicalPlan, RestoreTableStatement} import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} @@ -76,6 +77,8 @@ class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface { override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => builder.visit(parser.singleStatement()) match { + case clusterByPlan: ClusterByPlan => + ClusterByParserUtils(clusterByPlan, delegate).parsePlan(sqlText) case plan: LogicalPlan => plan case _ => delegate.parsePlan(sqlText) } @@ -406,6 +409,33 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { RestoreTableStatement(timeTravelTableRelation.asInstanceOf[TimeTravel]) } + /** + * Captures any CLUSTER BY clause and creates a [[ClusterByPlan]] logical plan. + * The plan will be used as a sentinel for DeltaSqlParser to process it further. + */ + override def visitClusterBy(ctx: ClusterByContext): LogicalPlan = withOrigin(ctx) { + val clusterBySpecCtx = ctx.clusterBySpec.asScala.head + checkDuplicateClauses(ctx.clusterBySpec, "CLUSTER BY", clusterBySpecCtx) + val columnNames = + clusterBySpecCtx.interleave.asScala + .map(_.identifier.asScala.map(_.getText).toSeq) + .map(_.asInstanceOf[Seq[String]]).toSeq + // get CLUSTER BY clause positions. + val startIndex = clusterBySpecCtx.getStart.getStartIndex + val stopIndex = clusterBySpecCtx.getStop.getStopIndex + + // get CLUSTER BY parenthesis positions. + val parenStartIndex = clusterBySpecCtx.LEFT_PAREN().getSymbol.getStartIndex + val parenStopIndex = clusterBySpecCtx.RIGHT_PAREN().getSymbol.getStopIndex + ClusterByPlan( + ClusterBySpec(columnNames), + startIndex, + stopIndex, + parenStartIndex, + parenStopIndex, + clusterBySpecCtx) + } + /** * Time travel the table to the given version or timestamp. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala index 45bc14ed481..0b503d34e4a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala @@ -16,8 +16,8 @@ package org.apache.spark.sql.delta.skipping.clustering -import org.apache.spark.sql.delta.{ClusteringTableFeature, DeltaConfigs} -import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.ClusteringTableFeature +import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -27,6 +27,11 @@ import org.apache.spark.sql.internal.SQLConf * Clustered table utility functions. */ trait ClusteredTableUtilsBase extends DeltaLogging { + // Clustering columns property key. The column names are logical and separated by comma. + // This will be removed when we integrate with OSS Spark and use + // [[CatalogTable.PROP_CLUSTERING_COLUMNS]] directly. + val PROP_CLUSTERING_COLUMNS: String = "clusteringColumns" + /** * Returns whether the protocol version supports the Liquid table feature. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala new file mode 100644 index 00000000000..69eb54be8d3 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala @@ -0,0 +1,155 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.skipping.clustering.temp + +import scala.reflect.ClassTag + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} +import org.antlr.v4.runtime.ParserRuleContext + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, ParserUtils} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, CreateTableAsSelect, LeafNode, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} +import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedReference, Transform} + +/** + * A container for clustering information. Copied from OSS Spark. + * + * This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. + * @see https://github.com/apache/spark/pull/42577 + * + * @param columnNames the names of the columns used for clustering. + */ +case class ClusterBySpec(columnNames: Seq[NamedReference]) { + override def toString: String = toJson + + def toJson: String = + ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames)) +} + +object ClusterBySpec { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.setSerializationInclusion(Include.NON_ABSENT) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret.registerModule(DefaultScalaModule) + ret + } + + // ClassTag is added to avoid the "same type after erasure" issue with the case class. + def apply[_: ClassTag](columnNames: Seq[Seq[String]]): ClusterBySpec = { + ClusterBySpec(columnNames.map(FieldReference(_))) + } + + // Convert from table property back to ClusterBySpec. + def fromProperty(columns: String): ClusterBySpec = { + ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_))) + } +} + +/** + * A [[LogicalPlan]] representing a CLUSTER BY clause. + * + * This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. + * @see https://github.com/apache/spark/pull/42577 + * + * @param clusterBySpec: clusterBySpec which contains the clustering columns. + * @param startIndex: start index of CLUSTER BY clause. + * @param stopIndex: stop index of CLUSTER BY clause. + * @param parenStartIndex: start index of the left parenthesis in CLUSTER BY clause. + * @param parenStopIndex: stop index of the right parenthesis in CLUSTER BY clause. + * @param ctx: parser rule context of the CLUSTER BY clause. + */ +case class ClusterByPlan( + clusterBySpec: ClusterBySpec, + startIndex: Int, + stopIndex: Int, + parenStartIndex: Int, + parenStopIndex: Int, + ctx: ParserRuleContext) + extends LeafNode { + override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = this + override def output: Seq[Attribute] = Seq.empty +} + +/** + * Parser utils for parsing a [[ClusterByPlan]] and converts it to table properties. + * + * This class will be removed when we integrate with OSS Spark's CLUSTER BY implementation. + * @see https://github.com/apache/spark/pull/42577 + * + * @param clusterByPlan: the ClusterByPlan to parse. + * @param delegate: delegate parser. + */ +case class ClusterByParserUtils(clusterByPlan: ClusterByPlan, delegate: ParserInterface) { + // Update partitioning to include clustering columns as transforms. + private def updatePartitioning(partitioning: Seq[Transform]): Seq[Transform] = { + // Validate no bucketing is specified. + if (partitioning.exists(t => t.isInstanceOf[BucketTransform])) { + ParserUtils.operationNotAllowed( + "Clustering and bucketing cannot both be specified. " + + "Please remove CLUSTERED BY INTO BUCKETS if you " + + "want to create a Delta table with clustering", + clusterByPlan.ctx) + } + Seq(ClusterByTransform(clusterByPlan.clusterBySpec.columnNames)) + } + + /** + * Parse the [[ClusterByPlan]] by replacing CLUSTER BY with PARTITIONED BY and + * leverage Spark SQL parser to perform the validation. After parsing, store the + * clustering columns in the logical plan's partitioning transforms. + * + * @param sqlText: original SQL text. + * @return the logical plan after parsing. + */ + def parsePlan(sqlText: String): LogicalPlan = { + val colText = + sqlText.substring(clusterByPlan.parenStartIndex, clusterByPlan.parenStopIndex + 1) + // Replace CLUSTER BY with PARTITIONED BY to let SparkSqlParser do the validation for us. + // This serves as a short-term workaround until Spark incorporates CREATE TABLE ... CLUSTER BY + // syntax. + val partitionedByText = "PARTITIONED BY " + colText + val newSqlText = + sqlText.substring(0, clusterByPlan.startIndex) + + partitionedByText + + sqlText.substring(clusterByPlan.stopIndex + 1) + try { + delegate.parsePlan(newSqlText) match { + case create: CreateTable => + create.copy(partitioning = updatePartitioning(create.partitioning)) + case ctas: CreateTableAsSelect => + ctas.copy(partitioning = updatePartitioning(ctas.partitioning)) + case replace: ReplaceTable => + replace.copy(partitioning = updatePartitioning(replace.partitioning)) + case rtas: ReplaceTableAsSelect => + rtas.copy(partitioning = updatePartitioning(rtas.partitioning)) + case plan => plan + } + } catch { + case e: ParseException if (e.errorClass.contains("DUPLICATE_CLAUSES")) => + // Since we replace CLUSTER BY with PARTITIONED BY, duplicated clauses means we + // encountered CLUSTER BY with PARTITIONED BY. + ParserUtils.operationNotAllowed( + "Clustering and partitioning cannot both be specified. " + + "Please remove PARTITIONED BY if you want to create a Delta table with clustering", + clusterByPlan.ctx) + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterByTransform.scala b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterByTransform.scala new file mode 100644 index 00000000000..8d16b525667 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterByTransform.scala @@ -0,0 +1,59 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.skipping.clustering.temp + +import org.apache.spark.sql.connector.expressions.{Expression, NamedReference, Transform} + +/** + * Minimal version of Spark's ClusterByTransform. We'll remove this when we integrate with OSS + * Spark's CLUSTER BY implementation. + * + * This class represents a transform for `ClusterBySpec`. This is used to bundle + * ClusterBySpec in CreateTable's partitioning transforms to pass it down to analyzer/delta. + */ +final case class ClusterByTransform( + columnNames: Seq[NamedReference]) extends Transform { + + override val name: String = "temp_cluster_by" + + override def arguments: Array[Expression] = columnNames.toArray + + override def toString: String = s"$name(${arguments.map(_.describe).mkString(", ")})" +} + +/** + * Convenience extractor for ClusterByTransform. + */ +object ClusterByTransform { + def unapply(transform: Transform): Option[Seq[NamedReference]] = + transform match { + case NamedTransform("temp_cluster_by", arguments) => + Some(arguments.map(_.asInstanceOf[NamedReference])) + case _ => + None + } +} + +/** + * Copied from OSS Spark. We'll remove this when we integrate with OSS Spark's CLUSTER BY. + * Convenience extractor for any Transform. + */ +private object NamedTransform { + def unapply(transform: Transform): Some[(String, Seq[Expression])] = { + Some((transform.name, transform.arguments)) + } +} diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 3e13ab1a7c7..ecdbe0d1856 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -18,6 +18,9 @@ package io.delta.sql.parser import io.delta.tables.execution.VacuumTableCommand +import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils +import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterByTransform + import org.apache.spark.sql.delta.CloneTableSQLTestUtils import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN import org.apache.spark.sql.delta.{UnresolvedPathBasedDeltaTable, UnresolvedPathBasedTable} @@ -28,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRe import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.SQLHelper -import org.apache.spark.sql.catalyst.plans.logical.{AlterTableDropFeature, CloneTableStatement, RestoreTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTableDropFeature, CloneTableStatement, CreateTable, CreateTableAsSelect, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, RestoreTableStatement} +import org.apache.spark.sql.execution.SparkSqlParser class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { @@ -376,4 +380,138 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { new TableIdentifier(tblName, Some(schema), Some(catalog)) } } + + private def clusterByStatement( + createOrReplaceClause: String, + asSelect: Boolean, + schema: String, + clusterByClause: String): String = { + val tableSchema = if (asSelect) { + "" + } else { + s"($schema)" + } + val select = if (asSelect) { + "AS SELECT * FROM tbl2" + } else { + "" + } + s"$createOrReplaceClause TABLE tbl $tableSchema USING DELTA $clusterByClause $select" + } + + private def validateClusterByTransform( + clause: String, + asSelect: Boolean, + plan: LogicalPlan, + expectedColumns: Seq[Seq[String]]): Unit = { + val partitioning = if (clause == "CREATE") { + if (asSelect) { + plan.asInstanceOf[CreateTableAsSelect].partitioning + } else { + plan.asInstanceOf[CreateTable].partitioning + } + } else { + if (asSelect) { + plan.asInstanceOf[ReplaceTableAsSelect].partitioning + } else { + plan.asInstanceOf[ReplaceTable].partitioning + } + } + assert(partitioning.size === 1) + val transform = partitioning.head + val actualColumns = transform match { + case ClusterByTransform(columnNames) => columnNames.map(_.fieldNames.toSeq) + case _ => assert(false, "Should not reach here") + } + assert(actualColumns === expectedColumns) + } + + for (asSelect <- BOOLEAN_DOMAIN) { + Seq("CREATE", "REPLACE").foreach { clause => + test(s"CLUSTER BY - $clause TABLE asSelect = $asSelect") { + val parser = new DeltaSqlParser(new SparkSqlParser()) + val sql = clusterByStatement(clause, asSelect, "a int, b string", "CLUSTER BY (a)") + val parsedPlan = parser.parsePlan(sql) + validateClusterByTransform(clause, asSelect, parsedPlan, Seq(Seq("a"))) + } + + test(s"CLUSTER BY nested column - $clause TABLE asSelect = $asSelect") { + val parser = new DeltaSqlParser(new SparkSqlParser()) + val sql = + clusterByStatement(clause, asSelect, "a struct", "CLUSTER BY (a.b, a.c)") + val parsedPlan = parser.parsePlan(sql) + validateClusterByTransform(clause, asSelect, parsedPlan, Seq(Seq("a", "b"), Seq("a", "c"))) + } + + test(s"CLUSTER BY backquoted column - $clause TABLE asSelect = $asSelect") { + val parser = new DeltaSqlParser(new SparkSqlParser()) + val sql = + clusterByStatement(clause, asSelect, "`a.b.c` int", "CLUSTER BY (`a.b.c`)") + val parsedPlan = parser.parsePlan(sql) + validateClusterByTransform(clause, asSelect, parsedPlan, Seq(Seq("a.b.c"))) + } + + test(s"CLUSTER BY comma column - $clause TABLE asSelect = $asSelect") { + val parser = new DeltaSqlParser(new SparkSqlParser()) + val sql = + clusterByStatement(clause, asSelect, "`a,b` int", "CLUSTER BY (`a,b`)") + val parsedPlan = parser.parsePlan(sql) + validateClusterByTransform(clause, asSelect, parsedPlan, Seq(Seq("a,b"))) + } + + test(s"CLUSTER BY duplicated clauses - $clause TABLE asSelect = $asSelect") { + val parser = new DeltaSqlParser(new SparkSqlParser()) + val sql = + clusterByStatement(clause, asSelect, "a int, b string", "CLUSTER BY (a) CLUSTER BY (b)") + checkError(exception = intercept[ParseException] { + parser.parsePlan(sql) + }, errorClass = "DUPLICATE_CLAUSES", parameters = Map("clauseName" -> "CLUSTER BY")) + } + + test("CLUSTER BY set clustering column property is ignored - " + + s"$clause TABLE asSelect = $asSelect") { + val parser = new DeltaSqlParser(new SparkSqlParser()) + val sql = + clusterByStatement( + clause, + asSelect, + "a int, b string", + "CLUSTER BY (a) " + + s"TBLPROPERTIES ('${ClusteredTableUtils.PROP_CLUSTERING_COLUMNS}' = 'b')") + val parsedPlan = parser.parsePlan(sql) + validateClusterByTransform(clause, asSelect, parsedPlan, Seq(Seq("a"))) + } + + test(s"CLUSTER BY with PARTITIONED BY - $clause TABLE asSelect = $asSelect") { + val parser = new DeltaSqlParser(new SparkSqlParser()) + val sql = + clusterByStatement( + clause, + asSelect, + "a int, b string", + "CLUSTER BY (a) PARTITIONED BY (b)") + val errorMsg = "Clustering and partitioning cannot both be specified. " + + "Please remove PARTITIONED BY if you want to create a Delta table with clustering" + checkError(exception = intercept[ParseException] { + parser.parsePlan(sql) + }, errorClass = "_LEGACY_ERROR_TEMP_0035", parameters = Map("message" -> errorMsg)) + } + + test(s"CLUSTER BY with bucketing - $clause TABLE asSelect = $asSelect") { + val parser = new DeltaSqlParser(new SparkSqlParser()) + val sql = + clusterByStatement( + clause, + asSelect, + "a int, b string", + "CLUSTER BY (a) CLUSTERED BY (b) INTO 2 BUCKETS") + val errorMsg = "Clustering and bucketing cannot both be specified. " + + "Please remove CLUSTERED BY INTO BUCKETS if you " + + "want to create a Delta table with clustering" + checkError(exception = intercept[ParseException] { + parser.parsePlan(sql) + }, errorClass = "_LEGACY_ERROR_TEMP_0035", parameters = Map("message" -> errorMsg)) + } + } + } }