diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 4346daac74a92..56830615128c1 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1872,8 +1872,22 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC } else { if prop.MPPPartitionTp == property.HashType { var matches []int - if matches = prop.IsSubsetOf(lkeys); len(matches) == 0 { + if p.JoinType == InnerJoin { + if matches = prop.IsSubsetOf(lkeys); len(matches) == 0 { + matches = prop.IsSubsetOf(rkeys) + } + } else if p.JoinType == RightOuterJoin { + // for right out join, only the right keys can possibly matches the prop, because + // the left keys will generate NULL values randomly + // todo maybe we can add a null-sensitive flag in the key columns to indicate whether the column is + // null-sensitive(used in aggregation) or null-insensitive(used in join) matches = prop.IsSubsetOf(rkeys) + } else { + // for left out join, only the left keys can possibly matches the prop, because + // the right keys will generate NULL values randomly + // for semi/anti semi/left out semi/anti left out semi join, only left keys are returned, + // so just check the left keys + matches = prop.IsSubsetOf(lkeys) } if len(matches) == 0 { return nil diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index f3b98aee546a0..a0c31fd7d6769 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -680,6 +680,55 @@ func (s *testIntegrationSerialSuite) TestMPPShuffledJoin(c *C) { } } +func (s *testIntegrationSerialSuite) TestMPPJoinWithCanNotFoundColumnInSchemaColumnsError(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(id int, v1 decimal(20,2), v2 decimal(20,2))") + tk.MustExec("create table t2(id int, v1 decimal(10,2), v2 decimal(10,2))") + tk.MustExec("create table t3(id int, v1 decimal(10,2), v2 decimal(10,2))") + tk.MustExec("insert into t1 values(1,1,1),(2,2,2)") + tk.MustExec("insert into t2 values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8)") + tk.MustExec("insert into t3 values(1,1,1)") + tk.MustExec("analyze table t1") + tk.MustExec("analyze table t2") + tk.MustExec("analyze table t3") + + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t1" || tblInfo.Name.L == "t2" || tblInfo.Name.L == "t3" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_enforce_mpp = 1") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0") + tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 0") + + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + func (s *testIntegrationSerialSuite) TestJoinNotSupportedByTiFlash(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/task.go b/planner/core/task.go index 764ee130a46a9..755eb9e4e5c1a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -750,7 +750,25 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...task) task { lCost := lTask.cost() rCost := rTask.cost() - outerTask := tasks[1-p.InnerChildIdx].(*mppTask) + // outer task is the task that will pass its MPPPartitionType to the join result + // for broadcast inner join, it should be the non-broadcast side, since broadcast side is always the build side, so + // just use the probe side is ok. + // for hash inner join, both side is ok, by default, we use the probe side + // for outer join, it should always be the outer side of the join + // for semi join, it should be the left side(the same as left out join) + outerTaskIndex := 1 - p.InnerChildIdx + if p.JoinType != InnerJoin { + if p.JoinType == RightOuterJoin { + outerTaskIndex = 1 + } else { + outerTaskIndex = 0 + } + } + // can not use the task from tasks because it maybe updated. + outerTask := lTask + if outerTaskIndex == 1 { + outerTask = rTask + } task := &mppTask{ cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()), p: p, diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index a613a9238f378..642ee13516172 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -79,6 +79,14 @@ "explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)" ] }, + { + "name": "TestMPPJoinWithCanNotFoundColumnInSchemaColumnsError", + "cases": [ + "explain format = 'brief' select v from t3 as a left join (select t1.v1, t1.v2, t1.v1 + t1.v2 as v from t1 left join t2 on t1.v1 = t2.v1 and t1.v2 = t2.v2) b on a.v1 = b.v1 and a.v2 = b.v2", + "explain format = 'brief' select count(*), t2.v1, t2.v2 from t1 left join t2 on t1.v1 = t2.v1 and t1.v2 = t2.v2 group by t2.v1, t2.v2", + "explain format = 'brief' select count(*), t2.v1, t2.v2 from t3 left join t2 on t3.v1 = t2.v1 and t3.v2 = t2.v2 group by t2.v1, t2.v2" + ] + }, { "name": "TestJoinNotSupportedByTiFlash", "cases": [ diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index 72d8dfe3965b7..34ab40db8f53f 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -756,6 +756,76 @@ } ] }, + { + "Name": "TestMPPJoinWithCanNotFoundColumnInSchemaColumnsError", + "Cases": [ + { + "SQL": "explain format = 'brief' select v from t3 as a left join (select t1.v1, t1.v2, t1.v1 + t1.v2 as v from t1 left join t2 on t1.v1 = t2.v1 and t1.v2 = t2.v2) b on a.v1 = b.v1 and a.v2 = b.v2", + "Plan": [ + "TableReader 1.00 root data:ExchangeSender", + "└─ExchangeSender 1.00 cop[tiflash] ExchangeType: PassThrough", + " └─Projection 1.00 cop[tiflash] Column#13", + " └─HashJoin 1.00 cop[tiflash] left outer join, equal:[eq(test.t3.v1, test.t1.v1) eq(test.t3.v2, test.t1.v2)]", + " ├─ExchangeReceiver(Build) 1.00 cop[tiflash] ", + " │ └─ExchangeSender 1.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: Column#23, Column#24", + " │ └─Projection 1.00 cop[tiflash] test.t3.v1, test.t3.v2, cast(test.t3.v1, decimal(20,2))->Column#23, cast(test.t3.v2, decimal(20,2))->Column#24", + " │ └─TableFullScan 1.00 cop[tiflash] table:a keep order:false", + " └─Projection(Probe) 2.00 cop[tiflash] test.t1.v1, test.t1.v2, plus(test.t1.v1, test.t1.v2)->Column#13", + " └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.t1.v1, test.t2.v1) eq(test.t1.v2, test.t2.v2)]", + " ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ", + " │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t1.v1, test.t1.v2", + " │ └─Selection 2.00 cop[tiflash] not(isnull(test.t1.v1)), not(isnull(test.t1.v2))", + " │ └─TableFullScan 2.00 cop[tiflash] table:t1 keep order:false", + " └─ExchangeReceiver(Probe) 8.00 cop[tiflash] ", + " └─ExchangeSender 8.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: Column#15, Column#16", + " └─Projection 8.00 cop[tiflash] test.t2.v1, test.t2.v2, cast(test.t2.v1, decimal(20,2))->Column#15, cast(test.t2.v2, decimal(20,2))->Column#16", + " └─Selection 8.00 cop[tiflash] not(isnull(test.t2.v1)), not(isnull(test.t2.v2))", + " └─TableFullScan 8.00 cop[tiflash] table:t2 keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*), t2.v1, t2.v2 from t1 left join t2 on t1.v1 = t2.v1 and t1.v2 = t2.v2 group by t2.v1, t2.v2", + "Plan": [ + "TableReader 2.00 root data:ExchangeSender", + "└─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Projection 2.00 batchCop[tiflash] Column#9, test.t2.v1, test.t2.v2", + " └─HashAgg 2.00 batchCop[tiflash] group by:test.t2.v1, test.t2.v2, funcs:sum(Column#22)->Column#9, funcs:firstrow(test.t2.v1)->test.t2.v1, funcs:firstrow(test.t2.v2)->test.t2.v2", + " └─ExchangeReceiver 2.00 batchCop[tiflash] ", + " └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t2.v1, test.t2.v2", + " └─HashAgg 2.00 batchCop[tiflash] group by:test.t2.v1, test.t2.v2, funcs:count(1)->Column#22", + " └─HashJoin 2.00 batchCop[tiflash] left outer join, equal:[eq(test.t1.v1, test.t2.v1) eq(test.t1.v2, test.t2.v2)]", + " ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ", + " │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t1.v1, test.t1.v2", + " │ └─TableFullScan 2.00 batchCop[tiflash] table:t1 keep order:false", + " └─ExchangeReceiver(Probe) 8.00 batchCop[tiflash] ", + " └─ExchangeSender 8.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: Column#14, Column#15", + " └─Projection 8.00 batchCop[tiflash] test.t2.v1, test.t2.v2, cast(test.t2.v1, decimal(20,2))->Column#14, cast(test.t2.v2, decimal(20,2))->Column#15", + " └─Selection 8.00 batchCop[tiflash] not(isnull(test.t2.v1)), not(isnull(test.t2.v2))", + " └─TableFullScan 8.00 batchCop[tiflash] table:t2 keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*), t2.v1, t2.v2 from t3 left join t2 on t3.v1 = t2.v1 and t3.v2 = t2.v2 group by t2.v1, t2.v2", + "Plan": [ + "TableReader 1.00 root data:ExchangeSender", + "└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Projection 1.00 batchCop[tiflash] Column#9, test.t2.v1, test.t2.v2", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t2.v1, test.t2.v2, funcs:sum(Column#16)->Column#9, funcs:firstrow(test.t2.v1)->test.t2.v1, funcs:firstrow(test.t2.v2)->test.t2.v2", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t2.v1, test.t2.v2", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t2.v1, test.t2.v2, funcs:count(1)->Column#16", + " └─HashJoin 1.00 batchCop[tiflash] left outer join, equal:[eq(test.t3.v1, test.t2.v1) eq(test.t3.v2, test.t2.v2)]", + " ├─ExchangeReceiver(Build) 1.00 batchCop[tiflash] ", + " │ └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t3.v1, test.t3.v2", + " │ └─TableFullScan 1.00 batchCop[tiflash] table:t3 keep order:false", + " └─ExchangeReceiver(Probe) 8.00 batchCop[tiflash] ", + " └─ExchangeSender 8.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t2.v1, test.t2.v2", + " └─Selection 8.00 batchCop[tiflash] not(isnull(test.t2.v1)), not(isnull(test.t2.v2))", + " └─TableFullScan 8.00 batchCop[tiflash] table:t2 keep order:false" + ] + } + ] + }, { "Name": "TestJoinNotSupportedByTiFlash", "Cases": [