-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches #43978
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer | |
|
||
import scala.collection.mutable | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Alias, CommonExpressionDef, CommonExpressionRef, Expression, With} | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EXPRESSION} | ||
|
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] { | |
override def apply(plan: LogicalPlan): LogicalPlan = { | ||
plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { | ||
case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) => | ||
var newChildren = p.children | ||
var newPlan: LogicalPlan = p.transformExpressionsUp { | ||
case With(child, defs) => | ||
val refToExpr = mutable.HashMap.empty[Long, Expression] | ||
val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias]) | ||
val inputPlans = p.children.toArray | ||
var newPlan: LogicalPlan = p.mapExpressions { expr => | ||
rewriteWithExprAndInputPlans(expr, inputPlans) | ||
} | ||
newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq) | ||
if (p.output == newPlan.output) { | ||
newPlan | ||
} else { | ||
Project(p.output, newPlan) | ||
} | ||
} | ||
} | ||
|
||
defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) => | ||
if (CollapseProject.isCheap(child)) { | ||
refToExpr(id) = child | ||
} else { | ||
val childProjectionIndex = newChildren.indexWhere( | ||
c => child.references.subsetOf(c.outputSet) | ||
) | ||
if (childProjectionIndex == -1) { | ||
// When we cannot rewrite the common expressions, force to inline them so that the | ||
// query can still run. This can happen if the join condition contains `With` and | ||
// the common expression references columns from both join sides. | ||
// TODO: things can go wrong if the common expression is nondeterministic. We | ||
// don't fix it for now to match the old buggy behavior when certain | ||
// `RuntimeReplaceable` did not use the `With` expression. | ||
// TODO: we should calculate the ref count and also inline the common expression | ||
// if it's ref count is 1. | ||
refToExpr(id) = child | ||
} else { | ||
val alias = Alias(child, s"_common_expr_$index")() | ||
childProjections(childProjectionIndex) += alias | ||
refToExpr(id) = alias.toAttribute | ||
} | ||
} | ||
} | ||
private def rewriteWithExprAndInputPlans( | ||
e: Expression, | ||
inputPlans: Array[LogicalPlan]): Expression = { | ||
if (!e.containsPattern(WITH_EXPRESSION)) return e | ||
e match { | ||
case w: With => | ||
// Rewrite nested With expression in CommonExpressionDef first. | ||
val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans)) | ||
val refToExpr = mutable.HashMap.empty[Long, Expression] | ||
val childProjections = Array.fill(inputPlans.length)(mutable.ArrayBuffer.empty[Alias]) | ||
|
||
newChildren = newChildren.zip(childProjections).map { case (child, projections) => | ||
if (projections.nonEmpty) { | ||
Project(child.output ++ projections, child) | ||
} else { | ||
child | ||
} | ||
defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) => | ||
if (CollapseProject.isCheap(child)) { | ||
refToExpr(id) = child | ||
} else { | ||
val childProjectionIndex = inputPlans.indexWhere( | ||
c => child.references.subsetOf(c.outputSet) | ||
) | ||
if (childProjectionIndex == -1) { | ||
// When we cannot rewrite the common expressions, force to inline them so that the | ||
// query can still run. This can happen if the join condition contains `With` and | ||
// the common expression references columns from both join sides. | ||
// TODO: things can go wrong if the common expression is nondeterministic. We | ||
// don't fix it for now to match the old buggy behavior when certain | ||
// `RuntimeReplaceable` did not use the `With` expression. | ||
// TODO: we should calculate the ref count and also inline the common expression | ||
// if it's ref count is 1. | ||
refToExpr(id) = child | ||
} else { | ||
val alias = Alias(child, s"_common_expr_$index")() | ||
childProjections(childProjectionIndex) += alias | ||
refToExpr(id) = alias.toAttribute | ||
} | ||
} | ||
} | ||
|
||
for (i <- inputPlans.indices) { | ||
val projectList = childProjections(i) | ||
if (projectList.nonEmpty) { | ||
inputPlans(i) = Project(inputPlans(i).output ++ projectList, inputPlans(i)) | ||
} | ||
} | ||
|
||
w.child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) { | ||
case ref: CommonExpressionRef => refToExpr(ref.id) | ||
} | ||
|
||
case c: ConditionalExpression => | ||
val newAlwaysEvaluatedInputs = c.alwaysEvaluatedInputs.map( | ||
rewriteWithExprAndInputPlans(_, inputPlans)) | ||
Comment on lines
+110
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is dealing with common expressions only in always evaluated input e.g., predicate of How about common expressions shared between predicate and branches? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about it before. The problem is that it's hard to update the original |
||
val newExpr = c.withNewAlwaysEvaluatedInputs(newAlwaysEvaluatedInputs) | ||
// Use transformUp to handle nested With. | ||
newExpr.transformUpWithPruning(_.containsPattern(WITH_EXPRESSION)) { | ||
case With(child, defs) => | ||
// For With in the conditional branches, they may not be evaluated at all and we can't | ||
// pull the common expressions into a project which will always be evaluated. Inline it. | ||
Comment on lines
+115
to
+117
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, for specific conditional expression, e.g., There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as https://github.com/apache/spark/pull/43978/files#r1403392772 . It's easy to find these common expressions shared on both branches, but it's hard to put them back to |
||
val refToExpr = defs.map(d => d.id -> d.child).toMap | ||
child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) { | ||
case ref: CommonExpressionRef => refToExpr(ref.id) | ||
} | ||
} | ||
|
||
newPlan = newPlan.withNewChildren(newChildren) | ||
if (p.output == newPlan.output) { | ||
newPlan | ||
} else { | ||
Project(p.output, newPlan) | ||
} | ||
case other => other.mapChildren(rewriteWithExprAndInputPlans(_, inputPlans)) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have "manual" recursion (instead of
transformExpressionsUp()
), shall we deal with nestedWith
s inw.child
too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the current logic seems to behave correctly if there is an inner
With
in an outerWith
'schild
and the inner has a definition with a reference to an outer definition . (The previoustransformExpressionsUp()
had issues in that case.) But the rule is not idempotent now, so maybe we should recurse intow.child
after replacingCommonExpressionRef
s?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good catch! It seems doesn't matter when to recurse into
w.child
, either before replacingCommonExpressionRef
or after is fine?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe before is better, as the expression tree may be much larger after replacing
CommonExpressionRef
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. E.g. if we have
With(With(x + x, Seq(x = y + y)), Seq(y = a + 1))
wherex
andy
are references anda
is an attribute and we would recurse intoWith(x + x, Seq(x = y + y))
before replacing they
references to actual attributes, that aliasesa + 1
, then thechildProjectionIndex
calculation fory + y
won't find the right child, will it? But an UT covering this case would be good. :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh correlated nested
With
! I'm not sure if we want to support it or not... But at least we should fail if we don't want to support it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we may need a test for that (either supported or failed if not).