-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE #42577
Changes from 18 commits
71faed7
e83d706
51516d0
bb40915
f8b7e6f
4237193
6872414
c4e2e6c
51d3d51
730be5b
354a257
d3538ad
8c9defb
15311bc
08421f2
1262ce1
1be8e4a
328e461
9e52100
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -24,25 +24,29 @@ import java.util.Date | |||||||||||||||
import scala.collection.mutable | ||||||||||||||||
import scala.util.control.NonFatal | ||||||||||||||||
|
||||||||||||||||
import com.fasterxml.jackson.annotation.JsonInclude.Include | ||||||||||||||||
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} | ||||||||||||||||
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} | ||||||||||||||||
import org.apache.commons.lang3.StringUtils | ||||||||||||||||
import org.json4s.JsonAST.{JArray, JString} | ||||||||||||||||
import org.json4s.jackson.JsonMethods._ | ||||||||||||||||
|
||||||||||||||||
import org.apache.spark.internal.Logging | ||||||||||||||||
import org.apache.spark.sql.AnalysisException | ||||||||||||||||
import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier} | ||||||||||||||||
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedLeafNode} | ||||||||||||||||
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, Resolver, UnresolvedLeafNode} | ||||||||||||||||
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN | ||||||||||||||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} | ||||||||||||||||
import org.apache.spark.sql.catalyst.plans.logical._ | ||||||||||||||||
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils | ||||||||||||||||
import org.apache.spark.sql.catalyst.types.DataTypeUtils | ||||||||||||||||
import org.apache.spark.sql.catalyst.util._ | ||||||||||||||||
import org.apache.spark.sql.connector.catalog.CatalogManager | ||||||||||||||||
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} | ||||||||||||||||
import org.apache.spark.sql.errors.QueryCompilationErrors | ||||||||||||||||
import org.apache.spark.sql.internal.SQLConf | ||||||||||||||||
import org.apache.spark.sql.types._ | ||||||||||||||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||||||||||||||||
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} | ||||||||||||||||
|
||||||||||||||||
|
||||||||||||||||
/** | ||||||||||||||||
|
@@ -170,6 +174,55 @@ case class CatalogTablePartition( | |||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
/** | ||||||||||||||||
* A container for clustering information. | ||||||||||||||||
* | ||||||||||||||||
* @param columnNames the names of the columns used for clustering. | ||||||||||||||||
*/ | ||||||||||||||||
case class ClusterBySpec(columnNames: Seq[NamedReference]) { | ||||||||||||||||
override def toString: String = toJson | ||||||||||||||||
|
||||||||||||||||
def toJson: String = ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames)) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
object ClusterBySpec { | ||||||||||||||||
private val mapper = { | ||||||||||||||||
val ret = new ObjectMapper() with ClassTagExtensions | ||||||||||||||||
ret.setSerializationInclusion(Include.NON_ABSENT) | ||||||||||||||||
ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | ||||||||||||||||
ret.registerModule(DefaultScalaModule) | ||||||||||||||||
ret | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
def fromProperty(columns: String): ClusterBySpec = { | ||||||||||||||||
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_))) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
def toProperty( | ||||||||||||||||
schema: StructType, | ||||||||||||||||
clusterBySpec: ClusterBySpec, | ||||||||||||||||
resolver: Resolver): (String, String) = { | ||||||||||||||||
CatalogTable.PROP_CLUSTERING_COLUMNS -> | ||||||||||||||||
normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
private def normalizeClusterBySpec( | ||||||||||||||||
schema: StructType, | ||||||||||||||||
clusterBySpec: ClusterBySpec, | ||||||||||||||||
resolver: Resolver): ClusterBySpec = { | ||||||||||||||||
val normalizedColumns = clusterBySpec.columnNames.map { columnName => | ||||||||||||||||
val position = SchemaUtils.findColumnPosition( | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This checks column existence but we only hit it for v1 path (cover to v1 command), where do we check column existence for pure v2 path? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's happening here for v2 path: spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala Lines 273 to 279 in 6abc4a1
|
||||||||||||||||
columnName.fieldNames(), schema, resolver) | ||||||||||||||||
FieldReference(SchemaUtils.getColumnName(position, schema)) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
SchemaUtils.checkColumnNameDuplication( | ||||||||||||||||
normalizedColumns.map(_.toString), | ||||||||||||||||
resolver) | ||||||||||||||||
|
||||||||||||||||
ClusterBySpec(normalizedColumns) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
/** | ||||||||||||||||
* A container for bucketing information. | ||||||||||||||||
|
@@ -462,6 +515,10 @@ case class CatalogTable( | |||||||||||||||
if (value.isEmpty) key else s"$key: $value" | ||||||||||||||||
}.mkString("", "\n", "") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
lazy val clusterBySpec: Option[ClusterBySpec] = { | ||||||||||||||||
properties.get(PROP_CLUSTERING_COLUMNS).map(ClusterBySpec.fromProperty) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
object CatalogTable { | ||||||||||||||||
|
@@ -499,6 +556,8 @@ object CatalogTable { | |||||||||||||||
|
||||||||||||||||
val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan" | ||||||||||||||||
|
||||||||||||||||
val PROP_CLUSTERING_COLUMNS: String = "clusteringColumns" | ||||||||||||||||
|
||||||||||||||||
def splitLargeTableProp( | ||||||||||||||||
key: String, | ||||||||||||||||
value: String, | ||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively I could have serialized
FieldReference
, but this approach is more generic.