Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE #42577

Closed
wants to merge 19 commits into from

Conversation

imback82
Copy link
Contributor

@imback82 imback82 commented Aug 21, 2023

What changes were proposed in this pull request?

This proposes to introduce CLUSTER BY SQL clause to CREATE/REPLACE SQL syntax:

CREATE TABLE tbl(a int, b string) CLUSTER BY (a, b)

This doesn't introduce a default implementation for clustering, but it's up to the catalog/datasource implementation to utilize the clustering information (e.g., Delta, Iceberg, etc.).

Why are the changes needed?

To introduce the concept of clustering to datasources.

Does this PR introduce any user-facing change?

Yes, this introduces a new SQL keyword.

How was this patch tested?

Added extensive unit tests.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Aug 21, 2023
@imback82 imback82 changed the title [SPARK-XXXXX][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE [WIP][SPARK-XXXXX][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE Aug 21, 2023
@imback82 imback82 marked this pull request as draft August 21, 2023 02:39
@imback82 imback82 changed the title [WIP][SPARK-XXXXX][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE [WIP][SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE Aug 21, 2023
@github-actions github-actions bot added the DOCS label Sep 9, 2023
@imback82 imback82 marked this pull request as ready for review November 4, 2023 14:49
@imback82 imback82 changed the title [WIP][SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE [SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE Nov 4, 2023
Copy link
Contributor Author

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan this PR is ready for review. I left my questions in the PR. TIA!

@@ -253,7 +270,8 @@ case class CatalogTable(
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None) {
viewOriginalText: Option[String] = None,
clusterBySpec: Option[ClusterBySpec] = None) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't added any field to CatalogTable in the last 5 years and should we not add this new field? Alternatively, we can store clustering columns in the property and make the property as reserved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have to store it in the table properties anyway at the Hive layer, I think it's simpler to just do it ahead here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I guess no need to make it as a reserved property like properties in TableCatalog?

)

val filteredProperties = properties.filterNot {
case (key, _) => excludedTableProperties.contains(key)
}
val comment = properties.get("comment")
val clusterBySpec = properties.get("clusteringColumns").map(ClusterBySpec(_))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this property as reserved?

* table catalog.
*/
class CreateTableClusterBySuite extends v1.CreateTableClusterBySuiteBase with CommandSuiteBase {
// Hive doesn't support nested column names with space and dot.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the exception stack trace looks like the following:

 [CANNOT_RECOGNIZE_HIVE_TYPE] Cannot recognize hive type string: "STRUCT<COL4.1:INT>", column: `col3`. The specified data type for the field cannot be recognized by Spark SQL. Please check the data type of the specified field and ensure that it is a valid Spark SQL data type. Refer to the Spark SQL documentation for a list of valid data types and their format. If the data type is correct, please ensure that you are using a supported version of Spark SQL. SQLSTATE: 429BB
org.apache.spark.SparkException: [CANNOT_RECOGNIZE_HIVE_TYPE] Cannot recognize hive type string: "STRUCT<COL4.1:INT>", column: `col3`. The specified data type for the field cannot be recognized by Spark SQL. Please check the data type of the specified field and ensure that it is a valid Spark SQL data type. Refer to the Spark SQL documentation for a list of valid data types and their format. If the data type is correct, please ensure that you are using a supported version of Spark SQL. SQLSTATE: 429BB
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotRecognizeHiveTypeError(QueryExecutionErrors.scala:1637)
	at org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1067)
	at org.apache.spark.sql.hive.client.HiveClientImpl$.$anonfun$verifyColumnDataType$1(HiveClientImpl.scala:1082)
	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)

COL4.1 in "STRUCT<COL4.1:INT>" is not quoted, so the parser fails. We prob. need to fix DataType.catalogString...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, cc @beliefer can you help to take a look?

}

object ClusterBySpec {
def apply(columns: String): ClusterBySpec = columns match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks weird, where do we use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used for parsing property value back to ClusterBySpec. I renamed this to fromProperty.

object ClusterBySpec {
def fromProperty(columns: String): ClusterBySpec = columns match {
case "" => ClusterBySpec(Seq.empty[UnresolvedAttribute])
case _ => ClusterBySpec(columns.split(",").map(_.trim).map(UnresolvedAttribute.quotedString))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm what if the column name contains ,? shall we use JSON format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* @param columnNames the names of the columns used for clustering.
*/
case class ClusterBySpec(columnNames: Seq[UnresolvedAttribute]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to use UnresolvedAttribute here as we are not going to resolve it in the analyzer. How about just Seq[Seq[String]] or Seq[FieldReference]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with Seq[NamedReference] to be consistent with *Transform (including ClusterByTransform). Also, some of the helper function FieldReference.unapply returns NamedReference, so it's easier to work with NamedReference.

override val name: String = "cluster_by"

override def references: Array[NamedReference] = {
arguments.collect { case named: NamedReference => named }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's columnNames: Seq[NamedReference], we can just return columnNames here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -297,6 +297,7 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical

val normalizedPartCols = normalizePartitionColumns(schema, table)
val normalizedBucketSpec = normalizeBucketSpec(schema, table)
val normalizedClusterBySpec = normalizeClusterBySpec(schema, table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of normalizing it here, shall we normalize it before we convert cluster spec to table properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

protected val nestedClusteringColumns: Seq[String] =
Seq("col2.col3", "col2.`col4 1`", "col3.`col4.1`")

def validateClusterBy(tableIdent: TableIdentifier, clusteringColumns: Seq[String]): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to not use v1 TableIdentifier in base test suite that works for both v1 and v2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq[String] should be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or just String as the qualified table name with dot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

checkError(
exception = intercept[AnalysisException](
sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing CLUSTER BY (unknown)")),
errorClass = "COLUMN_NOT_DEFINED_IN_TABLE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be a general behavior? Analyzer should check the existence of clustering columns. I think it's already the case for partition and bucket cols.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that the logic follows the same as in PreprocessTableCreation, the error message is unitifed: https://github.com/apache/spark/pull/42577/files#diff-f2a04f920c41d18a7d387216f86405bfdc6fb09c44ebe1bb09312ba7dde55333R216

}

def fromProperty(columns: String): ClusterBySpec = {
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I could have serialized FieldReference, but this approach is more generic.

checkError(
exception = intercept[AnalysisException](
sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing CLUSTER BY (unknown)")),
errorClass = "COLUMN_NOT_DEFINED_IN_TABLE",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that the logic follows the same as in PreprocessTableCreation, the error message is unitifed: https://github.com/apache/spark/pull/42577/files#diff-f2a04f920c41d18a7d387216f86405bfdc6fb09c44ebe1bb09312ba7dde55333R216

clusterBySpec: ClusterBySpec,
resolver: Resolver): ClusterBySpec = {
val normalizedColumns = clusterBySpec.columnNames.map { columnName =>
val position = SchemaUtils.findColumnPosition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks column existence but we only hit it for v1 path (cover to v1 command), where do we check column existence for pure v2 path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's happening here for v2 path:

case transform: RewritableTransform =>
val rewritten = transform.references().map { ref =>
// Throws an exception if the reference cannot be resolved
val position = SchemaUtils.findColumnPosition(ref.fieldNames(), schema, resolver)
FieldReference(SchemaUtils.getColumnName(position, schema))
}
transform.withReferences(rewritten)

}

test("test clustering columns with comma") {
assume(!catalogVersion.contains("Hive")) // Hive catalog doesn't support column names with dots.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can override def excluded in the hive suite to exclude this test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 5ac88b1 Nov 9, 2023
zedtang added a commit to zedtang/delta that referenced this pull request Nov 22, 2023
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid
clustering. Since Delta imports Spark as a library and can't change the
source code directly, we instead replace `CLUSTER BY` with `PARTITIONED
BY`, and leverage the Spark SQL parser to perform validation. After
parsing, clustering columns will be stored in TableSpec's properties.

When we integrate with OSS Spark's CLUSTER BY
implementation([PR](apache/spark#42577)), we'll
remove the workaround in this PR.
allisonport-db pushed a commit to delta-io/delta that referenced this pull request Dec 13, 2023
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms.

When we integrate with Apache Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR.

Closes #2328

GitOrigin-RevId: 19262070edbcaead765e7f9eefe96b6e63a7f884
andreaschat-db pushed a commit to andreaschat-db/delta that referenced this pull request Jan 5, 2024
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms.

When we integrate with Apache Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR.

Closes delta-io#2328

GitOrigin-RevId: 19262070edbcaead765e7f9eefe96b6e63a7f884
cloud-fan added a commit that referenced this pull request Jul 3, 2024
… change clustering columns

### What changes were proposed in this pull request?

Introduce ALTER TABLE ... CLUSTER BY SQL syntax to change the clustering columns:
```sql
ALTER TABLE tbl CLUSTER BY (a, b);  -- update clustering columns to a and b
ALTER TABLE tbl CLUSTER BY NONE;  -- remove clustering columns
```

This change updates the clustering columns for catalogs to utilize. Clustering columns are maintained in:
* CatalogTable's `PROP_CLUSTERING_COLUMNS` for session catalog
* Table's `partitioning` transform array for V2 catalog

which is consistent with CREATE TABLE CLUSTER BY( #42577).

### Why are the changes needed?

Provides a way to update the clustering columns.

### Does this PR introduce _any_ user-facing change?

Yes, it introduces new SQL syntax and a new keyword NONE.

### How was this patch tested?

New unit tests.
### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47156 from zedtang/alter-table-cluster-by.

Lead-authored-by: Jiaheng Tang <jiaheng.tang@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ericm-db pushed a commit to ericm-db/spark that referenced this pull request Jul 10, 2024
… change clustering columns

### What changes were proposed in this pull request?

Introduce ALTER TABLE ... CLUSTER BY SQL syntax to change the clustering columns:
```sql
ALTER TABLE tbl CLUSTER BY (a, b);  -- update clustering columns to a and b
ALTER TABLE tbl CLUSTER BY NONE;  -- remove clustering columns
```

This change updates the clustering columns for catalogs to utilize. Clustering columns are maintained in:
* CatalogTable's `PROP_CLUSTERING_COLUMNS` for session catalog
* Table's `partitioning` transform array for V2 catalog

which is consistent with CREATE TABLE CLUSTER BY( apache#42577).

### Why are the changes needed?

Provides a way to update the clustering columns.

### Does this PR introduce _any_ user-facing change?

Yes, it introduces new SQL syntax and a new keyword NONE.

### How was this patch tested?

New unit tests.
### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47156 from zedtang/alter-table-cluster-by.

Lead-authored-by: Jiaheng Tang <jiaheng.tang@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants