Skip to content

Commit

Permalink
txn: set txn options in txn provider which avoid data race (#52304)
Browse files Browse the repository at this point in the history
ref #50215
  • Loading branch information
you06 committed Apr 8, 2024
1 parent 3e2f2c5 commit c39d79f
Show file tree
Hide file tree
Showing 23 changed files with 272 additions and 195 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
if err != nil {
return nil, err
}
sessForJob.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
wk.sess = sess.NewSession(sessForJob)
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
return wk, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
injectModifyJobArgFailPoint(job)
}

se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)

if tasks[0].job.LocalMode {
for _, task := range tasks {
Expand Down Expand Up @@ -609,7 +609,7 @@ func cleanMDLInfo(pool *sess.Pool, jobID int64, ec *clientv3.Client) {
sctx, _ := pool.Get()
defer pool.Put(sctx)
se := sess.NewSession(sctx)
se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(context.Background(), sql, "delete-mdl-info")
if err != nil {
logutil.BgLogger().Warn("unexpected error when clean mdl info", zap.Int64("job ID", jobID), zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,10 @@ func (sdr *sessionDelRangeExecWrapper) AppendParamsList(jobID, elemID int64, sta

func (sdr *sessionDelRangeExecWrapper) ConsumeDeleteRange(ctx context.Context, sql string) error {
// set session disk full opt
sdr.sctx.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
sdr.sctx.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := sdr.sctx.GetSQLExecutor().ExecuteInternal(ctx, sql, sdr.paramsList...)
// clear session disk full opt
sdr.sctx.ClearDiskFullOpt()
sdr.sctx.GetSessionVars().ClearDiskFullOpt()
sdr.paramsList = nil
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
}

func (*ddl) markJobProcessing(se *sess.Session, job *model.Job) error {
se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(context.Background(), fmt.Sprintf(
"update mysql.tidb_ddl_job set processing = 1 where job_id = %d", job.ID),
"mark_job_processing")
Expand Down Expand Up @@ -552,7 +552,7 @@ func insertDDLJobs2Table(se *sess.Session, updateRawArgs bool, jobs ...*model.Jo
}
fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", job.ID, job.MayNeedReorg(), strconv.Quote(job2SchemaIDs(job)), strconv.Quote(job2TableIDs(job)), util.WrapKey2String(b), job.Type, !job.NotStarted())
}
se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
_, err := se.Execute(ctx, sql.String(), "insert_job")
logutil.BgLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("category", "ddl"), zap.String("sql", sql.String()))
Expand Down
3 changes: 0 additions & 3 deletions pkg/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ go_library(
"//pkg/statistics/handle/usage",
"//pkg/statistics/handle/usage/indexusage",
"//pkg/store/driver/error",
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/store/mockstore",
"//pkg/table",
Expand Down Expand Up @@ -110,7 +109,6 @@ go_library(
"//pkg/util/sqlescape",
"//pkg/util/sqlexec",
"//pkg/util/syncutil",
"//pkg/util/tableutil",
"//pkg/util/timeutil",
"//pkg/util/topsql",
"//pkg/util/topsql/state",
Expand All @@ -124,7 +122,6 @@ go_library(
"@com_github_pingcap_tipb//go-binlog",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
Expand Down
130 changes: 10 additions & 120 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/usage"
"github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage"
storeerr "github.com/pingcap/tidb/pkg/store/driver/error"
"github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
tbctx "github.com/pingcap/tidb/pkg/table/context"
Expand All @@ -113,15 +112,12 @@ import (
"github.com/pingcap/tidb/pkg/util/sli"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/tableutil"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/pingcap/tipb/go-binlog"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
tikvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -202,9 +198,6 @@ type session struct {
// indexUsageCollector collects index usage information.
idxUsageCollector *indexusage.SessionIndexUsageCollector

// allowed when tikv disk full happened.
diskFullOpt kvrpcpb.DiskFullOpt

// StmtStats is used to count various indicators of each SQL in this session
// at each point in time. These data will be periodically taken away by the
// background goroutine. The background goroutine will continue to aggregate
Expand Down Expand Up @@ -497,7 +490,7 @@ func (s *session) doCommit(ctx context.Context) error {
defer func() {
s.txn.changeToInvalid()
s.sessionVars.SetInTxn(false)
s.ClearDiskFullOpt()
s.sessionVars.ClearDiskFullOpt()
}()
// check if the transaction is read-only
if s.txn.IsReadOnly() {
Expand Down Expand Up @@ -528,93 +521,9 @@ func (s *session) doCommit(ctx context.Context) error {
}
})

if s.sessionVars.BinlogClient != nil {
prewriteValue := binloginfo.GetPrewriteValue(s, false)
if prewriteValue != nil {
prewriteData, err := prewriteValue.Marshal()
if err != nil {
return errors.Trace(err)
}
info := &binloginfo.BinlogInfo{
Data: &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
PrewriteValue: prewriteData,
},
Client: s.sessionVars.BinlogClient,
}
s.txn.SetOption(kv.BinlogInfo, info)
}
}

sessVars := s.GetSessionVars()
// Get the related table or partition IDs.
relatedPhysicalTables := sessVars.TxnCtx.TableDeltaMap
// Get accessed temporary tables in the transaction.
temporaryTables := sessVars.TxnCtx.TemporaryTables
physicalTableIDs := make([]int64, 0, len(relatedPhysicalTables))
for id := range relatedPhysicalTables {
// Schema change on global temporary tables doesn't affect transactions.
if _, ok := temporaryTables[id]; ok {
continue
}
physicalTableIDs = append(physicalTableIDs, id)
}
needCheckSchema := true
// Set this option for 2 phase commit to validate schema lease.
if s.GetSessionVars().TxnCtx != nil {
needCheckSchema = !s.GetSessionVars().TxnCtx.EnableMDL
}
if s.txn.IsPipelined() && !s.GetSessionVars().TxnCtx.EnableMDL {
return errors.New("cannot commit pipelined transaction without Metadata Lock: MDL is OFF")
}

s.txn.SetOption(kv.CommitHook, func(info string, _ error) { s.sessionVars.LastTxnInfo = info })
s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit)
s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC)
// TODO: refactor SetOption usage to avoid race risk, should detect it in test.
// The pipelined txn will may be flushed in background, not touch the options to avoid races.
if !s.txn.IsPipelined() {
// to avoid session set overlap the txn set.
if s.GetDiskFullOpt() != kvrpcpb.DiskFullOpt_NotAllowedOnFull {
s.txn.SetDiskFullOpt(s.GetDiskFullOpt())
}
s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.GetInfoSchema().SchemaMetaVersion(), physicalTableIDs, needCheckSchema))
s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema)
s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
s.txn.SetOption(kv.ExplicitRequestSourceType, sessVars.ExplicitRequestSourceType)
if sessVars.StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
s.txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor())
}
// priority of the sysvar is lower than `start transaction with causal consistency only`
if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) {
// We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
// because the property is naturally holds:
// We guarantee the commitTS of any transaction must not exceed the next timestamp from the TSO.
// An auto-commit transaction fetches its startTS from the TSO so its commitTS > its startTS > the commitTS
// of any previously committed transactions.
s.txn.SetOption(kv.GuaranteeLinearizability,
sessVars.TxnCtx.IsExplicit && sessVars.GuaranteeLinearizability)
}
if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 {
s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables))
}
}

var txnSource uint64
if val := s.txn.GetOption(kv.TxnSource); val != nil {
txnSource, _ = val.(uint64)
}
// If the transaction is started by CDC, we need to set the CDCWriteSource option.
if sessVars.CDCWriteSource != 0 {
err := kv.SetCDCWriteSource(&txnSource, sessVars.CDCWriteSource)
if err != nil {
return errors.Trace(err)
}

s.txn.SetOption(kv.TxnSource, txnSource)
}

var commitTSChecker func(uint64) bool
if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 {
c := cachedTableRenewLease{tables: tables}
now := time.Now()
Expand All @@ -624,7 +533,10 @@ func (s *session) doCommit(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
s.txn.SetOption(kv.CommitTSUpperBoundCheck, c.commitTSCheck)
commitTSChecker = c.commitTSCheck
}
if err = sessiontxn.GetTxnManager(s).SetOptionsBeforeCommit(s.txn.Transaction, commitTSChecker); err != nil {
return err
}

err = s.commitTxnWithTemporaryData(tikvutil.SetSessionID(ctx, sessVars.ConnectionID), &s.txn)
Expand Down Expand Up @@ -831,19 +743,6 @@ func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transac
return nil
}

type temporaryTableKVFilter map[int64]tableutil.TempTable

func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) {
tid := tablecodec.DecodeTableID(key)
if _, ok := m[tid]; ok {
return true, nil
}

// This is the default filter for all tables.
defaultFilter := txn.TiDBKVFilter{}
return defaultFilter.IsUnnecessaryKeyValue(key, value, flags)
}

// errIsNoisy is used to filter DUPLCATE KEY errors.
// These can observed by users in INFORMATION_SCHEMA.CLIENT_ERRORS_SUMMARY_GLOBAL instead.
//
Expand Down Expand Up @@ -1592,18 +1491,6 @@ func (s *session) getOomAlarmVariablesInfo() util.OOMAlarmVariablesInfo {
}
}

func (s *session) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {
s.diskFullOpt = level
}

func (s *session) GetDiskFullOpt() kvrpcpb.DiskFullOpt {
return s.diskFullOpt
}

func (s *session) ClearDiskFullOpt() {
s.diskFullOpt = kvrpcpb.DiskFullOpt_NotAllowedOnFull
}

func (s *session) ExecuteInternal(ctx context.Context, sql string, args ...any) (rs sqlexec.RecordSet, err error) {
origin := s.sessionVars.InRestrictedSQL
s.sessionVars.InRestrictedSQL = true
Expand Down Expand Up @@ -2609,7 +2496,7 @@ func (s *session) Close() {
if s.stmtStats != nil {
s.stmtStats.SetFinished()
}
s.ClearDiskFullOpt()
s.sessionVars.ClearDiskFullOpt()
if s.sessionPlanCache != nil {
s.sessionPlanCache.Close()
}
Expand Down Expand Up @@ -4329,6 +4216,9 @@ func (s *session) DecodeSessionStates(ctx context.Context,
func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNode ast.StmtNode) {
if !s.isInternal() {
if txn, _ := s.Txn(false); txn != nil && txn.Valid() {
if txn.IsPipelined() {
stmtLabel = "pdml"
}
txn.SetOption(kv.RequestSourceType, stmtLabel)
}
s.sessionVars.RequestSourceType = stmtLabel
Expand Down
5 changes: 5 additions & 0 deletions pkg/session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,8 @@ func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) (s
return nil, errors.Errorf("Invalid txn mode '%s'", txnMode)
}
}

// SetOptionsBeforeCommit sets options before commit.
func (m *txnManager) SetOptionsBeforeCommit(txn kv.Transaction, commitTSChecker func(uint64) bool) error {
return m.ctxProvider.SetOptionsBeforeCommit(txn, commitTSChecker)
}
1 change: 0 additions & 1 deletion pkg/session/types/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ go_library(
"//pkg/sessionctx/sessionstates",
"//pkg/util",
"//pkg/util/sqlexec",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
],
)
6 changes: 0 additions & 6 deletions pkg/session/types/sesson_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"crypto/tls"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/extension"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -81,11 +80,6 @@ type Session interface {
FieldList(tableName string) (fields []*ast.ResultField, err error)
SetPort(port string)

// set cur session operations allowed when tikv disk full happens.
SetDiskFullOpt(level kvrpcpb.DiskFullOpt)
GetDiskFullOpt() kvrpcpb.DiskFullOpt
ClearDiskFullOpt()

// SetExtensions sets the `*extension.SessionExtensions` object
SetExtensions(extensions *extension.SessionExtensions)
}
1 change: 0 additions & 1 deletion pkg/sessionctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_library(
"//pkg/util/sqlexec",
"//pkg/util/topsql/stmtstats",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_tipb//go-binlog",
"@com_github_tikv_client_go_v2//oracle",
],
Expand Down
5 changes: 0 additions & 5 deletions pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/extension"
Expand Down Expand Up @@ -69,10 +68,6 @@ type Context interface {
SessionStatesHandler
contextutil.ValueStoreContext
tablelock.TableLockContext
// SetDiskFullOpt set the disk full opt when tikv disk full happened.
SetDiskFullOpt(level kvrpcpb.DiskFullOpt)
// ClearDiskFullOpt clear the disk full opt.
ClearDiskFullOpt()
// RollbackTxn rolls back the current transaction.
RollbackTxn(ctx context.Context)
// CommitTxn commits the current transaction.
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_library(
"//pkg/util/topsql/state",
"//pkg/util/versioninfo",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
Loading

0 comments on commit c39d79f

Please sign in to comment.