Skip to content

Commit

Permalink
[SPARK-49737][SQL] Disable bucketing on collated columns in complex t…
Browse files Browse the repository at this point in the history
…ypes

### What changes were proposed in this pull request?

To disable bucketing on collated string types in complex types (structs, arrays and maps).

### Why are the changes needed?

#45260 introduces the logic to disabled bucketing for collated columns, but forgot to address complex types which have collated strings inside.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48186 from stefankandic/fixBucketing.

Authored-by: Stefan Kandic <stefan.kandic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
stefankandic authored and MaxGekk committed Sep 20, 2024
1 parent 22a7edc commit f3785fa
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.util.SchemaUtils

object BucketingUtils {
// The file name of bucketed data should have 3 parts:
Expand Down Expand Up @@ -53,10 +54,7 @@ object BucketingUtils {
bucketIdGenerator(mutableInternalRow).getInt(0)
}

def canBucketOn(dataType: DataType): Boolean = dataType match {
case st: StringType => st.supportsBinaryOrdering
case other => true
}
def canBucketOn(dataType: DataType): Boolean = !SchemaUtils.hasNonUTF8BinaryCollation(dataType)

def bucketIdToString(id: Int): String = f"_$id%05d"
}
23 changes: 17 additions & 6 deletions sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,14 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
withTable(tableName) {
sql(
s"""
|CREATE TABLE $tableName
|(id INT, c1 STRING COLLATE UNICODE, c2 string)
|USING parquet
|CREATE TABLE $tableName (
| id INT,
| c1 STRING COLLATE UNICODE,
| c2 STRING,
| struct_col STRUCT<col1: STRING COLLATE UNICODE, col2: STRING>,
| array_col ARRAY<STRING COLLATE UNICODE>,
| map_col MAP<STRING COLLATE UNICODE, STRING>
|) USING parquet
|CLUSTERED BY (${bucketColumns.mkString(",")})
|INTO 4 BUCKETS""".stripMargin
)
Expand All @@ -175,14 +180,20 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
createTable("c2")
createTable("id", "c2")

Seq(Seq("c1"), Seq("c1", "id"), Seq("c1", "c2")).foreach { bucketColumns =>
val failBucketingColumns = Seq(
Seq("c1"), Seq("c1", "id"), Seq("c1", "c2"),
Seq("struct_col"), Seq("array_col"), Seq("map_col")
)

failBucketingColumns.foreach { bucketColumns =>
checkError(
exception = intercept[AnalysisException] {
createTable(bucketColumns: _*)
},
condition = "INVALID_BUCKET_COLUMN_DATA_TYPE",
parameters = Map("type" -> "\"STRING COLLATE UNICODE\"")
);
parameters = Map("type" -> ".*STRING COLLATE UNICODE.*"),
matchPVals = true
)
}
}

Expand Down

0 comments on commit f3785fa

Please sign in to comment.