-
Notifications
You must be signed in to change notification settings - Fork 5.8k
/
repeatable_read.go
250 lines (206 loc) · 8.34 KB
/
repeatable_read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package isolation
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/util/logutil"
tikverr "github.com/tikv/client-go/v2/error"
"go.uber.org/zap"
)
// PessimisticRRTxnContextProvider provides txn context for isolation level repeatable-read
type PessimisticRRTxnContextProvider struct {
baseTxnContextProvider
// Used for ForUpdateRead statement
forUpdateTS uint64
// It may decide whether to update forUpdateTs when calling provider's getForUpdateTs
// See more details in the comments of optimizeWithPlan
followingOperatorIsPointGetForUpdate bool
}
// NewPessimisticRRTxnContextProvider returns a new PessimisticRRTxnContextProvider
func NewPessimisticRRTxnContextProvider(sctx sessionctx.Context, causalConsistencyOnly bool) *PessimisticRRTxnContextProvider {
provider := &PessimisticRRTxnContextProvider{
baseTxnContextProvider: baseTxnContextProvider{
sctx: sctx,
causalConsistencyOnly: causalConsistencyOnly,
onInitializeTxnCtx: func(txnCtx *variable.TransactionContext) {
txnCtx.IsPessimistic = true
txnCtx.Isolation = ast.RepeatableRead
},
onTxnActive: func(txn kv.Transaction) {
txn.SetOption(kv.Pessimistic, true)
},
},
}
provider.getStmtReadTSFunc = provider.getTxnStartTS
provider.getStmtForUpdateTSFunc = provider.getForUpdateTs
return provider
}
func (p *PessimisticRRTxnContextProvider) getForUpdateTs() (ts uint64, err error) {
if p.forUpdateTS != 0 {
return p.forUpdateTS, nil
}
if p.followingOperatorIsPointGetForUpdate {
return p.sctx.GetSessionVars().TxnCtx.GetForUpdateTS(), nil
}
var txn kv.Transaction
if txn, err = p.activeTxn(); err != nil {
return 0, err
}
txnCtx := p.sctx.GetSessionVars().TxnCtx
futureTS := sessiontxn.NewOracleFuture(p.ctx, p.sctx, txnCtx.TxnScope)
if ts, err = futureTS.Wait(); err != nil {
return 0, err
}
txnCtx.SetForUpdateTS(ts)
txn.SetOption(kv.SnapshotTS, ts)
p.forUpdateTS = ts
return
}
// updateForUpdateTS acquires the latest TSO and update the TransactionContext and kv.Transaction with it.
func (p *PessimisticRRTxnContextProvider) updateForUpdateTS() (err error) {
sctx := p.sctx
var txn kv.Transaction
if txn, err = sctx.Txn(false); err != nil {
return err
}
if !txn.Valid() {
return errors.Trace(kv.ErrInvalidTxn)
}
// Because the ForUpdateTS is used for the snapshot for reading data in DML.
// We can avoid allocating a global TSO here to speed it up by using the local TSO.
version, err := sctx.GetStore().CurrentVersion(sctx.GetSessionVars().TxnCtx.TxnScope)
if err != nil {
return err
}
sctx.GetSessionVars().TxnCtx.SetForUpdateTS(version.Ver)
txn.SetOption(kv.SnapshotTS, sctx.GetSessionVars().TxnCtx.GetForUpdateTS())
return nil
}
// OnStmtStart is the hook that should be called when a new statement started
func (p *PessimisticRRTxnContextProvider) OnStmtStart(ctx context.Context) error {
if err := p.baseTxnContextProvider.OnStmtStart(ctx); err != nil {
return err
}
p.forUpdateTS = 0
p.followingOperatorIsPointGetForUpdate = false
return nil
}
// OnStmtRetry is the hook that should be called when a statement is retried internally.
func (p *PessimisticRRTxnContextProvider) OnStmtRetry(ctx context.Context) (err error) {
if err = p.baseTxnContextProvider.OnStmtRetry(ctx); err != nil {
return err
}
txnCtxForUpdateTS := p.sctx.GetSessionVars().TxnCtx.GetForUpdateTS()
// If TxnCtx.forUpdateTS is updated in OnStmtErrorForNextAction, we assign the value to the provider
if txnCtxForUpdateTS > p.forUpdateTS {
p.forUpdateTS = txnCtxForUpdateTS
} else {
p.forUpdateTS = 0
}
p.followingOperatorIsPointGetForUpdate = false
return nil
}
// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
func (p *PessimisticRRTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) {
switch point {
case sessiontxn.StmtErrAfterPessimisticLock:
return p.handleAfterPessimisticLockError(err)
default:
return sessiontxn.NoIdea()
}
}
// AdviseOptimizeWithPlan optimizes for update point get related execution.
// Use case: In for update point get related operations, we do not fetch ts from PD but use the last ts we fetched.
// We expect that the data that the point get acquires has not been changed.
// Benefit: Save the cost of acquiring ts from PD.
// Drawbacks: If the data has been changed since the ts we used, we need to retry.
func (p *PessimisticRRTxnContextProvider) AdviseOptimizeWithPlan(val interface{}) (err error) {
if p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() {
return nil
}
plan, ok := val.(plannercore.Plan)
if !ok {
return nil
}
if execute, ok := plan.(*plannercore.Execute); ok {
plan = execute.Plan
}
mayOptimizeForPointGet := false
if v, ok := plan.(*plannercore.PhysicalLock); ok {
if _, ok := v.Children()[0].(*plannercore.PointGetPlan); ok {
mayOptimizeForPointGet = true
}
} else if v, ok := plan.(*plannercore.Update); ok {
if _, ok := v.SelectPlan.(*plannercore.PointGetPlan); ok {
mayOptimizeForPointGet = true
}
} else if v, ok := plan.(*plannercore.Delete); ok {
if _, ok := v.SelectPlan.(*plannercore.PointGetPlan); ok {
mayOptimizeForPointGet = true
}
}
p.followingOperatorIsPointGetForUpdate = mayOptimizeForPointGet
return nil
}
func (p *PessimisticRRTxnContextProvider) handleAfterPessimisticLockError(lockErr error) (sessiontxn.StmtErrorAction, error) {
sessVars := p.sctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
if deadlock, ok := errors.Cause(lockErr).(*tikverr.ErrDeadlock); ok {
if !deadlock.IsRetryable {
return sessiontxn.ErrorAction(lockErr)
}
logutil.Logger(p.ctx).Info("single statement deadlock, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("lockTS", deadlock.LockTs),
zap.Stringer("lockKey", kv.Key(deadlock.LockKey)),
zap.Uint64("deadlockKeyHash", deadlock.DeadlockKeyHash))
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
errStr := lockErr.Error()
forUpdateTS := txnCtx.GetForUpdateTS()
logutil.Logger(p.ctx).Debug("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", forUpdateTS),
zap.String("err", errStr))
// Always update forUpdateTS by getting a new timestamp from PD.
// If we use the conflict commitTS as the new forUpdateTS and async commit
// is used, the commitTS of this transaction may exceed the max timestamp
// that PD allocates. Then, the change may be invisible to a new transaction,
// which means linearizability is broken.
} else {
// This branch: if err is not nil, always update forUpdateTS to avoid problem described below.
// For nowait, when ErrLock happened, ErrLockAcquireFailAndNoWaitSet will be returned, and in the same txn
// the select for updateTs must be updated, otherwise there maybe rollback problem.
// begin
// select for update key1 (here encounters ErrLocked or other errors (or max_execution_time like util),
// key1 lock has not gotten and async rollback key1 is raised)
// select for update key1 again (this time lock is acquired successfully (maybe lock was released by others))
// the async rollback operation rollbacks the lock just acquired
if err := p.updateForUpdateTS(); err != nil {
logutil.Logger(p.ctx).Warn("UpdateForUpdateTS failed", zap.Error(err))
}
return sessiontxn.ErrorAction(lockErr)
}
if err := p.updateForUpdateTS(); err != nil {
return sessiontxn.ErrorAction(lockErr)
}
return sessiontxn.RetryReady()
}