-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE #42577
Changes from 11 commits
71faed7
e83d706
51516d0
bb40915
f8b7e6f
4237193
6872414
c4e2e6c
51d3d51
730be5b
354a257
d3538ad
8c9defb
15311bc
08421f2
1262ce1
1be8e4a
328e461
9e52100
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,7 @@ import org.json4s.jackson.JsonMethods._ | |
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier} | ||
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedLeafNode} | ||
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedAttribute, UnresolvedLeafNode} | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
|
@@ -170,6 +170,23 @@ case class CatalogTablePartition( | |
} | ||
} | ||
|
||
/** | ||
* A container for clustering information. | ||
* | ||
* @param columnNames the names of the columns used for clustering. | ||
*/ | ||
case class ClusterBySpec(columnNames: Seq[UnresolvedAttribute]) { | ||
override def toString: String = columnNames.map(_.name).mkString(",") | ||
|
||
lazy val toDDL: String = if (columnNames.nonEmpty) s"CLUSTER BY ($toString)" else "" | ||
} | ||
|
||
object ClusterBySpec { | ||
def apply(columns: String): ClusterBySpec = columns match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this looks weird, where do we use it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is used for parsing property value back to ClusterBySpec. I renamed this to |
||
case "" => ClusterBySpec(Seq.empty[UnresolvedAttribute]) | ||
case _ => ClusterBySpec(columns.split(",").map(_.trim).map(UnresolvedAttribute.quotedString)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm what if the column name contains There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
} | ||
|
||
/** | ||
* A container for bucketing information. | ||
|
@@ -253,7 +270,8 @@ case class CatalogTable( | |
tracksPartitionsInCatalog: Boolean = false, | ||
schemaPreservesCase: Boolean = true, | ||
ignoredProperties: Map[String, String] = Map.empty, | ||
viewOriginalText: Option[String] = None) { | ||
viewOriginalText: Option[String] = None, | ||
clusterBySpec: Option[ClusterBySpec] = None) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We haven't added any field to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we have to store it in the table properties anyway at the Hive layer, I think it's simpler to just do it ahead here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, I guess no need to make it as a reserved property like properties in |
||
|
||
import CatalogTable._ | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,9 @@ private[sql] object LogicalExpressions { | |
sortedCols: Array[NamedReference]): SortedBucketTransform = | ||
SortedBucketTransform(literal(numBuckets, IntegerType), references, sortedCols) | ||
|
||
def clusterBy(references: Array[NamedReference]): ClusterByTransform = | ||
ClusterByTransform(references) | ||
|
||
def identity(reference: NamedReference): IdentityTransform = IdentityTransform(reference) | ||
|
||
def years(reference: NamedReference): YearsTransform = YearsTransform(reference) | ||
|
@@ -150,6 +153,41 @@ private[sql] object BucketTransform { | |
} | ||
} | ||
|
||
/** | ||
* This class represents a transform for [[ClusterBySpec]]. This is used to bundle | ||
* ClusterBySpec in CreateTable's partitioning transforms to pass it down to analyzer/delta. | ||
*/ | ||
final case class ClusterByTransform( | ||
columnNames: Seq[NamedReference]) extends RewritableTransform { | ||
|
||
override val name: String = "cluster_by" | ||
|
||
override def references: Array[NamedReference] = { | ||
arguments.collect { case named: NamedReference => named } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
} | ||
|
||
override def arguments: Array[Expression] = columnNames.toArray | ||
|
||
override def toString: String = s"$name(${arguments.map(_.describe).mkString(", ")})" | ||
|
||
override def withReferences(newReferences: Seq[NamedReference]): Transform = { | ||
this.copy(columnNames = newReferences) | ||
} | ||
} | ||
|
||
/** | ||
* Convenience extractor for ClusterByTransform. | ||
*/ | ||
object ClusterByTransform { | ||
def unapply(transform: Transform): Option[Seq[NamedReference]] = | ||
transform match { | ||
case NamedTransform("cluster_by", arguments) => | ||
Some(arguments.map(_.asInstanceOf[NamedReference])) | ||
case _ => | ||
None | ||
} | ||
} | ||
|
||
private[sql] final case class SortedBucketTransform( | ||
numBuckets: Literal[Int], | ||
columns: Seq[NamedReference], | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird to use
UnresolvedAttribute
here as we are not going to resolve it in the analyzer. How about justSeq[Seq[String]]
orSeq[FieldReference]
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with
Seq[NamedReference]
to be consistent with*Transform
(includingClusterByTransform
). Also, some of the helper functionFieldReference.unapply
returnsNamedReference
, so it's easier to work withNamedReference
.