-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE #42577
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan this PR is ready for review. I left my questions in the PR. TIA!
@@ -253,7 +270,8 @@ case class CatalogTable( | |||
tracksPartitionsInCatalog: Boolean = false, | |||
schemaPreservesCase: Boolean = true, | |||
ignoredProperties: Map[String, String] = Map.empty, | |||
viewOriginalText: Option[String] = None) { | |||
viewOriginalText: Option[String] = None, | |||
clusterBySpec: Option[ClusterBySpec] = None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't added any field to CatalogTable
in the last 5 years and should we not add this new field? Alternatively, we can store clustering columns in the property and make the property as reserved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have to store it in the table properties anyway at the Hive layer, I think it's simpler to just do it ahead here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, I guess no need to make it as a reserved property like properties in TableCatalog
?
) | ||
|
||
val filteredProperties = properties.filterNot { | ||
case (key, _) => excludedTableProperties.contains(key) | ||
} | ||
val comment = properties.get("comment") | ||
val clusterBySpec = properties.get("clusteringColumns").map(ClusterBySpec(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make this property as reserved?
* table catalog. | ||
*/ | ||
class CreateTableClusterBySuite extends v1.CreateTableClusterBySuiteBase with CommandSuiteBase { | ||
// Hive doesn't support nested column names with space and dot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the exception stack trace looks like the following:
[CANNOT_RECOGNIZE_HIVE_TYPE] Cannot recognize hive type string: "STRUCT<COL4.1:INT>", column: `col3`. The specified data type for the field cannot be recognized by Spark SQL. Please check the data type of the specified field and ensure that it is a valid Spark SQL data type. Refer to the Spark SQL documentation for a list of valid data types and their format. If the data type is correct, please ensure that you are using a supported version of Spark SQL. SQLSTATE: 429BB
org.apache.spark.SparkException: [CANNOT_RECOGNIZE_HIVE_TYPE] Cannot recognize hive type string: "STRUCT<COL4.1:INT>", column: `col3`. The specified data type for the field cannot be recognized by Spark SQL. Please check the data type of the specified field and ensure that it is a valid Spark SQL data type. Refer to the Spark SQL documentation for a list of valid data types and their format. If the data type is correct, please ensure that you are using a supported version of Spark SQL. SQLSTATE: 429BB
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotRecognizeHiveTypeError(QueryExecutionErrors.scala:1637)
at org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1067)
at org.apache.spark.sql.hive.client.HiveClientImpl$.$anonfun$verifyColumnDataType$1(HiveClientImpl.scala:1082)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
COL4.1
in "STRUCT<COL4.1:INT>"
is not quoted, so the parser fails. We prob. need to fix DataType.catalogString
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, cc @beliefer can you help to take a look?
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
object ClusterBySpec { | ||
def apply(columns: String): ClusterBySpec = columns match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks weird, where do we use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used for parsing property value back to ClusterBySpec. I renamed this to fromProperty
.
object ClusterBySpec { | ||
def fromProperty(columns: String): ClusterBySpec = columns match { | ||
case "" => ClusterBySpec(Seq.empty[UnresolvedAttribute]) | ||
case _ => ClusterBySpec(columns.split(",").map(_.trim).map(UnresolvedAttribute.quotedString)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm what if the column name contains ,
? shall we use JSON format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* | ||
* @param columnNames the names of the columns used for clustering. | ||
*/ | ||
case class ClusterBySpec(columnNames: Seq[UnresolvedAttribute]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird to use UnresolvedAttribute
here as we are not going to resolve it in the analyzer. How about just Seq[Seq[String]]
or Seq[FieldReference]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with Seq[NamedReference]
to be consistent with *Transform
(including ClusterByTransform
). Also, some of the helper function FieldReference.unapply
returns NamedReference
, so it's easier to work with NamedReference
.
override val name: String = "cluster_by" | ||
|
||
override def references: Array[NamedReference] = { | ||
arguments.collect { case named: NamedReference => named } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's columnNames: Seq[NamedReference]
, we can just return columnNames
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -297,6 +297,7 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical | |||
|
|||
val normalizedPartCols = normalizePartitionColumns(schema, table) | |||
val normalizedBucketSpec = normalizeBucketSpec(schema, table) | |||
val normalizedClusterBySpec = normalizeClusterBySpec(schema, table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of normalizing it here, shall we normalize it before we convert cluster spec to table properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
Show resolved
Hide resolved
protected val nestedClusteringColumns: Seq[String] = | ||
Seq("col2.col3", "col2.`col4 1`", "col3.`col4.1`") | ||
|
||
def validateClusterBy(tableIdent: TableIdentifier, clusteringColumns: Seq[String]): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to not use v1 TableIdentifier
in base test suite that works for both v1 and v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seq[String]
should be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or just String
as the qualified table name with dot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
checkError( | ||
exception = intercept[AnalysisException]( | ||
sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing CLUSTER BY (unknown)")), | ||
errorClass = "COLUMN_NOT_DEFINED_IN_TABLE", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be a general behavior? Analyzer should check the existence of clustering columns. I think it's already the case for partition and bucket cols.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see https://github.com/apache/spark/pull/42577/files#diff-d96524deae4f3cbf30881f842ab6091aea1de9d3e5c6810f4f35d3dc697dd3d3R58 It's better if we can unify the error between v1 and v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that the logic follows the same as in PreprocessTableCreation
, the error message is unitifed: https://github.com/apache/spark/pull/42577/files#diff-f2a04f920c41d18a7d387216f86405bfdc6fb09c44ebe1bb09312ba7dde55333R216
...ore/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateTableClusterBySuite.scala
Show resolved
Hide resolved
} | ||
|
||
def fromProperty(columns: String): ClusterBySpec = { | ||
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively I could have serialized FieldReference
, but this approach is more generic.
checkError( | ||
exception = intercept[AnalysisException]( | ||
sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing CLUSTER BY (unknown)")), | ||
errorClass = "COLUMN_NOT_DEFINED_IN_TABLE", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that the logic follows the same as in PreprocessTableCreation
, the error message is unitifed: https://github.com/apache/spark/pull/42577/files#diff-f2a04f920c41d18a7d387216f86405bfdc6fb09c44ebe1bb09312ba7dde55333R216
clusterBySpec: ClusterBySpec, | ||
resolver: Resolver): ClusterBySpec = { | ||
val normalizedColumns = clusterBySpec.columnNames.map { columnName => | ||
val position = SchemaUtils.findColumnPosition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This checks column existence but we only hit it for v1 path (cover to v1 command), where do we check column existence for pure v2 path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's happening here for v2 path:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
Lines 273 to 279 in 6abc4a1
case transform: RewritableTransform => | |
val rewritten = transform.references().map { ref => | |
// Throws an exception if the reference cannot be resolved | |
val position = SchemaUtils.findColumnPosition(ref.fieldNames(), schema, resolver) | |
FieldReference(SchemaUtils.getColumnName(position, schema)) | |
} | |
transform.withReferences(rewritten) |
} | ||
|
||
test("test clustering columns with comma") { | ||
assume(!catalogVersion.contains("Hive")) // Hive catalog doesn't support column names with dots. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can override def excluded
in the hive suite to exclude this test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
thanks, merging to master! |
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in TableSpec's properties. When we integrate with OSS Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR.
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms. When we integrate with Apache Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR. Closes #2328 GitOrigin-RevId: 19262070edbcaead765e7f9eefe96b6e63a7f884
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms. When we integrate with Apache Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR. Closes delta-io#2328 GitOrigin-RevId: 19262070edbcaead765e7f9eefe96b6e63a7f884
… change clustering columns ### What changes were proposed in this pull request? Introduce ALTER TABLE ... CLUSTER BY SQL syntax to change the clustering columns: ```sql ALTER TABLE tbl CLUSTER BY (a, b); -- update clustering columns to a and b ALTER TABLE tbl CLUSTER BY NONE; -- remove clustering columns ``` This change updates the clustering columns for catalogs to utilize. Clustering columns are maintained in: * CatalogTable's `PROP_CLUSTERING_COLUMNS` for session catalog * Table's `partitioning` transform array for V2 catalog which is consistent with CREATE TABLE CLUSTER BY( #42577). ### Why are the changes needed? Provides a way to update the clustering columns. ### Does this PR introduce _any_ user-facing change? Yes, it introduces new SQL syntax and a new keyword NONE. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47156 from zedtang/alter-table-cluster-by. Lead-authored-by: Jiaheng Tang <jiaheng.tang@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… change clustering columns ### What changes were proposed in this pull request? Introduce ALTER TABLE ... CLUSTER BY SQL syntax to change the clustering columns: ```sql ALTER TABLE tbl CLUSTER BY (a, b); -- update clustering columns to a and b ALTER TABLE tbl CLUSTER BY NONE; -- remove clustering columns ``` This change updates the clustering columns for catalogs to utilize. Clustering columns are maintained in: * CatalogTable's `PROP_CLUSTERING_COLUMNS` for session catalog * Table's `partitioning` transform array for V2 catalog which is consistent with CREATE TABLE CLUSTER BY( apache#42577). ### Why are the changes needed? Provides a way to update the clustering columns. ### Does this PR introduce _any_ user-facing change? Yes, it introduces new SQL syntax and a new keyword NONE. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47156 from zedtang/alter-table-cluster-by. Lead-authored-by: Jiaheng Tang <jiaheng.tang@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This proposes to introduce
CLUSTER BY
SQL clause to CREATE/REPLACE SQL syntax:This doesn't introduce a default implementation for clustering, but it's up to the catalog/datasource implementation to utilize the clustering information (e.g., Delta, Iceberg, etc.).
Why are the changes needed?
To introduce the concept of clustering to datasources.
Does this PR introduce any user-facing change?
Yes, this introduces a new SQL keyword.
How was this patch tested?
Added extensive unit tests.
Was this patch authored or co-authored using generative AI tooling?
No