Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add TxnManager to manage txn in session #30574

Merged
merged 13 commits into from
Dec 22, 2021
4 changes: 3 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ func (t *testExecInfo) compileSQL(idx int) (err error) {
compiler := executor.Compiler{Ctx: c.session}
se := c.session
ctx := context.TODO()
se.PrepareTxnCtx(ctx)
if err = se.PrepareTxnCtx(ctx); err != nil {
return err
}
sctx := se.(sessionctx.Context)
if err = executor.ResetContextOfStmt(sctx, c.rawStmt); err != nil {
return errors.Trace(err)
Expand Down
26 changes: 26 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -262,6 +263,12 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
a.PsStmt.Executor = newExecutor
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is)
})

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
return nil, err
Expand Down Expand Up @@ -297,6 +304,16 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil {
return 0, err
}

failpoint.Inject("assertTxnManagerInRebuildPlan", func() {
if is, ok := a.Ctx.Value(sessiontxn.AssertTxnInfoSchemaAfterRetryKey).(infoschema.InfoSchema); ok {
a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is)
a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaAfterRetryKey, nil)
}
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema)
})

a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.LastSnapshotTS
a.IsStaleness = ret.IsStaleness
Expand Down Expand Up @@ -753,6 +770,10 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()
a.Ctx.GetSessionVars().RetryInfo.ResetOffset()

failpoint.Inject("assertTxnManagerAfterPessimisticLockErrorRetry", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true)
})

if err = e.Open(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -807,6 +828,11 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
return nil, errors.Trace(b.err)
}

failpoint.Inject("assertTxnManagerAfterBuildExecutor", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterBuildExecutor", true)
sessiontxn.AssertTxnManagerInfoSchema(b.ctx, b.is)
})

// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
if executorExec, ok := e.(*ExecuteExec); ok {
err := executorExec.Build(b)
Expand Down
13 changes: 12 additions & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
)

var (
Expand Down Expand Up @@ -57,11 +58,21 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm

ret := &plannercore.PreprocessorReturn{}
pe := &plannercore.PreprocessExecuteISUpdate{ExecuteInfoSchemaUpdate: planner.GetExecuteForUpdateReadIS, Node: stmtNode}
err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret), plannercore.WithExecuteInfoSchemaUpdate(pe))
err := plannercore.Preprocess(c.Ctx,
stmtNode,
plannercore.WithPreprocessorReturn(ret),
plannercore.WithExecuteInfoSchemaUpdate(pe),
plannercore.InitTxnContextProvider,
)
if err != nil {
return nil, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(c.Ctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(c.Ctx, ret.InfoSchema)
})

finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, ret.InfoSchema)
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -1718,7 +1717,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.MemTracker.SetActionOnExceed(action)
}
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := planner.GetPreparedStmt(execStmt, vars)
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -31,6 +32,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -340,6 +342,11 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
return nil, false, false, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(sctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(sctx, is)
})

stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Expand Down
3 changes: 2 additions & 1 deletion executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func TestPrepared(t *testing.T) {
require.Equal(t, query, stmt.OriginText())

// Check that rebuild plan works.
tk.Session().PrepareTxnCtx(ctx)
err = tk.Session().PrepareTxnCtx(ctx)
require.NoError(t, err)
_, err = stmt.RebuildPlan(ctx)
require.NoError(t, err)
rs, err = stmt.Exec(ctx)
Expand Down
20 changes: 20 additions & 0 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -213,3 +214,22 @@ type CachedPrepareStmt struct {
ForUpdateRead bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
}

// GetPreparedStmt extract the prepared statement from the execute statement.
func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*CachedPrepareStmt, error) {
var ok bool
execID := stmt.ExecID
if stmt.Name != "" {
if execID, ok = vars.PreparedStmtNameToID[stmt.Name]; !ok {
return nil, ErrStmtNotFound
}
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
preparedObj, ok := preparedPointer.(*CachedPrepareStmt)
if !ok {
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}
return preparedObj, nil
}
return nil, ErrStmtNotFound
}
52 changes: 52 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/types"
Expand All @@ -59,6 +60,11 @@ func InTxnRetry(p *preprocessor) {
p.flag |= inTxnRetry
}

// InitTxnContextProvider is a PreprocessOpt that indicates preprocess should init transaction's context
func InitTxnContextProvider(p *preprocessor) {
p.flag |= initTxnContextProvider
}

// WithPreprocessorReturn returns a PreprocessOpt to initialize the PreprocessorReturn.
func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt {
return func(p *preprocessor) {
Expand Down Expand Up @@ -117,6 +123,9 @@ func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...Preproce
node.Accept(&v)
// InfoSchema must be non-nil after preprocessing
v.ensureInfoSchema()

v.initTxnContextProviderIfNecessary(node)

return errors.Trace(v.err)
}

Expand All @@ -136,6 +145,8 @@ const (
// inSequenceFunction is set when visiting a sequence function.
// This flag indicates the tableName in these function should be checked as sequence object.
inSequenceFunction
// initTxnContextProvider is set when we should init txn context in preprocess
initTxnContextProvider
)

// Make linter happy.
Expand Down Expand Up @@ -193,6 +204,9 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
// handle the insert table name imminently
// insert into t with t ..., the insert can not see t here. We should hand it before the CTE statement
p.handleTableName(node.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName))
case *ast.ExecuteStmt:
p.stmtTp = TypeExecute
p.resolveExecuteStmt(node)
case *ast.CreateTableStmt:
p.stmtTp = TypeCreate
p.flag |= inCreateOrDropTable
Expand Down Expand Up @@ -361,6 +375,8 @@ const (
TypeRepair
// TypeShow for ShowStmt
TypeShow
// TypeExecute for ExecuteStmt
TypeExecute
)

func bindableStmtType(node ast.StmtNode) byte {
Expand Down Expand Up @@ -1489,6 +1505,32 @@ func (p *preprocessor) resolveShowStmt(node *ast.ShowStmt) {
}
}

func (p *preprocessor) resolveExecuteStmt(node *ast.ExecuteStmt) {
prepared, err := GetPreparedStmt(node, p.ctx.GetSessionVars())
if err != nil {
p.err = err
return
}

if prepared.SnapshotTSEvaluator != nil {
snapshotTS, err := prepared.SnapshotTSEvaluator(p.ctx)
if err != nil {
p.err = err
return
}

is, err := domain.GetDomain(p.ctx).GetSnapshotInfoSchema(snapshotTS)
if err != nil {
p.err = err
return
}

p.LastSnapshotTS = snapshotTS
p.initedLastSnapshotTS = true
p.InfoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.ctx, is)
}
}

func (p *preprocessor) resolveCreateTableStmt(node *ast.CreateTableStmt) {
for _, val := range node.Constraints {
if val.Refer != nil && val.Refer.Table.Schema.String() == "" {
Expand Down Expand Up @@ -1689,3 +1731,13 @@ func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema {
p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema)
return p.InfoSchema
}

func (p *preprocessor) initTxnContextProviderIfNecessary(node ast.Node) {
if p.err != nil || p.flag&initTxnContextProvider == 0 {
return
}

p.err = sessiontxn.GetTxnManager(p.ctx).SetContextProvider(&sessiontxn.SimpleTxnContextProvider{
InfoSchema: p.ensureInfoSchema(),
})
}
21 changes: 1 addition & 20 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,10 @@ import (
"go.uber.org/zap"
)

// GetPreparedStmt extract the prepared statement from the execute statement.
func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*plannercore.CachedPrepareStmt, error) {
var ok bool
execID := stmt.ExecID
if stmt.Name != "" {
if execID, ok = vars.PreparedStmtNameToID[stmt.Name]; !ok {
return nil, plannercore.ErrStmtNotFound
}
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt)
if !ok {
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}
return preparedObj, nil
}
return nil, plannercore.ErrStmtNotFound
}

// IsReadOnly check whether the ast.Node is a read only statement.
func IsReadOnly(node ast.Node, vars *variable.SessionVars) bool {
if execStmt, isExecStmt := node.(*ast.ExecuteStmt); isExecStmt {
prepareStmt, err := GetPreparedStmt(execStmt, vars)
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
logutil.BgLogger().Warn("GetPreparedStmt failed", zap.Error(err))
return false
Expand Down
Loading