-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Delta SQL parser change to support CLUSTER BY #2328
Conversation
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in TableSpec's properties. When we integrate with OSS Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR.
@@ -76,6 +77,8 @@ class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface { | |||
|
|||
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => | |||
builder.visit(parser.singleStatement()) match { | |||
case clusterByPlan: ClusterByPlan => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, not obvious to me. How does the Spark returns a ClusterByPlan
? Can a user type CREATE TABLE ... CLUSTER BY
using the Spark 3.5 which the Delta depends on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ClusterByPlan
is injected by Delta in L416 below. We are using this workaround since Spark 3.5 doesn't support CLUSTER BY
yet.
The flow looks like this:
- User types
CREATE TABLE ... CLUSTER BY
. CLUSTER BY
gets matched withclusterBySpec
inDeltaSqlBase.g4
andvisitClusterBy
is called.visitClusterBy
will create aClusterByPlan
which gets detected inparsePlan
and callsClusterByParserUtils.parsePlan
ClusterByParserUtils.parsePlan
will replaceCLUSTER BY
withPARTITIONED BY
and call SparkSqlParser(delegate.parsePlan
) for validation.- After successful parsing, clustering column is saved in table's
partitioning
transforms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks reasonable to me as a temporary solution until we integrate with OSS Spark's CLUSTER BY
* The plan will be used as a sentinel for DeltaSqlParser to process it further. | ||
*/ | ||
override def visitClusterBy(ctx: ClusterByContext): LogicalPlan = withOrigin(ctx) { | ||
val clusterBySpecCtx = ctx.clusterBySpec.asScala.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is head
always safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This function is invoked when we've matched clusterBySpec+
, which means one or more clusterBySpec
} | ||
|
||
// ClassTag is added to avoid the "same type after erasure" issue with the case class. | ||
def apply[_: ClassTag](columnNames: Seq[Seq[String]]): ClusterBySpec = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is only used by parser, so it will be removed when we integrated with OSS? If so, can you add a comment that this will be removed (or just put this functionality inside the caller to make it easy to remember); note that I don't see this function in OSS Spark either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to its own package clustering.temp
and file ClusterBySpec.scala
.
* | ||
* @param columnNames the names of the columns used for clustering. | ||
*/ | ||
case class ClusterBySpec(columnNames: Seq[NamedReference]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe name this to TempClusterBySpec
to make it obvious?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to its own package clustering.temp
and file ClusterBySpec.scala
.
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms. When we integrate with Apache Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR. Closes delta-io#2328 GitOrigin-RevId: 19262070edbcaead765e7f9eefe96b6e63a7f884
Which Delta project/connector is this regarding?
Description
Resolves #2593
This PR is part of #1874.
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace
CLUSTER BY
withPARTITIONED BY
, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms.When we integrate with Apache Spark's CLUSTER BY
implementation(PR), we'll remove the workaround in this PR.
How was this patch tested?
New unit tests.
Does this PR introduce any user-facing changes?
Yes, it introduces parser support for CLUSTER BY syntax.