Skip to content

Commit

Permalink
Revert "coprocessor: Exceed action for copiterator (#17324)" (#18706)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Jul 21, 2020
1 parent f5c6e59 commit af685f5
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 101 deletions.
10 changes: 0 additions & 10 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sort"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -140,14 +139,6 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
e.feedback.Invalidate()
return err
}

actionExceed := e.memTracker.GetActionOnExceed()
if actionExceed != nil {
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionExceed)
} else {
return errors.Trace(fmt.Errorf("failed to find actionExceed in TableReaderExecutor Open phase"))
}

if len(secondPartRanges) == 0 {
e.resultHandler.open(nil, firstResult)
return nil
Expand Down Expand Up @@ -205,7 +196,6 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
} else {
reqBuilder = builder.SetTableRanges(getPhysicalTableID(e.table), ranges, e.feedback)
}

kvReq, err := reqBuilder.
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand Down
84 changes: 0 additions & 84 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,8 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
vars: vars,
memTracker: req.MemTracker,
replicaReadSeed: c.replicaReadSeed,
actionOnExceed: &EndCopWorkerAction{},
rpcCancel: NewRPCanceller(),
}
if it.memTracker != nil {
it.memTracker.FallbackOldAndSetNewAction(it.actionOnExceed)
}

it.minCommitTSPushed.data = make(map[uint64]struct{}, 5)
it.tasks = tasks
if it.concurrency > len(tasks) {
Expand All @@ -96,7 +91,6 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
} else {
it.respChan = make(chan *copResponse, it.concurrency)
}
it.actionOnExceed.mu.aliveWorker = it.concurrency
if !it.req.Streaming {
ctx = context.WithValue(ctx, RPCCancellerCtxKey{}, it.rpcCancel)
}
Expand Down Expand Up @@ -409,13 +403,10 @@ type copIterator struct {
closed uint32

minCommitTSPushed

actionOnExceed *EndCopWorkerAction
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
type copIteratorWorker struct {
id string
taskCh <-chan *copTask
wg *sync.WaitGroup
store *tikvStore
Expand All @@ -428,8 +419,6 @@ type copIteratorWorker struct {
memTracker *memory.Tracker

replicaReadSeed uint32

actionOnExceed *EndCopWorkerAction
}

// copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit.
Expand Down Expand Up @@ -501,14 +490,7 @@ const minLogCopTaskTime = 300 * time.Millisecond
// send the result back.
func (worker *copIteratorWorker) run(ctx context.Context) {
defer worker.wg.Done()

for task := range worker.taskCh {
endWorker, remainWorkers := worker.checkWorkerOOM()
if endWorker {
logutil.BgLogger().Info("end one copIterator worker.",
zap.String("copIteratorWorker id", worker.id), zap.Int("remain alive worker", remainWorkers))
return
}
respCh := worker.respChan
if respCh == nil {
respCh = task.respChan
Expand All @@ -521,39 +503,19 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
}
select {
case <-worker.finishCh:
worker.actionOnExceed.mu.Lock()
worker.actionOnExceed.mu.aliveWorker--
worker.actionOnExceed.mu.Unlock()
return
default:
}
}
}

func (worker *copIteratorWorker) checkWorkerOOM() (bool, int) {
endWorker := false
remainWorkers := 0
worker.actionOnExceed.mu.Lock()
defer worker.actionOnExceed.mu.Unlock()
if worker.actionOnExceed.mu.exceeded != 0 {
endWorker = true
worker.actionOnExceed.mu.aliveWorker--
remainWorkers = worker.actionOnExceed.mu.aliveWorker
// reset action
worker.actionOnExceed.mu.exceeded = 0
worker.actionOnExceed.once = sync.Once{}
}
return endWorker, remainWorkers
}

// open starts workers and sender goroutines.
func (it *copIterator) open(ctx context.Context) {
taskCh := make(chan *copTask, 1)
it.wg.Add(it.concurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency; i++ {
worker := &copIteratorWorker{
id: fmt.Sprintf("copIteratorWorker-%d", i),
taskCh: taskCh,
wg: &it.wg,
store: it.store,
Expand All @@ -571,7 +533,6 @@ func (it *copIterator) open(ctx context.Context) {
memTracker: it.memTracker,

replicaReadSeed: it.replicaReadSeed,
actionOnExceed: it.actionOnExceed,
}
go worker.run(ctx)
}
Expand Down Expand Up @@ -1208,48 +1169,3 @@ func (it copErrorResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
func (it copErrorResponse) Close() error {
return nil
}

// EndCopWorkerAction implements memory.ActionOnExceed for copIteratorWorker. If
// the memory quota of a query is exceeded, EndCopWorkAction.Action would end one copIteratorWorker.
// If there is only one or zero worker is running, delegate to the fallback action.
type EndCopWorkerAction struct {
once sync.Once
fallbackAction memory.ActionOnExceed
mu struct {
sync.Mutex
// exceeded indicates that datasource have exceeded memQuota.
exceeded uint32

// alive worker indicates how many copIteratorWorker are running
aliveWorker int
}
}

// Action sends a signal to trigger end one copIterator worker.
func (e *EndCopWorkerAction) Action(t *memory.Tracker) {
e.mu.Lock()
defer e.mu.Unlock()
// only one or zero worker is running, delegate to the fallback action
if e.mu.aliveWorker < 2 {
if e.fallbackAction != nil {
e.fallbackAction.Action(t)
}
return
}
// set exceeded as 1
e.once.Do(func() {
e.mu.exceeded = 1
logutil.BgLogger().Info("memory exceeds quota, mark EndCopWorkerAction exceed signal.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()), zap.Int64("maxConsumed", t.MaxConsumed()))
})
}

// SetLogHook implements ActionOnExceed.SetLogHook
func (e *EndCopWorkerAction) SetLogHook(hook func(uint64)) {

}

// SetFallback implements ActionOnExceed.SetFallback
func (e *EndCopWorkerAction) SetFallback(a memory.ActionOnExceed) {
e.fallbackAction = a
}
7 changes: 0 additions & 7 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,6 @@ func (t *Tracker) SetActionOnExceed(a ActionOnExceed) {
t.actionMu.Unlock()
}

// GetActionOnExceed return the actionOnExceed
func (t *Tracker) GetActionOnExceed() ActionOnExceed {
t.actionMu.Lock()
defer t.actionMu.Unlock()
return t.actionMu.actionOnExceed
}

// FallbackOldAndSetNewAction sets the action when memory usage exceeds bytesLimit
// and set the original action as its fallback.
func (t *Tracker) FallbackOldAndSetNewAction(a ActionOnExceed) {
Expand Down

0 comments on commit af685f5

Please sign in to comment.