Skip to content

Commit

Permalink
pkg/executor: fix the hang issue in indexHashJoin (#49218) (#49409)
Browse files Browse the repository at this point in the history
close #49033
  • Loading branch information
ti-chi-bot committed Dec 20, 2023
1 parent 27ee7b8 commit 0b35635
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 42 deletions.
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4431,6 +4431,9 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
}
}
if len(kvRanges) != 0 && memTracker != nil {
failpoint.Inject("testIssue49033", func() {
panic("testIssue49033")
})
memTracker.Consume(int64(2 * cap(kvRanges[0].StartKey) * len(kvRanges)))
}
if len(tmpDatumRanges) != 0 && memTracker != nil {
Expand Down
102 changes: 60 additions & 42 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type IndexNestedLoopHashJoin struct {

stats *indexLookUpJoinRuntimeStats
prepared bool
// panicErr records the error generated by panic recover. This is introduced to
// return the actual error message instead of `context cancelled` to the client.
panicErr error
ctxWithCancel context.Context
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -144,7 +148,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.stats.concurrency = concurrency
}
workerCtx, cancelFunc := context.WithCancel(ctx)
e.cancelFunc = cancelFunc
e.ctxWithCancel, e.cancelFunc = workerCtx, cancelFunc
innerCh := make(chan *indexHashJoinTask, concurrency)
if e.keepOuterOrder {
e.taskCh = make(chan *indexHashJoinTask, concurrency)
Expand All @@ -157,7 +161,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency)
e.workerWg.Add(1)
ow := e.newOuterWorker(innerCh)
go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers)
go util.WithRecovery(func() { ow.run(e.ctxWithCancel) }, e.finishJoinWorkers)

for i := 0; i < concurrency; i++ {
if !e.keepOuterOrder {
Expand All @@ -174,7 +178,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.workerWg.Add(concurrency)
for i := 0; i < concurrency; i++ {
workerID := i
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(workerCtx, cancelFunc) }, e.finishJoinWorkers)
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(e.ctxWithCancel, cancelFunc) }, e.finishJoinWorkers)
}
go e.wait4JoinWorkers()
}
Expand All @@ -189,6 +193,7 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
e.panicErr = err
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand All @@ -214,59 +219,39 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
}
req.Reset()
if e.keepOuterOrder {
return e.runInOrder(ctx, req)
return e.runInOrder(e.ctxWithCancel, req)
}
// unordered run
var (
result *indexHashJoinResult
ok bool
)
select {
case result, ok = <-e.resultCh:
if !ok {
return nil
}
if result.err != nil {
return result.err
}
case <-ctx.Done():
return ctx.Err()
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
return e.runUnordered(e.ctxWithCancel, req)
}

func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
var (
result *indexHashJoinResult
ok bool
)
for {
if e.isDryUpTasks(ctx) {
return nil
return e.panicErr
}
if e.curTask.err != nil {
return e.curTask.err
}
select {
case result, ok = <-e.curTask.resultCh:
if !ok {
e.curTask = nil
continue
}
if result.err != nil {
return result.err
}
case <-ctx.Done():
return ctx.Err()
result, err := e.getResultFromChannel(ctx, e.curTask.resultCh)
if err != nil {
return err
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
if result == nil {
e.curTask = nil
continue
}
return e.handleResult(req, result)
}
}

func (e *IndexNestedLoopHashJoin) runUnordered(ctx context.Context, req *chunk.Chunk) error {
result, err := e.getResultFromChannel(ctx, e.resultCh)
if err != nil {
return err
}
return e.handleResult(req, result)
}

// isDryUpTasks indicates whether all the tasks have been processed.
func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
if e.curTask != nil {
Expand All @@ -284,6 +269,38 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
return false
}

func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resultCh <-chan *indexHashJoinResult) (*indexHashJoinResult, error) {
var (
result *indexHashJoinResult
ok bool
)
select {
case result, ok = <-resultCh:
if !ok {
return nil, nil
}
if result.err != nil {
return nil, result.err
}
case <-ctx.Done():
err := e.panicErr
if err == nil {
err = ctx.Err()
}
return nil, err
}
return result, nil
}

func (*IndexNestedLoopHashJoin) handleResult(req *chunk.Chunk, result *indexHashJoinResult) error {
if result == nil {
return nil
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
}

// Close implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Close() error {
if e.cancelFunc != nil {
Expand All @@ -305,6 +322,7 @@ func (e *IndexNestedLoopHashJoin) Close() error {
e.joinChkResourceCh = nil
e.finished.Store(false)
e.prepared = false
e.ctxWithCancel = nil
return e.baseExecutor.Close()
}

Expand Down
34 changes: 34 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2952,3 +2952,37 @@ func TestOuterJoin(t *testing.T) {
),
)
}

func TestIssue49033(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t, s;")
tk.MustExec("create table t(a int, index(a));")
tk.MustExec("create table s(a int, index(a));")
tk.MustExec("insert into t values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16), (17), (18), (19), (20), (21), (22), (23), (24), (25), (26), (27), (28), (29), (30), (31), (32), (33), (34), (35), (36), (37), (38), (39), (40), (41), (42), (43), (44), (45), (46), (47), (48), (49), (50), (51), (52), (53), (54), (55), (56), (57), (58), (59), (60), (61), (62), (63), (64), (65), (66), (67), (68), (69), (70), (71), (72), (73), (74), (75), (76), (77), (78), (79), (80), (81), (82), (83), (84), (85), (86), (87), (88), (89), (90), (91), (92), (93), (94), (95), (96), (97), (98), (99), (100), (101), (102), (103), (104), (105), (106), (107), (108), (109), (110), (111), (112), (113), (114), (115), (116), (117), (118), (119), (120), (121), (122), (123), (124), (125), (126), (127), (128);")
tk.MustExec("insert into s values(1), (128);")
tk.MustExec("set @@tidb_max_chunk_size=32;")
tk.MustExec("set @@tidb_index_lookup_join_concurrency=1;")
tk.MustExec("set @@tidb_index_join_batch_size=32;")
tk.MustQuery("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a;")
tk.MustQuery("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a order by t.a;")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIssue49033", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIssue49033"))
}()

rs, err := tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a order by t.a;")
require.NoError(t, err)
_, err = session.GetRows4Test(context.Background(), nil, rs)
require.EqualError(t, err, "testIssue49033")
require.NoError(t, rs.Close())

rs, err = tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a")
require.NoError(t, err)
_, err = session.GetRows4Test(context.Background(), nil, rs)
require.EqualError(t, err, "testIssue49033")
require.NoError(t, rs.Close())
}

0 comments on commit 0b35635

Please sign in to comment.