From 9438064f03c4c8680dcc3677b0a9aa32a09d1b1b Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 6 Dec 2021 14:53:57 +0800 Subject: [PATCH] cherry pick #30401 to release-5.1 Signed-off-by: ti-srebot --- ddl/ddl.go | 4 ++++ ddl/ddl_worker.go | 11 ++++++++++- ddl/ddl_worker_test.go | 26 ++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 9eb05b86741ed..42874cd178884 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -538,6 +538,10 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { d.limitJobCh <- task // worker should restart to continue handling tasks in limitJobCh, and send back through task.err err := <-task.err + if err != nil { + // The transaction of enqueuing job is failed. + return errors.Trace(err) + } ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index f692b2673ccdd..2186b8046ed2c 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -285,6 +285,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { return errors.Trace(err) } } + failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) + } + }) return nil }) var jobs string @@ -294,7 +299,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, task.job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) } - logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + if err != nil { + logutil.BgLogger().Warn("[ddl] add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err)) + } else { + logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + } } // getHistoryDDLJob gets a DDL job with job's ID from history queue. diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 72fef7c96c19e..a56b18b3120c4 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -476,6 +476,32 @@ func (s *testDDLSuite) TestColumnError(c *C) { doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionDropColumns, []interface{}{[]model.CIStr{model.NewCIStr("c5"), model.NewCIStr("c6")}, make([]bool, 2)}, ctx, d) } +func (s *testDDLSerialSuite) TestAddBatchJobError(c *C) { + store := testCreateStore(c, "test_add_batch_job_error") + defer func() { + err := store.Close() + c.Assert(err, IsNil) + }() + d, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + c.Assert(err, IsNil) + defer func() { + err := d.Stop() + c.Assert(err, IsNil) + }() + ctx := testNewContext(d) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr", `return(true)`), IsNil) + // Test the job runner should not hang forever. + job := &model.Job{SchemaID: 1, TableID: 1} + err = d.doDDLJob(ctx, job) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "mockAddBatchDDLJobsErr") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr"), IsNil) +} + func testCheckOwner(c *C, d *ddl, expectedVal bool) { c.Assert(d.isOwner(), Equals, expectedVal) }