Skip to content

Commit

Permalink
txn: set session id of committer once txn is activated (#52388)
Browse files Browse the repository at this point in the history
ref #50215
  • Loading branch information
ekexium committed Apr 8, 2024
1 parent 33f5d05 commit 0ce42ed
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 15 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7080,13 +7080,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "b6bc257c94e1029fd3b2efb125a4c674a315ed6b19f54f6308ba1674df561f3a",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240403052240-5a4905d2f553",
sha256 = "c1de006faa131696f05eca8490a1455999db0802271bf058ee3fbdb436a2b67d",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240408080359-7c70c5401687",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240403052240-5a4905d2f553.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240403052240-5a4905d2f553.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240403052240-5a4905d2f553.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240403052240-5a4905d2f553.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240408080359-7c70c5401687.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240408080359-7c70c5401687.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240408080359-7c70c5401687.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240408080359-7c70c5401687.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tidwall/btree v1.7.0
github.com/tikv/client-go/v2 v2.0.8-0.20240403052240-5a4905d2f553
github.com/tikv/client-go/v2 v2.0.8-0.20240408080359-7c70c5401687
github.com/tikv/pd/client v0.0.0-20240322051414-fb9e2d561b6e
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tikv/client-go/v2 v2.0.8-0.20240403052240-5a4905d2f553 h1:An/o2WMQBfjeW7GsWbkYhkr8OzUcrvCPEq2dYRYv+Os=
github.com/tikv/client-go/v2 v2.0.8-0.20240403052240-5a4905d2f553/go.mod h1:+vXk4Aex17GnI8gfSMPxrL0SQLbBYgP3Db4FvHiImwM=
github.com/tikv/client-go/v2 v2.0.8-0.20240408080359-7c70c5401687 h1:Tru0REq+ym9CeowIgPZHKQBpYNlO+Np8naSpIAVv32E=
github.com/tikv/client-go/v2 v2.0.8-0.20240408080359-7c70c5401687/go.mod h1:+vXk4Aex17GnI8gfSMPxrL0SQLbBYgP3Db4FvHiImwM=
github.com/tikv/pd/client v0.0.0-20240322051414-fb9e2d561b6e h1:u2OoEvmh3qyjIiAKXUPRiFCOSwznByMINDx2fsorjAo=
github.com/tikv/pd/client v0.0.0-20240322051414-fb9e2d561b6e/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo=
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ const (
TiKVClientReadTimeout
// SizeLimits sets the size limits of membuf
SizeLimits
// SessionID marks the connection id, for logging and tracing.
SessionID
)

// TxnSizeLimits is the argument type for `SizeLimits` option
Expand Down
32 changes: 26 additions & 6 deletions pkg/sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
if p.onTxnActiveFunc != nil {
p.onTxnActiveFunc(txn, p.enterNewTxnType)
}

p.txn = txn
return txn, nil
}
Expand Down Expand Up @@ -454,7 +455,10 @@ func (p *baseTxnContextProvider) SetOptionsOnTxnActive(txn kv.Transaction) {
txn.SetOption(kv.ReplicaRead, readReplicaType)
}

if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil {
if interceptor := temptable.SessionSnapshotInterceptor(
p.sctx,
p.infoSchema,
); interceptor != nil {
txn.SetOption(kv.SnapInterceptor, interceptor)
}

Expand Down Expand Up @@ -499,15 +503,21 @@ func (p *baseTxnContextProvider) SetOptionsOnTxnActive(txn kv.Transaction) {
// of any previously committed transactions.
// Additionally, it's required to guarantee linearizability for snapshot read-only transactions though
// it does take effects on read-only transactions now.
txn.SetOption(kv.GuaranteeLinearizability,
txn.SetOption(
kv.GuaranteeLinearizability,
!sessVars.IsAutocommit() ||
sessVars.SnapshotTS > 0 ||
p.enterNewTxnType == sessiontxn.EnterNewTxnDefault ||
p.enterNewTxnType == sessiontxn.EnterNewTxnWithBeginStmt)
p.enterNewTxnType == sessiontxn.EnterNewTxnWithBeginStmt,
)
}

txn.SetOption(kv.SessionID, p.sctx.GetSessionVars().ConnectionID)
}

func (p *baseTxnContextProvider) SetOptionsBeforeCommit(txn kv.Transaction, commitTSChecker func(uint64) bool) error {
func (p *baseTxnContextProvider) SetOptionsBeforeCommit(
txn kv.Transaction, commitTSChecker func(uint64) bool,
) error {
sessVars := p.sctx.GetSessionVars()
// Pipelined dml txn already flushed mutations into stores, so we don't need to set options for them.
// Instead, some invariants must be checked to avoid anomalies though are unreachable in designed usages.
Expand Down Expand Up @@ -554,7 +564,15 @@ func (p *baseTxnContextProvider) SetOptionsBeforeCommit(txn kv.Transaction, comm
// TODO: refactor SetOption usage to avoid race risk, should detect it in test.
// The pipelined txn will may be flushed in background, not touch the options to avoid races.
// to avoid session set overlap the txn set.
txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(p.sctx), p.GetTxnInfoSchema().SchemaMetaVersion(), physicalTableIDs, needCheckSchema))
txn.SetOption(
kv.SchemaChecker,
domain.NewSchemaChecker(
domain.GetDomain(p.sctx),
p.GetTxnInfoSchema().SchemaMetaVersion(),
physicalTableIDs,
needCheckSchema,
),
)

if sessVars.StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
Expand Down Expand Up @@ -711,7 +729,9 @@ func (p *basePessimisticTxnContextProvider) cancelFairLockingIfNeeded(ctx contex

type temporaryTableKVFilter map[int64]tableutil.TempTable

func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) {
func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(
key, value []byte, flags tikvstore.KeyFlags,
) (bool, error) {
tid := tablecodec.DecodeTableID(key)
if _, ok := m[tid]; ok {
return true, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ func (txn *tikvTxn) SetOption(opt int, val any) {
case kv.SizeLimits:
limits := val.(kv.TxnSizeLimits)
txn.KVTxn.GetUnionStore().SetEntrySizeLimit(limits.Entry, limits.Total)
case kv.SessionID:
txn.KVTxn.SetSessionID(val.(uint64))
}
}

Expand Down

0 comments on commit 0ce42ed

Please sign in to comment.