diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 67e2d9edc896c..c7271b87c3e87 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1036,7 +1036,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver))) var allJobs []*model.Job err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error { - allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx()) + allJobs, err = ddl.GetAllDDLJobs(context.Background(), se.GetSessionCtx()) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index a6183834d5d5d..7bad21458d0d1 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -675,7 +675,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return errors.Trace(err) } job := reorgInfo.Job - opCtx := NewLocalOperatorCtx(ctx, job.ID) + opCtx, cancel := NewLocalOperatorCtx(ctx, job.ID) + defer cancel() + idxCnt := len(reorgInfo.elements) indexIDs := make([]int64, 0, idxCnt) indexInfos := make([]*model.IndexInfo, 0, idxCnt) @@ -705,11 +707,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return errors.Trace(err) } defer ingest.LitBackCtxMgr.Unregister(job.ID) - sctx, err := sessPool.Get() - if err != nil { - return errors.Trace(err) - } - defer sessPool.Put(sctx) cpMgr, err := ingest.NewCheckpointManager( ctx, @@ -737,6 +734,11 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)), } + sctx, err := sessPool.Get() + if err != nil { + return errors.Trace(err) + } + defer sessPool.Put(sctx) avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t) engines, err := bcCtx.Register(indexIDs, uniques, t) diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index fa9c833178130..f46a451395ca8 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -83,33 +83,33 @@ type OperatorCtx struct { } // NewDistTaskOperatorCtx is used for adding index with dist framework. -func NewDistTaskOperatorCtx(ctx context.Context, taskID, subtaskID int64) *OperatorCtx { +func NewDistTaskOperatorCtx( + ctx context.Context, + taskID, subtaskID int64, +) (*OperatorCtx, context.CancelFunc) { opCtx, cancel := context.WithCancel(ctx) - opCtx = logutil.WithFields(opCtx, zap.Int64("task-id", taskID), zap.Int64("subtask-id", subtaskID)) + opCtx = logutil.WithFields(opCtx, + zap.Int64("task-id", taskID), + zap.Int64("subtask-id", subtaskID)) return &OperatorCtx{ Context: opCtx, cancel: cancel, - } + }, cancel } // NewLocalOperatorCtx is used for adding index with local ingest mode. -func NewLocalOperatorCtx(ctx context.Context, jobID int64) *OperatorCtx { +func NewLocalOperatorCtx(ctx context.Context, jobID int64) (*OperatorCtx, context.CancelFunc) { opCtx, cancel := context.WithCancel(ctx) opCtx = logutil.WithFields(opCtx, zap.Int64("jobID", jobID)) return &OperatorCtx{ Context: opCtx, cancel: cancel, - } + }, cancel } func (ctx *OperatorCtx) onError(err error) { tracedErr := errors.Trace(err) - ctx.cancel() ctx.err.CompareAndSwap(nil, &tracedErr) -} - -// Cancel cancels the pipeline. -func (ctx *OperatorCtx) Cancel() { ctx.cancel() } @@ -769,7 +769,7 @@ func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(Index return } w.rowCntListener.Written(rs.Added) - flushed, imported, err := w.backendCtx.Flush(ingest.FlushModeAuto) + flushed, imported, err := w.backendCtx.Flush(w.ctx, ingest.FlushModeAuto) if err != nil { w.ctx.onError(err) return @@ -949,7 +949,7 @@ func (s *indexWriteResultSink) flush() error { failpoint.Inject("mockFlushError", func(_ failpoint.Value) { failpoint.Return(errors.New("mock flush error")) }) - flushed, imported, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport) + flushed, imported, err := s.backendCtx.Flush(s.ctx, ingest.FlushModeForceFlushAndImport) if s.cpMgr != nil { // Try to advance watermark even if there is an error. s.cpMgr.AdvanceWatermark(flushed, imported) diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index ed9166a7627f4..8e848b19b654c 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -105,8 +105,8 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta return err } - opCtx := NewDistTaskOperatorCtx(ctx, subtask.TaskID, subtask.ID) - defer opCtx.Cancel() + opCtx, cancel := NewDistTaskOperatorCtx(ctx, subtask.TaskID, subtask.ID) + defer cancel() r.curRowCount.Store(0) if len(r.cloudStorageURI) > 0 { diff --git a/pkg/ddl/cluster.go b/pkg/ddl/cluster.go index b66894be51d13..c7d08e2c718f8 100644 --- a/pkg/ddl/cluster.go +++ b/pkg/ddl/cluster.go @@ -297,7 +297,7 @@ func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context, } } - jobs, err := GetAllDDLJobs(se) + jobs, err := GetAllDDLJobs(ctx, se) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/db_change_test.go b/pkg/ddl/db_change_test.go index 740b94135d01b..ade943bd20290 100644 --- a/pkg/ddl/db_change_test.go +++ b/pkg/ddl/db_change_test.go @@ -1297,11 +1297,12 @@ func prepareTestControlParallelExecSQL(t *testing.T, store kv.Storage) (*testkit return } var qLen int + ctx := context.Background() for { sess := testkit.NewTestKit(t, store).Session() - err := sessiontxn.NewTxn(context.Background(), sess) + err := sessiontxn.NewTxn(ctx, sess) require.NoError(t, err) - jobs, err := ddl.GetAllDDLJobs(sess) + jobs, err := ddl.GetAllDDLJobs(ctx, sess) require.NoError(t, err) qLen = len(jobs) if qLen == 2 { @@ -1321,11 +1322,12 @@ func prepareTestControlParallelExecSQL(t *testing.T, store kv.Storage) (*testkit // Make sure the sql1 is put into the DDLJobQueue. go func() { var qLen int + ctx := context.Background() for { sess := testkit.NewTestKit(t, store).Session() - err := sessiontxn.NewTxn(context.Background(), sess) + err := sessiontxn.NewTxn(ctx, sess) require.NoError(t, err) - jobs, err := ddl.GetAllDDLJobs(sess) + jobs, err := ddl.GetAllDDLJobs(ctx, sess) require.NoError(t, err) qLen = len(jobs) if qLen == 1 { diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index b6a5134f8f447..666a18b2cfa6e 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -1229,7 +1229,8 @@ func GetDDLInfo(s sessionctx.Context) (*Info, error) { func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) { var generalJob, reorgJob *model.Job - jobs, err := getJobsBySQL(sess, JobTable, "not reorg order by job_id limit 1") + ctx := context.Background() + jobs, err := getJobsBySQL(ctx, sess, JobTable, "not reorg order by job_id limit 1") if err != nil { return nil, nil, errors.Trace(err) } @@ -1237,7 +1238,7 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) { if len(jobs) != 0 { generalJob = jobs[0] } - jobs, err = getJobsBySQL(sess, JobTable, "reorg order by job_id limit 1") + jobs, err = getJobsBySQL(ctx, sess, JobTable, "reorg order by job_id limit 1") if err != nil { return nil, nil, errors.Trace(err) } @@ -1309,6 +1310,7 @@ func resumePausedJob(_ *sess.Session, job *model.Job, // processJobs command on the Job according to the process func processJobs( + ctx context.Context, process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), sessCtx sessionctx.Context, ids []int64, @@ -1336,11 +1338,11 @@ func processJobs( idsStr = append(idsStr, strconv.FormatInt(id, 10)) } - err = ns.Begin(context.Background()) + err = ns.Begin(ctx) if err != nil { return nil, err } - jobs, err := getJobsBySQL(ns, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", "))) + jobs, err := getJobsBySQL(ctx, ns, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", "))) if err != nil { ns.Rollback() return nil, err @@ -1362,7 +1364,7 @@ func processJobs( continue } - err = updateDDLJob2Table(ns, job, false) + err = updateDDLJob2Table(ctx, ns, job, false) if err != nil { jobErrs[i] = err continue @@ -1376,7 +1378,7 @@ func processJobs( }) // There may be some conflict during the update, try it again - if err = ns.Commit(context.Background()); err != nil { + if err = ns.Commit(ctx); err != nil { continue } @@ -1391,43 +1393,50 @@ func processJobs( } // CancelJobs cancels the DDL jobs according to user command. -func CancelJobs(se sessionctx.Context, ids []int64) (errs []error, err error) { - return processJobs(cancelRunningJob, se, ids, model.AdminCommandByEndUser) +func CancelJobs(ctx context.Context, se sessionctx.Context, ids []int64) (errs []error, err error) { + return processJobs(ctx, cancelRunningJob, se, ids, model.AdminCommandByEndUser) } // PauseJobs pause all the DDL jobs according to user command. -func PauseJobs(se sessionctx.Context, ids []int64) ([]error, error) { - return processJobs(pauseRunningJob, se, ids, model.AdminCommandByEndUser) +func PauseJobs(ctx context.Context, se sessionctx.Context, ids []int64) ([]error, error) { + return processJobs(ctx, pauseRunningJob, se, ids, model.AdminCommandByEndUser) } // ResumeJobs resume all the DDL jobs according to user command. -func ResumeJobs(se sessionctx.Context, ids []int64) ([]error, error) { - return processJobs(resumePausedJob, se, ids, model.AdminCommandByEndUser) +func ResumeJobs(ctx context.Context, se sessionctx.Context, ids []int64) ([]error, error) { + return processJobs(ctx, resumePausedJob, se, ids, model.AdminCommandByEndUser) } // CancelJobsBySystem cancels Jobs because of internal reasons. func CancelJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) { - return processJobs(cancelRunningJob, se, ids, model.AdminCommandBySystem) + ctx := context.Background() + return processJobs(ctx, cancelRunningJob, se, ids, model.AdminCommandBySystem) } // PauseJobsBySystem pauses Jobs because of internal reasons. func PauseJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) { - return processJobs(pauseRunningJob, se, ids, model.AdminCommandBySystem) + ctx := context.Background() + return processJobs(ctx, pauseRunningJob, se, ids, model.AdminCommandBySystem) } // ResumeJobsBySystem resumes Jobs that are paused by TiDB itself. func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) { - return processJobs(resumePausedJob, se, ids, model.AdminCommandBySystem) + ctx := context.Background() + return processJobs(ctx, resumePausedJob, se, ids, model.AdminCommandBySystem) } // pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage. -func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), - se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) { +func processAllJobs( + ctx context.Context, + process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), + se sessionctx.Context, + byWho model.AdminCommandOperator, +) (map[int64]error, error) { var err error var jobErrs = make(map[int64]error) ns := sess.NewSession(se) - err = ns.Begin(context.Background()) + err = ns.Begin(ctx) if err != nil { return nil, err } @@ -1437,7 +1446,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp var limit = 100 for { var jobs []*model.Job - jobs, err = getJobsBySQL(ns, JobTable, + jobs, err = getJobsBySQL(ctx, ns, JobTable, fmt.Sprintf("job_id >= %s order by job_id asc limit %s", strconv.FormatInt(jobID, 10), strconv.FormatInt(int64(limit), 10))) @@ -1453,7 +1462,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp continue } - err = updateDDLJob2Table(ns, job, false) + err = updateDDLJob2Table(ctx, ns, job, false) if err != nil { jobErrs[job.ID] = err continue @@ -1473,7 +1482,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp jobID = jobIDMax + 1 } - err = ns.Commit(context.Background()) + err = ns.Commit(ctx) if err != nil { return nil, err } @@ -1482,23 +1491,23 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp // PauseAllJobsBySystem pauses all running Jobs because of internal reasons. func PauseAllJobsBySystem(se sessionctx.Context) (map[int64]error, error) { - return processAllJobs(pauseRunningJob, se, model.AdminCommandBySystem) + return processAllJobs(context.Background(), pauseRunningJob, se, model.AdminCommandBySystem) } // ResumeAllJobsBySystem resumes all paused Jobs because of internal reasons. func ResumeAllJobsBySystem(se sessionctx.Context) (map[int64]error, error) { - return processAllJobs(resumePausedJob, se, model.AdminCommandBySystem) + return processAllJobs(context.Background(), resumePausedJob, se, model.AdminCommandBySystem) } // GetAllDDLJobs get all DDL jobs and sorts jobs by job.ID. -func GetAllDDLJobs(se sessionctx.Context) ([]*model.Job, error) { - return getJobsBySQL(sess.NewSession(se), JobTable, "1 order by job_id") +func GetAllDDLJobs(ctx context.Context, se sessionctx.Context) ([]*model.Job, error) { + return getJobsBySQL(ctx, sess.NewSession(se), JobTable, "1 order by job_id") } // IterAllDDLJobs will iterates running DDL jobs first, return directly if `finishFn` return true or error, // then iterates history DDL jobs until the `finishFn` return true or error. func IterAllDDLJobs(ctx sessionctx.Context, txn kv.Transaction, finishFn func([]*model.Job) (bool, error)) error { - jobs, err := GetAllDDLJobs(ctx) + jobs, err := GetAllDDLJobs(context.Background(), ctx) if err != nil { return err } diff --git a/pkg/ddl/executor_test.go b/pkg/ddl/executor_test.go index 6899262783363..356bcc3bc59de 100644 --- a/pkg/ddl/executor_test.go +++ b/pkg/ddl/executor_test.go @@ -49,6 +49,7 @@ func TestGetDDLJobs(t *testing.T) { cnt := 10 jobs := make([]*model.Job, cnt) + ctx := context.Background() var currJobs2 []*model.Job for i := 0; i < cnt; i++ { jobs[i] = &model.Job{ @@ -59,7 +60,7 @@ func TestGetDDLJobs(t *testing.T) { err := addDDLJobs(sess, txn, jobs[i]) require.NoError(t, err) - currJobs, err := ddl.GetAllDDLJobs(sess) + currJobs, err := ddl.GetAllDDLJobs(ctx, sess) require.NoError(t, err) require.Len(t, currJobs, i+1) @@ -77,7 +78,7 @@ func TestGetDDLJobs(t *testing.T) { require.Len(t, currJobs2, i+1) } - currJobs, err := ddl.GetAllDDLJobs(sess) + currJobs, err := ddl.GetAllDDLJobs(ctx, sess) require.NoError(t, err) for i, job := range jobs { @@ -93,6 +94,7 @@ func TestGetDDLJobs(t *testing.T) { func TestGetDDLJobsIsSort(t *testing.T) { store := testkit.CreateMockStore(t) + ctx := context.Background() sess := testkit.NewTestKit(t, store).Session() _, err := sess.Execute(context.Background(), "begin") @@ -110,7 +112,7 @@ func TestGetDDLJobsIsSort(t *testing.T) { // insert add index jobs to AddIndexJobListKey queue enQueueDDLJobs(t, sess, txn, model.ActionAddIndex, 5, 10) - currJobs, err := ddl.GetAllDDLJobs(sess) + currJobs, err := ddl.GetAllDDLJobs(ctx, sess) require.NoError(t, err) require.Len(t, currJobs, 15) diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index a2383003072aa..e815a3c6d1b9a 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -45,7 +45,8 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, for i := 0; i < 10; i++ { srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize) } - opCtx := ddl.NewLocalOperatorCtx(context.Background(), 1) + opCtx, cancel := ddl.NewLocalOperatorCtx(context.Background(), 1) + defer cancel() src := testutil.NewOperatorTestSource(ddl.TableScanTask{ID: 1, Start: startKey, End: endKey}) scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil, 0) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 2aa8ae82bf9bd..7fa56b2febbff 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -159,7 +159,7 @@ func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Tab } // Flush implements FlushController. -func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err error) { +func (bc *litBackendCtx) Flush(ctx context.Context, mode FlushMode) (flushed, imported bool, err error) { shouldFlush, shouldImport := bc.checkFlush(mode) if !shouldFlush { return false, false, nil @@ -202,7 +202,7 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err erro }) for indexID, ei := range bc.engines { - if err = bc.unsafeImportAndReset(ei); err != nil { + if err = bc.unsafeImportAndReset(ctx, ei); err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { idxInfo := model.FindIndexInfoByID(bc.tbl.Meta().Indices, indexID) if idxInfo == nil { @@ -239,7 +239,7 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err erro const distributedLockLease = 10 // Seconds -func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error { +func (bc *litBackendCtx) unsafeImportAndReset(ctx context.Context, ei *engineInfo) error { logger := log.FromContext(bc.ctx).With( zap.Stringer("engineUUID", ei.uuid), ) @@ -251,8 +251,8 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error { regionSplitSize := int64(lightning.SplitRegionSize) * int64(lightning.MaxSplitRegionSizeRatio) regionSplitKeys := int64(lightning.SplitRegionKeys) - if err := closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys); err != nil { - logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID), + if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil { + logger.Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID), zap.String("usage info", bc.diskRoot.UsageInfo())) return err } @@ -267,12 +267,12 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error { resetFn = bc.backend.ResetEngine } - err := resetFn(bc.ctx, ei.uuid) + err := resetFn(ctx, ei.uuid) failpoint.Inject("mockResetEngineFailed", func() { err = fmt.Errorf("mock reset engine failed") }) if err != nil { - logutil.Logger(bc.ctx).Error(LitErrResetEngineFail, zap.Int64("index ID", ei.indexID)) + logger.Error(LitErrResetEngineFail, zap.Int64("index ID", ei.indexID)) err1 := closedEngine.Cleanup(bc.ctx) if err1 != nil { logutil.Logger(ei.ctx).Error(LitErrCleanEngineErr, zap.Error(err1), diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 0575dd27af966..9faf8bb1d8e7b 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -90,7 +90,7 @@ type taskCheckpoint struct { type FlushController interface { // Flush checks if al engines need to be flushed and imported based on given // FlushMode. It's concurrent safe. - Flush(mode FlushMode) (flushed, imported bool, err error) + Flush(ctx context.Context, mode FlushMode) (flushed, imported bool, err error) } // NewCheckpointManager creates a new checkpoint manager. diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index 9c282baecf811..d495808937cbc 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -404,17 +404,20 @@ func TestAddIndexIngestPartitionCheckpoint(t *testing.T) { jobID = job.ID }) rowCnt := atomic.Int32{} - failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/ingest/onMockWriterWriteRow", func() { - rowCnt.Add(1) - if rowCnt.Load() == 10 { - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - updateSQL := fmt.Sprintf("update mysql.tidb_ddl_job set processing = 0 where job_id = %d", jobID) - tk2.MustExec(updateSQL) - updateSQL = fmt.Sprintf("update mysql.tidb_ddl_job set processing = 1 where job_id = %d", jobID) - tk2.MustExec(updateSQL) - } - }) + testfailpoint.EnableCall( + t, + "github.com/pingcap/tidb/pkg/ddl/ingest/onMockWriterWriteRow", + func() { + rowCnt.Add(1) + if rowCnt.Load() == 10 { + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + updateSQL := fmt.Sprintf("update mysql.tidb_ddl_job set processing = 0 where job_id = %d", jobID) + tk2.MustExec(updateSQL) + updateSQL = fmt.Sprintf("update mysql.tidb_ddl_job set processing = 1 where job_id = %d", jobID) + tk2.MustExec(updateSQL) + } + }) tk.MustExec("alter table t add index idx(b);") // It should resume to correct partition. diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index a176f5d5ce32c..59dcb167fdfec 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -134,7 +134,7 @@ func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) } // Flush implements BackendCtx.Flush interface. -func (*MockBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err error) { +func (*MockBackendCtx) Flush(context.Context, FlushMode) (flushed, imported bool, err error) { return false, false, nil } diff --git a/pkg/ddl/job_scheduler.go b/pkg/ddl/job_scheduler.go index 3ea533cfb4d9f..83b695356e350 100644 --- a/pkg/ddl/job_scheduler.go +++ b/pkg/ddl/job_scheduler.go @@ -750,13 +750,18 @@ func job2TableIDs(jobW *JobWrapper) string { } } -func updateDDLJob2Table(se *sess.Session, job *model.Job, updateRawArgs bool) error { +func updateDDLJob2Table( + ctx context.Context, + se *sess.Session, + job *model.Job, + updateRawArgs bool, +) error { b, err := job.Encode(updateRawArgs) if err != nil { return err } sql := fmt.Sprintf(updateDDLJobSQL, util.WrapKey2String(b), job.ID) - _, err = se.Execute(context.Background(), sql, "update_job") + _, err = se.Execute(ctx, sql, "update_job") return errors.Trace(err) } @@ -874,8 +879,12 @@ func cleanDDLReorgHandles(se *sess.Session, job *model.Job) error { }) } -func getJobsBySQL(se *sess.Session, tbl, condition string) ([]*model.Job, error) { - rows, err := se.Execute(context.Background(), fmt.Sprintf("select job_meta from mysql.%s where %s", tbl, condition), "get_job") +func getJobsBySQL( + ctx context.Context, + se *sess.Session, + tbl, condition string, +) ([]*model.Job, error) { + rows, err := se.Execute(ctx, fmt.Sprintf("select job_meta from mysql.%s where %s", tbl, condition), "get_job") if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/ddl/job_scheduler_testkit_test.go b/pkg/ddl/job_scheduler_testkit_test.go index d1057a5fb0d85..c7fe1d9f9669d 100644 --- a/pkg/ddl/job_scheduler_testkit_test.go +++ b/pkg/ddl/job_scheduler_testkit_test.go @@ -37,6 +37,7 @@ import ( // then all the records of job A must before or after job B, no cross record between these 2 jobs. func TestDDLScheduling(t *testing.T) { store, _ := testkit.CreateMockStoreAndDomain(t) + ctx := context.Background() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -74,7 +75,7 @@ func TestDDLScheduling(t *testing.T) { }) for { time.Sleep(time.Millisecond * 100) - jobs, err := ddl.GetAllDDLJobs(testkit.NewTestKit(t, store).Session()) + jobs, err := ddl.GetAllDDLJobs(ctx, testkit.NewTestKit(t, store).Session()) require.NoError(t, err) if len(jobs) == i+1 { break diff --git a/pkg/ddl/job_submitter_test.go b/pkg/ddl/job_submitter_test.go index 36a1cc5e316db..42b81545b9fb9 100644 --- a/pkg/ddl/job_submitter_test.go +++ b/pkg/ddl/job_submitter_test.go @@ -93,7 +93,7 @@ func TestGenIDAndInsertJobsWithRetry(t *testing.T) { wg.Wait() jobCount := threads * iterations - gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) + gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session()) require.NoError(t, err) require.Len(t, gotJobs, jobCount) currGID := getGlobalID(ctx, t, store) @@ -352,7 +352,7 @@ func TestCombinedIDAllocation(t *testing.T) { require.NoError(t, submitter.GenGIDAndInsertJobsWithRetry(ctx, sess.NewSession(tk.Session()), []*ddl.JobWrapper{c.jobW})) require.Equal(t, currentGlobalID+int64(c.requiredIDCount), getGlobalID(ctx, t, store), fmt.Sprintf("case-%d", i)) } - gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) + gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session()) require.NoError(t, err) require.Len(t, gotJobs, len(cases)) }) @@ -370,7 +370,7 @@ func TestCombinedIDAllocation(t *testing.T) { require.NoError(t, submitter.GenGIDAndInsertJobsWithRetry(ctx, sess.NewSession(tk.Session()), jobWs)) require.Equal(t, currentGlobalID+int64(totalRequiredCnt), getGlobalID(ctx, t, store)) - gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) + gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session()) require.NoError(t, err) require.Len(t, gotJobs, len(cases)) }) @@ -406,7 +406,7 @@ func TestCombinedIDAllocation(t *testing.T) { checkPartitionInfo(pInfo) } } - gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) + gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session()) require.NoError(t, err) require.Len(t, gotJobs, allocIDCaseCount) for _, j := range gotJobs { diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index d99dd8b9cde85..fb1ab0e0269a1 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -267,7 +267,7 @@ func (w *worker) updateDDLJob(job *model.Job, updateRawArgs bool) error { w.jobLogger(job).Info("meet something wrong before update DDL job, shouldn't update raw args", zap.String("job", job.String())) } - return errors.Trace(updateDDLJob2Table(w.sess, job, updateRawArgs)) + return errors.Trace(updateDDLJob2Table(w.ctx, w.sess, job, updateRawArgs)) } // registerMDLInfo registers metadata lock info. diff --git a/pkg/ddl/job_worker_test.go b/pkg/ddl/job_worker_test.go index e4bf2eb8d4e8b..a865b859fd638 100644 --- a/pkg/ddl/job_worker_test.go +++ b/pkg/ddl/job_worker_test.go @@ -15,6 +15,7 @@ package ddl_test import ( + "context" "strconv" "sync" "sync/atomic" @@ -76,6 +77,7 @@ func TestAddBatchJobError(t *testing.T) { func TestParallelDDL(t *testing.T) { store := testkit.CreateMockStoreWithSchemaLease(t, testLease) + ctx := context.Background() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -107,7 +109,7 @@ func TestParallelDDL(t *testing.T) { for { tk1 := testkit.NewTestKit(t, store) tk1.MustExec("begin") - jobs, err := ddl.GetAllDDLJobs(tk1.Session()) + jobs, err := ddl.GetAllDDLJobs(ctx, tk1.Session()) require.NoError(t, err) tk1.MustExec("rollback") var qLen1, qLen2 int diff --git a/pkg/ddl/multi_schema_change_test.go b/pkg/ddl/multi_schema_change_test.go index ca4d2fe3c4bd5..aa61da5243c1b 100644 --- a/pkg/ddl/multi_schema_change_test.go +++ b/pkg/ddl/multi_schema_change_test.go @@ -15,6 +15,7 @@ package ddl_test import ( + "context" "strconv" "testing" @@ -809,7 +810,7 @@ func (c *cancelOnceHook) OnJobUpdated(job *model.Job) { return } c.triggered = true - errs, err := ddl.CancelJobs(c.s, []int64{job.ID}) + errs, err := ddl.CancelJobs(context.Background(), c.s, []int64{job.ID}) if len(errs) > 0 && errs[0] != nil { c.cancelErr = errs[0] return diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index d2011a22eb1f5..ef8b8c91f908f 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -2740,7 +2740,7 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job args.PartitionID = partDef.ID job.FillArgs(args) defID = partDef.ID - err = updateDDLJob2Table(w.sess, job, true) + err = updateDDLJob2Table(jobCtx.ctx, w.sess, job, true) if err != nil { return ver, errors.Trace(err) } @@ -2784,7 +2784,7 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job // might be used later, ignore the lint warning. //nolint: ineffassign defID = partDef.ID - err = updateDDLJob2Table(w.sess, job, true) + err = updateDDLJob2Table(jobCtx.ctx, w.sess, job, true) if err != nil { return ver, errors.Trace(err) } diff --git a/pkg/ddl/table_modify_test.go b/pkg/ddl/table_modify_test.go index 7547d0551bd9b..8787085e4e17d 100644 --- a/pkg/ddl/table_modify_test.go +++ b/pkg/ddl/table_modify_test.go @@ -128,6 +128,7 @@ func TestConcurrentLockTables(t *testing.T) { func testParallelExecSQL(t *testing.T, store kv.Storage, sql1, sql2 string, se1, se2 sessiontypes.Session, f func(t *testing.T, err1, err2 error)) { times := 0 + ctx := context.Background() testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunBefore", func(job *model.Job) { if times != 0 { return @@ -135,9 +136,9 @@ func testParallelExecSQL(t *testing.T, store kv.Storage, sql1, sql2 string, se1, var qLen int for { sess := testkit.NewTestKit(t, store).Session() - err := sessiontxn.NewTxn(context.Background(), sess) + err := sessiontxn.NewTxn(ctx, sess) require.NoError(t, err) - jobs, err := ddl.GetAllDDLJobs(sess) + jobs, err := ddl.GetAllDDLJobs(ctx, sess) require.NoError(t, err) qLen = len(jobs) if qLen == 2 { @@ -158,9 +159,9 @@ func testParallelExecSQL(t *testing.T, store kv.Storage, sql1, sql2 string, se1, var qLen int for { sess := testkit.NewTestKit(t, store).Session() - err := sessiontxn.NewTxn(context.Background(), sess) + err := sessiontxn.NewTxn(ctx, sess) require.NoError(t, err) - jobs, err := ddl.GetAllDDLJobs(sess) + jobs, err := ddl.GetAllDDLJobs(ctx, sess) require.NoError(t, err) qLen = len(jobs) if qLen == 1 { diff --git a/pkg/ddl/tests/adminpause/pause_negative_test.go b/pkg/ddl/tests/adminpause/pause_negative_test.go index 2eb0d3a190a6f..06274690c3e34 100644 --- a/pkg/ddl/tests/adminpause/pause_negative_test.go +++ b/pkg/ddl/tests/adminpause/pause_negative_test.go @@ -96,6 +96,7 @@ func TestPauseOnWriteConflict(t *testing.T) { func TestPauseFailedOnCommit(t *testing.T) { store := testkit.CreateMockStoreWithSchemaLease(t, dbTestLease) + ctx := context.Background() tk1 := testkit.NewTestKit(t, store) tk2 := testkit.NewTestKit(t, store) @@ -119,7 +120,7 @@ func TestPauseFailedOnCommit(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockCommitFailedOnDDLCommand")) }() jobID.Store(job.ID) - jobErrs, pauseErr = ddl.PauseJobs(tk2.Session(), []int64{jobID.Load()}) + jobErrs, pauseErr = ddl.PauseJobs(ctx, tk2.Session(), []int64{jobID.Load()}) } adminMutex.Unlock() }) diff --git a/pkg/ddl/tests/fastcreatetable/fastcreatetable_test.go b/pkg/ddl/tests/fastcreatetable/fastcreatetable_test.go index ce2277a17ebe5..62515f380fedf 100644 --- a/pkg/ddl/tests/fastcreatetable/fastcreatetable_test.go +++ b/pkg/ddl/tests/fastcreatetable/fastcreatetable_test.go @@ -15,6 +15,7 @@ package fastcreatetable import ( + "context" "testing" "time" @@ -96,6 +97,7 @@ func TestDDL(t *testing.T) { func TestMergedJob(t *testing.T) { store := testkit.CreateMockStore(t) + ctx := context.Background() var wg util.WaitGroupWrapper tk := testkit.NewTestKit(t, store) @@ -114,7 +116,7 @@ func TestMergedJob(t *testing.T) { tk1.MustExec("create table t(a int)") }) require.Eventually(t, func() bool { - gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) + gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session()) require.NoError(t, err) return len(gotJobs) == 1 }, 10*time.Second, 100*time.Millisecond) @@ -136,7 +138,7 @@ func TestMergedJob(t *testing.T) { tk1.MustExecToErr("create table t1(a int)") }) require.Eventually(t, func() bool { - gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) + gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session()) require.NoError(t, err) return len(gotJobs) == 2 && gotJobs[1].Type == model.ActionCreateTables }, 10*time.Second, 100*time.Millisecond) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 7baca17bda632..888cfa48ee4b8 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -225,19 +225,19 @@ type CommandDDLJobsExec struct { jobIDs []int64 errs []error - execute func(se sessionctx.Context, ids []int64) (errs []error, err error) + execute func(ctx context.Context, se sessionctx.Context, ids []int64) (errs []error, err error) } // Open implements the Executor for all Cancel/Pause/Resume command on DDL jobs // just with different processes. And, it should not be called directly by the // Executor. -func (e *CommandDDLJobsExec) Open(context.Context) error { +func (e *CommandDDLJobsExec) Open(ctx context.Context) error { // We want to use a global transaction to execute the admin command, so we don't use e.Ctx() here. newSess, err := e.GetSysSession() if err != nil { return err } - e.errs, err = e.execute(newSess, e.jobIDs) + e.errs, err = e.execute(ctx, newSess, e.jobIDs) e.ReleaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), newSess) return err } @@ -448,7 +448,7 @@ func (e *DDLJobRetriever) initial(txn kv.Transaction, sess sessionctx.Context) e // For instance, in the case of the SQL like `create table t(id int)`, // the tableInfo for 't' will not be available in the infoschema until the job is completed. // As a result, we cannot retrieve its table_id. - e.runningJobs, err = ddl.GetAllDDLJobs(sess) + e.runningJobs, err = ddl.GetAllDDLJobs(context.Background(), sess) if err != nil { return err } @@ -640,7 +640,7 @@ func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error { session.GetSessionVars().SetInTxn(true) m := meta.NewMeta(txn) - jobs, err = ddl.GetAllDDLJobs(session) + jobs, err = ddl.GetAllDDLJobs(ctx, session) if err != nil { return err } @@ -728,7 +728,7 @@ func (e *ShowDDLJobQueriesWithRangeExec) Open(ctx context.Context) error { session.GetSessionVars().SetInTxn(true) m := meta.NewMeta(txn) - jobs, err = ddl.GetAllDDLJobs(session) + jobs, err = ddl.GetAllDDLJobs(ctx, session) if err != nil { return err } diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 50f2f3b915d04..87f753de2ecd8 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -1098,6 +1098,7 @@ func (local *Backend) startWorker( if job.region != nil && job.region.Region != nil { peers = job.region.Region.GetPeers() } + failpoint.InjectCall("beforeExecuteRegionJob", ctx) metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Inc() err := local.executeJob(ctx, job) metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Dec() diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index 8598e49c16945..b76506d020f31 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -15,11 +15,13 @@ package addindextest_test import ( + "context" "fmt" "strings" "sync" "sync/atomic" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" @@ -106,6 +108,38 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) { require.True(t, strings.Contains(rows[1][3].(string) /* job_type */, "ingest")) require.Equal(t, rows[0][7].(string) /* row_count */, "3") require.Equal(t, rows[1][7].(string) /* row_count */, "3") + + // TODO(lance6716): enable below test + t.Skip("DDL will be canceled timeout soon") + + tk.MustExec("set @@global.tidb_enable_dist_task = 0;") + // TODO(lance6716): dist_task also need this + + // test cancel is timely + enter := make(chan struct{}) + testfailpoint.EnableCall( + t, + "github.com/pingcap/tidb/pkg/lightning/backend/local/beforeExecuteRegionJob", + func(ctx context.Context) { + close(enter) + select { + case <-time.After(time.Second * 50): + case <-ctx.Done(): + } + }) + wg.Add(1) + go func() { + defer wg.Done() + err := tk2.ExecToErr("alter table t add index idx_ba(b, a);") + require.ErrorContains(t, err, "Cancelled DDL job") + }() + <-enter + jobID := tk.MustQuery("admin show ddl jobs 1;").Rows()[0][0].(string) + now := time.Now() + tk.MustExec("admin cancel ddl jobs " + jobID) + wg.Wait() + // cancel should be timely + require.Less(t, time.Since(now).Seconds(), 30.0) } func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) { diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 692b181d424d7..8c3b22322b2fe 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -59,7 +59,7 @@ func TestBackfillOperators(t *testing.T) { var opTasks []ddl.TableScanTask { ctx := context.Background() - opCtx := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) pTbl := tbl.(table.PhysicalTable) src := ddl.NewTableScanTaskSource(opCtx, store, pTbl, startKey, endKey, nil) sink := testutil.NewOperatorTestSink[ddl.TableScanTask]() @@ -78,7 +78,7 @@ func TestBackfillOperators(t *testing.T) { require.Equal(t, startKey, tasks[0].Start) require.Equal(t, endKey, tasks[9].End) - opCtx.Cancel() + cancel() require.NoError(t, opCtx.OperatorErr()) opTasks = tasks @@ -94,7 +94,7 @@ func TestBackfillOperators(t *testing.T) { } ctx := context.Background() - opCtx := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) src := testutil.NewOperatorTestSource(opTasks...) scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, nil, 0) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() @@ -120,14 +120,14 @@ func TestBackfillOperators(t *testing.T) { } require.Equal(t, 10, cnt) - opCtx.Cancel() + cancel() require.NoError(t, opCtx.OperatorErr()) } // Test IndexIngestOperator. { ctx := context.Background() - opCtx := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) var keys, values [][]byte onWrite := func(key, val []byte) { keys = append(keys, key) @@ -166,7 +166,7 @@ func TestBackfillOperators(t *testing.T) { require.Len(t, values, 10) require.Equal(t, 10, cnt) - opCtx.Cancel() + cancel() require.NoError(t, opCtx.OperatorErr()) } } @@ -179,7 +179,8 @@ func TestBackfillOperatorPipeline(t *testing.T) { sessPool := newSessPoolForTest(t, store) ctx := context.Background() - opCtx := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + defer cancel() mockBackendCtx := &ingest.MockBackendCtx{} mockEngine := ingest.NewMockEngineInfo(nil) mockEngine.SetHook(func(key, val []byte) {}) @@ -206,7 +207,6 @@ func TestBackfillOperatorPipeline(t *testing.T) { err = pipeline.Close() require.NoError(t, err) - opCtx.Cancel() require.NoError(t, opCtx.OperatorErr()) } @@ -278,7 +278,8 @@ func TestBackfillOperatorPipelineException(t *testing.T) { } else { require.NoError(t, failpoint.Enable(tc.failPointPath, `return`)) } - opCtx := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + defer cancel() pipeline, err := ddl.NewAddIndexIngestPipeline( opCtx, store, sessPool, @@ -301,14 +302,12 @@ func TestBackfillOperatorPipelineException(t *testing.T) { err = pipeline.Close() comment := fmt.Sprintf("case: %s", tc.failPointPath) require.ErrorContains(t, err, tc.closeErrMsg, comment) - opCtx.Cancel() if tc.operatorErrMsg == "" { require.NoError(t, opCtx.OperatorErr()) } else { require.Error(t, opCtx.OperatorErr()) require.Equal(t, tc.operatorErrMsg, opCtx.OperatorErr().Error()) } - cancel() }) } }