-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44219][SQL] Adds extra per-rule validations for optimization rewrites. #41763
Conversation
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
case expr: AggregateExpression => | ||
val aggFunction = expr.aggregateFunction | ||
aggFunction.children.foreach { | ||
child => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code style nit to save one indentation level. It's also the style of the previous code: https://github.com/apache/spark/pull/41763/files#diff-583171e935b2dc349378063a5841c5b98b30a2d57ac3743a9eccfe7bffcb8f2aL432
aggFunction.children.foreach { child =>
...
}
msg = s"Non-deterministic expression '${ | ||
toSQLExpr(expr) | ||
}' should not appear in " + | ||
"grouping expression.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg = s"Non-deterministic expression '${ | |
toSQLExpr(expr) | |
}' should not appear in " + | |
"grouping expression.", | |
msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not appear in " + | |
"grouping expression.", |
def validateNoDanglingReferences(plan: LogicalPlan): Option[String] = { | ||
plan.collectFirst { | ||
// DML commands and multi instance relations (like InMemoryRelation caches) | ||
// have different output semantics than typical queries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to special-case these two? QueryPlan#missingInputs
should have already taken care of them
final def missingInput: AttributeSet = references -- inputSet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, not a big deal, it's faster to skip some cases that will never hit missing attr issue.
case _: Command => None | ||
case _: MultiInstanceRelation => None | ||
case n if canGetOutputAttrs(n) => | ||
if ( n.missingInput.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ( n.missingInput.nonEmpty) { | |
if (n.missingInput.nonEmpty) { |
None | ||
} catch { | ||
case _: AnalysisException => | ||
Some(s"Aggregate: ${a.toString} is not a valid aggregate expression") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we take the actual error message? case e: AnalysisException => Some(e.getMessage)
.orElse(LogicalPlanIntegrity.validateNoDanglingReferences(currentPlan)) | ||
.orElse(LogicalPlanIntegrity.validateGroupByTypes(currentPlan)) | ||
.orElse(LogicalPlanIntegrity.validateAggregateExpressions(currentPlan)) | ||
.map( err => s"${err}\nPrevious schema:${previousPlan.output.mkString(", ")}" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map( err => s"${err}\nPrevious schema:${previousPlan.output.mkString(", ")}" + | |
.map(err => s"${err}\nPrevious schema:${previousPlan.output.mkString(", ")}" + |
thanks, merging to master! |
…ewrites ### What changes were proposed in this pull request? Adds per-rule validation checks for the following: 1. aggregate expressions in Aggregate plans are valid. 2. Grouping key types in Aggregate plans cannot by of type Map. 3. No dangling references have been generated. This validation is by default enabled for all tests or selectively using the spark.sql.planChangeValidation=true flag. ### Why are the changes needed? Extra validation for optimizer rewrites. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes apache#41763 from YannisSismanis/SC-130139_followup. Authored-by: Yannis Sismanis <yannis.sismanis@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Adds per-rule validation checks for the following:
This validation is by default enabled for all tests or selectively using the spark.sql.planChangeValidation=true flag.
Why are the changes needed?
Extra validation for optimizer rewrites.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests