Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE #42577

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2908,6 +2908,18 @@
],
"sqlState" : "42601"
},
"SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED" : {
"message" : [
"Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS."
],
"sqlState" : "42908"
},
"SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED" : {
"message" : [
"Cannot specify both CLUSTER BY and PARTITIONED BY."
],
"sqlState" : "42908"
},
"SPECIFY_PARTITION_IS_NOT_ALLOWED" : {
"message" : [
"A CREATE TABLE without explicit column list cannot specify PARTITIONED BY.",
Expand Down
12 changes: 12 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,18 @@ A CREATE TABLE without explicit column list cannot specify bucketing information
Please use the form with explicit column list and specify bucketing information.
Alternatively, allow bucketing information to be inferred by omitting the clause.

### SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED

[SQLSTATE: 42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS.

### SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED

[SQLSTATE: 42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Cannot specify both CLUSTER BY and PARTITIONED BY.

### SPECIFY_PARTITION_IS_NOT_ALLOWED

[SQLSTATE: 42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ replaceTableHeader
: (CREATE OR)? REPLACE TABLE identifierReference
;

clusterBySpec
: CLUSTER BY LEFT_PAREN multipartIdentifierList RIGHT_PAREN
;

bucketSpec
: CLUSTERED BY identifierList
(SORTED BY orderedIdentifierList)?
Expand Down Expand Up @@ -383,6 +387,7 @@ createTableClauses
:((OPTIONS options=expressionPropertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
clusterBySpec |
bucketSpec |
rowFormat |
createFileFormat |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,4 +679,12 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
ctx
)
}

def clusterByWithPartitionedBy(ctx: ParserRuleContext): Throwable = {
new ParseException(errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED", ctx)
}

def clusterByWithBucketing(ctx: ParserRuleContext): Throwable = {
new ParseException(errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED", ctx)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,29 @@ import java.util.Date
import scala.collection.mutable
import scala.util.control.NonFatal

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import org.apache.commons.lang3.StringUtils
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedLeafNode}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, Resolver, UnresolvedLeafNode}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}


/**
Expand Down Expand Up @@ -170,6 +174,55 @@ case class CatalogTablePartition(
}
}

/**
* A container for clustering information.
*
* @param columnNames the names of the columns used for clustering.
*/
case class ClusterBySpec(columnNames: Seq[NamedReference]) {
override def toString: String = toJson

def toJson: String = ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames))
}

object ClusterBySpec {
private val mapper = {
val ret = new ObjectMapper() with ClassTagExtensions
ret.setSerializationInclusion(Include.NON_ABSENT)
ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
ret.registerModule(DefaultScalaModule)
ret
}

def fromProperty(columns: String): ClusterBySpec = {
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I could have serialized FieldReference, but this approach is more generic.

}

def toProperty(
schema: StructType,
clusterBySpec: ClusterBySpec,
resolver: Resolver): (String, String) = {
CatalogTable.PROP_CLUSTERING_COLUMNS ->
normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson
}

private def normalizeClusterBySpec(
schema: StructType,
clusterBySpec: ClusterBySpec,
resolver: Resolver): ClusterBySpec = {
val normalizedColumns = clusterBySpec.columnNames.map { columnName =>
val position = SchemaUtils.findColumnPosition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks column existence but we only hit it for v1 path (cover to v1 command), where do we check column existence for pure v2 path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's happening here for v2 path:

case transform: RewritableTransform =>
val rewritten = transform.references().map { ref =>
// Throws an exception if the reference cannot be resolved
val position = SchemaUtils.findColumnPosition(ref.fieldNames(), schema, resolver)
FieldReference(SchemaUtils.getColumnName(position, schema))
}
transform.withReferences(rewritten)

columnName.fieldNames(), schema, resolver)
FieldReference(SchemaUtils.getColumnName(position, schema))
}

SchemaUtils.checkColumnNameDuplication(
normalizedColumns.map(_.toString),
resolver)

ClusterBySpec(normalizedColumns)
}
}

/**
* A container for bucketing information.
Expand Down Expand Up @@ -462,6 +515,10 @@ case class CatalogTable(
if (value.isEmpty) key else s"$key: $value"
}.mkString("", "\n", "")
}

lazy val clusterBySpec: Option[ClusterBySpec] = {
properties.get(PROP_CLUSTERING_COLUMNS).map(ClusterBySpec.fromProperty)
}
}

object CatalogTable {
Expand Down Expand Up @@ -499,6 +556,8 @@ object CatalogTable {

val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"

val PROP_CLUSTERING_COLUMNS: String = "clusteringColumns"

def splitLargeTableProp(
key: String,
value: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.{SparkArithmeticException, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, ClusterBySpec}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AnyValue, First, Last, PercentileCont, PercentileDisc}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
Expand Down Expand Up @@ -3241,6 +3241,15 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
})
}

/**
* Create a [[ClusterBySpec]].
*/
override def visitClusterBySpec(ctx: ClusterBySpecContext): ClusterBySpec = withOrigin(ctx) {
val columnNames = ctx.multipartIdentifierList.multipartIdentifier.asScala
.map(typedVisit[Seq[String]]).map(FieldReference(_)).toSeq
ClusterBySpec(columnNames)
}

/**
* Convert a property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
Expand Down Expand Up @@ -3341,14 +3350,15 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
* - location
* - comment
* - serde
* - clusterBySpec
*
* Note: Partition transforms are based on existing table schema definition. It can be simple
* column names, or functions like `year(date_col)`. Partition columns are column names with data
* types like `i INT`, which should be appended to the existing table schema.
*/
type TableClauses = (
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
OptionList, Option[String], Option[String], Option[SerdeInfo])
OptionList, Option[String], Option[String], Option[SerdeInfo], Option[ClusterBySpec])

/**
* Validate a create table statement and return the [[TableIdentifier]].
Expand Down Expand Up @@ -3809,6 +3819,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.clusterBySpec(), "CLUSTER BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

if (ctx.skewSpec.size > 0) {
Expand All @@ -3827,8 +3838,19 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
val comment = visitCommentSpecList(ctx.commentSpec())
val serdeInfo =
getSerdeInfo(ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
val clusterBySpec = ctx.clusterBySpec().asScala.headOption.map(visitClusterBySpec)

if (clusterBySpec.isDefined) {
if (partCols.nonEmpty || partTransforms.nonEmpty) {
throw QueryParsingErrors.clusterByWithPartitionedBy(ctx)
}
if (bucketSpec.isDefined) {
throw QueryParsingErrors.clusterByWithBucketing(ctx)
}
}

(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment,
serdeInfo)
serdeInfo, clusterBySpec)
}

protected def getSerdeInfo(
Expand Down Expand Up @@ -3881,6 +3903,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
* [OPTIONS table_property_list]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [CLUSTER BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
Expand All @@ -3902,7 +3925,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
.map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partTransforms, partCols, bucketSpec, properties, options, location,
comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses())
comment, serdeInfo, clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())

if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
Expand All @@ -3915,7 +3938,10 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
}

val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
partitionExpressions(partTransforms, partCols, ctx) ++
bucketSpec.map(_.asTransform) ++
clusterBySpec.map(_.asTransform)

val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external)

Expand Down Expand Up @@ -3958,6 +3984,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
* replace_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (partition_fields)]
* [CLUSTER BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
Expand All @@ -3973,8 +4000,8 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
*/
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
val orCreate = ctx.replaceTableHeader().CREATE() != null
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
visitCreateTableClauses(ctx.createTableClauses())
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo,
clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
val columns = Option(ctx.createOrReplaceTableColTypeList())
.map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
Expand All @@ -3984,7 +4011,10 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
}

val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
partitionExpressions(partTransforms, partCols, ctx) ++
bucketSpec.map(_.asTransform) ++
clusterBySpec.map(_.asTransform)

val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external = false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package org.apache.spark.sql.connector.catalog

import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ClusterBySpec}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, QuotingUtils}
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.connector.expressions.{BucketTransform, ClusterByTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -53,10 +54,15 @@ private[sql] object CatalogV2Implicits {
}
}

implicit class ClusterByHelper(spec: ClusterBySpec) {
def asTransform: Transform = clusterBy(spec.columnNames.toArray)
}

implicit class TransformHelper(transforms: Seq[Transform]) {
def convertTransforms: (Seq[String], Option[BucketSpec]) = {
def convertTransforms: (Seq[String], Option[BucketSpec], Option[ClusterBySpec]) = {
val identityCols = new mutable.ArrayBuffer[String]
var bucketSpec = Option.empty[BucketSpec]
var clusterBySpec = Option.empty[ClusterBySpec]

transforms.map {
case IdentityTransform(FieldReference(Seq(col))) =>
Expand All @@ -73,11 +79,23 @@ private[sql] object CatalogV2Implicits {
sortCol.map(_.fieldNames.mkString("."))))
}

case ClusterByTransform(columnNames) =>
if (clusterBySpec.nonEmpty) {
// AstBuilder guarantees that it only passes down one ClusterByTransform.
throw SparkException.internalError("Cannot have multiple cluster by transforms.")
}
clusterBySpec = Some(ClusterBySpec(columnNames))

case transform =>
throw QueryExecutionErrors.unsupportedPartitionTransformError(transform)
}

(identityCols.toSeq, bucketSpec)
// Parser guarantees that partition and clustering cannot co-exist.
assert(!(identityCols.toSeq.nonEmpty && clusterBySpec.nonEmpty))
// Parser guarantees that bucketing and clustering cannot co-exist.
assert(!(bucketSpec.nonEmpty && clusterBySpec.nonEmpty))

(identityCols.toSeq, bucketSpec, clusterBySpec)
}
}

Expand Down
Loading