From 68aaa2fa7ec1cc4def8daaa7726e3ff8f8a5b395 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 13 Jan 2021 12:19:58 +0800 Subject: [PATCH] *: fix auto-id allocate tracing (#22371) --- ddl/column.go | 3 ++- ddl/column_change_test.go | 8 +++---- ddl/db_change_test.go | 6 ++--- ddl/db_integration_test.go | 6 ++--- ddl/db_partition_test.go | 4 ++-- ddl/db_test.go | 6 ++--- ddl/ddl.go | 2 +- ddl/ddl_worker.go | 6 ++--- ddl/ddl_worker_test.go | 22 +++++++++--------- ddl/delete_range.go | 2 +- ddl/index.go | 4 ++-- ddl/index_change_test.go | 2 +- ddl/reorg.go | 2 +- ddl/reorg_test.go | 6 ++--- ddl/schema_test.go | 4 ++-- ddl/serial_test.go | 2 +- ddl/stat.go | 4 +++- ddl/table_test.go | 6 ++--- executor/admin.go | 4 ++-- executor/ddl_test.go | 2 +- executor/executor_test.go | 2 +- infoschema/infoschema_test.go | 7 +++--- kv/txn.go | 6 ++--- kv/txn_test.go | 9 +++---- meta/autoid/autoid.go | 40 ++++++++++++++++++-------------- meta/autoid/autoid_test.go | 18 +++++++------- session/session.go | 4 ++-- store/store_test.go | 6 ++--- store/tikv/gcworker/gc_worker.go | 2 +- structure/structure_test.go | 2 +- table/table.go | 4 ---- table/tables/tables.go | 2 +- 32 files changed, 105 insertions(+), 98 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 2dc842d0578bd..65be2d26b18b3 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -15,6 +15,7 @@ package ddl import ( "bytes" + "context" "fmt" "math/bits" "strings" @@ -1312,7 +1313,7 @@ func (w *updateColumnWorker) cleanRowMap() { // BackfillDataInTxn will backfill the table record in a transaction, lock corresponding rowKey, if the value of rowKey is changed. func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { oprStartTime := time.Now() - errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error { + errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index e8d5f3b0e74f5..f3265eb76ddb4 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -49,7 +49,7 @@ func (s *testColumnChangeSuite) SetUpSuite(c *C) { Name: model.NewCIStr("test_column_change"), ID: 1, } - err := kv.RunInNewTxn(s.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) return errors.Trace(t.CreateDatabase(s.dbInfo)) }) @@ -187,7 +187,7 @@ func (s *testColumnChangeSuite) TestModifyAutoRandColumnWithMetaKeyChanged(c *C) tc.onJobRunBefore = func(job *model.Job) { if atomic.LoadInt32(&errCount) > 0 && job.Type == model.ActionModifyColumn { atomic.AddInt32(&errCount, -1) - genAutoRandErr = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + genAutoRandErr = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) _, err1 := t.GenAutoRandomID(s.dbInfo.ID, tableID, 1) return err1 @@ -210,7 +210,7 @@ func (s *testColumnChangeSuite) TestModifyAutoRandColumnWithMetaKeyChanged(c *C) c.Assert(genAutoRandErr, IsNil) testCheckJobDone(c, d, job, true) var newTbInfo *model.TableInfo - err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err error newTbInfo, err = t.GetTable(s.dbInfo.ID, tableID) @@ -413,7 +413,7 @@ func (s *testColumnChangeSuite) checkAddPublic(sctx sessionctx.Context, d *ddl, func getCurrentTable(d *ddl, schemaID, tableID int64) (table.Table, error) { var tblInfo *model.TableInfo - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err error tblInfo, err = t.GetTable(schemaID, tableID) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index d87fc6406d26f..8e0b07e2afe53 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1192,7 +1192,7 @@ func (s *testStateChangeSuiteBase) prepareTestControlParallelExecSQL(c *C) (sess } var qLen int for { - kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { jobs, err1 := admin.GetDDLJobs(txn) if err1 != nil { return err1 @@ -1224,7 +1224,7 @@ func (s *testStateChangeSuiteBase) prepareTestControlParallelExecSQL(c *C) (sess go func() { var qLen int for { - kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { jobs, err3 := admin.GetDDLJobs(txn) if err3 != nil { return err3 @@ -1638,7 +1638,7 @@ func (s *serialTestStateChangeSuite) TestModifyColumnTypeArgs(c *C) { ID, err := strconv.Atoi(jobID) c.Assert(err, IsNil) var historyJob *model.Job - err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) historyJob, err = t.GetHistoryDDLJob(int64(ID)) if err != nil { diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 3b6c31ccd2eb0..f4422f93efde9 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -690,7 +690,7 @@ func (s *testIntegrationSuite2) TestUpdateMultipleTable(c *C) { } t1Info.Columns = append(t1Info.Columns, newColumn) - kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) _, err = m.GenSchemaVersion() c.Assert(err, IsNil) @@ -706,7 +706,7 @@ func (s *testIntegrationSuite2) TestUpdateMultipleTable(c *C) { newColumn.State = model.StatePublic - kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) _, err = m.GenSchemaVersion() c.Assert(err, IsNil) @@ -1323,7 +1323,7 @@ func checkGetMaxTableRowID(ctx *testMaxTableRowIDContext, store kv.Storage, expe func getHistoryDDLJob(store kv.Storage, id int64) (*model.Job, error) { var job *model.Job - err := kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error job, err1 = t.GetHistoryDDLJob(id) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 30e8c6822f510..2471c4c4593e1 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2106,7 +2106,7 @@ func (s *testIntegrationSuite4) TestAddPartitionTooManyPartitions(c *C) { func checkPartitionDelRangeDone(c *C, s *testIntegrationSuite, partitionPrefix kv.Key) bool { hasOldPartitionData := true for i := 0; i < waitForCleanDataRound; i++ { - err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { it, err := txn.Iter(partitionPrefix, nil) if err != nil { return err @@ -2914,7 +2914,7 @@ func (s *testIntegrationSuite5) TestDropSchemaWithPartitionTable(c *C) { row := rows[0] c.Assert(row.GetString(3), Equals, "drop schema") jobID := row.GetInt64(0) - kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) historyJob, err := t.GetHistoryDDLJob(jobID) c.Assert(err, IsNil) diff --git a/ddl/db_test.go b/ddl/db_test.go index a6f45efe30793..34f8999662a36 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -3136,7 +3136,7 @@ func (s *testSerialDBSuite) TestTruncateTable(c *C) { tablePrefix := tablecodec.EncodeTablePrefix(oldTblID) hasOldTableData := true for i := 0; i < waitForCleanDataRound; i++ { - err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { it, err1 := txn.Iter(tablePrefix, nil) if err1 != nil { return err1 @@ -5721,7 +5721,7 @@ func (s *testDBSuite4) testParallelExecSQL(c *C, sql1, sql2 string, se1, se2 ses } var qLen int for { - err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { jobs, err1 := admin.GetDDLJobs(txn) if err1 != nil { return err1 @@ -5751,7 +5751,7 @@ func (s *testDBSuite4) testParallelExecSQL(c *C, sql1, sql2 string, se1, se2 ses go func() { var qLen int for { - err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { jobs, err3 := admin.GetDDLJobs(txn) if err3 != nil { return err3 diff --git a/ddl/ddl.go b/ddl/ddl.go index 0a9266802a898..56e814a3f7be1 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -412,7 +412,7 @@ func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.In func (d *ddl) genGlobalIDs(count int) ([]int64, error) { var ret []int64 - err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, true, func(ctx context.Context, txn kv.Transaction) error { failpoint.Inject("mockGenGlobalIDFail", func(val failpoint.Value) { if val.(bool) { failpoint.Return(errors.New("gofail genGlobalIDs error")) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index eb765851ee7f4..2562d12bb3b60 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -224,7 +224,7 @@ func (d *ddl) limitDDLJobs() { // addBatchDDLJobs gets global job IDs and puts the DDL jobs in the DDL queue. func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { startTime := time.Now() - err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) ids, err := t.GenGlobalIDs(len(tasks)) if err != nil { @@ -265,7 +265,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) { var job *model.Job - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error job, err1 = t.GetHistoryDDLJob(id) @@ -430,7 +430,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { runJobErr error ) waitTime := 2 * d.lease - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { // We are not owner, return and retry checking later. if !d.isOwner() { return nil diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index ff3a2d812e9dd..7c74ae1ec1dcf 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -142,7 +142,7 @@ func (s *testDDLSuite) TestTableError(c *C) { // Table ID or schema ID is wrong, so getting table is failed. tblInfo := testTableInfo(c, d, "t", 3) testCreateTable(c, ctx, d, dbInfo, tblInfo) - err := kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { job.SchemaID = -1 job.TableID = -1 t := meta.NewMeta(txn) @@ -339,7 +339,7 @@ func testCheckOwner(c *C, d *ddl, expectedVal bool) { } func testCheckJobDone(c *C, d *ddl, job *model.Job, isAdd bool) { - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) historyJob, err := t.GetHistoryDDLJob(job.ID) c.Assert(err, IsNil) @@ -355,7 +355,7 @@ func testCheckJobDone(c *C, d *ddl, job *model.Job, isAdd bool) { } func testCheckJobCancelled(c *C, d *ddl, job *model.Job, state *model.SchemaState) { - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) historyJob, err := t.GetHistoryDDLJob(job.ID) c.Assert(err, IsNil) @@ -1164,7 +1164,7 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { job7 := &model.Job{ID: 7, TableID: 2, Type: model.ActionModifyColumn} job9 := &model.Job{ID: 9, SchemaID: 111, Type: model.ActionDropSchema} job11 := &model.Job{ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []interface{}{int64(111), "old db name"}} - kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) err := t.EnQueueDDLJob(job1) c.Assert(err, IsNil) @@ -1183,7 +1183,7 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { return nil }) job4 := &model.Job{ID: 4, TableID: 1, Type: model.ActionAddIndex} - kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) err := buildJobDependence(t, job4) c.Assert(err, IsNil) @@ -1191,7 +1191,7 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { return nil }) job5 := &model.Job{ID: 5, TableID: 2, Type: model.ActionAddIndex} - kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) err := buildJobDependence(t, job5) c.Assert(err, IsNil) @@ -1199,7 +1199,7 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { return nil }) job8 := &model.Job{ID: 8, TableID: 3, Type: model.ActionAddIndex} - kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) err := buildJobDependence(t, job8) c.Assert(err, IsNil) @@ -1207,7 +1207,7 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { return nil }) job10 := &model.Job{ID: 10, SchemaID: 111, TableID: 3, Type: model.ActionAddIndex} - kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) err := buildJobDependence(t, job10) c.Assert(err, IsNil) @@ -1215,7 +1215,7 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { return nil }) job12 := &model.Job{ID: 12, SchemaID: 112, TableID: 2, Type: model.ActionAddIndex} - kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) err := buildJobDependence(t, job12) c.Assert(err, IsNil) @@ -1308,7 +1308,7 @@ func (s *testDDLSuite) TestParallelDDL(c *C) { qLen1 := int64(0) qLen2 := int64(0) for { - checkErr = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + checkErr = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) qLen1, err = m.DDLJobQueueLen() if err != nil { @@ -1378,7 +1378,7 @@ func (s *testDDLSuite) TestParallelDDL(c *C) { // check results. isChecked := false for !isChecked { - kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) lastJob, err := m.GetHistoryDDLJob(job11.ID) c.Assert(err, IsNil) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 7ff2ae4017369..805f600df9cae 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -196,7 +196,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { for { finish := true dr.keys = dr.keys[:0] - err := kv.RunInNewTxn(dr.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), dr.store, false, func(ctx context.Context, txn kv.Transaction) error { iter, err := txn.Iter(oldStartKey, r.EndKey) if err != nil { return errors.Trace(err) diff --git a/ddl/index.go b/ddl/index.go index 9650f9bf63c42..2ba3dd5d0a62a 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1151,7 +1151,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC }) oprStartTime := time.Now() - errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error { + errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) @@ -1363,7 +1363,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t }) oprStartTime := time.Now() - errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error { + errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) diff --git a/ddl/index_change_test.go b/ddl/index_change_test.go index 457dbc0dc237c..be3a8ee3812c3 100644 --- a/ddl/index_change_test.go +++ b/ddl/index_change_test.go @@ -41,7 +41,7 @@ func (s *testIndexChangeSuite) SetUpSuite(c *C) { Name: model.NewCIStr("test_index_change"), ID: 1, } - err := kv.RunInNewTxn(s.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) return errors.Trace(t.CreateDatabase(s.dbInfo)) }) diff --git a/ddl/reorg.go b/ddl/reorg.go index 1fcba343445d1..9c72040a0221c 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -710,7 +710,7 @@ func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key) error { return nil } - err := kv.RunInNewTxn(r.d.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), r.d.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) return errors.Trace(t.UpdateDDLReorgHandle(r.Job, startKey, r.EndKey, r.PhysicalTableID, r.currElement)) }) diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 7e44e5cad02b1..aeacb5f68e62c 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -140,7 +140,7 @@ func (s *testDDLSuite) TestReorg(c *C) { EndKey: s.NewHandle().Int(0).Common(101, "string").Encoded(), PhysicalTableID: 456, } - err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error _, err1 = getReorgInfo(d.ddlCtx, t, job, mockTbl, []*meta.Element{element}) @@ -152,7 +152,7 @@ func (s *testDDLSuite) TestReorg(c *C) { job.SnapshotVer = uint64(1) err = info.UpdateReorgMeta(info.StartKey) c.Assert(err, IsNil) - err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) info1, err1 := getReorgInfo(d.ddlCtx, t, job, mockTbl, []*meta.Element{element}) c.Assert(err1, IsNil) @@ -230,7 +230,7 @@ func (s *testDDLSuite) TestReorgOwner(c *C) { testDropSchema(c, ctx, d1, dbInfo) - err = kv.RunInNewTxn(d1.store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), d1.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) db, err1 := t.GetDatabase(dbInfo.ID) c.Assert(err1, IsNil) diff --git a/ddl/schema_test.go b/ddl/schema_test.go index feecd3cd453e9..45a00ef9971ab 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -96,7 +96,7 @@ func testCheckSchemaState(c *C, d *ddl, dbInfo *model.DBInfo, state model.Schema isDropped := true for { - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) info, err := t.GetDatabase(dbInfo.ID) c.Assert(err, IsNil) @@ -227,7 +227,7 @@ func (s *testSchemaSuite) TestSchemaWaitJob(c *C) { func testGetSchemaInfoWithError(d *ddl, schemaID int64) (*model.DBInfo, error) { var dbInfo *model.DBInfo - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error dbInfo, err1 = t.GetDatabase(schemaID) diff --git a/ddl/serial_test.go b/ddl/serial_test.go index 4abfb57f70c3d..2a28ec960847d 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -893,7 +893,7 @@ func (s *testSerialSuite) TestCanceledJobTakeTime(c *C) { once := sync.Once{} hook.OnJobUpdatedExported = func(job *model.Job) { once.Do(func() { - err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) return t.DropTableOrView(job.SchemaID, job.TableID, true) }) diff --git a/ddl/stat.go b/ddl/stat.go index a7453da510407..6f21b99f89a8e 100644 --- a/ddl/stat.go +++ b/ddl/stat.go @@ -14,6 +14,8 @@ package ddl import ( + "context" + "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" @@ -49,7 +51,7 @@ func (d *ddl) Stats(vars *variable.SessionVars) (map[string]interface{}, error) m[serverID] = d.uuid var ddlInfo *admin.DDLInfo - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { var err1 error ddlInfo, err1 = admin.GetDDLInfo(txn) if err1 != nil { diff --git a/ddl/table_test.go b/ddl/table_test.go index 3b431e402da7b..ea51ac6bc4254 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -245,7 +245,7 @@ func testLockTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblI } func checkTableLockedTest(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, serverID string, sessionID uint64, lockTp model.TableLockType) { - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) info, err := t.GetTable(dbInfo.ID, tblInfo.ID) c.Assert(err, IsNil) @@ -298,7 +298,7 @@ func testTruncateTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInf } func testCheckTableState(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, state model.SchemaState) { - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) info, err := t.GetTable(dbInfo.ID, tblInfo.ID) c.Assert(err, IsNil) @@ -323,7 +323,7 @@ func testGetTable(c *C, d *ddl, schemaID int64, tableID int64) table.Table { func testGetTableWithError(d *ddl, schemaID, tableID int64) (table.Table, error) { var tblInfo *model.TableInfo - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error tblInfo, err1 = t.GetTable(schemaID, tableID) diff --git a/executor/admin.go b/executor/admin.go index 85b21894f2759..db66a98e06407 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -320,7 +320,7 @@ func (e *RecoverIndexExec) backfillIndex(ctx context.Context) (int64, int64, err result backfillResult ) for { - errInTxn := kv.RunInNewTxn(e.ctx.GetStore(), true, func(txn kv.Transaction) error { + errInTxn := kv.RunInNewTxn(context.Background(), e.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { var err error result, err = e.backfillIndexInTxn(ctx, txn, currentHandle) return err @@ -691,7 +691,7 @@ func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *CleanupIndexExec) cleanTableIndex(ctx context.Context) error { for { - errInTxn := kv.RunInNewTxn(e.ctx.GetStore(), true, func(txn kv.Transaction) error { + errInTxn := kv.RunInNewTxn(context.Background(), e.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { err := e.fetchIndex(ctx, txn) if err != nil { return err diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 1526813d13844..95a1b11dd586a 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -804,7 +804,7 @@ func (s *testSuite8) TestShardRowIDBits(c *C) { tblInfo.ShardRowIDBits = 5 tblInfo.MaxShardRowIDBits = 5 - kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) _, err = m.GenSchemaVersion() c.Assert(err, IsNil) diff --git a/executor/executor_test.go b/executor/executor_test.go index 8ca647e5d7cb0..c25a7ca21a241 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -525,7 +525,7 @@ func (s *testSuiteP2) TestAdminShowDDLJobs(c *C) { jobID, err := strconv.Atoi(row[0].(string)) c.Assert(err, IsNil) - err = kv.RunInNewTxn(s.store, true, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) job, err := t.GetHistoryDDLJob(int64(jobID)) c.Assert(err, IsNil) diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index cc194617518e9..b0690e78e3f67 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -14,6 +14,7 @@ package infoschema_test import ( + "context" "sync" "testing" @@ -105,7 +106,7 @@ func (*testSuite) TestT(c *C) { } dbInfos := []*model.DBInfo{dbInfo} - err = kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, true, func(ctx context.Context, txn kv.Transaction) error { meta.NewMeta(txn).CreateDatabase(dbInfo) return errors.Trace(err) }) @@ -195,7 +196,7 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) c.Assert(tb, NotNil) - err = kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, true, func(ctx context.Context, txn kv.Transaction) error { meta.NewMeta(txn).CreateTableOrView(dbID, tblInfo) return errors.Trace(err) }) @@ -331,7 +332,7 @@ func (*testSuite) TestInfoTables(c *C) { func genGlobalID(store kv.Storage) (int64, error) { var globalID int64 - err := kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), store, true, func(ctx context.Context, txn kv.Transaction) error { var err error globalID, err = meta.NewMeta(txn).GenGlobalID() return errors.Trace(err) diff --git a/kv/txn.go b/kv/txn.go index 9bc693232b202..00dad8a7d3b19 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -26,7 +26,7 @@ import ( ) // RunInNewTxn will run the f in a new transaction environment. -func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error { +func RunInNewTxn(ctx context.Context, store Storage, retryable bool, f func(ctx context.Context, txn Transaction) error) error { var ( err error originalTxnTS uint64 @@ -44,7 +44,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e originalTxnTS = txn.StartTS() } - err = f(txn) + err = f(ctx, txn) if err != nil { err1 := txn.Rollback() terror.Log(err1) @@ -58,7 +58,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e return err } - err = txn.Commit(context.Background()) + err = txn.Commit(ctx) if err == nil { break } diff --git a/kv/txn_test.go b/kv/txn_test.go index b81e5f024235d..38e85ad354bfd 100644 --- a/kv/txn_test.go +++ b/kv/txn_test.go @@ -14,6 +14,7 @@ package kv import ( + "context" "errors" "time" @@ -51,17 +52,17 @@ func (s *testTxnSuite) TestRetryExceedCountError(c *C) { }(maxRetryCnt) maxRetryCnt = 5 - err := RunInNewTxn(&mockStorage{}, true, func(txn Transaction) error { + err := RunInNewTxn(context.Background(), &mockStorage{}, true, func(ctx context.Context, txn Transaction) error { return nil }) c.Assert(err, NotNil) - err = RunInNewTxn(&mockStorage{}, true, func(txn Transaction) error { + err = RunInNewTxn(context.Background(), &mockStorage{}, true, func(ctx context.Context, txn Transaction) error { return ErrTxnRetryable }) c.Assert(err, NotNil) - err = RunInNewTxn(&mockStorage{}, true, func(txn Transaction) error { + err = RunInNewTxn(context.Background(), &mockStorage{}, true, func(ctx context.Context, txn Transaction) error { return errors.New("do not retry") }) c.Assert(err, NotNil) @@ -71,7 +72,7 @@ func (s *testTxnSuite) TestRetryExceedCountError(c *C) { cfg.SetGetError(err1) cfg.SetCommitError(err1) storage := NewInjectedStore(newMockStorage(), &cfg) - err = RunInNewTxn(storage, true, func(txn Transaction) error { + err = RunInNewTxn(context.Background(), storage, true, func(ctx context.Context, txn Transaction) error { return nil }) c.Assert(err, NotNil) diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index fa9d32130ffb7..5115e1b4e5b3e 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -187,7 +187,7 @@ func (alloc *allocator) End() int64 { func (alloc *allocator) NextGlobalAutoID(tableID int64) (int64, error) { var autoID int64 startTime := time.Now() - err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { var err1 error m := meta.NewMeta(txn) autoID, err1 = getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType) @@ -215,7 +215,7 @@ func (alloc *allocator) rebase4Unsigned(tableID int64, requiredBase uint64, allo } var newBase, newEnd uint64 startTime := time.Now() - err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) currentEnd, err1 := getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType) if err1 != nil { @@ -261,7 +261,7 @@ func (alloc *allocator) rebase4Signed(tableID, requiredBase int64, allocIDs bool } var newBase, newEnd int64 startTime := time.Now() - err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) currentEnd, err1 := getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType) if err1 != nil { @@ -298,7 +298,7 @@ func (alloc *allocator) rebase4Signed(tableID, requiredBase int64, allocIDs bool func (alloc *allocator) rebase4Sequence(tableID, requiredBase int64) (int64, bool, error) { startTime := time.Now() alreadySatisfied := false - err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) currentEnd, err := getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType) if err != nil { @@ -460,12 +460,6 @@ func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.T // increment and offset at this time now. To simplify the rule is like (ID - offset) % increment = 0, // so the first autoID should be 9, then add increment to it to get 13. func (alloc *allocator) Alloc(ctx context.Context, tableID int64, n uint64, increment, offset int64) (int64, int64, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("autoid.Alloc", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } - if tableID == 0 { return 0, 0, errInvalidTableID.GenWithStackByArgs("Invalid tableID") } @@ -480,9 +474,9 @@ func (alloc *allocator) Alloc(ctx context.Context, tableID int64, n uint64, incr alloc.mu.Lock() defer alloc.mu.Unlock() if alloc.isUnsigned { - return alloc.alloc4Unsigned(tableID, n, increment, offset) + return alloc.alloc4Unsigned(ctx, tableID, n, increment, offset) } - return alloc.alloc4Signed(tableID, n, increment, offset) + return alloc.alloc4Signed(ctx, tableID, n, increment, offset) } func (alloc *allocator) AllocSeqCache(tableID int64) (int64, int64, int64, error) { @@ -632,7 +626,7 @@ func SeekToFirstAutoIDUnSigned(base, increment, offset uint64) uint64 { return nr } -func (alloc *allocator) alloc4Signed(tableID int64, n uint64, increment, offset int64) (int64, int64, error) { +func (alloc *allocator) alloc4Signed(ctx context.Context, tableID int64, n uint64, increment, offset int64) (int64, int64, error) { // Check offset rebase if necessary. if offset-1 > alloc.base { if err := alloc.rebase4Signed(tableID, offset-1, true); err != nil { @@ -656,7 +650,13 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64, increment, offset consumeDur := startTime.Sub(alloc.lastAllocTime) nextStep = NextStep(alloc.step, consumeDur) } - err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("alloc.alloc4Signed", opentracing.ChildOf(span.Context())) + defer span1.Finish() + opentracing.ContextWithSpan(ctx, span1) + } + m := meta.NewMeta(txn) var err1 error newBase, err1 = getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType) @@ -701,7 +701,7 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64, increment, offset return min, alloc.base, nil } -func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offset int64) (int64, int64, error) { +func (alloc *allocator) alloc4Unsigned(ctx context.Context, tableID int64, n uint64, increment, offset int64) (int64, int64, error) { // Check offset rebase if necessary. if uint64(offset-1) > uint64(alloc.base) { if err := alloc.rebase4Unsigned(tableID, uint64(offset-1), true); err != nil { @@ -725,7 +725,13 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offse consumeDur := startTime.Sub(alloc.lastAllocTime) nextStep = NextStep(alloc.step, consumeDur) } - err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("alloc.alloc4Unsigned", opentracing.ChildOf(span.Context())) + defer span1.Finish() + opentracing.ContextWithSpan(ctx, span1) + } + m := meta.NewMeta(txn) var err1 error newBase, err1 = getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType) @@ -789,7 +795,7 @@ func (alloc *allocator) alloc4Sequence(tableID int64) (min int64, max int64, rou var newBase, newEnd int64 startTime := time.Now() - err = kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) var ( err1 error diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index 11a6a9bb98a59..febfa5e2952e3 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -52,7 +52,7 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) defer store.Close() - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")}) c.Assert(err, IsNil) @@ -256,7 +256,7 @@ func (*testSuite) TestUnsignedAutoid(c *C) { c.Assert(err, IsNil) defer store.Close() - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")}) c.Assert(err, IsNil) @@ -420,7 +420,7 @@ func (*testSuite) TestConcurrentAlloc(c *C) { dbID := int64(2) tblID := int64(100) - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: dbID, Name: model.NewCIStr("a")}) c.Assert(err, IsNil) @@ -503,7 +503,7 @@ func (*testSuite) TestRollbackAlloc(c *C) { defer store.Close() dbID := int64(1) tblID := int64(2) - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: dbID, Name: model.NewCIStr("a")}) c.Assert(err, IsNil) @@ -548,7 +548,7 @@ func BenchmarkAllocator_Alloc(b *testing.B) { defer store.Close() dbID := int64(1) tblID := int64(2) - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: dbID, Name: model.NewCIStr("a")}) if err != nil { @@ -580,7 +580,7 @@ func BenchmarkAllocator_SequenceAlloc(b *testing.B) { defer store.Close() var seq *model.SequenceInfo var sequenceBase int64 - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")}) if err != nil { @@ -634,7 +634,7 @@ func (*testSuite) TestSequenceAutoid(c *C) { var seq *model.SequenceInfo var sequenceBase int64 - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")}) c.Assert(err, IsNil) @@ -755,7 +755,7 @@ func (*testSuite) TestConcurrentAllocSequence(c *C) { var seq *model.SequenceInfo var sequenceBase int64 - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err1 := m.CreateDatabase(&model.DBInfo{ID: 2, Name: model.NewCIStr("a")}) c.Assert(err1, IsNil) @@ -843,7 +843,7 @@ func (*testSuite) TestAllocComputationIssue(c *C) { c.Assert(err, IsNil) defer store.Close() - err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")}) c.Assert(err, IsNil) diff --git a/session/session.go b/session/session.go index c476631d8bdc6..0c28697a906c0 100644 --- a/session/session.go +++ b/session/session.go @@ -2321,7 +2321,7 @@ func getStoreBootstrapVersion(store kv.Storage) int64 { var ver int64 // check in kv store - err := kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { var err error t := meta.NewMeta(txn) ver, err = t.GetBootstrapVersion() @@ -2344,7 +2344,7 @@ func getStoreBootstrapVersion(store kv.Storage) int64 { func finishBootstrap(store kv.Storage) { setStoreBootstrapped(store.UUID()) - err := kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) err := t.FinishBootstrap(currentBootstrapVersion) return err diff --git a/store/store_test.go b/store/store_test.go index 55bfdcb4780a4..627a214badee7 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -583,7 +583,7 @@ func (s *testKVSuite) TestIsolationInc(c *C) { defer wg.Done() for j := 0; j < 100; j++ { var id int64 - err := kv.RunInNewTxn(s.s, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.s, true, func(ctx context.Context, txn kv.Transaction) error { var err1 error id, err1 = kv.IncInt64(txn, []byte("key"), 1) return err1 @@ -625,7 +625,7 @@ func (s *testKVSuite) TestIsolationMultiInc(c *C) { go func() { defer wg.Done() for j := 0; j < incCnt; j++ { - err := kv.RunInNewTxn(s.s, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.s, true, func(ctx context.Context, txn kv.Transaction) error { for _, key := range keys { _, err1 := kv.IncInt64(txn, key, 1) if err1 != nil { @@ -642,7 +642,7 @@ func (s *testKVSuite) TestIsolationMultiInc(c *C) { wg.Wait() - err := kv.RunInNewTxn(s.s, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), s.s, false, func(ctx context.Context, txn kv.Transaction) error { for _, key := range keys { id, err1 := kv.GetInt64(context.TODO(), txn, key) if err1 != nil { diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index ee464abebfc9e..22b3e27e6b9f6 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -1805,7 +1805,7 @@ func (w *GCWorker) doGCPlacementRules(dr util.DelRangeTask) (pid int64, err erro } }) if historyJob == nil { - err = kv.RunInNewTxn(w.store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), w.store, false, func(ctx context.Context, txn kv.Transaction) error { var err1 error t := meta.NewMeta(txn) historyJob, err1 = t.GetHistoryDDLJob(dr.JobID) diff --git a/structure/structure_test.go b/structure/structure_test.go index 487826be70a25..accacaf42bce2 100644 --- a/structure/structure_test.go +++ b/structure/structure_test.go @@ -385,7 +385,7 @@ func (s *testTxStructureSuite) TestHash(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { t := structure.NewStructure(txn, txn, []byte{0x00}) err = t.Set(key, []byte("abc")) c.Assert(err, IsNil) diff --git a/table/table.go b/table/table.go index 448f4b2620ed4..60af2eb36bb8f 100644 --- a/table/table.go +++ b/table/table.go @@ -230,10 +230,6 @@ func AllocAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Conte // AllocBatchAutoIncrementValue allocates batch auto_increment value for rows, returning firstID, increment and err. // The caller can derive the autoID by adding increment to firstID for N-1 times. func AllocBatchAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Context, N int) (firstID int64, increment int64, err error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("table.AllocBatchAutoIncrementValue", opentracing.ChildOf(span.Context())) - defer span1.Finish() - } increment = int64(sctx.GetSessionVars().AutoIncrementIncrement) offset := int64(sctx.GetSessionVars().AutoIncrementOffset) min, max, err := t.Allocators(sctx).Get(autoid.RowIDAllocType).Alloc(ctx, t.Meta().ID, uint64(N), increment, offset) diff --git a/table/tables/tables.go b/table/tables/tables.go index d62f6262ee35e..a4de0bd79bf59 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -1092,7 +1092,7 @@ func writeSequenceUpdateValueBinlog(ctx sessionctx.Context, db, sequence string, sequenceFullName := stringutil.Escape(db, sqlMode) + "." + stringutil.Escape(sequence, sqlMode) sql := "select setval(" + sequenceFullName + ", " + strconv.FormatInt(end, 10) + ")" - err := kv.RunInNewTxn(ctx.GetStore(), true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(context.Background(), ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) mockJobID, err := m.GenGlobalID() if err != nil {