Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48760][SQL] Introduce ALTER TABLE ... CLUSTER BY SQL syntax to change clustering columns #47156

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ Below is a list of all the keywords in Spark SQL.
|NANOSECONDS|non-reserved|non-reserved|non-reserved|
|NATURAL|reserved|strict-non-reserved|reserved|
|NO|non-reserved|non-reserved|reserved|
|NONE|non-reserved|non-reserved|reserved|
|NOT|reserved|non-reserved|reserved|
|NULL|reserved|non-reserved|reserved|
|NULLS|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NONE: 'NONE';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ statement
| ALTER TABLE identifierReference
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE identifierReference
(clusterBySpec | CLUSTER BY NONE) #alterClusterBy
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
Expand Down Expand Up @@ -1572,6 +1574,7 @@ ansiNonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NULLS
| NUMERIC
| OF
Expand Down Expand Up @@ -1920,6 +1923,7 @@ nonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NOT
| NULL
| NULLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.types.DataType;

/**
Expand Down Expand Up @@ -248,6 +249,17 @@ static TableChange deleteColumn(String[] fieldNames, Boolean ifExists) {
return new DeleteColumn(fieldNames, ifExists);
}

/**
* Create a TableChange for changing clustering columns for a table.
*
* @param clusteringColumns clustering columns to change to. Each clustering column represents
* field names.
* @return a TableChange for this assignment
*/
static TableChange clusterBy(NamedReference[] clusteringColumns) {
return new ClusterBy(clusteringColumns);
}

/**
* A TableChange to set a table property.
* <p>
Expand Down Expand Up @@ -752,4 +764,22 @@ public int hashCode() {
}
}

/** A TableChange to alter clustering columns for a table. */
final class ClusterBy implements TableChange {
private final NamedReference[] clusteringColumns;

private ClusterBy(NamedReference[] clusteringColumns) {
this.clusteringColumns = clusteringColumns;
}

public NamedReference[] clusteringColumns() { return clusteringColumns; }

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterBy that = (ClusterBy) o;
return Arrays.equals(clusteringColumns, that.clusteringColumns());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ object ClusterBySpec {
case ClusterByTransform(columnNames) => ClusterBySpec(columnNames)
}
}

def extractClusterByTransform(
schema: StructType,
clusterBySpec: ClusterBySpec,
resolver: Resolver): ClusterByTransform = {
val normalizedClusterBySpec = normalizeClusterBySpec(schema, clusterBySpec, resolver)
ClusterByTransform(normalizedClusterBySpec.columnNames)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4594,6 +4594,25 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
ifExists)
}

/**
* Parse a [[AlterTableClusterBy]] command.
*
* For example:
* {{{
* ALTER TABLE table1 CLUSTER BY (a.b.c)
* ALTER TABLE table1 CLUSTER BY NONE
* }}}
*/
override def visitAlterClusterBy(ctx: AlterClusterByContext): LogicalPlan = withOrigin(ctx) {
val table = createUnresolvedTable(ctx.identifierReference, "ALTER TABLE ... CLUSTER BY")
if (ctx.NONE() != null) {
AlterTableClusterBy(table, None)
} else {
assert(ctx.clusterBySpec() != null)
AlterTableClusterBy(table, Some(visitClusterBySpec(ctx.clusterBySpec())))
}
}

/**
* Parse [[SetViewProperties]] or [[SetTableProperties]] commands.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, ResolvedFieldName}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, TypeUtils}
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -244,3 +245,19 @@ case class AlterColumn(
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the following commands:
* - ALTER TABLE ... CLUSTER BY (col1, col2, ...)
* - ALTER TABLE ... CLUSTER BY NONE
*/
case class AlterTableClusterBy(
table: LogicalPlan, clusterBySpec: Option[ClusterBySpec]) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
Seq(TableChange.clusterBy(clusterBySpec
.map(_.columnNames.toArray) // CLUSTER BY (col1, col2, ...)
.getOrElse(Array.empty)))
}

protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.GeneratedColumn
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -134,6 +135,61 @@ private[sql] object CatalogV2Util {
Collections.unmodifiableMap(newProperties)
}

/**
* Apply ClusterBy changes to a map and return the result.
*/
def applyClusterByChanges(
properties: Map[String, String],
schema: StructType,
changes: Seq[TableChange]): Map[String, String] = {
applyClusterByChanges(properties.asJava, schema, changes).asScala.toMap
}

/**
* Apply ClusterBy changes to a Java map and return the result.
*/
def applyClusterByChanges(
properties: util.Map[String, String],
schema: StructType,
changes: Seq[TableChange]): util.Map[String, String] = {
val newProperties = new util.HashMap[String, String](properties)

changes.foreach {
case clusterBy: ClusterBy =>
val clusterByProp =
ClusterBySpec.toProperty(
schema,
ClusterBySpec(clusterBy.clusteringColumns.toIndexedSeq),
conf.resolver)
newProperties.put(clusterByProp._1, clusterByProp._2)

case _ =>
// ignore non-property changes
}

Collections.unmodifiableMap(newProperties)
}

/**
* Apply ClusterBy changes to the partitioning transforms and return the result.
*/
def applyClusterByChanges(
partitioning: Array[Transform],
schema: StructType,
changes: Seq[TableChange]): Array[Transform] = {

val newPartitioning = partitioning.filterNot(_.isInstanceOf[ClusterByTransform]).toBuffer
changes.foreach {
case clusterBy: ClusterBy =>
newPartitioning += ClusterBySpec.extractClusterByTransform(
schema, ClusterBySpec(clusterBy.clusteringColumns.toIndexedSeq), conf.resolver)

case _ =>
// ignore other changes
}
newPartitioning.toArray
}

/**
* Apply schema changes to a schema and return the result.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Locale

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
Expand Down Expand Up @@ -235,6 +236,23 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("alter table cluster by") {
comparePlans(
parsePlan("ALTER TABLE table_name CLUSTER BY (`a.b`, c.d, none)"),
AlterTableClusterBy(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
Some(ClusterBySpec(Seq(
FieldReference(Seq("a.b")),
FieldReference(Seq("c", "d")),
FieldReference(Seq("none")))))))

comparePlans(
parsePlan("ALTER TABLE table_name CLUSTER BY NONE"),
AlterTableClusterBy(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
None))
}

test("create/replace table - with comment") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ class BasicInMemoryTableCatalog extends TableCatalog {
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
val finalPartitioning = CatalogV2Util.applyClusterByChanges(table.partitioning, schema, changes)

// fail if the last column in the schema was dropped
if (schema.fields.isEmpty) {
throw new IllegalArgumentException(s"Cannot drop all fields")
}

val newTable = new InMemoryTable(table.name, schema, table.partitioning, properties)
val newTable = new InMemoryTable(table.name, schema, finalPartitioning, properties)
.withData(table.data)

tables.put(ident, newTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
builder.build())
AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)

case AlterTableClusterBy(ResolvedTable(catalog, ident, table: V1Table, _), clusterBySpecOpt)
if isSessionCatalog(catalog) =>
val prop = clusterBySpecOpt.map { clusterBySpec =>
Map(ClusterBySpec.toProperty(table.schema, clusterBySpec, conf.resolver))
}.getOrElse {
Map(ClusterBySpec.toProperty(table.schema, ClusterBySpec(Nil), conf.resolver))
}
AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false)

case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) =>
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "RENAME COLUMN")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,11 @@ class V2SessionCatalog(catalog: SessionCatalog)
catalogTable.storage
}

val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes)
try {
catalog.alterTable(
catalogTable.copy(
properties = properties, schema = schema, owner = owner, comment = comment,
properties = finalProperties, schema = schema, owner = owner, comment = comment,
storage = storage))
} catch {
case _: NoSuchTableException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ NANOSECOND false
NANOSECONDS false
NATURAL true
NO false
NONE false
NOT true
NULL true
NULLS false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ NANOSECOND false
NANOSECONDS false
NATURAL false
NO false
NONE false
NOT false
NULL false
NULLS false
Expand Down
Loading