Skip to content

Commit

Permalink
planner: export elements in cteClass for late pkg move. (#55429)
Browse files Browse the repository at this point in the history
ref #51664, ref #52714
  • Loading branch information
AilinKid authored Aug 19, 2024
1 parent 60b96b4 commit 9e9e9c2
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 61 deletions.
14 changes: 7 additions & 7 deletions pkg/planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,17 +294,17 @@ func (c *columnStatsUsageCollector) collectFromPlan(lp base.LogicalPlan) {
case *logicalop.LogicalPartitionUnionAll:
c.collectPredicateColumnsForUnionAll(&x.LogicalUnionAll)
case *LogicalCTE:
// Visit seedPartLogicalPlan and recursivePartLogicalPlan first.
c.collectFromPlan(x.Cte.seedPartLogicalPlan)
if x.Cte.recursivePartLogicalPlan != nil {
c.collectFromPlan(x.Cte.recursivePartLogicalPlan)
// Visit SeedPartLogicalPlan and RecursivePartLogicalPlan first.
c.collectFromPlan(x.Cte.SeedPartLogicalPlan)
if x.Cte.RecursivePartLogicalPlan != nil {
c.collectFromPlan(x.Cte.RecursivePartLogicalPlan)
}
// Schema change from seedPlan/recursivePlan to self.
columns := x.Schema().Columns
seedColumns := x.Cte.seedPartLogicalPlan.Schema().Columns
seedColumns := x.Cte.SeedPartLogicalPlan.Schema().Columns
var recursiveColumns []*expression.Column
if x.Cte.recursivePartLogicalPlan != nil {
recursiveColumns = x.Cte.recursivePartLogicalPlan.Schema().Columns
if x.Cte.RecursivePartLogicalPlan != nil {
recursiveColumns = x.Cte.RecursivePartLogicalPlan.Schema().Columns
}
relatedCols := make([]*expression.Column, 0, 2)
for i, col := range columns {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2483,7 +2483,7 @@ func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bo
if storeTp != kv.TiFlash {
return false
}
if c.Cte.recursivePartLogicalPlan != nil || !c.Cte.seedPartLogicalPlan.CanPushToCop(storeTp) {
if c.Cte.RecursivePartLogicalPlan != nil || !c.Cte.SeedPartLogicalPlan.CanPushToCop(storeTp) {
return false
}
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2914,7 +2914,7 @@ func findBestTask4LogicalCTE(p *LogicalCTE, prop *property.PhysicalProperty, cou
return base.InvalidTask, 1, nil
}
// The physical plan has been build when derive stats.
pcte := PhysicalCTE{SeedPlan: p.Cte.seedPartPhysicalPlan, RecurPlan: p.Cte.recursivePartPhysicalPlan, CTE: p.Cte, cteAsName: p.CteAsName, cteName: p.CteName}.Init(p.SCtx(), p.StatsInfo())
pcte := PhysicalCTE{SeedPlan: p.Cte.SeedPartPhysicalPlan, RecurPlan: p.Cte.RecursivePartPhysicalPlan, CTE: p.Cte, cteAsName: p.CteAsName, cteName: p.CteName}.Init(p.SCtx(), p.StatsInfo())
pcte.SetSchema(p.Schema())
if prop.IsFlashProp() && prop.CTEProducerStatus == property.AllCTECanMpp {
pcte.readerReceiver = PhysicalExchangeReceiver{IsCTEReader: true}.Init(p.SCtx(), p.StatsInfo())
Expand Down
79 changes: 40 additions & 39 deletions pkg/planner/core/logical_cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,25 @@ func (p LogicalCTE) Init(ctx base.PlanContext, offset int) *LogicalCTE {
type CTEClass struct {
// The union between seed part and recursive part is DISTINCT or DISTINCT ALL.
IsDistinct bool
// seedPartLogicalPlan and recursivePartLogicalPlan are the logical plans for the seed part and recursive part of this CTE.
seedPartLogicalPlan base.LogicalPlan
recursivePartLogicalPlan base.LogicalPlan
// seedPartPhysicalPlan and recursivePartPhysicalPlan are the physical plans for the seed part and recursive part of this CTE.
seedPartPhysicalPlan base.PhysicalPlan
recursivePartPhysicalPlan base.PhysicalPlan
// SeedPartLogicalPlan and RecursivePartLogicalPlan are the logical plans for the seed part and recursive part of this CTE.
SeedPartLogicalPlan base.LogicalPlan
// RecursivePartLogicalPlan is nil if this CTE is not a recursive CTE.
RecursivePartLogicalPlan base.LogicalPlan
// SeedPartPhysicalPlan and RecursivePartPhysicalPlan are the physical plans for the seed part and recursive part of this CTE.
SeedPartPhysicalPlan base.PhysicalPlan
RecursivePartPhysicalPlan base.PhysicalPlan
// storageID for this CTE.
IDForStorage int
// optFlag is the optFlag for the whole CTE.
optFlag uint64
// OptFlag is the OptFlag for the whole CTE.
OptFlag uint64
HasLimit bool
LimitBeg uint64
LimitEnd uint64
IsInApply bool
// pushDownPredicates may be push-downed by different references.
pushDownPredicates []expression.Expression
// PushDownPredicates may be push-downed by different references.
PushDownPredicates []expression.Expression
ColumnMap map[string]*expression.Column
isOuterMostCTE bool
IsOuterMostCTE bool
}

const emptyCTEClassSize = int64(unsafe.Sizeof(CTEClass{}))
Expand All @@ -83,14 +84,14 @@ func (cc *CTEClass) MemoryUsage() (sum int64) {
}

sum = emptyCTEClassSize
if cc.seedPartPhysicalPlan != nil {
sum += cc.seedPartPhysicalPlan.MemoryUsage()
if cc.SeedPartPhysicalPlan != nil {
sum += cc.SeedPartPhysicalPlan.MemoryUsage()
}
if cc.recursivePartPhysicalPlan != nil {
sum += cc.recursivePartPhysicalPlan.MemoryUsage()
if cc.RecursivePartPhysicalPlan != nil {
sum += cc.RecursivePartPhysicalPlan.MemoryUsage()
}

for _, expr := range cc.pushDownPredicates {
for _, expr := range cc.PushDownPredicates {
sum += expr.MemoryUsage()
}
for key, val := range cc.ColumnMap {
Expand All @@ -105,11 +106,11 @@ func (cc *CTEClass) MemoryUsage() (sum int64) {

// PredicatePushDown implements base.LogicalPlan.<1st> interface.
func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, _ *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
if p.Cte.recursivePartLogicalPlan != nil {
if p.Cte.RecursivePartLogicalPlan != nil {
// Doesn't support recursive CTE yet.
return predicates, p.Self()
}
if !p.Cte.isOuterMostCTE {
if !p.Cte.IsOuterMostCTE {
return predicates, p.Self()
}
pushedPredicates := make([]expression.Expression, len(predicates))
Expand All @@ -126,15 +127,15 @@ func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, _ *op
}
}
if len(pushedPredicates) == 0 {
p.Cte.pushDownPredicates = append(p.Cte.pushDownPredicates, expression.NewOne())
p.Cte.PushDownPredicates = append(p.Cte.PushDownPredicates, expression.NewOne())
return predicates, p.Self()
}
newPred := make([]expression.Expression, 0, len(predicates))
for i := range pushedPredicates {
newPred = append(newPred, pushedPredicates[i].Clone())
ruleutil.ResolveExprAndReplace(newPred[i], p.Cte.ColumnMap)
}
p.Cte.pushDownPredicates = append(p.Cte.pushDownPredicates, expression.ComposeCNFCondition(p.SCtx().GetExprCtx(), newPred...))
p.Cte.PushDownPredicates = append(p.Cte.PushDownPredicates, expression.ComposeCNFCondition(p.SCtx().GetExprCtx(), newPred...))
return predicates, p.Self()
}

Expand Down Expand Up @@ -180,43 +181,43 @@ func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression
}

var err error
if p.Cte.seedPartPhysicalPlan == nil {
if p.Cte.SeedPartPhysicalPlan == nil {
// Build push-downed predicates.
if len(p.Cte.pushDownPredicates) > 0 {
newCond := expression.ComposeDNFCondition(p.SCtx().GetExprCtx(), p.Cte.pushDownPredicates...)
newSel := logicalop.LogicalSelection{Conditions: []expression.Expression{newCond}}.Init(p.SCtx(), p.Cte.seedPartLogicalPlan.QueryBlockOffset())
newSel.SetChildren(p.Cte.seedPartLogicalPlan)
p.Cte.seedPartLogicalPlan = newSel
p.Cte.optFlag |= flagPredicatePushDown
if len(p.Cte.PushDownPredicates) > 0 {
newCond := expression.ComposeDNFCondition(p.SCtx().GetExprCtx(), p.Cte.PushDownPredicates...)
newSel := logicalop.LogicalSelection{Conditions: []expression.Expression{newCond}}.Init(p.SCtx(), p.Cte.SeedPartLogicalPlan.QueryBlockOffset())
newSel.SetChildren(p.Cte.SeedPartLogicalPlan)
p.Cte.SeedPartLogicalPlan = newSel
p.Cte.OptFlag |= flagPredicatePushDown
}
p.Cte.seedPartLogicalPlan, p.Cte.seedPartPhysicalPlan, _, err = doOptimize(context.TODO(), p.SCtx(), p.Cte.optFlag, p.Cte.seedPartLogicalPlan)
p.Cte.SeedPartLogicalPlan, p.Cte.SeedPartPhysicalPlan, _, err = doOptimize(context.TODO(), p.SCtx(), p.Cte.OptFlag, p.Cte.SeedPartLogicalPlan)
if err != nil {
return nil, err
}
}
if p.OnlyUsedAsStorage {
p.SetChildren(p.Cte.seedPartLogicalPlan)
p.SetChildren(p.Cte.SeedPartLogicalPlan)
}
resStat := p.Cte.seedPartPhysicalPlan.StatsInfo()
resStat := p.Cte.SeedPartPhysicalPlan.StatsInfo()
// Changing the pointer so that SeedStat in LogicalCTETable can get the new stat.
*p.SeedStat = *resStat
p.SetStats(&property.StatsInfo{
RowCount: resStat.RowCount,
ColNDVs: make(map[int64]float64, selfSchema.Len()),
})
for i, col := range selfSchema.Columns {
p.StatsInfo().ColNDVs[col.UniqueID] += resStat.ColNDVs[p.Cte.seedPartLogicalPlan.Schema().Columns[i].UniqueID]
p.StatsInfo().ColNDVs[col.UniqueID] += resStat.ColNDVs[p.Cte.SeedPartLogicalPlan.Schema().Columns[i].UniqueID]
}
if p.Cte.recursivePartLogicalPlan != nil {
if p.Cte.recursivePartPhysicalPlan == nil {
p.Cte.recursivePartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.SCtx(), p.Cte.optFlag, p.Cte.recursivePartLogicalPlan)
if p.Cte.RecursivePartLogicalPlan != nil {
if p.Cte.RecursivePartPhysicalPlan == nil {
p.Cte.RecursivePartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.SCtx(), p.Cte.OptFlag, p.Cte.RecursivePartLogicalPlan)
if err != nil {
return nil, err
}
}
recurStat := p.Cte.recursivePartLogicalPlan.StatsInfo()
recurStat := p.Cte.RecursivePartLogicalPlan.StatsInfo()
for i, col := range selfSchema.Columns {
p.StatsInfo().ColNDVs[col.UniqueID] += recurStat.ColNDVs[p.Cte.recursivePartLogicalPlan.Schema().Columns[i].UniqueID]
p.StatsInfo().ColNDVs[col.UniqueID] += recurStat.ColNDVs[p.Cte.RecursivePartLogicalPlan.Schema().Columns[i].UniqueID]
}
if p.Cte.IsDistinct {
p.StatsInfo().RowCount, _ = cardinality.EstimateColsNDVWithMatchedLen(p.Schema().Columns, p.Schema(), p.StatsInfo())
Expand All @@ -238,9 +239,9 @@ func (p *LogicalCTE) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]ba

// ExtractCorrelatedCols implements the base.LogicalPlan.<15th> interface.
func (p *LogicalCTE) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
corCols := coreusage.ExtractCorrelatedCols4LogicalPlan(p.Cte.seedPartLogicalPlan)
if p.Cte.recursivePartLogicalPlan != nil {
corCols = append(corCols, coreusage.ExtractCorrelatedCols4LogicalPlan(p.Cte.recursivePartLogicalPlan)...)
corCols := coreusage.ExtractCorrelatedCols4LogicalPlan(p.Cte.SeedPartLogicalPlan)
if p.Cte.RecursivePartLogicalPlan != nil {
corCols = append(corCols, coreusage.ExtractCorrelatedCols4LogicalPlan(p.Cte.RecursivePartLogicalPlan)...)
}
return corCols
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4208,14 +4208,14 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName
if cte.cteClass == nil {
cte.cteClass = &CTEClass{
IsDistinct: cte.isDistinct,
seedPartLogicalPlan: cte.seedLP,
recursivePartLogicalPlan: cte.recurLP,
SeedPartLogicalPlan: cte.seedLP,
RecursivePartLogicalPlan: cte.recurLP,
IDForStorage: cte.storageID,
optFlag: cte.optFlag,
OptFlag: cte.optFlag,
HasLimit: hasLimit,
LimitBeg: limitBeg,
LimitEnd: limitEnd,
pushDownPredicates: make([]expression.Expression, 0),
PushDownPredicates: make([]expression.Expression, 0),
ColumnMap: make(map[string]*expression.Column),
}
}
Expand Down Expand Up @@ -5121,9 +5121,9 @@ func setIsInApplyForCTE(p base.LogicalPlan, apSchema *expression.Schema) {
if len(coreusage.ExtractCorColumnsBySchema4LogicalPlan(p, apSchema)) > 0 {
x.Cte.IsInApply = true
}
setIsInApplyForCTE(x.Cte.seedPartLogicalPlan, apSchema)
if x.Cte.recursivePartLogicalPlan != nil {
setIsInApplyForCTE(x.Cte.recursivePartLogicalPlan, apSchema)
setIsInApplyForCTE(x.Cte.SeedPartLogicalPlan, apSchema)
if x.Cte.RecursivePartLogicalPlan != nil {
setIsInApplyForCTE(x.Cte.RecursivePartLogicalPlan, apSchema)
}
default:
for _, child := range p.Children() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func GetDBTableInfo(visitInfo []visitInfo) []stmtctx.TableEntry {
return tables
}

// GetOptFlag gets the optFlag of the PlanBuilder.
// GetOptFlag gets the OptFlag of the PlanBuilder.
func (b *PlanBuilder) GetOptFlag() uint64 {
if b.isSampling {
// Disable logical optimization to avoid the optimizer
Expand Down
10 changes: 5 additions & 5 deletions pkg/planner/core/recheck_cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func findCTEs(
cte := cteReader.Cte
if !isRootTree {
// Set it to false since it's referenced by other CTEs.
cte.isOuterMostCTE = false
cte.IsOuterMostCTE = false
}
if visited.Has(cte.IDForStorage) {
return
}
visited.Insert(cte.IDForStorage)
// Set it when we meet it first time.
cte.isOuterMostCTE = isRootTree
findCTEs(cte.seedPartLogicalPlan, visited, false)
if cte.recursivePartLogicalPlan != nil {
findCTEs(cte.recursivePartLogicalPlan, visited, false)
cte.IsOuterMostCTE = isRootTree
findCTEs(cte.SeedPartLogicalPlan, visited, false)
if cte.RecursivePartLogicalPlan != nil {
findCTEs(cte.RecursivePartLogicalPlan, visited, false)
}
return
}
Expand Down

0 comments on commit 9e9e9c2

Please sign in to comment.