Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE #42577

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2908,6 +2908,18 @@
],
"sqlState" : "42601"
},
"SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED" : {
"message" : [
"Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS."
],
"sqlState" : "42908"
},
"SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED" : {
"message" : [
"Cannot specify both CLUSTER BY and PARTITIONED BY."
],
"sqlState" : "42908"
},
"SPECIFY_PARTITION_IS_NOT_ALLOWED" : {
"message" : [
"A CREATE TABLE without explicit column list cannot specify PARTITIONED BY.",
Expand Down
12 changes: 12 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,18 @@ A CREATE TABLE without explicit column list cannot specify bucketing information
Please use the form with explicit column list and specify bucketing information.
Alternatively, allow bucketing information to be inferred by omitting the clause.

### SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED

[SQLSTATE: 42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS.

### SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED

[SQLSTATE: 42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Cannot specify both CLUSTER BY and PARTITIONED BY.

### SPECIFY_PARTITION_IS_NOT_ALLOWED

[SQLSTATE: 42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ replaceTableHeader
: (CREATE OR)? REPLACE TABLE identifierReference
;

clusterBySpec
: CLUSTER BY LEFT_PAREN multipartIdentifierList RIGHT_PAREN
;

bucketSpec
: CLUSTERED BY identifierList
(SORTED BY orderedIdentifierList)?
Expand Down Expand Up @@ -383,6 +387,7 @@ createTableClauses
:((OPTIONS options=expressionPropertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
clusterBySpec |
bucketSpec |
rowFormat |
createFileFormat |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,4 +679,12 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
ctx
)
}

def clusterByWithPartitionedBy(ctx: ParserRuleContext): Throwable = {
new ParseException(errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED", ctx)
}

def clusterByWithBucketing(ctx: ParserRuleContext): Throwable = {
new ParseException(errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED", ctx)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell

import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BasePredicate, BoundReference, Expression, Predicate}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
Expand Down Expand Up @@ -235,6 +235,18 @@ object CatalogUtils {
BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
}

def normalizeClusterBySpec(
tableName: String,
tableCols: Seq[String],
clusterBySpec: ClusterBySpec,
resolver: Resolver): ClusterBySpec = {
val normalizedClusterByCols = clusterBySpec.columnNames.map { colName =>
normalizeColumnName(tableName, tableCols, colName.name, "cluster by", resolver)
}

ClusterBySpec(normalizedClusterByCols.map(UnresolvedAttribute.quotedString(_)))
}

/**
* Convert URI to String.
* Since URI.toString does not decode the uri, e.g. change '%25' to '%'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedLeafNode}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedAttribute, UnresolvedLeafNode}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -170,6 +170,23 @@ case class CatalogTablePartition(
}
}

/**
* A container for clustering information.
*
* @param columnNames the names of the columns used for clustering.
*/
case class ClusterBySpec(columnNames: Seq[UnresolvedAttribute]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to use UnresolvedAttribute here as we are not going to resolve it in the analyzer. How about just Seq[Seq[String]] or Seq[FieldReference]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with Seq[NamedReference] to be consistent with *Transform (including ClusterByTransform). Also, some of the helper function FieldReference.unapply returns NamedReference, so it's easier to work with NamedReference.

override def toString: String = columnNames.map(_.name).mkString(",")

lazy val toDDL: String = if (columnNames.nonEmpty) s"CLUSTER BY ($toString)" else ""
}

object ClusterBySpec {
def apply(columns: String): ClusterBySpec = columns match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks weird, where do we use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used for parsing property value back to ClusterBySpec. I renamed this to fromProperty.

case "" => ClusterBySpec(Seq.empty[UnresolvedAttribute])
case _ => ClusterBySpec(columns.split(",").map(_.trim).map(UnresolvedAttribute.quotedString))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm what if the column name contains ,? shall we use JSON format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

/**
* A container for bucketing information.
Expand Down Expand Up @@ -253,7 +270,8 @@ case class CatalogTable(
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None) {
viewOriginalText: Option[String] = None,
clusterBySpec: Option[ClusterBySpec] = None) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't added any field to CatalogTable in the last 5 years and should we not add this new field? Alternatively, we can store clustering columns in the property and make the property as reserved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have to store it in the table properties anyway at the Hive layer, I think it's simpler to just do it ahead here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I guess no need to make it as a reserved property like properties in TableCatalog?


import CatalogTable._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.{SparkArithmeticException, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, ClusterBySpec}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AnyValue, First, Last, PercentileCont, PercentileDisc}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
Expand Down Expand Up @@ -3241,6 +3241,15 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
})
}

/**
* Create a [[ClusterBySpec]].
*/
override def visitClusterBySpec(ctx: ClusterBySpecContext): ClusterBySpec = withOrigin(ctx) {
val columnNames = ctx.multipartIdentifierList.multipartIdentifier.asScala
.map(typedVisit[Seq[String]]).map(UnresolvedAttribute(_)).toSeq
ClusterBySpec(columnNames)
}

/**
* Convert a property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
Expand Down Expand Up @@ -3341,14 +3350,15 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
* - location
* - comment
* - serde
* - clusterBySpec
*
* Note: Partition transforms are based on existing table schema definition. It can be simple
* column names, or functions like `year(date_col)`. Partition columns are column names with data
* types like `i INT`, which should be appended to the existing table schema.
*/
type TableClauses = (
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
OptionList, Option[String], Option[String], Option[SerdeInfo])
OptionList, Option[String], Option[String], Option[SerdeInfo], Option[ClusterBySpec])

/**
* Validate a create table statement and return the [[TableIdentifier]].
Expand Down Expand Up @@ -3809,6 +3819,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.clusterBySpec(), "CLUSTER BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

if (ctx.skewSpec.size > 0) {
Expand All @@ -3827,8 +3838,9 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
val comment = visitCommentSpecList(ctx.commentSpec())
val serdeInfo =
getSerdeInfo(ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
val clusterBySpec = ctx.clusterBySpec().asScala.headOption.map(visitClusterBySpec)
(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment,
serdeInfo)
serdeInfo, clusterBySpec)
}

protected def getSerdeInfo(
Expand Down Expand Up @@ -3881,6 +3893,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
* [OPTIONS table_property_list]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [CLUSTER BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
Expand All @@ -3902,7 +3915,16 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
.map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partTransforms, partCols, bucketSpec, properties, options, location,
comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses())
comment, serdeInfo, clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())

if (clusterBySpec.isDefined) {
if (partCols.nonEmpty || partTransforms.nonEmpty) {
throw QueryParsingErrors.clusterByWithPartitionedBy(ctx)
}
if (bucketSpec.isDefined) {
throw QueryParsingErrors.clusterByWithBucketing(ctx)
}
}

if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
Expand All @@ -3915,7 +3937,10 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
}

val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
partitionExpressions(partTransforms, partCols, ctx) ++
bucketSpec.map(_.asTransform) ++
clusterBySpec.map(_.asTransform)

val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external)

Expand Down Expand Up @@ -3958,6 +3983,8 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
* replace_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (partition_fields)]
* [CLUSTER BY (col_name, col_name, ...)]
* [CLUSTER BY (col_name, col_name, ...)]
imback82 marked this conversation as resolved.
Show resolved Hide resolved
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
Expand All @@ -3973,18 +4000,31 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
*/
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
val orCreate = ctx.replaceTableHeader().CREATE() != null
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
visitCreateTableClauses(ctx.createTableClauses())
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo,
clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
val columns = Option(ctx.createOrReplaceTableColTypeList())
.map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)

if (clusterBySpec.isDefined) {
if (partCols.nonEmpty || partTransforms.nonEmpty) {
throw QueryParsingErrors.clusterByWithPartitionedBy(ctx)
imback82 marked this conversation as resolved.
Show resolved Hide resolved
}
if (bucketSpec.isDefined) {
throw QueryParsingErrors.clusterByWithBucketing(ctx)
}
}

val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)

if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"REPLACE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
}

val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
partitionExpressions(partTransforms, partCols, ctx) ++
bucketSpec.map(_.asTransform) ++
clusterBySpec.map(_.asTransform)

val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external = false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package org.apache.spark.sql.connector.catalog

import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ClusterBySpec}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, QuotingUtils}
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.connector.expressions.{BucketTransform, ClusterByTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -53,10 +55,18 @@ private[sql] object CatalogV2Implicits {
}
}

implicit class ClusterByHelper(spec: ClusterBySpec) {
def asTransform: Transform = {
val references = spec.columnNames.map(col => FieldReference(col.nameParts))
clusterBy(references.toArray)
}
}

implicit class TransformHelper(transforms: Seq[Transform]) {
def convertTransforms: (Seq[String], Option[BucketSpec]) = {
def convertTransforms: (Seq[String], Option[BucketSpec], Option[ClusterBySpec]) = {
val identityCols = new mutable.ArrayBuffer[String]
var bucketSpec = Option.empty[BucketSpec]
var clusterBySpec = Option.empty[ClusterBySpec]

transforms.map {
case IdentityTransform(FieldReference(Seq(col))) =>
Expand All @@ -73,11 +83,24 @@ private[sql] object CatalogV2Implicits {
sortCol.map(_.fieldNames.mkString("."))))
}

case ClusterByTransform(columnNames) =>
if (clusterBySpec.nonEmpty) {
// AstBuilder guarantees that it only passes down one ClusterByTransform.
throw SparkException.internalError("Cannot have multiple cluster by transforms.")
}
clusterBySpec =
Some(ClusterBySpec(columnNames.map(_.fieldNames).map(UnresolvedAttribute(_))))

case transform =>
throw QueryExecutionErrors.unsupportedPartitionTransformError(transform)
}

(identityCols.toSeq, bucketSpec)
// Parser guarantees that partition and clustering cannot co-exist.
assert(!(identityCols.toSeq.nonEmpty && clusterBySpec.nonEmpty))
// Parser guarantees that bucketing and clustering cannot co-exist.
assert(!(bucketSpec.nonEmpty && clusterBySpec.nonEmpty))

(identityCols.toSeq, bucketSpec, clusterBySpec)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ private[sql] object LogicalExpressions {
sortedCols: Array[NamedReference]): SortedBucketTransform =
SortedBucketTransform(literal(numBuckets, IntegerType), references, sortedCols)

def clusterBy(references: Array[NamedReference]): ClusterByTransform =
ClusterByTransform(references)

def identity(reference: NamedReference): IdentityTransform = IdentityTransform(reference)

def years(reference: NamedReference): YearsTransform = YearsTransform(reference)
Expand Down Expand Up @@ -150,6 +153,41 @@ private[sql] object BucketTransform {
}
}

/**
* This class represents a transform for [[ClusterBySpec]]. This is used to bundle
* ClusterBySpec in CreateTable's partitioning transforms to pass it down to analyzer/delta.
*/
final case class ClusterByTransform(
columnNames: Seq[NamedReference]) extends RewritableTransform {

override val name: String = "cluster_by"

override def references: Array[NamedReference] = {
arguments.collect { case named: NamedReference => named }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's columnNames: Seq[NamedReference], we can just return columnNames here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

override def arguments: Array[Expression] = columnNames.toArray

override def toString: String = s"$name(${arguments.map(_.describe).mkString(", ")})"

override def withReferences(newReferences: Seq[NamedReference]): Transform = {
this.copy(columnNames = newReferences)
}
}

/**
* Convenience extractor for ClusterByTransform.
*/
object ClusterByTransform {
def unapply(transform: Transform): Option[Seq[NamedReference]] =
transform match {
case NamedTransform("cluster_by", arguments) =>
Some(arguments.map(_.asInstanceOf[NamedReference]))
case _ =>
None
}
}

private[sql] final case class SortedBucketTransform(
numBuckets: Literal[Int],
columns: Seq[NamedReference],
Expand Down
Loading