Skip to content

Commit

Permalink
executor: HashJoinExec checks the buildError even if the probeSide is…
Browse files Browse the repository at this point in the history
… empty (#30471) (#30794)

close #30289
  • Loading branch information
ti-srebot authored Dec 20, 2021
1 parent e4c8abb commit 20d3d60
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
14 changes: 14 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8279,3 +8279,17 @@ func (s *testSuite) TestIssue26532(c *C) {
tk.MustQuery("select greatest(\"2020-01-01 01:01:01\" ,\"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2020-01-01 01:01:01", "<nil>"))
tk.MustQuery("select least(\"2020-01-01 01:01:01\" , \"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2019-01-01 01:01:01", "<nil>"))
}

func (s *testSerialSuite) TestIssue30289(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
fpName := "github.com/pingcap/tidb/executor/issue30289"
c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable(fpName), IsNil)
}()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a")
c.Assert(err.Error(), Matches, "issue30289 build return error")
}
13 changes: 12 additions & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,13 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
return
}
if !hasWaitedForBuild {
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
probeSideResult.Reset()
}
})
if probeSideResult.NumRows() == 0 && !e.useOuterToBuild {
e.finished.Store(true)
return
}
emptyBuild, buildErr := e.wait4BuildSide()
if buildErr != nil {
Expand Down Expand Up @@ -260,6 +264,13 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) {
func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) {
defer close(chkCh)
var err error
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
err = errors.Errorf("issue30289 build return error")
e.buildFinished <- errors.Trace(err)
return
}
})
for {
if e.finished.Load().(bool) {
return
Expand Down
28 changes: 21 additions & 7 deletions executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,25 +141,39 @@ func (e *ShuffleExec) Close() error {
if !e.prepared {
for _, w := range e.workers {
for _, r := range w.receivers {
close(r.inputHolderCh)
close(r.inputCh)
if r.inputHolderCh != nil {
close(r.inputHolderCh)
}
if r.inputCh != nil {
close(r.inputCh)
}
}
close(w.outputHolderCh)
if w.outputHolderCh != nil {
close(w.outputHolderCh)
}
}
if e.outputCh != nil {
close(e.outputCh)
}
close(e.outputCh)
}
close(e.finishCh)
if e.finishCh != nil {
close(e.finishCh)
}
for _, w := range e.workers {
for _, r := range w.receivers {
for range r.inputCh {
if r.inputCh != nil {
for range r.inputCh {
}
}
}
// close child executor of each worker
if err := w.childExec.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
for range e.outputCh { // workers exit before `e.outputCh` is closed.
if e.outputCh != nil {
for range e.outputCh { // workers exit before `e.outputCh` is closed.
}
}
e.executed = false

Expand Down

0 comments on commit 20d3d60

Please sign in to comment.