Skip to content

Commit

Permalink
fix bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 committed May 19, 2016
1 parent ebf166d commit b526229
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 51 deletions.
12 changes: 10 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 PingCAP, Inc.
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -719,6 +719,8 @@ func (s *testSuite) TestSelectHaving(c *C) {
r.Check(testkit.Rows(rowStr))
tk.MustExec("commit")

r = tk.MustQuery("select * from select_having_test group by id having null is not null;")

tk.MustExec("drop table select_having_test")
}

Expand Down Expand Up @@ -926,7 +928,6 @@ func (s *testSuite) TestUnion(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

testSQL := `select 1 union select 0;`
tk.MustExec(testSQL)

Expand Down Expand Up @@ -1156,6 +1157,13 @@ func (s *testSuite) TestNewJoin(c *C) {

result = tk.MustQuery("select * from t1 left join t2 on t1.c1 = t2.c1 right join t3 on t2.c1 = t3.c1 order by t1.c1, t1.c2, t2.c1, t2.c2, t3.c1, t3.c2;")
result.Check(testkit.Rows("<nil> <nil> <nil> <nil> 5 5", "<nil> <nil> <nil> <nil> 9 9", "1 1 1 1 1 1"))

tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (c1 int)")
tk.MustExec("insert into t1 values (1), (1), (1)")
result = tk.MustQuery("select * from t1 a join t1 b on a.c1 = b.c1;")
result.Check(testkit.Rows("1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1"))

plan.UseNewPlanner = false
}

Expand Down
65 changes: 45 additions & 20 deletions executor/new_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,20 @@ type HashJoinExec struct {
otherFilter ast.ExprNode
outter bool
leftSmall bool
matchedRows []*Row
cursor int
}

func joinTwoRow(a *Row, b *Row) *Row {
return &Row{RowKeys: append(a.RowKeys, b.RowKeys...), Data: append(a.Data, b.Data...)}
ret := &Row{
RowKeys: make([]*RowKeyEntry, 0, len(a.RowKeys)+len(b.RowKeys)),
Data: make([]types.Datum, 0, len(a.Data)+len(b.Data)),
}
ret.RowKeys = append(ret.RowKeys, a.RowKeys...)
ret.RowKeys = append(ret.RowKeys, b.RowKeys...)
ret.Data = append(ret.Data, a.Data...)
ret.Data = append(ret.Data, b.Data...)
return ret
}

func (e *HashJoinExec) getHashKey(exprs []ast.ExprNode) ([]byte, error) {
Expand All @@ -62,27 +72,28 @@ func (e *HashJoinExec) getHashKey(exprs []ast.ExprNode) ([]byte, error) {
return result, err
}

// Fields implements Executor Fields interface
// Fields implements Executor Fields interface.
func (e *HashJoinExec) Fields() []*ast.ResultField {
return e.fields
}

// Close implements Executor Close interface
// Close implements Executor Close interface.
func (e *HashJoinExec) Close() error {
e.hashTable = nil
e.matchedRows = nil
return nil
}

func (e *HashJoinExec) prepare() error {
e.hashTable = make(map[string][]*Row)
e.cursor = 0
for {
row, err := e.smallExec.Next()
if err != nil {
return errors.Trace(err)
}
if row == nil {
e.smallExec.Close()
e.prepared = true
break
}
matched := true
Expand All @@ -109,7 +120,7 @@ func (e *HashJoinExec) prepare() error {
return nil
}

func (e *HashJoinExec) constructMatchedRow(bigRow *Row) (matchedRows []*Row, err error) {
func (e *HashJoinExec) constructMatchedRows(bigRow *Row) (matchedRows []*Row, err error) {
hashcode, err := e.getHashKey(e.bigHashKey)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -118,15 +129,15 @@ func (e *HashJoinExec) constructMatchedRow(bigRow *Row) (matchedRows []*Row, err
if rows, ok := e.hashTable[string(hashcode)]; ok {
for _, smallRow := range rows {
//TODO: remove result fields in order to reduce memory copy cost.
startKey := 0
if !e.leftSmall {
startKey = len(bigRow.Data)
}
for i, data := range smallRow.Data {
e.fields[i+startKey].Expr.SetValue(data.GetValue())
}
otherMatched := true
if e.otherFilter != nil {
startKey := 0
if !e.leftSmall {
startKey = len(bigRow.Data)
}
for i, data := range smallRow.Data {
e.fields[i+startKey].Expr.SetValue(data.GetValue())
}
otherMatched, err = evaluator.EvalBool(e.ctx, e.otherFilter)
}
if err != nil {
Expand Down Expand Up @@ -163,14 +174,29 @@ func (e *HashJoinExec) fillNullRow(bigRow *Row) (returnRow *Row, err error) {
return returnRow, nil
}

// Next implements Executor Next interface
func (e *HashJoinExec) returnRecord() (ret *Row, ok bool) {
if e.cursor >= len(e.matchedRows) {
return nil, false
}
for i, data := range e.matchedRows[e.cursor].Data {
e.fields[i].Expr.SetValue(data.GetValue())
}
e.cursor++
return e.matchedRows[e.cursor-1], true
}

// Next implements Executor Next interface.
func (e *HashJoinExec) Next() (*Row, error) {
if !e.prepared {
err := e.prepare()
if err != nil {
return nil, err
}
}
row, ok := e.returnRecord()
if ok {
return row, nil
}
for {
bigRow, err := e.bigExec.Next()
if err != nil {
Expand All @@ -189,18 +215,17 @@ func (e *HashJoinExec) Next() (*Row, error) {
}
}
if bigMatched {
matchedRows, err = e.constructMatchedRow(bigRow)
matchedRows, err = e.constructMatchedRows(bigRow)
if err != nil {
return nil, errors.Trace(err)
}
}
for _, row := range matchedRows {
for i, data := range row.Data {
e.fields[i].Expr.SetValue(data.GetValue())
}
e.matchedRows = matchedRows
e.cursor = 0
row, ok := e.returnRecord()
if ok {
return row, nil
}
if e.outter && len(matchedRows) == 0 {
} else if e.outter {
return e.fillNullRow(bigRow)
}
}
Expand Down
2 changes: 1 addition & 1 deletion optimizer/plan/new_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Join struct {
}

// AddChild for parent.
func AddChild(parent Plan, child Plan) {
func addChild(parent Plan, child Plan) {
if child == nil || parent == nil {
return
}
Expand Down
13 changes: 8 additions & 5 deletions optimizer/plan/newplanbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ func (b *planBuilder) buildNewJoin(join *ast.Join) Plan {
}
leftPlan := b.buildNewSinglePathPlan(join.Left)
rightPlan := b.buildNewSinglePathPlan(join.Right)
onCondition := splitWhere(join.On.Expr)
eqCond, leftCond, rightCond, otherCond := extractOnCondition(onCondition, leftPlan, rightPlan)
var eqCond, leftCond, rightCond, otherCond []ast.ExprNode
if join.On != nil {
onCondition := splitWhere(join.On.Expr)
eqCond, leftCond, rightCond, otherCond = extractOnCondition(onCondition, leftPlan, rightPlan)
}
joinPlan := &Join{EqualConditions: eqCond, LeftConditions: leftCond, RightConditions: rightCond, OtherConditions: otherCond}
if join.Tp == ast.LeftJoin {
joinPlan.JoinType = LeftOuterJoin
Expand All @@ -125,16 +128,16 @@ func (b *planBuilder) buildNewJoin(join *ast.Join) Plan {
} else {
joinPlan.JoinType = InnerJoin
}
AddChild(joinPlan, leftPlan)
AddChild(joinPlan, rightPlan)
addChild(joinPlan, leftPlan)
addChild(joinPlan, rightPlan)
joinPlan.SetFields(append(leftPlan.Fields(), rightPlan.Fields()...))
return joinPlan
}

func (b *planBuilder) buildFilter(p Plan, where ast.ExprNode) Plan {
conditions := splitWhere(where)
filter := &Filter{Conditions: conditions}
AddChild(filter, p)
addChild(filter, p)
filter.SetFields(p.Fields())
return filter
}
Expand Down
2 changes: 1 addition & 1 deletion optimizer/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Plan interface {
AddParent(parent Plan)
// AddChild means append a child for plan.
AddChild(children Plan)
// ReplaceParent means replace a parent for another one.
// ReplaceParent means replace a parent with another one.
ReplaceParent(parent, newPar Plan) error
// ReplaceChild means replace a child with another one.
ReplaceChild(children, newChild Plan) error
Expand Down
33 changes: 19 additions & 14 deletions optimizer/plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
// Error codes.
const (
CodeUnsupportedType terror.ErrCode = 1
SystemInternalError terror.ErrCode = 1
SystemInternalError terror.ErrCode = 2
)

// BuildPlan builds a plan from a node.
Expand Down Expand Up @@ -541,12 +541,12 @@ func (b *planBuilder) buildPseudoSelectPlan(p Plan, sel *ast.SelectStmt) Plan {
x.NoLimit = true
}
np := &Sort{ByItems: sel.OrderBy.Items}
AddChild(np, p)
addChild(np, p)
p = np
}
if sel.Limit != nil {
np := &Limit{Offset: sel.Limit.Offset, Count: sel.Limit.Count}
AddChild(np, p)
addChild(np, p)
np.SetLimit(0)
p = np
} else {
Expand All @@ -562,14 +562,14 @@ func (b *planBuilder) buildSelectLock(src Plan, lock ast.SelectLockType) *Select
selectLock := &SelectLock{
Lock: lock,
}
AddChild(selectLock, src)
addChild(selectLock, src)
selectLock.SetFields(src.Fields())
return selectLock
}

func (b *planBuilder) buildSelectFields(src Plan, fields []*ast.ResultField) Plan {
selectFields := &SelectFields{}
AddChild(selectFields, src)
addChild(selectFields, src)
selectFields.SetFields(fields)
return selectFields
}
Expand All @@ -579,7 +579,7 @@ func (b *planBuilder) buildAggregate(src Plan, aggFuncs []*ast.AggregateFuncExpr
aggPlan := &Aggregate{
AggFuncs: aggFuncs,
}
AddChild(aggPlan, src)
addChild(aggPlan, src)
if src != nil {
aggPlan.SetFields(src.Fields())
}
Expand All @@ -593,7 +593,7 @@ func (b *planBuilder) buildHaving(src Plan, having *ast.HavingClause) Plan {
p := &Having{
Conditions: splitWhere(having.Expr),
}
AddChild(p, src)
addChild(p, src)
p.SetFields(src.Fields())
return p
}
Expand All @@ -602,7 +602,7 @@ func (b *planBuilder) buildSort(src Plan, byItems []*ast.ByItem) Plan {
sort := &Sort{
ByItems: byItems,
}
AddChild(sort, src)
addChild(sort, src)
sort.SetFields(src.Fields())
return sort
}
Expand All @@ -616,7 +616,7 @@ func (b *planBuilder) buildLimit(src Plan, limit *ast.Limit) Plan {
s.ExecLimit = li
return s
}
AddChild(li, src)
addChild(li, src)
li.SetFields(src.Fields())
return li
}
Expand Down Expand Up @@ -840,7 +840,11 @@ func (se *subqueryVisitor) Leave(in ast.Node) (out ast.Node, ok bool) {
func (b *planBuilder) buildUnion(union *ast.UnionStmt) Plan {
sels := make([]Plan, len(union.SelectList.Selects))
for i, sel := range union.SelectList.Selects {
sels[i] = b.buildSelect(sel)
if UseNewPlanner {
sels[i] = b.buildNewSelect(sel)
} else {
sels[i] = b.buildSelect(sel)
}
}
var p Plan
p = &Union{
Expand Down Expand Up @@ -876,6 +880,7 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) Plan {
uField.Column.Tp = f.Column.Tp
}
}
addChild(p, sel)
}
for _, v := range unionFields {
v.Expr.SetType(&v.Column.FieldType)
Expand All @@ -896,7 +901,7 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) Plan {

func (b *planBuilder) buildDistinct(src Plan) Plan {
d := &Distinct{}
AddChild(d, src)
addChild(d, src)
d.SetFields(src.Fields())
return d
}
Expand Down Expand Up @@ -1039,7 +1044,7 @@ func (b *planBuilder) buildShow(show *ast.ShowStmt) Plan {
}
if len(conditions) != 0 {
filter := &Filter{Conditions: conditions}
AddChild(filter, p)
addChild(filter, p)
p = filter
}
return p
Expand All @@ -1061,7 +1066,7 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan {
}
if insert.Select != nil {
insertPlan.SelectPlan = b.build(insert.Select)
AddChild(insertPlan, insertPlan.SelectPlan)
addChild(insertPlan, insertPlan.SelectPlan)
if b.err != nil {
return nil
}
Expand All @@ -1082,7 +1087,7 @@ func (b *planBuilder) buildExplain(explain *ast.ExplainStmt) Plan {
return nil
}
p := &Explain{StmtPlan: targetPlan}
AddChild(p, targetPlan)
addChild(p, targetPlan)
p.SetFields(buildExplainFields())
return p
}
Expand Down
Loading

0 comments on commit b526229

Please sign in to comment.