Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support hash join in new plan #1234

Merged
merged 10 commits into from
May 19, 2016
Merged
71 changes: 71 additions & 0 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ngaut/log"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/optimizer/plan"
)

var smallCount = 100
Expand Down Expand Up @@ -34,6 +35,16 @@ func prepareBenchData(se Session, colType string, valueFormat string, valueCount
mustExecute(se, "commit")
}

func prepareJoinBenchData(se Session, colType string, valueFormat string, valueCount int) {
mustExecute(se, "drop table if exists t")
mustExecute(se, fmt.Sprintf("create table t (pk int primary key auto_increment, col %s)", colType))
mustExecute(se, "begin")
for i := 0; i < valueCount; i++ {
mustExecute(se, "insert t (col) values ("+fmt.Sprintf(valueFormat, i)+")")
}
mustExecute(se, "commit")
}

func readResult(rs ast.RecordSet, count int) {
for count > 0 {
x, err := rs.Next()
Expand Down Expand Up @@ -192,3 +203,63 @@ func BenchmarkInsertNoIndex(b *testing.B) {
mustExecute(se, fmt.Sprintf("insert t values (%d, %d)", i, i))
}
}

func BenchmarkJoin(b *testing.B) {
b.StopTimer()
se := prepareBenchSession()
prepareJoinBenchData(se, "int", "%v", smallCount)
b.StartTimer()
for i := 0; i < b.N; i++ {
rs, err := se.Execute("select * from t a join t b on a.col = b.col")
if err != nil {
b.Fatal(err)
}
readResult(rs[0], 100)
}
}

func BenchmarkNewJoin(b *testing.B) {
b.StopTimer()
se := prepareBenchSession()
prepareJoinBenchData(se, "int", "%v", smallCount)
b.StartTimer()
plan.UseNewPlanner = true
for i := 0; i < b.N; i++ {
rs, err := se.Execute("select * from t a join t b on a.col = b.col")
if err != nil {
b.Fatal(err)
}
readResult(rs[0], 100)
}
plan.UseNewPlanner = false
}

func BenchmarkJoinLimit(b *testing.B) {
b.StopTimer()
se := prepareBenchSession()
prepareJoinBenchData(se, "int", "%v", smallCount)
b.StartTimer()
for i := 0; i < b.N; i++ {
rs, err := se.Execute("select * from t a join t b on a.col = b.col limit 1")
if err != nil {
b.Fatal(err)
}
readResult(rs[0], 1)
}
}

func BenchmarkNewJoinLimit(b *testing.B) {
b.StopTimer()
se := prepareBenchSession()
prepareJoinBenchData(se, "int", "%v", smallCount)
b.StartTimer()
plan.UseNewPlanner = true
for i := 0; i < b.N; i++ {
rs, err := se.Execute("select * from t a join t b on a.col = b.col limit 1")
if err != nil {
b.Fatal(err)
}
readResult(rs[0], 1)
}
plan.UseNewPlanner = false
}
2 changes: 1 addition & 1 deletion evaluator/evaluator_binop.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (e *Evaluator) handleAndAnd(o *ast.BinaryOperationExpr) bool {

func (e *Evaluator) handleOrOr(o *ast.BinaryOperationExpr) bool {
leftDatum := o.L.GetDatum()
righDatum := o.R.GetDatum()
if leftDatum.Kind() != types.KindNull {
x, err := leftDatum.ToBool()
if err != nil {
Expand All @@ -103,6 +102,7 @@ func (e *Evaluator) handleOrOr(o *ast.BinaryOperationExpr) bool {
return true
}
}
righDatum := o.R.GetDatum()
if righDatum.Kind() != types.KindNull {
y, err := righDatum.ToBool()
if err != nil {
Expand Down
73 changes: 73 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,85 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildUnion(v)
case *plan.Update:
return b.buildUpdate(v)
case *plan.Join:
return b.buildJoin(v)
default:
b.err = ErrUnknownPlan.Gen("Unknown Plan %T", p)
return nil
}
}

// compose CNF items into a balance deep CNF tree, which benefits a lot for pb decoder/encoder.
func composeCondition(conditions []ast.ExprNode) ast.ExprNode {
length := len(conditions)
if length == 0 {
return nil
} else if length == 1 {
return conditions[0]
} else {
return &ast.BinaryOperationExpr{Op: opcode.AndAnd, L: composeCondition(conditions[:length/2]), R: composeCondition(conditions[length/2:])}
}
}

//TODO: select join algorithm during cbo phase.
func (b *executorBuilder) buildJoin(v *plan.Join) Executor {
e := &HashJoinExec{
otherFilter: composeCondition(v.OtherConditions),
prepared: false,
fields: v.Fields(),
ctx: b.ctx,
}
var leftHashKey, rightHashKey []ast.ExprNode
for _, eqCond := range v.EqualConditions {
binop, ok := eqCond.(*ast.BinaryOperationExpr)
if ok && binop.Op == opcode.EQ {
ln, lOK := binop.L.(*ast.ColumnNameExpr)
rn, rOK := binop.R.(*ast.ColumnNameExpr)
if lOK && rOK {
leftHashKey = append(leftHashKey, ln)
rightHashKey = append(rightHashKey, rn)
continue
}
}
b.err = ErrUnknownPlan.Gen("Invalid Join Equal Condition !!")
}
switch v.JoinType {
case plan.LeftOuterJoin:
e.outter = true
e.leftSmall = false
e.smallFilter = composeCondition(v.RightConditions)
e.bigFilter = composeCondition(v.LeftConditions)
e.smallHashKey = rightHashKey
e.bigHashKey = leftHashKey
case plan.RightOuterJoin:
e.outter = true
e.leftSmall = true
e.smallFilter = composeCondition(v.LeftConditions)
e.bigFilter = composeCondition(v.RightConditions)
e.smallHashKey = leftHashKey
e.bigHashKey = rightHashKey
case plan.InnerJoin:
//TODO: assume right table is the small one before cbo is realized.
e.outter = false
e.leftSmall = false
e.smallFilter = composeCondition(v.RightConditions)
e.bigFilter = composeCondition(v.LeftConditions)
e.smallHashKey = rightHashKey
e.bigHashKey = leftHashKey
default:
b.err = ErrUnknownPlan.Gen("Unknown Join Type !!")
return nil
}
if e.leftSmall {
e.smallExec = b.build(v.GetChildByIndex(0))
e.bigExec = b.build(v.GetChildByIndex(1))
} else {
e.smallExec = b.build(v.GetChildByIndex(1))
e.bigExec = b.build(v.GetChildByIndex(0))
}
return e
}

func (b *executorBuilder) buildFilter(src Executor, conditions []ast.ExprNode) Executor {
if len(conditions) == 0 {
return src
Expand Down
71 changes: 70 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/inspectkv"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/optimizer/plan"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand Down Expand Up @@ -718,6 +719,8 @@ func (s *testSuite) TestSelectHaving(c *C) {
r.Check(testkit.Rows(rowStr))
tk.MustExec("commit")

r = tk.MustQuery("select * from select_having_test group by id having null is not null;")

tk.MustExec("drop table select_having_test")
}

Expand Down Expand Up @@ -925,7 +928,6 @@ func (s *testSuite) TestUnion(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

testSQL := `select 1 union select 0;`
tk.MustExec(testSQL)

Expand Down Expand Up @@ -1098,6 +1100,73 @@ func (s *testSuite) TestJoin(c *C) {
result.Check(testkit.Rows("<nil> <nil> <nil> <nil> 5 5", "<nil> <nil> <nil> <nil> 9 9", "1 1 1 1 1 1"))
}

func (s *testSuite) TestNewJoin(c *C) {
plan.UseNewPlanner = true
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c int)")
tk.MustExec("insert t values (1)")
cases := []struct {
sql string
result [][]interface{}
}{
{
"select 1 from t as a left join t as b on 0",
testkit.Rows("1"),
},
{
"select 1 from t as a join t as b on 1",
testkit.Rows("1"),
},
}
for _, ca := range cases {
result := tk.MustQuery(ca.sql)
result.Check(ca.result)
}

tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t(c1 int, c2 int)")
tk.MustExec("create table t1(c1 int, c2 int)")
tk.MustExec("insert into t values(1,1),(2,2)")
tk.MustExec("insert into t1 values(2,3),(4,4)")
result := tk.MustQuery("select * from t left outer join t1 on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20")
result.Check(testkit.Rows("1 1 <nil> <nil>"))
result = tk.MustQuery("select * from t1 right outer join t on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20")
result.Check(testkit.Rows("<nil> <nil> 1 1"))
result = tk.MustQuery("select * from t right outer join t1 on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20")
result.Check(testkit.Rows())
result = tk.MustQuery("select * from t left outer join t1 on t.c1 = t1.c1 where t1.c1 = 3 or false")
result.Check(testkit.Rows())
result = tk.MustQuery("select * from t left outer join t1 on t.c1 = t1.c1 and t.c1 != 1")
result.Check(testkit.Rows("1 1 <nil> <nil>", "2 2 2 3"))

tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("drop table if exists t3")

tk.MustExec("create table t1 (c1 int, c2 int)")
tk.MustExec("create table t2 (c1 int, c2 int)")
tk.MustExec("create table t3 (c1 int, c2 int)")

tk.MustExec("insert into t1 values (1,1), (2,2), (3,3)")
tk.MustExec("insert into t2 values (1,1), (3,3), (5,5)")
tk.MustExec("insert into t3 values (1,1), (5,5), (9,9)")

result = tk.MustQuery("select * from t1 left join t2 on t1.c1 = t2.c1 right join t3 on t2.c1 = t3.c1 order by t1.c1, t1.c2, t2.c1, t2.c2, t3.c1, t3.c2;")
result.Check(testkit.Rows("<nil> <nil> <nil> <nil> 5 5", "<nil> <nil> <nil> <nil> 9 9", "1 1 1 1 1 1"))

tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (c1 int)")
tk.MustExec("insert into t1 values (1), (1), (1)")
result = tk.MustQuery("select * from t1 a join t1 b on a.c1 = b.c1;")
result.Check(testkit.Rows("1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1"))

plan.UseNewPlanner = false
}

func (s *testSuite) TestIndexScan(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
Expand Down
Loading