Skip to content

Commit

Permalink
planner: ByItem should filter NULL out (pingcap#10488)
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored and db-storage committed May 29, 2019
1 parent ede530e commit 045b9a8
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 33 deletions.
2 changes: 2 additions & 0 deletions executor/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func (s *testSuite4) TestWindowFunctions(c *C) {
result = tk.MustQuery("SELECT CUME_DIST() OVER (ORDER BY null);")
result.Check(testkit.Rows("1"))

tk.MustQuery("select lead(a) over(partition by null) from t").Check(testkit.Rows("1", "2", "2", "<nil>"))

tk.MustExec("create table issue10494(a INT, b CHAR(1), c DATETIME, d BLOB)")
tk.MustExec("insert into issue10494 VALUES (1,'x','2010-01-01','blob'), (2, 'y', '2011-01-01', ''), (3, 'y', '2012-01-01', ''), (4, 't', '2012-01-01', 'blob'), (5, null, '2013-01-01', null)")
tk.MustQuery("SELECT a, b, c, SUM(a) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM issue10494 order by a;").Check(
Expand Down
87 changes: 54 additions & 33 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2808,7 +2808,6 @@ func getWindowName(name string) string {
func (b *PlanBuilder) buildProjectionForWindow(p LogicalPlan, expr *ast.WindowFuncExpr, aggMap map[*ast.AggregateFuncExpr]int) (LogicalPlan, []property.Item, []property.Item, []expression.Expression, error) {
b.optFlag |= flagEliminateProjection

var items []*ast.ByItem
if expr.Spec.Name.L != "" {
ref, ok := b.windowSpecs[expr.Spec.Name.L]
if !ok {
Expand All @@ -2828,44 +2827,32 @@ func (b *PlanBuilder) buildProjectionForWindow(p LogicalPlan, expr *ast.WindowFu
}
}

lenPartition := 0
var partitionItems, orderItems []*ast.ByItem
if spec.PartitionBy != nil {
items = append(items, spec.PartitionBy.Items...)
lenPartition = len(spec.PartitionBy.Items)
partitionItems = spec.PartitionBy.Items
}
if spec.OrderBy != nil {
items = append(items, spec.OrderBy.Items...)
orderItems = spec.OrderBy.Items
}
projLen := len(p.Schema().Columns) + len(items) + len(expr.Args)

projLen := len(p.Schema().Columns) + len(partitionItems) + len(orderItems) + len(expr.Args)
proj := LogicalProjection{Exprs: make([]expression.Expression, 0, projLen)}.Init(b.ctx)
schema := expression.NewSchema(make([]*expression.Column, 0, projLen)...)
proj.SetSchema(expression.NewSchema(make([]*expression.Column, 0, projLen)...))
for _, col := range p.Schema().Columns {
proj.Exprs = append(proj.Exprs, col)
schema.Append(col)
proj.schema.Append(col)
}

transformer := &itemTransformer{}
propertyItems := make([]property.Item, 0, len(items))
for _, item := range items {
newExpr, _ := item.Expr.Accept(transformer)
item.Expr = newExpr.(ast.ExprNode)
it, np, err := b.rewrite(item.Expr, p, aggMap, true)
if err != nil {
return nil, nil, nil, nil, err
}
p = np
if col, ok := it.(*expression.Column); ok {
propertyItems = append(propertyItems, property.Item{Col: col, Desc: item.Desc})
continue
}
proj.Exprs = append(proj.Exprs, it)
col := &expression.Column{
ColName: model.NewCIStr(fmt.Sprintf("%d_proj_window_%d", p.ID(), schema.Len())),
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
RetType: it.GetType(),
}
schema.Append(col)
propertyItems = append(propertyItems, property.Item{Col: col, Desc: item.Desc})
propertyItems := make([]property.Item, 0, len(partitionItems)+len(orderItems))
var err error
p, propertyItems, err = b.buildByItemsForWindow(p, proj, partitionItems, propertyItems, aggMap)
if err != nil {
return nil, nil, nil, nil, err
}
lenPartition := len(propertyItems)
p, propertyItems, err = b.buildByItemsForWindow(p, proj, orderItems, propertyItems, aggMap)
if err != nil {
return nil, nil, nil, nil, err
}

newArgList := make([]expression.Expression, 0, len(expr.Args))
Expand All @@ -2882,19 +2869,53 @@ func (b *PlanBuilder) buildProjectionForWindow(p LogicalPlan, expr *ast.WindowFu
}
proj.Exprs = append(proj.Exprs, newArg)
col := &expression.Column{
ColName: model.NewCIStr(fmt.Sprintf("%d_proj_window_%d", p.ID(), schema.Len())),
ColName: model.NewCIStr(fmt.Sprintf("%d_proj_window_%d", p.ID(), proj.schema.Len())),
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
RetType: newArg.GetType(),
}
schema.Append(col)
proj.schema.Append(col)
newArgList = append(newArgList, col)
}

proj.SetSchema(schema)
proj.SetChildren(p)
return proj, propertyItems[:lenPartition], propertyItems[lenPartition:], newArgList, nil
}

func (b *PlanBuilder) buildByItemsForWindow(
p LogicalPlan,
proj *LogicalProjection,
items []*ast.ByItem,
retItems []property.Item,
aggMap map[*ast.AggregateFuncExpr]int,
) (LogicalPlan, []property.Item, error) {
transformer := &itemTransformer{}
for _, item := range items {
newExpr, _ := item.Expr.Accept(transformer)
item.Expr = newExpr.(ast.ExprNode)
it, np, err := b.rewrite(item.Expr, p, aggMap, true)
if err != nil {
return nil, nil, err
}
p = np
if it.GetType().Tp == mysql.TypeNull {
continue
}
if col, ok := it.(*expression.Column); ok {
retItems = append(retItems, property.Item{Col: col, Desc: item.Desc})
continue
}
proj.Exprs = append(proj.Exprs, it)
col := &expression.Column{
ColName: model.NewCIStr(fmt.Sprintf("%d_proj_window_%d", p.ID(), proj.schema.Len())),
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
RetType: it.GetType(),
}
proj.schema.Append(col)
retItems = append(retItems, property.Item{Col: col, Desc: item.Desc})
}
return p, retItems, nil
}

// buildWindowFunctionFrameBound builds the bounds of window function frames.
// For type `Rows`, the bound expr must be an unsigned integer.
// For type `Range`, the bound expr must be temporal or numeric types.
Expand Down
9 changes: 9 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type testPlanSuite struct {
func (s *testPlanSuite) SetUpSuite(c *C) {
s.is = infoschema.MockInfoSchema([]*model.TableInfo{core.MockTable()})
s.Parser = parser.New()
s.Parser.EnableWindowFunc(true)
}

func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
Expand Down Expand Up @@ -201,6 +202,14 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
sql: "select * from ((SELECT 1 a,6 b) UNION (SELECT 2,5) UNION (SELECT 2, 4) ORDER BY 1) t order by 1, 2",
best: "UnionAll{Dual->Projection->Dual->Projection->Dual->Projection}->HashAgg->Sort->Sort",
},
{
sql: "select * from (select *, NULL as xxx from t) t order by xxx",
best: "TableReader(Table(t))->Projection",
},
{
sql: "select lead(a, 1) over (partition by null) as c from t",
best: "TableReader(Table(t))->Window(lead(test.t.a, 1) over())->Projection",
},
}
for i, tt := range tests {
comment := Commentf("case:%v sql:%s", i, tt.sql)
Expand Down
2 changes: 2 additions & 0 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func (ls *LogicalSort) PruneColumns(parentUsedCols []*expression.Column) error {
continue
}
ls.ByItems = append(ls.ByItems[:i], ls.ByItems[i+1:]...)
} else if ls.ByItems[i].Expr.GetType().Tp == mysql.TypeNull {
ls.ByItems = append(ls.ByItems[:i], ls.ByItems[i+1:]...)
} else {
parentUsedCols = append(parentUsedCols, cols...)
}
Expand Down

0 comments on commit 045b9a8

Please sign in to comment.