From f3785fadec3089fa60d85fa3c98ae9c6ada807a4 Mon Sep 17 00:00:00 2001 From: Stefan Kandic Date: Fri, 20 Sep 2024 19:12:05 +0200 Subject: [PATCH] [SPARK-49737][SQL] Disable bucketing on collated columns in complex types ### What changes were proposed in this pull request? To disable bucketing on collated string types in complex types (structs, arrays and maps). ### Why are the changes needed? #45260 introduces the logic to disabled bucketing for collated columns, but forgot to address complex types which have collated strings inside. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48186 from stefankandic/fixBucketing. Authored-by: Stefan Kandic Signed-off-by: Max Gekk --- .../datasources/BucketingUtils.scala | 8 +++---- .../org/apache/spark/sql/CollationSuite.scala | 23 ++++++++++++++----- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala index 4fa1e0c1f2c58..fd47feef25d57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.util.SchemaUtils object BucketingUtils { // The file name of bucketed data should have 3 parts: @@ -53,10 +54,7 @@ object BucketingUtils { bucketIdGenerator(mutableInternalRow).getInt(0) } - def canBucketOn(dataType: DataType): Boolean = dataType match { - case st: StringType => st.supportsBinaryOrdering - case other => true - } + def canBucketOn(dataType: DataType): Boolean = !SchemaUtils.hasNonUTF8BinaryCollation(dataType) def bucketIdToString(id: Int): String = f"_$id%05d" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index 73fd897e91f53..632b9305feb57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -162,9 +162,14 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { withTable(tableName) { sql( s""" - |CREATE TABLE $tableName - |(id INT, c1 STRING COLLATE UNICODE, c2 string) - |USING parquet + |CREATE TABLE $tableName ( + | id INT, + | c1 STRING COLLATE UNICODE, + | c2 STRING, + | struct_col STRUCT, + | array_col ARRAY, + | map_col MAP + |) USING parquet |CLUSTERED BY (${bucketColumns.mkString(",")}) |INTO 4 BUCKETS""".stripMargin ) @@ -175,14 +180,20 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { createTable("c2") createTable("id", "c2") - Seq(Seq("c1"), Seq("c1", "id"), Seq("c1", "c2")).foreach { bucketColumns => + val failBucketingColumns = Seq( + Seq("c1"), Seq("c1", "id"), Seq("c1", "c2"), + Seq("struct_col"), Seq("array_col"), Seq("map_col") + ) + + failBucketingColumns.foreach { bucketColumns => checkError( exception = intercept[AnalysisException] { createTable(bucketColumns: _*) }, condition = "INVALID_BUCKET_COLUMN_DATA_TYPE", - parameters = Map("type" -> "\"STRING COLLATE UNICODE\"") - ); + parameters = Map("type" -> ".*STRING COLLATE UNICODE.*"), + matchPVals = true + ) } }