diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 920b3392854c9..54f9fd4395489 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -597,6 +597,7 @@ Below is a list of all the keywords in Spark SQL. |NANOSECONDS|non-reserved|non-reserved|non-reserved| |NATURAL|reserved|strict-non-reserved|reserved| |NO|non-reserved|non-reserved|reserved| +|NONE|non-reserved|non-reserved|reserved| |NOT|reserved|non-reserved|reserved| |NULL|reserved|non-reserved|reserved| |NULLS|non-reserved|non-reserved|non-reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 85a4633e80502..bde298c23e786 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND'; NANOSECONDS: 'NANOSECONDS'; NATURAL: 'NATURAL'; NO: 'NO'; +NONE: 'NONE'; NOT: 'NOT'; NULL: 'NULL'; NULLS: 'NULLS'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 54eff14b6d4df..2f5bf8bbfec14 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -174,6 +174,8 @@ statement | ALTER TABLE identifierReference (partitionSpec)? SET locationSpec #setTableLocation | ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions + | ALTER TABLE identifierReference + (clusterBySpec | CLUSTER BY NONE) #alterClusterBy | DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable | DROP VIEW (IF EXISTS)? identifierReference #dropView | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? @@ -1572,6 +1574,7 @@ ansiNonReserved | NANOSECOND | NANOSECONDS | NO + | NONE | NULLS | NUMERIC | OF @@ -1920,6 +1923,7 @@ nonReserved | NANOSECOND | NANOSECONDS | NO + | NONE | NOT | NULL | NULLS diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index ebecb6f507e6a..117f1748e209b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.types.DataType; /** @@ -248,6 +249,17 @@ static TableChange deleteColumn(String[] fieldNames, Boolean ifExists) { return new DeleteColumn(fieldNames, ifExists); } + /** + * Create a TableChange for changing clustering columns for a table. + * + * @param clusteringColumns clustering columns to change to. Each clustering column represents + * field names. + * @return a TableChange for this assignment + */ + static TableChange clusterBy(NamedReference[] clusteringColumns) { + return new ClusterBy(clusteringColumns); + } + /** * A TableChange to set a table property. *
@@ -752,4 +764,22 @@ public int hashCode() {
}
}
+ /** A TableChange to alter clustering columns for a table. */
+ final class ClusterBy implements TableChange {
+ private final NamedReference[] clusteringColumns;
+
+ private ClusterBy(NamedReference[] clusteringColumns) {
+ this.clusteringColumns = clusteringColumns;
+ }
+
+ public NamedReference[] clusteringColumns() { return clusteringColumns; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClusterBy that = (ClusterBy) o;
+ return Arrays.equals(clusteringColumns, that.clusteringColumns());
+ }
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index d55b9c972697e..c281b0df8a6da 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -231,6 +231,14 @@ object ClusterBySpec {
case ClusterByTransform(columnNames) => ClusterBySpec(columnNames)
}
}
+
+ def extractClusterByTransform(
+ schema: StructType,
+ clusterBySpec: ClusterBySpec,
+ resolver: Resolver): ClusterByTransform = {
+ val normalizedClusterBySpec = normalizeClusterBySpec(schema, clusterBySpec, resolver)
+ ClusterByTransform(normalizedClusterBySpec.columnNames)
+ }
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index dc43bd1636594..7f93e993c6fa8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -4594,6 +4594,25 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
ifExists)
}
+ /**
+ * Parse a [[AlterTableClusterBy]] command.
+ *
+ * For example:
+ * {{{
+ * ALTER TABLE table1 CLUSTER BY (a.b.c)
+ * ALTER TABLE table1 CLUSTER BY NONE
+ * }}}
+ */
+ override def visitAlterClusterBy(ctx: AlterClusterByContext): LogicalPlan = withOrigin(ctx) {
+ val table = createUnresolvedTable(ctx.identifierReference, "ALTER TABLE ... CLUSTER BY")
+ if (ctx.NONE() != null) {
+ AlterTableClusterBy(table, None)
+ } else {
+ assert(ctx.clusterBySpec() != null)
+ AlterTableClusterBy(table, Some(visitClusterBySpec(ctx.clusterBySpec())))
+ }
+ }
+
/**
* Parse [[SetViewProperties]] or [[SetTableProperties]] commands.
*
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 9c66e68d686d5..2f5d4b9c86e25 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, ResolvedFieldName}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, TypeUtils}
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -244,3 +245,19 @@ case class AlterColumn(
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}
+
+/**
+ * The logical plan of the following commands:
+ * - ALTER TABLE ... CLUSTER BY (col1, col2, ...)
+ * - ALTER TABLE ... CLUSTER BY NONE
+ */
+case class AlterTableClusterBy(
+ table: LogicalPlan, clusterBySpec: Option[ClusterBySpec]) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ Seq(TableChange.clusterBy(clusterBySpec
+ .map(_.columnNames.toArray) // CLUSTER BY (col1, col2, ...)
+ .getOrElse(Array.empty)))
+ }
+
+ protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index f36310e8ad899..c5888d72c2b23 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -26,13 +26,14 @@ import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
+import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.GeneratedColumn
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
-import org.apache.spark.sql.connector.expressions.LiteralValue
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -134,6 +135,61 @@ private[sql] object CatalogV2Util {
Collections.unmodifiableMap(newProperties)
}
+ /**
+ * Apply ClusterBy changes to a map and return the result.
+ */
+ def applyClusterByChanges(
+ properties: Map[String, String],
+ schema: StructType,
+ changes: Seq[TableChange]): Map[String, String] = {
+ applyClusterByChanges(properties.asJava, schema, changes).asScala.toMap
+ }
+
+ /**
+ * Apply ClusterBy changes to a Java map and return the result.
+ */
+ def applyClusterByChanges(
+ properties: util.Map[String, String],
+ schema: StructType,
+ changes: Seq[TableChange]): util.Map[String, String] = {
+ val newProperties = new util.HashMap[String, String](properties)
+
+ changes.foreach {
+ case clusterBy: ClusterBy =>
+ val clusterByProp =
+ ClusterBySpec.toProperty(
+ schema,
+ ClusterBySpec(clusterBy.clusteringColumns.toIndexedSeq),
+ conf.resolver)
+ newProperties.put(clusterByProp._1, clusterByProp._2)
+
+ case _ =>
+ // ignore non-property changes
+ }
+
+ Collections.unmodifiableMap(newProperties)
+ }
+
+ /**
+ * Apply ClusterBy changes to the partitioning transforms and return the result.
+ */
+ def applyClusterByChanges(
+ partitioning: Array[Transform],
+ schema: StructType,
+ changes: Seq[TableChange]): Array[Transform] = {
+
+ val newPartitioning = partitioning.filterNot(_.isInstanceOf[ClusterByTransform]).toBuffer
+ changes.foreach {
+ case clusterBy: ClusterBy =>
+ newPartitioning += ClusterBySpec.extractClusterByTransform(
+ schema, ClusterBySpec(clusterBy.clusteringColumns.toIndexedSeq), conf.resolver)
+
+ case _ =>
+ // ignore other changes
+ }
+ newPartitioning.toArray
+ }
+
/**
* Apply schema changes to a schema and return the result.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 8612a6e9c50ff..16bc751aab88a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -21,6 +21,7 @@ import java.util.Locale
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
@@ -235,6 +236,23 @@ class DDLParserSuite extends AnalysisTest {
}
}
+ test("alter table cluster by") {
+ comparePlans(
+ parsePlan("ALTER TABLE table_name CLUSTER BY (`a.b`, c.d, none)"),
+ AlterTableClusterBy(
+ UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
+ Some(ClusterBySpec(Seq(
+ FieldReference(Seq("a.b")),
+ FieldReference(Seq("c", "d")),
+ FieldReference(Seq("none")))))))
+
+ comparePlans(
+ parsePlan("ALTER TABLE table_name CLUSTER BY NONE"),
+ AlterTableClusterBy(
+ UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
+ None))
+ }
+
test("create/replace table - with comment") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index d511477ef5d33..654fa0719cf82 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -133,13 +133,14 @@ class BasicInMemoryTableCatalog extends TableCatalog {
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
+ val finalPartitioning = CatalogV2Util.applyClusterByChanges(table.partitioning, schema, changes)
// fail if the last column in the schema was dropped
if (schema.fields.isEmpty) {
throw new IllegalArgumentException(s"Cannot drop all fields")
}
- val newTable = new InMemoryTable(table.name, schema, table.partitioning, properties)
+ val newTable = new InMemoryTable(table.name, schema, finalPartitioning, properties)
.withData(table.data)
tables.put(ident, newTable)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 169aad2f234d6..d8fa48a72cf81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -103,6 +103,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
builder.build())
AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)
+ case AlterTableClusterBy(ResolvedTable(catalog, ident, table: V1Table, _), clusterBySpecOpt)
+ if isSessionCatalog(catalog) =>
+ val prop = clusterBySpecOpt.map { clusterBySpec =>
+ Map(ClusterBySpec.toProperty(table.schema, clusterBySpec, conf.resolver))
+ }.getOrElse {
+ Map(ClusterBySpec.toProperty(table.schema, ClusterBySpec(Nil), conf.resolver))
+ }
+ AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false)
+
case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) =>
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "RENAME COLUMN")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 3d6de985a62f5..e619c59a7540c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -284,10 +284,11 @@ class V2SessionCatalog(catalog: SessionCatalog)
catalogTable.storage
}
+ val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes)
try {
catalog.alterTable(
catalogTable.copy(
- properties = properties, schema = schema, owner = owner, comment = comment,
+ properties = finalProperties, schema = schema, owner = owner, comment = comment,
storage = storage))
} catch {
case _: NoSuchTableException =>
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
index cabbfa520d77a..e03e0f0e3d638 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
@@ -203,6 +203,7 @@ NANOSECOND false
NANOSECONDS false
NATURAL true
NO false
+NONE false
NOT true
NULL true
NULLS false
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index e304509aa6d75..e5a371925b1dc 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -203,6 +203,7 @@ NANOSECOND false
NANOSECONDS false
NATURAL false
NO false
+NONE false
NOT false
NULL false
NULLS false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableClusterBySuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableClusterBySuiteBase.scala
new file mode 100644
index 0000000000000..8961019f3f8d1
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableClusterBySuiteBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+
+/**
+ * This base suite contains unified tests for the `ALTER TABLE ... CLUSTER BY` command
+ * that check V1 and V2 table catalogs. The tests that cannot run for all supported catalogs are
+ * located in more specific test suites:
+ *
+ * - V2 table catalog tests: `org.apache.spark.sql.execution.command.v2.AlterTableClusterBySuite`
+ * - V1 table catalog tests:
+ * `org.apache.spark.sql.execution.command.v1.AlterTableClusterBySuiteBase`
+ * - V1 In-Memory catalog: `org.apache.spark.sql.execution.command.v1.AlterTableClusterBySuite`
+ * - V1 Hive External catalog:
+ * `org.apache.spark.sql.hive.execution.command.AlterTableClusterBySuite`
+ */
+trait AlterTableClusterBySuiteBase extends QueryTest with DDLCommandTestUtils {
+ override val command = "ALTER TABLE CLUSTER BY"
+
+ protected val nestedColumnSchema: String =
+ "col1 INT, col2 STRUCT