Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Delta SQL parser change to support CLUSTER BY #2328

Closed
wants to merge 2 commits into from

Conversation

zedtang
Copy link
Collaborator

@zedtang zedtang commented Nov 22, 2023

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Resolves #2593

This PR is part of #1874.

This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace CLUSTER BY with PARTITIONED BY, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms.

When we integrate with Apache Spark's CLUSTER BY
implementation(PR), we'll remove the workaround in this PR.

How was this patch tested?

New unit tests.

Does this PR introduce any user-facing changes?

Yes, it introduces parser support for CLUSTER BY syntax.

This PR changes the Delta SQL parser to support CLUSTER BY for Liquid
clustering. Since Delta imports Spark as a library and can't change the
source code directly, we instead replace `CLUSTER BY` with `PARTITIONED
BY`, and leverage the Spark SQL parser to perform validation. After
parsing, clustering columns will be stored in TableSpec's properties.

When we integrate with OSS Spark's CLUSTER BY
implementation([PR](apache/spark#42577)), we'll
remove the workaround in this PR.
@@ -76,6 +77,8 @@ class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case clusterByPlan: ClusterByPlan =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not obvious to me. How does the Spark returns a ClusterByPlan? Can a user type CREATE TABLE ... CLUSTER BY using the Spark 3.5 which the Delta depends on?

Copy link
Collaborator Author

@zedtang zedtang Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClusterByPlan is injected by Delta in L416 below. We are using this workaround since Spark 3.5 doesn't support CLUSTER BY yet.

The flow looks like this:

  1. User types CREATE TABLE ... CLUSTER BY.
  2. CLUSTER BY gets matched with clusterBySpec in DeltaSqlBase.g4 and visitClusterBy is called.
  3. visitClusterBy will create a ClusterByPlan which gets detected in parsePlan and calls ClusterByParserUtils.parsePlan
  4. ClusterByParserUtils.parsePlan will replace CLUSTER BY with PARTITIONED BY and call SparkSqlParser(delegate.parsePlan) for validation.
  5. After successful parsing, clustering column is saved in table's partitioning transforms.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks reasonable to me as a temporary solution until we integrate with OSS Spark's CLUSTER BY

* The plan will be used as a sentinel for DeltaSqlParser to process it further.
*/
override def visitClusterBy(ctx: ClusterByContext): LogicalPlan = withOrigin(ctx) {
val clusterBySpecCtx = ctx.clusterBySpec.asScala.head
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is head always safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This function is invoked when we've matched clusterBySpec+, which means one or more clusterBySpec

}

// ClassTag is added to avoid the "same type after erasure" issue with the case class.
def apply[_: ClassTag](columnNames: Seq[Seq[String]]): ClusterBySpec = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is only used by parser, so it will be removed when we integrated with OSS? If so, can you add a comment that this will be removed (or just put this functionality inside the caller to make it easy to remember); note that I don't see this function in OSS Spark either.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to its own package clustering.temp and file ClusterBySpec.scala.

*
* @param columnNames the names of the columns used for clustering.
*/
case class ClusterBySpec(columnNames: Seq[NamedReference]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name this to TempClusterBySpec to make it obvious?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to its own package clustering.temp and file ClusterBySpec.scala.

@zedtang zedtang deleted the delta-parser-change branch December 14, 2023 01:00
andreaschat-db pushed a commit to andreaschat-db/delta that referenced this pull request Jan 5, 2024
This PR changes the Delta SQL parser to support CLUSTER BY for Liquid clustering. Since Delta imports Spark as a library and can't change the source code directly, we instead replace `CLUSTER BY` with `PARTITIONED BY`, and leverage the Spark SQL parser to perform validation. After parsing, clustering columns will be stored in the logical plan's partitioning transforms.

When we integrate with Apache Spark's CLUSTER BY implementation([PR](apache/spark#42577)), we'll remove the workaround in this PR.

Closes delta-io#2328

GitOrigin-RevId: 19262070edbcaead765e7f9eefe96b6e63a7f884
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Delta SQL parser support for CLUSTER BY syntax
3 participants