Skip to content

Commit

Permalink
[SPARK-48760][SQL] Introduce ALTER TABLE ... CLUSTER BY SQL syntax to…
Browse files Browse the repository at this point in the history
… change clustering columns

### What changes were proposed in this pull request?

Introduce ALTER TABLE ... CLUSTER BY SQL syntax to change the clustering columns:
```sql
ALTER TABLE tbl CLUSTER BY (a, b);  -- update clustering columns to a and b
ALTER TABLE tbl CLUSTER BY NONE;  -- remove clustering columns
```

This change updates the clustering columns for catalogs to utilize. Clustering columns are maintained in:
* CatalogTable's `PROP_CLUSTERING_COLUMNS` for session catalog
* Table's `partitioning` transform array for V2 catalog

which is consistent with CREATE TABLE CLUSTER BY( apache#42577).

### Why are the changes needed?

Provides a way to update the clustering columns.

### Does this PR introduce _any_ user-facing change?

Yes, it introduces new SQL syntax and a new keyword NONE.

### How was this patch tested?

New unit tests.
### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47156 from zedtang/alter-table-cluster-by.

Lead-authored-by: Jiaheng Tang <jiaheng.tang@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and ericm-db committed Jul 10, 2024
1 parent f9c6def commit 771f0f8
Show file tree
Hide file tree
Showing 20 changed files with 454 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ Below is a list of all the keywords in Spark SQL.
|NANOSECONDS|non-reserved|non-reserved|non-reserved|
|NATURAL|reserved|strict-non-reserved|reserved|
|NO|non-reserved|non-reserved|reserved|
|NONE|non-reserved|non-reserved|reserved|
|NOT|reserved|non-reserved|reserved|
|NULL|reserved|non-reserved|reserved|
|NULLS|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NONE: 'NONE';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ statement
| ALTER TABLE identifierReference
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE identifierReference
(clusterBySpec | CLUSTER BY NONE) #alterClusterBy
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
Expand Down Expand Up @@ -1572,6 +1574,7 @@ ansiNonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NULLS
| NUMERIC
| OF
Expand Down Expand Up @@ -1920,6 +1923,7 @@ nonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NOT
| NULL
| NULLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.types.DataType;

/**
Expand Down Expand Up @@ -248,6 +249,17 @@ static TableChange deleteColumn(String[] fieldNames, Boolean ifExists) {
return new DeleteColumn(fieldNames, ifExists);
}

/**
* Create a TableChange for changing clustering columns for a table.
*
* @param clusteringColumns clustering columns to change to. Each clustering column represents
* field names.
* @return a TableChange for this assignment
*/
static TableChange clusterBy(NamedReference[] clusteringColumns) {
return new ClusterBy(clusteringColumns);
}

/**
* A TableChange to set a table property.
* <p>
Expand Down Expand Up @@ -752,4 +764,22 @@ public int hashCode() {
}
}

/** A TableChange to alter clustering columns for a table. */
final class ClusterBy implements TableChange {
private final NamedReference[] clusteringColumns;

private ClusterBy(NamedReference[] clusteringColumns) {
this.clusteringColumns = clusteringColumns;
}

public NamedReference[] clusteringColumns() { return clusteringColumns; }

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterBy that = (ClusterBy) o;
return Arrays.equals(clusteringColumns, that.clusteringColumns());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ object ClusterBySpec {
case ClusterByTransform(columnNames) => ClusterBySpec(columnNames)
}
}

def extractClusterByTransform(
schema: StructType,
clusterBySpec: ClusterBySpec,
resolver: Resolver): ClusterByTransform = {
val normalizedClusterBySpec = normalizeClusterBySpec(schema, clusterBySpec, resolver)
ClusterByTransform(normalizedClusterBySpec.columnNames)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4594,6 +4594,25 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
ifExists)
}

/**
* Parse a [[AlterTableClusterBy]] command.
*
* For example:
* {{{
* ALTER TABLE table1 CLUSTER BY (a.b.c)
* ALTER TABLE table1 CLUSTER BY NONE
* }}}
*/
override def visitAlterClusterBy(ctx: AlterClusterByContext): LogicalPlan = withOrigin(ctx) {
val table = createUnresolvedTable(ctx.identifierReference, "ALTER TABLE ... CLUSTER BY")
if (ctx.NONE() != null) {
AlterTableClusterBy(table, None)
} else {
assert(ctx.clusterBySpec() != null)
AlterTableClusterBy(table, Some(visitClusterBySpec(ctx.clusterBySpec())))
}
}

/**
* Parse [[SetViewProperties]] or [[SetTableProperties]] commands.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, ResolvedFieldName}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, TypeUtils}
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -244,3 +245,19 @@ case class AlterColumn(
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the following commands:
* - ALTER TABLE ... CLUSTER BY (col1, col2, ...)
* - ALTER TABLE ... CLUSTER BY NONE
*/
case class AlterTableClusterBy(
table: LogicalPlan, clusterBySpec: Option[ClusterBySpec]) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
Seq(TableChange.clusterBy(clusterBySpec
.map(_.columnNames.toArray) // CLUSTER BY (col1, col2, ...)
.getOrElse(Array.empty)))
}

protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.GeneratedColumn
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -134,6 +135,61 @@ private[sql] object CatalogV2Util {
Collections.unmodifiableMap(newProperties)
}

/**
* Apply ClusterBy changes to a map and return the result.
*/
def applyClusterByChanges(
properties: Map[String, String],
schema: StructType,
changes: Seq[TableChange]): Map[String, String] = {
applyClusterByChanges(properties.asJava, schema, changes).asScala.toMap
}

/**
* Apply ClusterBy changes to a Java map and return the result.
*/
def applyClusterByChanges(
properties: util.Map[String, String],
schema: StructType,
changes: Seq[TableChange]): util.Map[String, String] = {
val newProperties = new util.HashMap[String, String](properties)

changes.foreach {
case clusterBy: ClusterBy =>
val clusterByProp =
ClusterBySpec.toProperty(
schema,
ClusterBySpec(clusterBy.clusteringColumns.toIndexedSeq),
conf.resolver)
newProperties.put(clusterByProp._1, clusterByProp._2)

case _ =>
// ignore non-property changes
}

Collections.unmodifiableMap(newProperties)
}

/**
* Apply ClusterBy changes to the partitioning transforms and return the result.
*/
def applyClusterByChanges(
partitioning: Array[Transform],
schema: StructType,
changes: Seq[TableChange]): Array[Transform] = {

val newPartitioning = partitioning.filterNot(_.isInstanceOf[ClusterByTransform]).toBuffer
changes.foreach {
case clusterBy: ClusterBy =>
newPartitioning += ClusterBySpec.extractClusterByTransform(
schema, ClusterBySpec(clusterBy.clusteringColumns.toIndexedSeq), conf.resolver)

case _ =>
// ignore other changes
}
newPartitioning.toArray
}

/**
* Apply schema changes to a schema and return the result.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Locale

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
Expand Down Expand Up @@ -235,6 +236,23 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("alter table cluster by") {
comparePlans(
parsePlan("ALTER TABLE table_name CLUSTER BY (`a.b`, c.d, none)"),
AlterTableClusterBy(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
Some(ClusterBySpec(Seq(
FieldReference(Seq("a.b")),
FieldReference(Seq("c", "d")),
FieldReference(Seq("none")))))))

comparePlans(
parsePlan("ALTER TABLE table_name CLUSTER BY NONE"),
AlterTableClusterBy(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
None))
}

test("create/replace table - with comment") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ class BasicInMemoryTableCatalog extends TableCatalog {
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
val finalPartitioning = CatalogV2Util.applyClusterByChanges(table.partitioning, schema, changes)

// fail if the last column in the schema was dropped
if (schema.fields.isEmpty) {
throw new IllegalArgumentException(s"Cannot drop all fields")
}

val newTable = new InMemoryTable(table.name, schema, table.partitioning, properties)
val newTable = new InMemoryTable(table.name, schema, finalPartitioning, properties)
.withData(table.data)

tables.put(ident, newTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
builder.build())
AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)

case AlterTableClusterBy(ResolvedTable(catalog, ident, table: V1Table, _), clusterBySpecOpt)
if isSessionCatalog(catalog) =>
val prop = clusterBySpecOpt.map { clusterBySpec =>
Map(ClusterBySpec.toProperty(table.schema, clusterBySpec, conf.resolver))
}.getOrElse {
Map(ClusterBySpec.toProperty(table.schema, ClusterBySpec(Nil), conf.resolver))
}
AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false)

case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) =>
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "RENAME COLUMN")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,11 @@ class V2SessionCatalog(catalog: SessionCatalog)
catalogTable.storage
}

val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes)
try {
catalog.alterTable(
catalogTable.copy(
properties = properties, schema = schema, owner = owner, comment = comment,
properties = finalProperties, schema = schema, owner = owner, comment = comment,
storage = storage))
} catch {
case _: NoSuchTableException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ NANOSECOND false
NANOSECONDS false
NATURAL true
NO false
NONE false
NOT true
NULL true
NULLS false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ NANOSECOND false
NANOSECONDS false
NATURAL false
NO false
NONE false
NOT false
NULL false
NULLS false
Expand Down
Loading

0 comments on commit 771f0f8

Please sign in to comment.