diff --git a/executor/adapter.go b/executor/adapter.go index 45099b784a193..b30bf9fab90f8 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -227,7 +227,7 @@ func (a ExecStmt) GetStmtNode() ast.StmtNode { } // PointGet short path for point exec directly from plan, keep only necessary steps -func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*recordSet, error) { +func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("ExecStmt.PointGet", opentracing.ChildOf(span.Context())) span1.LogKV("sql", a.OriginText()) @@ -238,7 +238,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true) // stale read should not reach here staleread.AssertStmtStaleness(a.Ctx, false) - sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is) + sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, a.InfoSchema) }) ctx = a.observeStmtBeginForTopSQL(ctx) @@ -262,7 +262,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec } } if a.PsStmt.Executor == nil { - b := newExecutorBuilder(a.Ctx, is, a.Ti) + b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti) newExecutor := b.build(a.Plan) if b.err != nil { return nil, b.err @@ -315,6 +315,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true) sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema) staleread.AssertStmtStaleness(a.Ctx, ret.IsStaleness) + sessiontxn.AssertTxnManagerReadTS(a.Ctx, ret.LastSnapshotTS) }) a.InfoSchema = sessiontxn.GetTxnManager(a.Ctx).GetTxnInfoSchema() diff --git a/executor/prepared.go b/executor/prepared.go index b606c47d28d5f..01387db0d30a2 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -32,7 +32,6 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util" @@ -332,15 +331,12 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error { } // CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement. -func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context, - execStmt *ast.ExecuteStmt, is infoschema.InfoSchema, snapshotTS uint64, replicaReadScope string, args []types.Datum) (*ExecStmt, bool, bool, error) { +func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context, execStmt *ast.ExecuteStmt) (*ExecStmt, bool, bool, error) { startTime := time.Now() defer func() { sctx.GetSessionVars().DurationCompile = time.Since(startTime) }() - isStaleness := snapshotTS != 0 - sctx.GetSessionVars().StmtCtx.IsStaleness = isStaleness - execStmt.BinaryArgs = args + is := sessiontxn.GetTxnManager(sctx).GetTxnInfoSchema() execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is) if err != nil { return nil, false, false, err @@ -348,11 +344,6 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context, failpoint.Inject("assertTxnManagerInCompile", func() { sessiontxn.RecordAssert(sctx, "assertTxnManagerInCompile", true) - sessiontxn.AssertTxnManagerInfoSchema(sctx, is) - staleread.AssertStmtStaleness(sctx, snapshotTS != 0) - if snapshotTS != 0 { - sessiontxn.AssertTxnManagerReadTS(sctx, snapshotTS) - } }) stmt := &ExecStmt{ diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index b4b018e8ac3ee..7ee60a7383f36 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -23,8 +23,6 @@ import ( "time" "github.com/pingcap/tidb/executor" - "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" @@ -158,10 +156,9 @@ func TestPrepared(t *testing.T) { require.NoError(t, err) tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows()) - execStmt := &ast.ExecuteStmt{ExecID: stmtID} + execStmt := &ast.ExecuteStmt{ExecID: stmtID, BinaryArgs: []types.Datum{types.NewDatum(1)}} // Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text. - stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Session(), execStmt, - tk.Session().GetInfoSchema().(infoschema.InfoSchema), 0, kv.GlobalReplicaScope, []types.Datum{types.NewDatum(1)}) + stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Session(), execStmt) require.NoError(t, err) require.Equal(t, query, stmt.OriginText()) diff --git a/session/bench_test.go b/session/bench_test.go index 75be9443cf7e6..eb484a3e9d415 100644 --- a/session/bench_test.go +++ b/session/bench_test.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/store/mockstore" @@ -1810,12 +1809,11 @@ func BenchmarkCompileExecutePreparedStmt(b *testing.B) { } args := []types.Datum{types.NewDatum(3401544)} - is := se.GetInfoSchema() b.ResetTimer() - stmtExec := &ast.ExecuteStmt{ExecID: stmtID} + stmtExec := &ast.ExecuteStmt{ExecID: stmtID, BinaryArgs: args} for i := 0; i < b.N; i++ { - _, _, _, err := executor.CompileExecutePreparedStmt(context.Background(), se, stmtExec, is.(infoschema.InfoSchema), 0, kv.GlobalTxnScope, args) + _, _, _, err := executor.CompileExecutePreparedStmt(context.Background(), se, stmtExec) if err != nil { b.Fatal(err) } diff --git a/session/session.go b/session/session.go index 1724facaeef2f..c034967d60d4c 100644 --- a/session/session.go +++ b/session/session.go @@ -2235,19 +2235,20 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields return prepareExec.ID, prepareExec.ParamCount, prepareExec.Fields, nil } -func (s *session) preparedStmtExec(ctx context.Context, - is infoschema.InfoSchema, snapshotTS uint64, - execStmt *ast.ExecuteStmt, prepareStmt *plannercore.CachedPrepareStmt, replicaReadScope string, args []types.Datum) (sqlexec.RecordSet, error) { - +func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStmt, prepareStmt *plannercore.CachedPrepareStmt) (sqlexec.RecordSet, error) { failpoint.Inject("assertTxnManagerInPreparedStmtExec", func() { sessiontxn.RecordAssert(s, "assertTxnManagerInPreparedStmtExec", true) - sessiontxn.AssertTxnManagerInfoSchema(s, is) - if snapshotTS != 0 { - sessiontxn.AssertTxnManagerReadTS(s, snapshotTS) + if prepareStmt.SnapshotTSEvaluator != nil { + staleread.AssertStmtStaleness(s, true) + ts, err := prepareStmt.SnapshotTSEvaluator(s) + if err != nil { + panic(err) + } + sessiontxn.AssertTxnManagerReadTS(s, ts) } }) - st, tiFlashPushDown, tiFlashExchangePushDown, err := executor.CompileExecutePreparedStmt(ctx, s, execStmt, is, snapshotTS, replicaReadScope, args) + st, tiFlashPushDown, tiFlashExchangePushDown, err := executor.CompileExecutePreparedStmt(ctx, s, execStmt) if err != nil { return nil, err } @@ -2267,18 +2268,17 @@ func (s *session) preparedStmtExec(ctx context.Context, // cachedPointPlanExec is a short path currently ONLY for cached "point select plan" execution func (s *session) cachedPointPlanExec(ctx context.Context, - is infoschema.InfoSchema, execAst *ast.ExecuteStmt, prepareStmt *plannercore.CachedPrepareStmt, replicaReadScope string, args []types.Datum) (sqlexec.RecordSet, bool, error) { + execAst *ast.ExecuteStmt, prepareStmt *plannercore.CachedPrepareStmt) (sqlexec.RecordSet, bool, error) { prepared := prepareStmt.PreparedAst failpoint.Inject("assertTxnManagerInCachedPlanExec", func() { sessiontxn.RecordAssert(s, "assertTxnManagerInCachedPlanExec", true) - sessiontxn.AssertTxnManagerInfoSchema(s, is) // stale read should not reach here staleread.AssertStmtStaleness(s, false) }) - execAst.BinaryArgs = args + is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema() execPlan, err := planner.OptimizeExecStmt(ctx, s, execAst, is) if err != nil { return nil, false, err @@ -2324,7 +2324,7 @@ func (s *session) cachedPointPlanExec(ctx context.Context, var resultSet sqlexec.RecordSet switch execPlan.(type) { case *plannercore.PointGetPlan: - resultSet, err = stmt.PointGet(ctx, is) + resultSet, err = stmt.PointGet(ctx) s.txn.changeToInvalid() case *plannercore.Update: stmtCtx.Priority = kv.PriorityHigh @@ -2341,9 +2341,9 @@ func (s *session) cachedPointPlanExec(ctx context.Context, // IsCachedExecOk check if we can execute using plan cached in prepared structure // Be careful with the short path, current precondition is ths cached plan satisfying // IsPointGetWithPKOrUniqueKeyByAutoCommit -func (s *session) IsCachedExecOk(ctx context.Context, preparedStmt *plannercore.CachedPrepareStmt, isStaleness bool) (bool, error) { +func (s *session) IsCachedExecOk(preparedStmt *plannercore.CachedPrepareStmt) (bool, error) { prepared := preparedStmt.PreparedAst - if prepared.CachedPlan == nil || isStaleness { + if prepared.CachedPlan == nil || staleread.IsStmtStaleness(s) { return false, nil } // check auto commit @@ -2396,22 +2396,25 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ return nil, errors.Errorf("invalid CachedPrepareStmt type") } - var snapshotTS uint64 - replicaReadScope := oracle.GlobalTxnScope + execStmt := &ast.ExecuteStmt{ExecID: stmtID, BinaryArgs: args} + if err := executor.ResetContextOfStmt(s, execStmt); err != nil { + return nil, err + } staleReadProcessor := staleread.NewStaleReadProcessor(s) if err = staleReadProcessor.OnExecutePreparedStmt(preparedStmt.SnapshotTSEvaluator); err != nil { return nil, err } - txnManager := sessiontxn.GetTxnManager(s) if staleReadProcessor.IsStaleness() { - snapshotTS = staleReadProcessor.GetStalenessReadTS() - is := staleReadProcessor.GetStalenessInfoSchema() - replicaReadScope = config.GetTxnScopeFromConfig() - err = txnManager.EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{ - Type: sessiontxn.EnterNewTxnWithReplaceProvider, - Provider: staleread.NewStalenessTxnContextProvider(s, snapshotTS, is), + s.sessionVars.StmtCtx.IsStaleness = true + err = sessiontxn.GetTxnManager(s).EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{ + Type: sessiontxn.EnterNewTxnWithReplaceProvider, + Provider: staleread.NewStalenessTxnContextProvider( + s, + staleReadProcessor.GetStalenessReadTS(), + staleReadProcessor.GetStalenessInfoSchema(), + ), }) if err != nil { @@ -2419,20 +2422,14 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ } } - staleness := snapshotTS > 0 executor.CountStmtNode(preparedStmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL) - ok, err = s.IsCachedExecOk(ctx, preparedStmt, staleness) + cacheExecOk, err := s.IsCachedExecOk(preparedStmt) if err != nil { return nil, err } s.txn.onStmtStart(preparedStmt.SQLDigest.String()) defer s.txn.onStmtEnd() - execStmt := &ast.ExecuteStmt{ExecID: stmtID} - if err := executor.ResetContextOfStmt(s, execStmt); err != nil { - return nil, err - } - if err = s.onTxnManagerStmtStartOrRetry(ctx, execStmt); err != nil { return nil, err } @@ -2440,8 +2437,8 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ // even the txn is valid, still need to set session variable for coprocessor usage. s.sessionVars.RequestSourceType = preparedStmt.PreparedAst.StmtType - if ok { - rs, ok, err := s.cachedPointPlanExec(ctx, txnManager.GetTxnInfoSchema(), execStmt, preparedStmt, replicaReadScope, args) + if cacheExecOk { + rs, ok, err := s.cachedPointPlanExec(ctx, execStmt, preparedStmt) if err != nil { return nil, err } @@ -2449,7 +2446,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ return rs, nil } } - return s.preparedStmtExec(ctx, txnManager.GetTxnInfoSchema(), snapshotTS, execStmt, preparedStmt, replicaReadScope, args) + return s.preparedStmtExec(ctx, execStmt, preparedStmt) } func (s *session) DropPreparedStmt(stmtID uint32) error { diff --git a/sessiontxn/txn_context_test.go b/sessiontxn/txn_context_test.go index 98aa0d7d2cd96..0a4ee7a24bede 100644 --- a/sessiontxn/txn_context_test.go +++ b/sessiontxn/txn_context_test.go @@ -587,13 +587,13 @@ func TestTxnContextForPrepareExecute(t *testing.T) { } func TestTxnContextForStaleReadInPrepare(t *testing.T) { - store, do, deferFunc := setupTxnContextTest(t) + store, _, deferFunc := setupTxnContextTest(t) defer deferFunc() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") se := tk.Session() - is1 := do.InfoSchema() + is1 := se.GetDomainInfoSchema() tk.MustExec("do sleep(0.1)") tk.MustExec("set @a=now(6)") tk.MustExec("prepare s1 from 'select * from t1 where id=1'") @@ -660,6 +660,32 @@ func TestTxnContextForStaleReadInPrepare(t *testing.T) { doWithCheckPath(t, se, normalPathRecords, func() { tk.MustExec("execute s3") }) + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + + // stale read should not use plan cache + is2 := se.GetDomainInfoSchema() + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tx_read_ts=''") + tk.MustExec("do sleep(0.1)") + tk.MustExec("set @b=now(6)") + tk.MustExec("do sleep(0.1)") + tk.MustExec("update t1 set v=v+1 where id=1") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2) + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 12")) + }) + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tx_read_ts=@b") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2) + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 11")) + }) + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tx_read_ts=''") } func TestTxnContextPreparedStmtWithForUpdate(t *testing.T) {