diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index 1eb7e1478b2f9..22b60187f0a77 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/util" @@ -70,6 +71,10 @@ type mockSessionManager struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) for _, item := range msm.PS { diff --git a/domain/domain_test.go b/domain/domain_test.go index 7c9d9ff633bc5..a4432b0fb1fe6 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv" @@ -241,6 +242,10 @@ type mockSessionManager struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) for _, item := range msm.PS { diff --git a/errors.toml b/errors.toml index 0ce61654373fb..a54913fa1bd2c 100644 --- a/errors.toml +++ b/errors.toml @@ -1281,6 +1281,11 @@ error = ''' Unknown SEQUENCE: '%-.300s' ''' +["schema:8003"] +error = ''' +TiDB admin check table failed. +''' + ["schema:8020"] error = ''' Table '%s' was locked in %s by %v diff --git a/executor/adapter.go b/executor/adapter.go index 5f5229195c3f9..44d00cd1efa1e 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -55,6 +55,7 @@ import ( "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/stringutil" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -377,6 +378,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { if txn.Valid() { txnStartTS = txn.StartTS() } + return &recordSet{ executor: e, stmt: a, @@ -590,6 +592,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { } e, err = a.handlePessimisticLockError(ctx, err) if err != nil { + // todo: Report deadlock if ErrDeadlock.Equal(err) { metrics.StatementDeadlockDetectDuration.Observe(time.Since(startLocking).Seconds()) } diff --git a/executor/admin_test.go b/executor/admin_test.go index c9cda897a4745..20095eb59a0ba 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -76,6 +76,22 @@ func (s *testSuite5) TestAdminCheckIndex(c *C) { check() } +func (s *testSuite5) TestAdminCheckIndexInTemporaryMode(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists temporary_admin_test;") + tk.MustExec("create global temporary table temporary_admin_test (c1 int, c2 int, c3 int default 1, primary key (c1), index (c1), unique key(c2)) ON COMMIT DELETE ROWS;") + tk.MustExec("insert temporary_admin_test (c1, c2) values (1, 1), (2, 2), (3, 3);") + tk.MustGetErrCode("admin check table temporary_admin_test;", mysql.ErrAdminCheckTable) + tk.MustExec("drop table if exists temporary_admin_test;") + + tk.MustExec("drop table if exists non_temporary_admin_test;") + tk.MustExec("create table non_temporary_admin_test (c1 int, c2 int, c3 int default 1, primary key (c1), index (c1), unique key(c2));") + tk.MustExec("insert non_temporary_admin_test (c1, c2) values (1, 1), (2, 2), (3, 3);") + tk.MustExec("admin check table non_temporary_admin_test;") + tk.MustExec("drop table if exists non_temporary_admin_test;") +} + func (s *testSuite5) TestAdminRecoverIndex(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/builder.go b/executor/builder.go index 40282d1030b2c..3324e52f894ff 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1531,7 +1531,9 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo strings.ToLower(infoschema.TablePlacementPolicy), strings.ToLower(infoschema.TableClientErrorsSummaryGlobal), strings.ToLower(infoschema.TableClientErrorsSummaryByUser), - strings.ToLower(infoschema.TableClientErrorsSummaryByHost): + strings.ToLower(infoschema.TableClientErrorsSummaryByHost), + strings.ToLower(infoschema.TableTiDBTrx), + strings.ToLower(infoschema.ClusterTableTiDBTrx): return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), table: v.Table, diff --git a/executor/ddl.go b/executor/ddl.go index 81f7221d1e60e..2f10555d21e1e 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -311,8 +311,12 @@ func (e *DDLExec) dropTableObject(objects []*ast.TableName, obt objectType, ifEx if isSystemTable(tn.Schema.L, tn.Name.L) { return errors.Errorf("Drop tidb system table '%s.%s' is forbidden", tn.Schema.L, tn.Name.L) } - - if obt == tableObject && config.CheckTableBeforeDrop { + tableInfo, err := e.is.TableByName(tn.Schema, tn.Name) + if err != nil { + return err + } + tempTableType := tableInfo.Meta().TempTableType + if obt == tableObject && config.CheckTableBeforeDrop && tempTableType == model.TempTableNone { logutil.BgLogger().Warn("admin check table before drop", zap.String("database", fullti.Schema.O), zap.String("table", fullti.Name.O), diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 7cc5a8a69d66e..5591dcefde54d 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" plannerutil "github.com/pingcap/tidb/planner/util" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -60,6 +61,10 @@ type mockSessionManager struct { serverID uint64 } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/executor/explainfor_test.go b/executor/explainfor_test.go index a113200a925d8..e29a7a3e24cee 100644 --- a/executor/explainfor_test.go +++ b/executor/explainfor_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/parser/auth" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/israce" @@ -38,6 +39,10 @@ type mockSessionManager1 struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager1) ShowTxnList() []*txninfo.TxnInfo { + return nil +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 0ec0c48885ecf..ae338bdd644d2 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -149,6 +149,10 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex infoschema.TableClientErrorsSummaryByUser, infoschema.TableClientErrorsSummaryByHost: err = e.setDataForClientErrorsSummary(sctx, e.table.Name.O) + case infoschema.TableTiDBTrx: + e.setDataForTiDBTrx(sctx) + case infoschema.ClusterTableTiDBTrx: + err = e.setDataForClusterTiDBTrx(sctx) } if err != nil { return nil, err @@ -2011,6 +2015,40 @@ func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context return nil } +func (e *memtableRetriever) setDataForTiDBTrx(ctx sessionctx.Context) { + sm := ctx.GetSessionManager() + if sm == nil { + return + } + + loginUser := ctx.GetSessionVars().User + var hasProcessPriv bool + if pm := privilege.GetPrivilegeManager(ctx); pm != nil { + if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { + hasProcessPriv = true + } + } + infoList := sm.ShowTxnList() + for _, info := range infoList { + // If you have the PROCESS privilege, you can see all running transactions. + // Otherwise, you can see only your own transactions. + if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username { + continue + } + e.rows = append(e.rows, info.ToDatum()) + } +} + +func (e *memtableRetriever) setDataForClusterTiDBTrx(ctx sessionctx.Context) error { + e.setDataForTiDBTrx(ctx) + rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows) + if err != nil { + return err + } + e.rows = rows + return nil +} + type hugeMemTableRetriever struct { dummyCloser table *model.TableInfo diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index c3e125824873d..e19eb9d9b3064 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" @@ -728,6 +729,10 @@ type mockSessionManager struct { serverID uint64 } +func (sm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { return sm.processInfoMap } diff --git a/executor/point_get_test.go b/executor/point_get_test.go index 846b6f1628fe1..f66446a6bef83 100644 --- a/executor/point_get_test.go +++ b/executor/point_get_test.go @@ -27,7 +27,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" - txndriver "github.com/pingcap/tidb/store/driver/txn" + storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/tablecodec" @@ -536,15 +536,15 @@ func (s *testPointGetSuite) TestSelectCheckVisibility(c *C) { c.Assert(expectErr.Equal(err), IsTrue) } // Test point get. - checkSelectResultError("select * from t where a='1'", txndriver.ErrGCTooEarly) + checkSelectResultError("select * from t where a='1'", storeerr.ErrGCTooEarly) // Test batch point get. - checkSelectResultError("select * from t where a in ('1','2')", txndriver.ErrGCTooEarly) + checkSelectResultError("select * from t where a in ('1','2')", storeerr.ErrGCTooEarly) // Test Index look up read. - checkSelectResultError("select * from t where b > 0 ", txndriver.ErrGCTooEarly) + checkSelectResultError("select * from t where b > 0 ", storeerr.ErrGCTooEarly) // Test Index read. - checkSelectResultError("select b from t where b > 0 ", txndriver.ErrGCTooEarly) + checkSelectResultError("select b from t where b > 0 ", storeerr.ErrGCTooEarly) // Test table read. - checkSelectResultError("select * from t", txndriver.ErrGCTooEarly) + checkSelectResultError("select * from t", storeerr.ErrGCTooEarly) } func (s *testPointGetSuite) TestReturnValues(c *C) { diff --git a/executor/prepared_test.go b/executor/prepared_test.go index 1f8edf79d942e..e0e2c19ee0f22 100644 --- a/executor/prepared_test.go +++ b/executor/prepared_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/domain" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/israce" @@ -135,6 +136,10 @@ type mockSessionManager2 struct { killed bool } +func (sm *mockSessionManager2) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (sm *mockSessionManager2) ShowProcessList() map[uint64]*util.ProcessInfo { pl := make(map[uint64]*util.ProcessInfo) if pi, ok := sm.GetProcessInfo(0); ok { diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index 916f218db1f9d..bb8f05e5eff54 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" @@ -796,6 +797,10 @@ type mockSessionManager1 struct { Se session.Session } +func (msm *mockSessionManager1) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/expression/integration_test.go b/expression/integration_test.go index f15dc5822be15..a3d983069cce9 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -9161,8 +9161,10 @@ func (s *testIntegrationSuite) TestIssue24429(c *C) { tk.MustExec("set @@sql_mode = ANSI_QUOTES;") tk.MustExec("use test") + tk.MustExec("drop table if exists t;") tk.MustExec("create table t (a int);") tk.MustQuery(`select t."a"=10 from t;`).Check(testkit.Rows()) + tk.MustExec("drop table if exists t;") } func (s *testIntegrationSuite) TestVitessHash(c *C) { diff --git a/infoschema/cluster.go b/infoschema/cluster.go index f113e90a0f587..2d196fe5b0023 100644 --- a/infoschema/cluster.go +++ b/infoschema/cluster.go @@ -37,6 +37,8 @@ const ( ClusterTableStatementsSummary = "CLUSTER_STATEMENTS_SUMMARY" // ClusterTableStatementsSummaryHistory is the string constant of cluster statement summary history table. ClusterTableStatementsSummaryHistory = "CLUSTER_STATEMENTS_SUMMARY_HISTORY" + // ClusterTableTiDBTrx is the string constant of cluster transaction running table. + ClusterTableTiDBTrx = "CLUSTER_TIDB_TRX" ) // memTableToClusterTables means add memory table to cluster table. @@ -45,6 +47,7 @@ var memTableToClusterTables = map[string]string{ TableProcesslist: ClusterTableProcesslist, TableStatementsSummary: ClusterTableStatementsSummary, TableStatementsSummaryHistory: ClusterTableStatementsSummaryHistory, + TableTiDBTrx: ClusterTableTiDBTrx, } func init() { diff --git a/infoschema/error.go b/infoschema/error.go index a0ef7ab9c8760..cb49e48419dec 100644 --- a/infoschema/error.go +++ b/infoschema/error.go @@ -69,4 +69,6 @@ var ( ErrTableLocked = dbterror.ClassSchema.NewStd(mysql.ErrTableLocked) // ErrWrongObject returns when the table/view/sequence is not the expected object. ErrWrongObject = dbterror.ClassSchema.NewStd(mysql.ErrWrongObject) + // ErrAdminCheckTable returns when the check table in temporary mode. + ErrAdminCheckTable = dbterror.ClassSchema.NewStd(mysql.ErrAdminCheckTable) ) diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index c3892e6527962..6aa0c5526f467 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -332,6 +332,7 @@ func (*testSuite) TestInfoTables(c *C) { "TABLESPACES", "COLLATION_CHARACTER_SET_APPLICABILITY", "PROCESSLIST", + "TIDB_TRX", } for _, t := range infoTables { tb, err1 := is.TableByName(util.InformationSchemaName, model.NewCIStr(t)) diff --git a/infoschema/tables.go b/infoschema/tables.go index bfca649e89fdd..2d5112ada05c0 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -31,11 +31,13 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" @@ -161,6 +163,8 @@ const ( TableClientErrorsSummaryByUser = "CLIENT_ERRORS_SUMMARY_BY_USER" // TableClientErrorsSummaryByHost is the string constant of client errors table. TableClientErrorsSummaryByHost = "CLIENT_ERRORS_SUMMARY_BY_HOST" + // TableTiDBTrx is current running transaction status table. + TableTiDBTrx = "TIDB_TRX" ) var tableIDMap = map[string]int64{ @@ -233,22 +237,25 @@ var tableIDMap = map[string]int64{ TableClientErrorsSummaryGlobal: autoid.InformationSchemaDBID + 67, TableClientErrorsSummaryByUser: autoid.InformationSchemaDBID + 68, TableClientErrorsSummaryByHost: autoid.InformationSchemaDBID + 69, + TableTiDBTrx: autoid.InformationSchemaDBID + 70, + ClusterTableTiDBTrx: autoid.InformationSchemaDBID + 71, } type columnInfo struct { - name string - tp byte - size int - decimal int - flag uint - deflt interface{} - comment string + name string + tp byte + size int + decimal int + flag uint + deflt interface{} + comment string + enumElems []string } func buildColumnInfo(col columnInfo) *model.ColumnInfo { mCharset := charset.CharsetBin mCollation := charset.CharsetBin - if col.tp == mysql.TypeVarchar || col.tp == mysql.TypeBlob || col.tp == mysql.TypeLongBlob { + if col.tp == mysql.TypeVarchar || col.tp == mysql.TypeBlob || col.tp == mysql.TypeLongBlob || col.tp == mysql.TypeEnum { mCharset = charset.CharsetUTF8MB4 mCollation = charset.CollationUTF8MB4 } @@ -259,6 +266,7 @@ func buildColumnInfo(col columnInfo) *model.ColumnInfo { Flen: col.size, Decimal: col.decimal, Flag: col.flag, + Elems: col.enumElems, } return &model.ColumnInfo{ Name: model.NewCIStr(col.name), @@ -1332,6 +1340,19 @@ var tableClientErrorsSummaryByHostCols = []columnInfo{ {name: "LAST_SEEN", tp: mysql.TypeTimestamp, size: 26}, } +var tableTiDBTrxCols = []columnInfo{ + {name: "ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.PriKeyFlag | mysql.NotNullFlag | mysql.UnsignedFlag}, + {name: "START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Start time of the transaction"}, + {name: "DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"}, + {name: "STATE", tp: mysql.TypeEnum, enumElems: txninfo.TxnRunningStateStrs, comment: "Current running state of the transaction"}, + {name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Current lock waiting's start time"}, + {name: "LEN", tp: mysql.TypeLonglong, size: 64, comment: "How many entries are in MemDB"}, + {name: "SIZE", tp: mysql.TypeLonglong, size: 64, comment: "MemDB used memory"}, + {name: "SESSION_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag, comment: "Which session this transaction belongs to"}, + {name: "USER", tp: mysql.TypeVarchar, size: 16, comment: "The user who open this session"}, + {name: "DB", tp: mysql.TypeVarchar, size: 64, comment: "The schema this transaction works on"}, +} + // GetShardingInfo returns a nil or description string for the sharding information of given TableInfo. // The returned description string may be: // - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified. @@ -1701,6 +1722,7 @@ var tableNameToColumns = map[string][]columnInfo{ TableClientErrorsSummaryGlobal: tableClientErrorsSummaryGlobalCols, TableClientErrorsSummaryByUser: tableClientErrorsSummaryByUserCols, TableClientErrorsSummaryByHost: tableClientErrorsSummaryByHostCols, + TableTiDBTrx: tableTiDBTrxCols, } func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) { diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index f30f25ba6abfa..6cc24300c1be4 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -28,6 +28,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/fn" + "github.com/pingcap/parser" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -42,6 +43,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util" @@ -121,7 +123,7 @@ func (s *testClusterTableSuite) setUpRPCService(c *C, addr string) (*grpc.Server lis, err := net.Listen("tcp", addr) c.Assert(err, IsNil) // Fix issue 9836 - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "root", @@ -276,7 +278,7 @@ func (s *testTableSuite) TestInfoschemaFieldValue(c *C) { tk1.MustQuery("select distinct(table_schema) from information_schema.tables").Check(testkit.Rows("INFORMATION_SCHEMA")) // Fix issue 9836 - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "root", @@ -433,6 +435,11 @@ func (s *testTableSuite) TestCurrentTimestampAsDefault(c *C) { type mockSessionManager struct { processInfoMap map[uint64]*util.ProcessInfo + txnInfo []*txninfo.TxnInfo +} + +func (sm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + return sm.txnInfo } func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { @@ -459,7 +466,7 @@ func (s *testTableSuite) TestSomeTables(c *C) { c.Assert(err, IsNil) tk := testkit.NewTestKit(c, s.store) tk.Se = se - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "user-1", @@ -516,7 +523,7 @@ func (s *testTableSuite) TestSomeTables(c *C) { fmt.Sprintf("3 user-3 127.0.0.1:12345 test Init DB 9223372036 %s %s", "in transaction", "check port"), )) - sm = &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2)} + sm = &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "user-1", @@ -1509,3 +1516,24 @@ func (s *testTableSuite) TestInfoschemaClientErrors(c *C) { err = tk.ExecToErr("FLUSH CLIENT_ERRORS_SUMMARY") c.Assert(err.Error(), Equals, "[planner:1227]Access denied; you need (at least one of) the RELOAD privilege(s) for this operation") } + +func (s *testTableSuite) TestTrx(c *C) { + tk := s.newTestKitWithRoot(c) + _, digest := parser.NormalizeDigest("select * from trx for update;") + sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 1)} + sm.txnInfo[0] = &txninfo.TxnInfo{ + StartTS: 424768545227014155, + CurrentSQLDigest: digest, + State: txninfo.TxnRunningNormal, + BlockStartTime: nil, + EntriesCount: 1, + EntriesSize: 19, + ConnectionID: 2, + Username: "root", + CurrentDB: "test", + } + tk.Se.SetSessionManager(sm) + tk.MustQuery("select * from information_schema.TIDB_TRX;").Check( + testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest + " Normal 1 19 2 root test"), + ) +} diff --git a/kv/kv.go b/kv/kv.go index a6a23a88df01d..1fad79d641009 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -154,6 +154,7 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. + // Will block until all keys are locked successfully or an error occurs. LockKeys(ctx context.Context, lockCtx *LockCtx, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 98ab7b7898370..9e8eaa9204af9 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3832,3 +3832,29 @@ func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) } + +func (s *testIntegrationSuite) TestEliminateLockForTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test;") + tk.MustExec("create global temporary table t1 (a int primary key, b int, c int, index i_b(b)) on commit delete rows;") + defer func() { + tk.MustExec("drop global temporary table if exists t1;") + }() + tk.MustExec("begin;") + tk.MustExec("insert t1 values (8,8,9);") + + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + tt).Rows()) + }) + tk.MustQuery("explain format = 'brief' " + tt).Check(testkit.Rows(output[i].Plan...)) + } +} diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 59c228767171a..0c1c4a668d3c8 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" @@ -186,6 +187,7 @@ func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) PhysicalPlan { plan = InjectExtraProjection(plan) mergeContinuousSelections(plan) plan = eliminateUnionScanAndLock(sctx, plan) + plan = eliminateLockForTemporaryTable(plan) plan = enableParallelApply(sctx, plan) return plan } @@ -322,6 +324,29 @@ func eliminateUnionScanAndLock(sctx sessionctx.Context, p PhysicalPlan) Physical }) } +// eliminateLockForTemporaryTable eliminates lock for the temporary table. +func eliminateLockForTemporaryTable(p PhysicalPlan) PhysicalPlan { + iteratePhysicalPlan(p, func(p PhysicalPlan) bool { + if len(p.Children()) > 1 { + return false + } + switch x := p.(type) { + case *PointGetPlan: + if x.TblInfo.TempTableType != model.TempTableNone { + x.Lock = false + x.LockWaitTime = 0 + } + case *BatchPointGetPlan: + if x.TblInfo.TempTableType != model.TempTableNone { + x.Lock = false + x.LockWaitTime = 0 + } + } + return true + }) + return p +} + func iteratePhysicalPlan(p PhysicalPlan, f func(p PhysicalPlan) bool) { if !f(p) { return diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index f7edbc1648819..fbc0bf9333a29 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -462,7 +462,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) { if tidbutil.IsMemDB(fp.dbName) { return nil } - fp.Lock, fp.LockWaitTime = getLockWaitTime(ctx, x.LockInfo) + // ignore lock for temporary table. + if fp.TblInfo.TempTableType == model.TempTableNone { + fp.Lock, fp.LockWaitTime = getLockWaitTime(ctx, x.LockInfo) + } p = fp return } @@ -480,7 +483,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) { p = tableDual.Init(ctx, &property.StatsInfo{}, 0) return } - fp.Lock, fp.LockWaitTime = getLockWaitTime(ctx, x.LockInfo) + // ignore lock for temporary table. + if fp.TblInfo.TempTableType == model.TempTableNone { + fp.Lock, fp.LockWaitTime = getLockWaitTime(ctx, x.LockInfo) + } p = fp return } diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index a3719fe4c4b0b..b5caf55e8de03 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -125,6 +125,8 @@ type preprocessor struct { func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) { switch node := in.(type) { + case *ast.AdminStmt: + p.checkAdminCheckTableGrammar(node) case *ast.DeleteStmt: p.stmtTp = TypeDelete case *ast.SelectStmt: @@ -557,6 +559,31 @@ func (p *preprocessor) checkDropDatabaseGrammar(stmt *ast.DropDatabaseStmt) { } } +func (p *preprocessor) checkAdminCheckTableGrammar(stmt *ast.AdminStmt) { + for _, table := range stmt.Tables { + currentDB := p.ctx.GetSessionVars().CurrentDB + if table.Schema.String() != "" { + currentDB = table.Schema.L + } + if currentDB == "" { + p.err = errors.Trace(ErrNoDB) + return + } + sName := model.NewCIStr(currentDB) + tName := table.Name + tableInfo, err := p.is.TableByName(sName, tName) + if err != nil { + p.err = err + return + } + tempTableType := tableInfo.Meta().TempTableType + if stmt.Tp == ast.AdminCheckTable && tempTableType != model.TempTableNone { + p.err = infoschema.ErrAdminCheckTable + return + } + } +} + func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) { tName := stmt.Table.Name.String() if isIncorrectName(tName) { diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index 087b32110e18f..f386f8d7f24e8 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -294,5 +294,14 @@ "select sum(1) from s1", "select count(1) as cnt from s1 union select count(1) as cnt from s2" ] + }, + { + "name": "TestEliminateLockForTemporaryTable", + "cases": [ + "select * from t1 where a = 2 for update", + "select * from t1 where a in (1,2) for update", + "select c + 1 from t1 where a = 2 and c = 2 for update", + "select c + 1 from t1 where a in (1,2) and c = 2 for update" + ] } ] diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index 7c735fcb5657c..902ff19276cc0 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -1564,5 +1564,38 @@ ] } ] + }, + { + "Name": "TestEliminateLockForTemporaryTable", + "Cases": [ + { + "SQL": "select * from t1 where a = 2 for update", + "Plan": [ + "Point_Get 1.00 root table:t1 handle:2" + ] + }, + { + "SQL": "select * from t1 where a in (1,2) for update", + "Plan": [ + "Batch_Point_Get 2.00 root table:t1 handle:[1 2], keep order:false, desc:false" + ] + }, + { + "SQL": "select c + 1 from t1 where a = 2 and c = 2 for update", + "Plan": [ + "Projection 0.00 root plus(test.t1.c, 1)->Column#4", + "└─Selection 0.00 root eq(test.t1.c, 2)", + " └─Point_Get 1.00 root table:t1 handle:2" + ] + }, + { + "SQL": "select c + 1 from t1 where a in (1,2) and c = 2 for update", + "Plan": [ + "Projection 0.00 root plus(test.t1.c, 1)->Column#4", + "└─Selection 0.00 root eq(test.t1.c, 2)", + " └─Batch_Point_Get 2.00 root table:t1 handle:[1 2], keep order:false, desc:false" + ] + } + ] } ] diff --git a/server/conn.go b/server/conn.go index 29c87bd0dfd86..78cdd1a46c12d 100644 --- a/server/conn.go +++ b/server/conn.go @@ -76,7 +76,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" - txndriver "github.com/pingcap/tidb/store/driver/txn" + storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/arena" @@ -1569,7 +1569,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1) if err != nil { _, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] - if allowTiFlashFallback && errors.ErrorEqual(err, txndriver.ErrTiFlashServerTimeout) && retryable { + if allowTiFlashFallback && errors.ErrorEqual(err, storeerr.ErrTiFlashServerTimeout) && retryable { // When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash // server and fallback to TiKV. warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err}) @@ -1870,10 +1870,10 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool failpoint.Inject("fetchNextErr", func(value failpoint.Value) { switch value.(string) { case "firstNext": - failpoint.Return(firstNext, txndriver.ErrTiFlashServerTimeout) + failpoint.Return(firstNext, storeerr.ErrTiFlashServerTimeout) case "secondNext": if !firstNext { - failpoint.Return(firstNext, txndriver.ErrTiFlashServerTimeout) + failpoint.Return(firstNext, storeerr.ErrTiFlashServerTimeout) } } }) diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 242b0df80fc83..e9f56306d9800 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -50,7 +50,7 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/stmtctx" - txndriver "github.com/pingcap/tidb/store/driver/txn" + storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/execdetails" @@ -198,7 +198,7 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{}) retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor) _, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] - if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, txndriver.ErrTiFlashServerTimeout) && retryable { + if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, storeerr.ErrTiFlashServerTimeout) && retryable { // When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash // server and fallback to TiKV. prevErr := err diff --git a/server/server.go b/server/server.go index f7a6021a11221..29f5307895cc2 100644 --- a/server/server.go +++ b/server/server.go @@ -37,7 +37,6 @@ import ( "math/rand" "net" "net/http" - "unsafe" // For pprof _ "net/http/pprof" @@ -46,6 +45,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/blacktear23/go-proxyprotocol" "github.com/pingcap/errors" @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/plugin" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" @@ -557,6 +558,22 @@ func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo { return rs } +// ShowTxnList shows all txn info for displaying in `TIDB_TRX` +func (s *Server) ShowTxnList() []*txninfo.TxnInfo { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + rs := make([]*txninfo.TxnInfo, 0, len(s.clients)) + for _, client := range s.clients { + if client.ctx.Session != nil { + info := client.ctx.Session.TxnInfo() + if info != nil { + rs = append(rs, info) + } + } + } + return rs +} + // GetProcessInfo implements the SessionManager interface. func (s *Server) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) { s.rwlock.RLock() diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 83f0057384aea..72853d86208a9 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -31,7 +31,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" - txndriver "github.com/pingcap/tidb/store/driver/txn" + storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" @@ -611,7 +611,7 @@ func (s *testPessimisticSuite) TestWaitLockKill(c *C) { _, err := tk2.Exec("update test_kill set c = c + 1 where id = 1") wg.Done() c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, txndriver.ErrQueryInterrupted), IsTrue) + c.Assert(terror.ErrorEqual(err, storeerr.ErrQueryInterrupted), IsTrue) tk.MustExec("rollback") } @@ -733,10 +733,10 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) { timeoutErr := <-timeoutErrCh c.Assert(timeoutErr, NotNil) - c.Assert(timeoutErr.Error(), Equals, txndriver.ErrLockWaitTimeout.Error()) + c.Assert(timeoutErr.Error(), Equals, storeerr.ErrLockWaitTimeout.Error()) timeoutErr = <-timeoutErrCh c.Assert(timeoutErr, NotNil) - c.Assert(timeoutErr.Error(), Equals, txndriver.ErrLockWaitTimeout.Error()) + c.Assert(timeoutErr.Error(), Equals, storeerr.ErrLockWaitTimeout.Error()) // tk4 lock c1 = 2 tk4.MustExec("begin pessimistic") @@ -749,7 +749,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) { _, err := tk2.Exec("delete from tk where c1 = 2") c.Check(time.Since(start), GreaterEqual, 1000*time.Millisecond) c.Check(time.Since(start), Less, 3000*time.Millisecond) // unit test diff should not be too big - c.Check(err.Error(), Equals, txndriver.ErrLockWaitTimeout.Error()) + c.Check(err.Error(), Equals, storeerr.ErrLockWaitTimeout.Error()) tk4.MustExec("commit") @@ -767,7 +767,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) { _, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped c.Check(time.Since(start), GreaterEqual, 1000*time.Millisecond) c.Check(time.Since(start), Less, 3000*time.Millisecond) // unit test diff should not be too big - c.Check(err.Error(), Equals, txndriver.ErrLockWaitTimeout.Error()) + c.Check(err.Error(), Equals, storeerr.ErrLockWaitTimeout.Error()) tk2.MustExec("commit") tk3.MustExec("commit") @@ -841,7 +841,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeoutWaitStart(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict"), IsNil) waitErr := <-done c.Assert(waitErr, NotNil) - c.Check(waitErr.Error(), Equals, txndriver.ErrLockWaitTimeout.Error()) + c.Check(waitErr.Error(), Equals, storeerr.ErrLockWaitTimeout.Error()) c.Check(duration, GreaterEqual, 1000*time.Millisecond) c.Check(duration, LessEqual, 3000*time.Millisecond) tk2.MustExec("rollback") @@ -1131,11 +1131,11 @@ func (s *testPessimisticSuite) TestPessimisticLockNonExistsKey(c *C) { tk1.MustExec("begin pessimistic") err := tk1.ExecToErr("select * from t where k = 2 for update nowait") - c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) err = tk1.ExecToErr("select * from t where k = 4 for update nowait") - c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) err = tk1.ExecToErr("select * from t where k = 7 for update nowait") - c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) tk.MustExec("rollback") tk1.MustExec("rollback") @@ -1147,9 +1147,9 @@ func (s *testPessimisticSuite) TestPessimisticLockNonExistsKey(c *C) { tk1.MustExec("begin pessimistic") err = tk1.ExecToErr("select * from t where k = 2 for update nowait") - c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) err = tk1.ExecToErr("select * from t where k = 6 for update nowait") - c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) tk.MustExec("rollback") tk1.MustExec("rollback") } @@ -1279,10 +1279,10 @@ func (s *testPessimisticSuite) TestBatchPointGetLockIndex(c *C) { tk2.MustExec("begin pessimistic") err := tk2.ExecToErr("insert into t1 values(2, 2, 2)") c.Assert(err, NotNil) - c.Assert(txndriver.ErrLockWaitTimeout.Equal(err), IsTrue) + c.Assert(storeerr.ErrLockWaitTimeout.Equal(err), IsTrue) err = tk2.ExecToErr("select * from t1 where c2 = 3 for update nowait") c.Assert(err, NotNil) - c.Assert(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Assert(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) tk.MustExec("rollback") tk2.MustExec("rollback") } @@ -1429,12 +1429,12 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) { tk2.MustExec("begin pessimistic") err := tk2.ExecToErr("select * from tu where z = 3 for update nowait") c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue) + c.Assert(terror.ErrorEqual(err, storeerr.ErrLockAcquireFailAndNoWaitSet), IsTrue) tk.MustExec("begin pessimistic") tk.MustExec("insert into tu(x, y) values(2, 2);") err = tk2.ExecToErr("select * from tu where z = 4 for update nowait") c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue) + c.Assert(terror.ErrorEqual(err, storeerr.ErrLockAcquireFailAndNoWaitSet), IsTrue) // test batch point get lock tk.MustExec("begin pessimistic") @@ -1443,12 +1443,12 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) { tk2.MustExec("begin pessimistic") err = tk2.ExecToErr("select x from tu where z in (3, 7, 9) for update nowait") c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue) + c.Assert(terror.ErrorEqual(err, storeerr.ErrLockAcquireFailAndNoWaitSet), IsTrue) tk.MustExec("begin pessimistic") tk.MustExec("insert into tu(x, y) values(5, 6);") err = tk2.ExecToErr("select * from tu where z = 11 for update nowait") c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue) + c.Assert(terror.ErrorEqual(err, storeerr.ErrLockAcquireFailAndNoWaitSet), IsTrue) tk.MustExec("commit") tk2.MustExec("commit") @@ -1996,11 +1996,11 @@ func (s *testPessimisticSuite) TestSelectForUpdateWaitSeconds(c *C) { waitErr2 := <-errCh waitErr3 := <-errCh c.Assert(waitErr, NotNil) - c.Check(waitErr.Error(), Equals, txndriver.ErrLockWaitTimeout.Error()) + c.Check(waitErr.Error(), Equals, storeerr.ErrLockWaitTimeout.Error()) c.Assert(waitErr2, NotNil) - c.Check(waitErr2.Error(), Equals, txndriver.ErrLockWaitTimeout.Error()) + c.Check(waitErr2.Error(), Equals, storeerr.ErrLockWaitTimeout.Error()) c.Assert(waitErr3, NotNil) - c.Check(waitErr3.Error(), Equals, txndriver.ErrLockWaitTimeout.Error()) + c.Check(waitErr3.Error(), Equals, storeerr.ErrLockWaitTimeout.Error()) c.Assert(time.Since(start).Seconds(), Less, 45.0) tk2.MustExec("commit") tk3.MustExec("rollback") diff --git a/session/session.go b/session/session.go index 2f842f92e183a..0b4cb309f434b 100644 --- a/session/session.go +++ b/session/session.go @@ -41,6 +41,9 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tipb/go-binlog" + "go.uber.org/zap" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -58,6 +61,7 @@ import ( "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -81,8 +85,6 @@ import ( "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/timeutil" - "github.com/pingcap/tipb/go-binlog" - "go.uber.org/zap" ) var ( @@ -145,6 +147,8 @@ type Session interface { Auth(user *auth.UserIdentity, auth []byte, salt []byte) bool AuthWithoutVerification(user *auth.UserIdentity) bool ShowProcess() *util.ProcessInfo + // Return the information of the txn current running + TxnInfo() *txninfo.TxnInfo // PrepareTxnCtx is exported for test. PrepareTxnCtx(context.Context) // FieldList returns fields list of a table. @@ -183,7 +187,7 @@ func (h *StmtHistory) Count() int { type session struct { // processInfo is used by ShowProcess(), and should be modified atomically. processInfo atomic.Value - txn TxnState + txn LazyTxn mu struct { sync.RWMutex @@ -442,6 +446,19 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) { return fields, nil } +func (s *session) TxnInfo() *txninfo.TxnInfo { + txnInfo := s.txn.Info() + if txnInfo == nil { + return nil + } + processInfo := s.ShowProcess() + txnInfo.CurrentSQLDigest = processInfo.Digest + txnInfo.ConnectionID = processInfo.ID + txnInfo.Username = processInfo.User + txnInfo.CurrentDB = processInfo.DB + return txnInfo +} + func (s *session) doCommit(ctx context.Context) error { if !s.txn.Valid() { return nil @@ -524,6 +541,7 @@ func (s *session) doCommit(ctx context.Context) error { if err = memBuffer.Delete(iter.Key()); err != nil { return errors.Trace(err) } + s.txn.UpdateEntriesCountAndSize() if err = iter.Next(); err != nil { return errors.Trace(err) } diff --git a/session/session_test.go b/session/session_test.go index 5fa7779fc65c3..3baee4f0ef6f1 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -42,6 +42,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" @@ -83,6 +84,7 @@ var _ = SerialSuites(&testSessionSerialSuite{}) var _ = SerialSuites(&testBackupRestoreSuite{}) var _ = Suite(&testClusteredSuite{}) var _ = SerialSuites(&testClusteredSerialSuite{}) +var _ = SerialSuites(&testTxnStateSuite{}) type testSessionSuiteBase struct { cluster cluster.Cluster @@ -4303,3 +4305,103 @@ func (s *testSessionSuite3) TestGlobalTemporaryTable(c *C) { // The global temporary table data is discard after the transaction commit. tk.MustQuery("select * from g_tmp").Check(testkit.Rows()) } + +type testTxnStateSuite struct { + testSessionSuiteBase +} + +func (s *testTxnStateSuite) TestBasic(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1);") + info := tk.Se.TxnInfo() + c.Assert(info, IsNil) + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t for update;") + info = tk.Se.TxnInfo() + _, expectedDigest := parser.NormalizeDigest("select * from t for update;") + c.Assert(info.CurrentSQLDigest, Equals, expectedDigest) + c.Assert(info.State, Equals, txninfo.TxnRunningNormal) + c.Assert(info.BlockStartTime, IsNil) + // len and size will be covered in TestLenAndSize + c.Assert(info.ConnectionID, Equals, tk.Se.GetSessionVars().ConnectionID) + c.Assert(info.Username, Equals, "") + c.Assert(info.CurrentDB, Equals, "test") + tk.MustExec("commit;") + info = tk.Se.TxnInfo() + c.Assert(info, IsNil) +} + +func (s *testTxnStateSuite) TestEntriesCountAndSize(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("begin pessimistic;") + tk.MustExec("insert into t(a) values (1);") + info := tk.Se.TxnInfo() + c.Assert(info.EntriesCount, Equals, uint64(1)) + c.Assert(info.EntriesSize, Equals, uint64(29)) + tk.MustExec("insert into t(a) values (2);") + info = tk.Se.TxnInfo() + c.Assert(info.EntriesCount, Equals, uint64(2)) + c.Assert(info.EntriesSize, Equals, uint64(58)) + tk.MustExec("commit;") +} + +func (s *testTxnStateSuite) TestBlocked(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1);") + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t where a = 1 for update;") + go func() { + tk2.MustExec("begin pessimistic") + tk2.MustExec("select * from t where a = 1 for update;") + tk2.MustExec("commit;") + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk2.Se.TxnInfo().State, Equals, txninfo.TxnLockWaiting) + c.Assert(tk2.Se.TxnInfo().BlockStartTime, NotNil) + tk.MustExec("commit;") +} + +func (s *testTxnStateSuite) TestCommitting(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1), (2);") + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t where a = 1 for update;") + ch := make(chan struct{}) + go func() { + tk2.MustExec("begin pessimistic") + c.Assert(tk2.Se.TxnInfo(), NotNil) + tk2.MustExec("select * from t where a = 2 for update;") + failpoint.Enable("github.com/pingcap/tidb/session/mockSlowCommit", "sleep(200)") + defer failpoint.Disable("github.com/pingcap/tidb/session/mockSlowCommit") + tk2.MustExec("commit;") + ch <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk2.Se.TxnInfo().State, Equals, txninfo.TxnCommitting) + tk.MustExec("commit;") + <-ch +} + +func (s *testTxnStateSuite) TestRollbacking(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1), (2);") + ch := make(chan struct{}) + go func() { + tk.MustExec("begin pessimistic") + tk.MustExec("insert into t(a) values (3);") + failpoint.Enable("github.com/pingcap/tidb/session/mockSlowRollback", "sleep(200)") + defer failpoint.Disable("github.com/pingcap/tidb/session/mockSlowRollback") + tk.MustExec("rollback;") + ch <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk.Se.TxnInfo().State, Equals, txninfo.TxnRollingBack) + <-ch +} diff --git a/session/txn.go b/session/txn.go index aebed7ed920b2..133cafb976aae 100644 --- a/session/txn.go +++ b/session/txn.go @@ -20,6 +20,8 @@ import ( "runtime/trace" "strings" "sync/atomic" + "time" + "unsafe" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" @@ -28,6 +30,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" tikvstore "github.com/pingcap/tidb/store/tikv/kv" @@ -39,12 +42,12 @@ import ( "go.uber.org/zap" ) -// TxnState wraps kv.Transaction to provide a new kv.Transaction. +// LazyTxn wraps kv.Transaction to provide a new kv.Transaction. // 1. It holds all statement related modification in the buffer before flush to the txn, // so if execute statement meets error, the txn won't be made dirty. // 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need. -type TxnState struct { - // States of a TxnState should be one of the followings: +type LazyTxn struct { + // States of a LazyTxn should be one of the followings: // Invalid: kv.Transaction == nil && txnFuture == nil // Pending: kv.Transaction == nil && txnFuture != nil // Valid: kv.Transaction != nil && txnFuture == nil @@ -55,23 +58,40 @@ type TxnState struct { stagingHandle kv.StagingHandle mutations map[int64]*binlog.TableMutation writeSLI sli.TxnWriteThroughputSLI + + // following atomic fields are used for filling TxnInfo + // we need these fields because kv.Transaction provides no thread safety promise + // but we hope getting TxnInfo is a thread safe op + + infoStartTS uint64 + // current executing state + State txninfo.TxnRunningState + // last trying to block start time + blockStartTime unsafe.Pointer // *time.Time, cannot use atomic.Value here because it is possible to be nil + // how many entries are there in the memBuffer, should be equal to self.(kv.Transaction).Len() + EntriesCount uint64 + // how many memory space do the entries in the memBuffer take, should be equal to self.(kv.Transaction).Size() + EntriesSize uint64 } // GetTableInfo returns the cached index name. -func (txn *TxnState) GetTableInfo(id int64) *model.TableInfo { +func (txn *LazyTxn) GetTableInfo(id int64) *model.TableInfo { return txn.Transaction.GetTableInfo(id) } // CacheTableInfo caches the index name. -func (txn *TxnState) CacheTableInfo(id int64, info *model.TableInfo) { +func (txn *LazyTxn) CacheTableInfo(id int64, info *model.TableInfo) { txn.Transaction.CacheTableInfo(id, info) } -func (txn *TxnState) init() { +func (txn *LazyTxn) init() { txn.mutations = make(map[int64]*binlog.TableMutation) + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.EntriesCount, 0) + atomic.StoreUint64(&txn.EntriesSize, 0) } -func (txn *TxnState) initStmtBuf() { +func (txn *LazyTxn) initStmtBuf() { if txn.Transaction == nil { return } @@ -81,14 +101,14 @@ func (txn *TxnState) initStmtBuf() { } // countHint is estimated count of mutations. -func (txn *TxnState) countHint() int { +func (txn *LazyTxn) countHint() int { if txn.stagingHandle == kv.InvalidStagingHandle { return 0 } return txn.Transaction.GetMemBuffer().Len() - txn.initCnt } -func (txn *TxnState) flushStmtBuf() { +func (txn *LazyTxn) flushStmtBuf() { if txn.stagingHandle == kv.InvalidStagingHandle { return } @@ -97,17 +117,19 @@ func (txn *TxnState) flushStmtBuf() { txn.initCnt = buf.Len() } -func (txn *TxnState) cleanupStmtBuf() { +func (txn *LazyTxn) cleanupStmtBuf() { if txn.stagingHandle == kv.InvalidStagingHandle { return } buf := txn.Transaction.GetMemBuffer() buf.Cleanup(txn.stagingHandle) txn.initCnt = buf.Len() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) } // Size implements the MemBuffer interface. -func (txn *TxnState) Size() int { +func (txn *LazyTxn) Size() int { if txn.Transaction == nil { return 0 } @@ -115,19 +137,19 @@ func (txn *TxnState) Size() int { } // Valid implements the kv.Transaction interface. -func (txn *TxnState) Valid() bool { +func (txn *LazyTxn) Valid() bool { return txn.Transaction != nil && txn.Transaction.Valid() } -func (txn *TxnState) pending() bool { +func (txn *LazyTxn) pending() bool { return txn.Transaction == nil && txn.txnFuture != nil } -func (txn *TxnState) validOrPending() bool { +func (txn *LazyTxn) validOrPending() bool { return txn.txnFuture != nil || txn.Valid() } -func (txn *TxnState) String() string { +func (txn *LazyTxn) String() string { if txn.Transaction != nil { return txn.Transaction.String() } @@ -138,7 +160,7 @@ func (txn *TxnState) String() string { } // GoString implements the "%#v" format for fmt.Printf. -func (txn *TxnState) GoString() string { +func (txn *LazyTxn) GoString() string { var s strings.Builder s.WriteString("Txn{") if txn.pending() { @@ -157,18 +179,25 @@ func (txn *TxnState) GoString() string { return s.String() } -func (txn *TxnState) changeInvalidToValid(kvTxn kv.Transaction) { +func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) { txn.Transaction = kvTxn + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.infoStartTS, kvTxn.StartTS()) txn.initStmtBuf() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) txn.txnFuture = nil } -func (txn *TxnState) changeInvalidToPending(future *txnFuture) { +func (txn *LazyTxn) changeInvalidToPending(future *txnFuture) { txn.Transaction = nil txn.txnFuture = future + atomic.StoreUint64(&txn.infoStartTS, 0) + atomic.StoreUint64(&txn.EntriesCount, uint64(0)) + atomic.StoreUint64(&txn.EntriesSize, uint64(0)) } -func (txn *TxnState) changePendingToValid(ctx context.Context) error { +func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { if txn.txnFuture == nil { return errors.New("transaction future is not set") } @@ -183,17 +212,24 @@ func (txn *TxnState) changePendingToValid(ctx context.Context) error { return err } txn.Transaction = t + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.infoStartTS, t.StartTS()) txn.initStmtBuf() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) return nil } -func (txn *TxnState) changeToInvalid() { +func (txn *LazyTxn) changeToInvalid() { if txn.stagingHandle != kv.InvalidStagingHandle { txn.Transaction.GetMemBuffer().Cleanup(txn.stagingHandle) } txn.stagingHandle = kv.InvalidStagingHandle txn.Transaction = nil txn.txnFuture = nil + atomic.StoreUint64(&txn.infoStartTS, 0) + atomic.StoreUint64(&txn.EntriesCount, 0) + atomic.StoreUint64(&txn.EntriesSize, 0) } var hasMockAutoIncIDRetry = int64(0) @@ -223,7 +259,7 @@ func ResetMockAutoRandIDRetryCount(failTimes int64) { } // Commit overrides the Transaction interface. -func (txn *TxnState) Commit(ctx context.Context) error { +func (txn *LazyTxn) Commit(ctx context.Context) error { defer txn.reset() if len(txn.mutations) != 0 || txn.countHint() != 0 { logutil.BgLogger().Error("the code should never run here", @@ -233,6 +269,10 @@ func (txn *TxnState) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } + atomic.StoreInt32(&txn.State, txninfo.TxnCommitting) + + failpoint.Inject("mockSlowCommit", func(_ failpoint.Value) {}) + // mockCommitError8942 is used for PR #8942. failpoint.Inject("mockCommitError8942", func(val failpoint.Value) { if val.(bool) { @@ -259,17 +299,34 @@ func (txn *TxnState) Commit(ctx context.Context) error { } // Rollback overrides the Transaction interface. -func (txn *TxnState) Rollback() error { +func (txn *LazyTxn) Rollback() error { defer txn.reset() + atomic.StoreInt32(&txn.State, txninfo.TxnRollingBack) + // mockSlowRollback is used to mock a rollback which takes a long time + failpoint.Inject("mockSlowRollback", func(_ failpoint.Value) {}) return txn.Transaction.Rollback() } -func (txn *TxnState) reset() { +// LockKeys Wrap the inner transaction's `LockKeys` to record the status +func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { + originState := atomic.LoadInt32(&txn.State) + atomic.StoreInt32(&txn.State, txninfo.TxnLockWaiting) + t := time.Now() + atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(&t)) + err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) + atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(nil)) + atomic.StoreInt32(&txn.State, originState) + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + return err +} + +func (txn *LazyTxn) reset() { txn.cleanup() txn.changeToInvalid() } -func (txn *TxnState) cleanup() { +func (txn *LazyTxn) cleanup() { txn.cleanupStmtBuf() txn.initStmtBuf() for key := range txn.mutations { @@ -278,7 +335,7 @@ func (txn *TxnState) cleanup() { } // KeysNeedToLock returns the keys need to be locked. -func (txn *TxnState) KeysNeedToLock() ([]kv.Key, error) { +func (txn *LazyTxn) KeysNeedToLock() ([]kv.Key, error) { if txn.stagingHandle == kv.InvalidStagingHandle { return nil, nil } @@ -316,6 +373,32 @@ func keyNeedToLock(k, v []byte, flags tikvstore.KeyFlags) bool { return !isNonUniqueIndex } +// Info dump the TxnState to Datum for displaying in `TIDB_TRX` +// This function is supposed to be thread safe +func (txn *LazyTxn) Info() *txninfo.TxnInfo { + startTs := atomic.LoadUint64(&txn.infoStartTS) + if startTs == 0 { + return nil + } + return &txninfo.TxnInfo{ + StartTS: startTs, + State: atomic.LoadInt32(&txn.State), + BlockStartTime: (*time.Time)(atomic.LoadPointer(&txn.blockStartTime)), + EntriesCount: atomic.LoadUint64(&txn.EntriesCount), + EntriesSize: atomic.LoadUint64(&txn.EntriesSize), + } +} + +// UpdateEntriesCountAndSize updates the EntriesCount and EntriesSize +// Note this function is not thread safe, because +// txn.Transaction can be changed during this function's execution if running parallel. +func (txn *LazyTxn) UpdateEntriesCountAndSize() { + if txn.Valid() { + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + } +} + func getBinlogMutation(ctx sessionctx.Context, tableID int64) *binlog.TableMutation { bin := binloginfo.GetPrewriteValue(ctx, true) for i := range bin.Mutations { diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go new file mode 100644 index 0000000000000..77a2d8c90cd05 --- /dev/null +++ b/session/txninfo/txn_info.go @@ -0,0 +1,96 @@ +// Copyright 2021 PingCAP, Inc. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txninfo + +import ( + "time" + + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/types" +) + +// TxnRunningState is the current state of a transaction +type TxnRunningState = int32 + +const ( + // TxnRunningNormal means the transaction is running normally + TxnRunningNormal TxnRunningState = iota + // TxnLockWaiting means the transaction is blocked on a lock + TxnLockWaiting + // TxnCommitting means the transaction is (at least trying to) committing + TxnCommitting + // TxnRollingBack means the transaction is rolling back + TxnRollingBack +) + +// TxnRunningStateStrs is the names of the TxnRunningStates +var TxnRunningStateStrs = []string{ + "Normal", "LockWaiting", "Committing", "RollingBack", +} + +// TxnInfo is information about a running transaction +// This is supposed to be the datasource of `TIDB_TRX` in infoschema +type TxnInfo struct { + StartTS uint64 + // digest of SQL current running + CurrentSQLDigest string + // current executing State + State TxnRunningState + // last trying to block start time + BlockStartTime *time.Time + // How many entries are in MemDB + EntriesCount uint64 + // MemDB used memory + EntriesSize uint64 + + // the following fields will be filled in `session` instead of `LazyTxn` + + // Which session this transaction belongs to + ConnectionID uint64 + // The user who open this session + Username string + // The schema this transaction works on + CurrentDB string +} + +// ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table +func (info *TxnInfo) ToDatum() []types.Datum { + humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6) + var blockStartTime interface{} + if info.BlockStartTime == nil { + blockStartTime = nil + } else { + blockStartTime = types.NewTime(types.FromGoTime(*info.BlockStartTime), mysql.TypeTimestamp, 0) + } + e, err := types.ParseEnumValue(TxnRunningStateStrs, uint64(info.State+1)) + if err != nil { + panic("this should never happen") + } + state := types.NewMysqlEnumDatum(e) + datums := types.MakeDatums( + info.StartTS, + types.NewTime(types.FromGoTime(humanReadableStartTime), mysql.TypeTimestamp, 0), + info.CurrentSQLDigest, + ) + datums = append(datums, state) + datums = append(datums, types.MakeDatums( + blockStartTime, + info.EntriesCount, + info.EntriesSize, + info.ConnectionID, + info.Username, + info.CurrentDB)...) + return datums +} diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index af224c59fc38a..4bec370a9a4d5 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" - txndriver "github.com/pingcap/tidb/store/driver/txn" + derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" @@ -126,8 +126,10 @@ func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.Key if err != nil { return nil, errors.Trace(err) } - // If the region is not found in cache, it must be out - // of date and already be cleaned up. We should retry and generate new tasks. + // When rpcCtx is nil, it's not only attributed to the miss region, but also + // some TiFlash stores crash and can't be recovered. + // That is not an error that can be easily recovered, so we regard this error + // same as rpc error. if rpcCtx == nil { needRetry = true logutil.BgLogger().Info("retry for TiFlash peer with region missing", zap.Uint64("region id", task.region.GetID())) @@ -147,8 +149,10 @@ func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.Key } } if needRetry { - // Backoff once for each retry. - err = bo.Backoff(tikv.BoRegionMiss, errors.New("Cannot find region with TiFlash peer")) + // As mentioned above, nil rpcCtx is always attributed to failed stores. + // It's equal to long poll the store but get no response. Here we'd better use + // TiFlash error to trigger the TiKV fallback mechanism. + err = bo.Backoff(tikv.BoTiFlashRPC, errors.New("Cannot find region with TiFlash peer")) if err != nil { return nil, errors.Trace(err) } @@ -261,7 +265,7 @@ func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopRe return case <-ticker.C: if atomic.LoadUint32(b.vars.Killed) == 1 { - resp = &batchCopResponse{err: txndriver.ErrQueryInterrupted} + resp = &batchCopResponse{err: derr.ErrQueryInterrupted} ok = true return } @@ -387,7 +391,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b } else { logutil.BgLogger().Info("stream unknown error", zap.Error(err)) } - return txndriver.ErrTiFlashServerTimeout + return derr.ErrTiFlashServerTimeout } } } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 5e7eab303e84f..2c1e2d361af76 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -35,7 +35,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" tidbmetrics "github.com/pingcap/tidb/metrics" - txndriver "github.com/pingcap/tidb/store/driver/txn" + derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" @@ -476,7 +476,7 @@ func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copRes return case <-ticker.C: if atomic.LoadUint32(it.vars.Killed) == 1 { - resp = &copResponse{err: txndriver.ErrQueryInterrupted} + resp = &copResponse{err: derr.ErrQueryInterrupted} ok = true return } @@ -717,7 +717,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *backoffer, task *copTask, ch ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels)) } resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo.TiKVBackoffer(), req, task.region, tikv.ReadTimeoutMedium, getEndPointType(task.storeType), task.storeAddr, ops...) - err = txndriver.ToTiDBErr(err) + err = derr.ToTiDBErr(err) if err != nil { if task.storeType == kv.TiDB { err = worker.handleTiDBSendReqErr(err, task, ch) @@ -874,7 +874,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *backoffer, rpcCtx *tikv.R logutil.BgLogger().Debug("coprocessor encounters", zap.Stringer("lock", lockErr)) msBeforeExpired, err1 := worker.kvclient.ResolveLocks(bo.TiKVBackoffer(), worker.req.StartTs, []*tikv.Lock{tikv.NewLock(lockErr)}) - err1 = txndriver.ToTiDBErr(err1) + err1 = derr.ToTiDBErr(err1) if err1 != nil { return nil, errors.Trace(err1) } @@ -982,11 +982,11 @@ type CopRuntimeStats struct { func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error { errCode := errno.ErrUnknown errMsg := err.Error() - if terror.ErrorEqual(err, txndriver.ErrTiKVServerTimeout) { + if terror.ErrorEqual(err, derr.ErrTiKVServerTimeout) { errCode = errno.ErrTiKVServerTimeout errMsg = "TiDB server timeout, address is " + task.storeAddr } - if terror.ErrorEqual(err, txndriver.ErrTiFlashServerTimeout) { + if terror.ErrorEqual(err, derr.ErrTiFlashServerTimeout) { errCode = errno.ErrTiFlashServerTimeout errMsg = "TiDB server timeout, address is " + task.storeAddr } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 10784912faa9b..377e439a9392c 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -27,7 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/tidb/kv" - txndriver "github.com/pingcap/tidb/store/driver/txn" + derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -225,7 +225,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req if sender.GetRPCError() != nil { logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error())) // we return timeout to trigger tikv's fallback - m.sendError(txndriver.ErrTiFlashServerTimeout) + m.sendError(derr.ErrTiFlashServerTimeout) return } } else { @@ -235,7 +235,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req if err != nil { logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error())) // we return timeout to trigger tikv's fallback - m.sendError(txndriver.ErrTiFlashServerTimeout) + m.sendError(derr.ErrTiFlashServerTimeout) return } @@ -255,7 +255,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) { if val.(bool) && !req.IsRoot { time.Sleep(1 * time.Second) - m.sendError(txndriver.ErrTiFlashServerTimeout) + m.sendError(derr.ErrTiFlashServerTimeout) return } }) @@ -318,7 +318,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques if err != nil { logutil.BgLogger().Error("establish mpp connection meet error", zap.String("error", err.Error())) // we return timeout to trigger tikv's fallback - m.sendError(txndriver.ErrTiFlashServerTimeout) + m.sendError(derr.ErrTiFlashServerTimeout) return } @@ -350,7 +350,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques logutil.BgLogger().Info("stream unknown error", zap.Error(err)) } } - m.sendError(txndriver.ErrTiFlashServerTimeout) + m.sendError(derr.ErrTiFlashServerTimeout) return } } @@ -405,7 +405,7 @@ func (m *mppIterator) nextImpl(ctx context.Context) (resp *mppResponse, ok bool, return case <-ticker.C: if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { - err = txndriver.ErrQueryInterrupted + err = derr.ErrQueryInterrupted exit = true return } diff --git a/store/copr/store.go b/store/copr/store.go index 2cc10ee7bad38..d3f132f85238f 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" - txndriver "github.com/pingcap/tidb/store/driver/txn" + derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -39,7 +39,7 @@ func (s *kvStore) GetRegionCache() *tikv.RegionCache { // CheckVisibility checks if it is safe to read using given ts. func (s *kvStore) CheckVisibility(startTime uint64) error { err := s.store.CheckVisibility(startTime) - return txndriver.ToTiDBErr(err) + return derr.ToTiDBErr(err) } // GetTiKVClient gets the client instance. @@ -54,13 +54,13 @@ type tikvClient struct { func (c *tikvClient) Close() error { err := c.c.Close() - return txndriver.ToTiDBErr(err) + return derr.ToTiDBErr(err) } // SendRequest sends Request. func (c *tikvClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { res, err := c.c.SendRequest(ctx, addr, req, timeout) - return res, txndriver.ToTiDBErr(err) + return res, derr.ToTiDBErr(err) } // Store wraps tikv.KVStore and provides coprocessor utilities. @@ -147,14 +147,14 @@ func (b *backoffer) TiKVBackoffer() *tikv.Backoffer { // It returns a retryable error if total sleep time exceeds maxSleep. func (b *backoffer) Backoff(typ tikv.BackoffType, err error) error { e := b.b.Backoff(typ, err) - return txndriver.ToTiDBErr(e) + return derr.ToTiDBErr(e) } // BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message // and never sleep more than maxSleepMs for each sleep. func (b *backoffer) BackoffWithMaxSleep(typ tikv.BackoffType, maxSleepMs int, err error) error { e := b.b.BackoffWithMaxSleep(typ, maxSleepMs, err) - return txndriver.ToTiDBErr(e) + return derr.ToTiDBErr(e) } // GetBackoffTimes returns a map contains backoff time count by type. diff --git a/store/driver/error/error.go b/store/driver/error/error.go new file mode 100644 index 0000000000000..17da8f7ef2fa3 --- /dev/null +++ b/store/driver/error/error.go @@ -0,0 +1,158 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package error + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/kv" + tikverr "github.com/pingcap/tidb/store/tikv/error" + "github.com/pingcap/tidb/util/dbterror" +) + +// tikv error instance +var ( + // ErrTokenLimit is the error that token is up to the limit. + ErrTokenLimit = dbterror.ClassTiKV.NewStd(errno.ErrTiKVStoreLimit) + // ErrTiKVServerTimeout is the error when tikv server is timeout. + ErrTiKVServerTimeout = dbterror.ClassTiKV.NewStd(errno.ErrTiKVServerTimeout) + ErrTiFlashServerTimeout = dbterror.ClassTiKV.NewStd(errno.ErrTiFlashServerTimeout) + // ErrGCTooEarly is the error that GC life time is shorter than transaction duration + ErrGCTooEarly = dbterror.ClassTiKV.NewStd(errno.ErrGCTooEarly) + // ErrTiKVStaleCommand is the error that the command is stale in tikv. + ErrTiKVStaleCommand = dbterror.ClassTiKV.NewStd(errno.ErrTiKVStaleCommand) + // ErrQueryInterrupted is the error when the query is interrupted. + ErrQueryInterrupted = dbterror.ClassTiKV.NewStd(errno.ErrQueryInterrupted) + // ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced. + ErrTiKVMaxTimestampNotSynced = dbterror.ClassTiKV.NewStd(errno.ErrTiKVMaxTimestampNotSynced) + // ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted. + ErrLockAcquireFailAndNoWaitSet = dbterror.ClassTiKV.NewStd(errno.ErrLockAcquireFailAndNoWaitSet) + ErrResolveLockTimeout = dbterror.ClassTiKV.NewStd(errno.ErrResolveLockTimeout) + // ErrLockWaitTimeout is the error that wait for the lock is timeout. + ErrLockWaitTimeout = dbterror.ClassTiKV.NewStd(errno.ErrLockWaitTimeout) + // ErrTiKVServerBusy is the error when tikv server is busy. + ErrTiKVServerBusy = dbterror.ClassTiKV.NewStd(errno.ErrTiKVServerBusy) + // ErrTiFlashServerBusy is the error that tiflash server is busy. + ErrTiFlashServerBusy = dbterror.ClassTiKV.NewStd(errno.ErrTiFlashServerBusy) + // ErrPDServerTimeout is the error when pd server is timeout. + ErrPDServerTimeout = dbterror.ClassTiKV.NewStd(errno.ErrPDServerTimeout) + // ErrRegionUnavailable is the error when region is not available. + ErrRegionUnavailable = dbterror.ClassTiKV.NewStd(errno.ErrRegionUnavailable) + // ErrUnknown is the unknow error. + ErrUnknown = dbterror.ClassTiKV.NewStd(errno.ErrUnknown) +) + +// Registers error returned from TiKV. +var ( + _ = dbterror.ClassTiKV.NewStd(errno.ErrDataOutOfRange) + _ = dbterror.ClassTiKV.NewStd(errno.ErrTruncatedWrongValue) + _ = dbterror.ClassTiKV.NewStd(errno.ErrDivisionByZero) +) + +// ToTiDBErr checks and converts a tikv error to a tidb error. +func ToTiDBErr(err error) error { + originErr := err + if err == nil { + return nil + } + err = errors.Cause(err) + if tikverr.IsErrNotFound(err) { + return kv.ErrNotExist + } + + if e, ok := err.(*tikverr.ErrWriteConflictInLatch); ok { + return kv.ErrWriteConflictInTiDB.FastGenByArgs(e.StartTS) + } + + if e, ok := err.(*tikverr.ErrTxnTooLarge); ok { + return kv.ErrTxnTooLarge.GenWithStackByArgs(e.Size) + } + + if errors.ErrorEqual(err, tikverr.ErrCannotSetNilValue) { + return kv.ErrCannotSetNilValue + } + + if e, ok := err.(*tikverr.ErrEntryTooLarge); ok { + return kv.ErrEntryTooLarge.GenWithStackByArgs(e.Limit, e.Size) + } + + if errors.ErrorEqual(err, tikverr.ErrInvalidTxn) { + return kv.ErrInvalidTxn + } + + if errors.ErrorEqual(err, tikverr.ErrTiKVServerTimeout) { + return ErrTiKVServerTimeout + } + + if e, ok := err.(*tikverr.ErrPDServerTimeout); ok { + if len(e.Error()) == 0 { + return ErrPDServerTimeout + } + return ErrPDServerTimeout.GenWithStackByArgs(e.Error()) + } + + if errors.ErrorEqual(err, tikverr.ErrTiFlashServerTimeout) { + return ErrTiFlashServerTimeout + } + + if errors.ErrorEqual(err, tikverr.ErrQueryInterrupted) { + return ErrQueryInterrupted + } + + if errors.ErrorEqual(err, tikverr.ErrTiKVServerBusy) { + return ErrTiKVServerBusy + } + + if errors.ErrorEqual(err, tikverr.ErrTiFlashServerBusy) { + return ErrTiFlashServerBusy + } + + if e, ok := err.(*tikverr.ErrGCTooEarly); ok { + return ErrGCTooEarly.GenWithStackByArgs(e.TxnStartTS, e.GCSafePoint) + } + + if errors.ErrorEqual(err, tikverr.ErrTiKVStaleCommand) { + return ErrTiKVStaleCommand + } + + if errors.ErrorEqual(err, tikverr.ErrTiKVMaxTimestampNotSynced) { + return ErrTiKVMaxTimestampNotSynced + } + + if errors.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet) { + return ErrLockAcquireFailAndNoWaitSet + } + + if errors.ErrorEqual(err, tikverr.ErrResolveLockTimeout) { + return ErrResolveLockTimeout + } + + if errors.ErrorEqual(err, tikverr.ErrLockWaitTimeout) { + return ErrLockWaitTimeout + } + + if errors.ErrorEqual(err, tikverr.ErrRegionUnavailable) { + return ErrRegionUnavailable + } + + if e, ok := err.(*tikverr.ErrTokenLimit); ok { + return ErrTokenLimit.GenWithStackByArgs(e.StoreID) + } + + if errors.ErrorEqual(err, tikverr.ErrUnknown) { + return ErrUnknown + } + + return errors.Trace(originErr) +} diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index cb14736844e68..398be99520aa6 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/copr" + derr "github.com/pingcap/tidb/store/driver/error" txn_driver "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/gcworker" "github.com/pingcap/tidb/store/tikv" @@ -261,7 +262,7 @@ func (s *tikvStore) StartGCWorker() error { gcWorker, err := gcworker.NewGCWorker(s, s.pdClient) if err != nil { - return txn_driver.ToTiDBErr(err) + return derr.ToTiDBErr(err) } gcWorker.Start() s.gcWorker = gcWorker @@ -286,7 +287,7 @@ func (s *tikvStore) Close() error { } s.coprStore.Close() err := s.KVStore.Close() - return txn_driver.ToTiDBErr(err) + return derr.ToTiDBErr(err) } // GetMemCache return memory manager of the storage @@ -298,7 +299,7 @@ func (s *tikvStore) GetMemCache() kv.MemManager { func (s *tikvStore) Begin() (kv.Transaction, error) { txn, err := s.KVStore.Begin() if err != nil { - return nil, txn_driver.ToTiDBErr(err) + return nil, derr.ToTiDBErr(err) } return txn_driver.NewTiKVTxn(txn), err } @@ -307,7 +308,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { txn, err := s.KVStore.BeginWithOption(option) if err != nil { - return nil, txn_driver.ToTiDBErr(err) + return nil, derr.ToTiDBErr(err) } return txn_driver.NewTiKVTxn(txn), err } @@ -321,7 +322,7 @@ func (s *tikvStore) GetSnapshot(ver kv.Version) kv.Snapshot { // CurrentVersion returns current max committed version with the given txnScope (local or global). func (s *tikvStore) CurrentVersion(txnScope string) (kv.Version, error) { ver, err := s.KVStore.CurrentTimestamp(txnScope) - return kv.NewVersion(ver), txn_driver.ToTiDBErr(err) + return kv.NewVersion(ver), derr.ToTiDBErr(err) } // ShowStatus returns the specified status of the storage diff --git a/store/driver/txn/error.go b/store/driver/txn/error.go index 4c8e770c44ff7..39931357567be 100644 --- a/store/driver/txn/error.go +++ b/store/driver/txn/error.go @@ -25,56 +25,16 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" + derr "github.com/pingcap/tidb/store/driver/error" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/dbterror" "go.uber.org/zap" ) -// tikv error instance -var ( - // ErrTokenLimit is the error that token is up to the limit. - ErrTokenLimit = dbterror.ClassTiKV.NewStd(errno.ErrTiKVStoreLimit) - // ErrTiKVServerTimeout is the error when tikv server is timeout. - ErrTiKVServerTimeout = dbterror.ClassTiKV.NewStd(errno.ErrTiKVServerTimeout) - ErrTiFlashServerTimeout = dbterror.ClassTiKV.NewStd(errno.ErrTiFlashServerTimeout) - // ErrGCTooEarly is the error that GC life time is shorter than transaction duration - ErrGCTooEarly = dbterror.ClassTiKV.NewStd(errno.ErrGCTooEarly) - // ErrTiKVStaleCommand is the error that the command is stale in tikv. - ErrTiKVStaleCommand = dbterror.ClassTiKV.NewStd(errno.ErrTiKVStaleCommand) - // ErrQueryInterrupted is the error when the query is interrupted. - ErrQueryInterrupted = dbterror.ClassTiKV.NewStd(errno.ErrQueryInterrupted) - // ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced. - ErrTiKVMaxTimestampNotSynced = dbterror.ClassTiKV.NewStd(errno.ErrTiKVMaxTimestampNotSynced) - // ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted. - ErrLockAcquireFailAndNoWaitSet = dbterror.ClassTiKV.NewStd(errno.ErrLockAcquireFailAndNoWaitSet) - ErrResolveLockTimeout = dbterror.ClassTiKV.NewStd(errno.ErrResolveLockTimeout) - // ErrLockWaitTimeout is the error that wait for the lock is timeout. - ErrLockWaitTimeout = dbterror.ClassTiKV.NewStd(errno.ErrLockWaitTimeout) - // ErrTiKVServerBusy is the error when tikv server is busy. - ErrTiKVServerBusy = dbterror.ClassTiKV.NewStd(errno.ErrTiKVServerBusy) - // ErrTiFlashServerBusy is the error that tiflash server is busy. - ErrTiFlashServerBusy = dbterror.ClassTiKV.NewStd(errno.ErrTiFlashServerBusy) - // ErrPDServerTimeout is the error when pd server is timeout. - ErrPDServerTimeout = dbterror.ClassTiKV.NewStd(errno.ErrPDServerTimeout) - // ErrRegionUnavailable is the error when region is not available. - ErrRegionUnavailable = dbterror.ClassTiKV.NewStd(errno.ErrRegionUnavailable) - // ErrUnknown is the unknow error. - ErrUnknown = dbterror.ClassTiKV.NewStd(errno.ErrUnknown) -) - -// Registers error returned from TiKV. -var ( - _ = dbterror.ClassTiKV.NewStd(errno.ErrDataOutOfRange) - _ = dbterror.ClassTiKV.NewStd(errno.ErrTruncatedWrongValue) - _ = dbterror.ClassTiKV.NewStd(errno.ErrDivisionByZero) -) - func genKeyExistsError(name string, value string, err error) error { if err != nil { logutil.BgLogger().Info("extractKeyExistsErr meets error", zap.Error(err)) @@ -186,104 +146,7 @@ func extractKeyErr(err error) error { notFoundDetail := prettyLockNotFoundKey(e.Retryable) return kv.ErrTxnRetryable.GenWithStackByArgs(e.Retryable + " " + notFoundDetail) } - return ToTiDBErr(err) -} - -// ToTiDBErr checks and converts a tikv error to a tidb error. -func ToTiDBErr(err error) error { - originErr := err - if err == nil { - return nil - } - err = errors.Cause(err) - if tikverr.IsErrNotFound(err) { - return kv.ErrNotExist - } - - if e, ok := err.(*tikverr.ErrWriteConflictInLatch); ok { - return kv.ErrWriteConflictInTiDB.FastGenByArgs(e.StartTS) - } - - if e, ok := err.(*tikverr.ErrTxnTooLarge); ok { - return kv.ErrTxnTooLarge.GenWithStackByArgs(e.Size) - } - - if errors.ErrorEqual(err, tikverr.ErrCannotSetNilValue) { - return kv.ErrCannotSetNilValue - } - - if e, ok := err.(*tikverr.ErrEntryTooLarge); ok { - return kv.ErrEntryTooLarge.GenWithStackByArgs(e.Limit, e.Size) - } - - if errors.ErrorEqual(err, tikverr.ErrInvalidTxn) { - return kv.ErrInvalidTxn - } - - if errors.ErrorEqual(err, tikverr.ErrTiKVServerTimeout) { - return ErrTiKVServerTimeout - } - - if e, ok := err.(*tikverr.ErrPDServerTimeout); ok { - if len(e.Error()) == 0 { - return ErrPDServerTimeout - } - return ErrPDServerTimeout.GenWithStackByArgs(e.Error()) - } - - if errors.ErrorEqual(err, tikverr.ErrTiFlashServerTimeout) { - return ErrTiFlashServerTimeout - } - - if errors.ErrorEqual(err, tikverr.ErrQueryInterrupted) { - return ErrQueryInterrupted - } - - if errors.ErrorEqual(err, tikverr.ErrTiKVServerBusy) { - return ErrTiKVServerBusy - } - - if errors.ErrorEqual(err, tikverr.ErrTiFlashServerBusy) { - return ErrTiFlashServerBusy - } - - if e, ok := err.(*tikverr.ErrGCTooEarly); ok { - return ErrGCTooEarly.GenWithStackByArgs(e.TxnStartTS, e.GCSafePoint) - } - - if errors.ErrorEqual(err, tikverr.ErrTiKVStaleCommand) { - return ErrTiKVStaleCommand - } - - if errors.ErrorEqual(err, tikverr.ErrTiKVMaxTimestampNotSynced) { - return ErrTiKVMaxTimestampNotSynced - } - - if errors.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet) { - return ErrLockAcquireFailAndNoWaitSet - } - - if errors.ErrorEqual(err, tikverr.ErrResolveLockTimeout) { - return ErrResolveLockTimeout - } - - if errors.ErrorEqual(err, tikverr.ErrLockWaitTimeout) { - return ErrLockWaitTimeout - } - - if errors.ErrorEqual(err, tikverr.ErrRegionUnavailable) { - return ErrRegionUnavailable - } - - if e, ok := err.(*tikverr.ErrTokenLimit); ok { - return ErrTokenLimit.GenWithStackByArgs(e.StoreID) - } - - if errors.ErrorEqual(err, tikverr.ErrUnknown) { - return ErrUnknown - } - - return errors.Trace(originErr) + return derr.ToTiDBErr(err) } func newWriteConflictError(conflict *kvrpcpb.WriteConflict) error { diff --git a/store/driver/txn/snapshot.go b/store/driver/txn/snapshot.go index a2d407738e358..ee1d1eeee29d8 100644 --- a/store/driver/txn/snapshot.go +++ b/store/driver/txn/snapshot.go @@ -17,7 +17,9 @@ import ( "context" "unsafe" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" + derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" ) @@ -48,7 +50,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { scanner, err := s.KVSnapshot.Iter(k, upperBound) if err != nil { - return nil, ToTiDBErr(err) + return nil, derr.ToTiDBErr(err) } return &tikvScanner{scanner.(*tikv.Scanner)}, err } @@ -57,7 +59,7 @@ func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { scanner, err := s.KVSnapshot.IterReverse(k) if err != nil { - return nil, ToTiDBErr(err) + return nil, derr.ToTiDBErr(err) } return &tikvScanner{scanner.(*tikv.Scanner)}, err } @@ -73,10 +75,16 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) { s.KVSnapshot.SetNotFillCache(val.(bool)) case tikvstore.SnapshotTS: s.KVSnapshot.SetSnapshotTS(val.(uint64)) + case tikvstore.ReplicaRead: + s.KVSnapshot.SetReplicaRead(val.(tikvstore.ReplicaReadType)) case tikvstore.SampleStep: s.KVSnapshot.SetSampleStep(val.(uint32)) case tikvstore.TaskID: s.KVSnapshot.SetTaskID(val.(uint64)) + case tikvstore.IsStalenessReadOnly: + s.KVSnapshot.SetIsStatenessReadOnly(val.(bool)) + case tikvstore.MatchStoreLabels: + s.KVSnapshot.SetMatchStoreLabels(val.([]*metapb.StoreLabel)) default: s.KVSnapshot.SetOption(opt, val) } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index a3ad26a7d7f8b..12f2c8233ccb1 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -19,9 +19,11 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/binloginfo" + derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" tikverr "github.com/pingcap/tidb/store/tikv/error" tikvstore "github.com/pingcap/tidb/store/tikv/kv" @@ -75,7 +77,7 @@ func (txn *tikvTxn) GetSnapshot() kv.Snapshot { // The Iterator must be Closed after use. func (txn *tikvTxn) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { it, err := txn.KVTxn.Iter(k, upperBound) - return newKVIterator(it), ToTiDBErr(err) + return newKVIterator(it), derr.ToTiDBErr(err) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. @@ -84,7 +86,7 @@ func (txn *tikvTxn) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { // TODO: Add lower bound limit func (txn *tikvTxn) IterReverse(k kv.Key) (kv.Iterator, error) { it, err := txn.KVTxn.IterReverse(k) - return newKVIterator(it), ToTiDBErr(err) + return newKVIterator(it), derr.ToTiDBErr(err) } // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. @@ -101,17 +103,17 @@ func (txn *tikvTxn) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]b func (txn *tikvTxn) Delete(k kv.Key) error { err := txn.KVTxn.Delete(k) - return ToTiDBErr(err) + return derr.ToTiDBErr(err) } func (txn *tikvTxn) Get(ctx context.Context, k kv.Key) ([]byte, error) { data, err := txn.KVTxn.Get(ctx, k) - return data, ToTiDBErr(err) + return data, derr.ToTiDBErr(err) } func (txn *tikvTxn) Set(k kv.Key, v []byte) error { err := txn.KVTxn.Set(k, v) - return ToTiDBErr(err) + return derr.ToTiDBErr(err) } func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer { @@ -144,6 +146,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.SetPessimistic(val.(bool)) case tikvstore.SnapshotTS: txn.KVTxn.GetSnapshot().SetSnapshotTS(val.(uint64)) + case tikvstore.ReplicaRead: + txn.KVTxn.GetSnapshot().SetReplicaRead(val.(tikvstore.ReplicaReadType)) case tikvstore.TaskID: txn.KVTxn.GetSnapshot().SetTaskID(val.(uint64)) case tikvstore.InfoSchema: @@ -154,10 +158,16 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.GetSnapshot().SetSampleStep(val.(uint32)) case tikvstore.CommitHook: txn.SetCommitCallback(val.(func(string, error))) + case tikvstore.EnableAsyncCommit: + txn.SetEnableAsyncCommit(val.(bool)) case tikvstore.Enable1PC: txn.SetEnable1PC(val.(bool)) case tikvstore.TxnScope: txn.SetScope(val.(string)) + case tikvstore.IsStalenessReadOnly: + txn.KVTxn.GetSnapshot().SetIsStatenessReadOnly(val.(bool)) + case tikvstore.MatchStoreLabels: + txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) default: txn.KVTxn.SetOption(opt, val) } diff --git a/store/driver/txn/unionstore_driver.go b/store/driver/txn/unionstore_driver.go index 9db2325a0148f..5a2f56bfe4233 100644 --- a/store/driver/txn/unionstore_driver.go +++ b/store/driver/txn/unionstore_driver.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/tidb/kv" + derr "github.com/pingcap/tidb/store/driver/error" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/unionstore" ) @@ -39,17 +40,17 @@ func (m *memBuffer) Delete(k kv.Key) error { func (m *memBuffer) DeleteWithFlags(k kv.Key, ops ...tikvstore.FlagsOp) error { err := m.MemDB.DeleteWithFlags(k, ops...) - return ToTiDBErr(err) + return derr.ToTiDBErr(err) } func (m *memBuffer) Get(_ context.Context, key kv.Key) ([]byte, error) { data, err := m.MemDB.Get(key) - return data, ToTiDBErr(err) + return data, derr.ToTiDBErr(err) } func (m *memBuffer) GetFlags(key kv.Key) (tikvstore.KeyFlags, error) { data, err := m.MemDB.GetFlags(key) - return data, ToTiDBErr(err) + return data, derr.ToTiDBErr(err) } func (m *memBuffer) Staging() kv.StagingHandle { @@ -73,12 +74,12 @@ func (m *memBuffer) InspectStage(handle kv.StagingHandle, f func(kv.Key, tikvsto func (m *memBuffer) Set(key kv.Key, value []byte) error { err := m.MemDB.Set(key, value) - return ToTiDBErr(err) + return derr.ToTiDBErr(err) } func (m *memBuffer) SetWithFlags(key kv.Key, value []byte, ops ...kv.FlagsOp) error { err := m.MemDB.SetWithFlags(key, value, ops...) - return ToTiDBErr(err) + return derr.ToTiDBErr(err) } // Iter creates an Iterator positioned on the first entry that k <= entry's key. @@ -87,7 +88,7 @@ func (m *memBuffer) SetWithFlags(key kv.Key, value []byte, ops ...kv.FlagsOp) er // The Iterator must be Closed after use. func (m *memBuffer) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { it, err := m.MemDB.Iter(k, upperBound) - return &tikvIterator{Iterator: it}, ToTiDBErr(err) + return &tikvIterator{Iterator: it}, derr.ToTiDBErr(err) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. @@ -96,7 +97,7 @@ func (m *memBuffer) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { // TODO: Add lower bound limit func (m *memBuffer) IterReverse(k kv.Key) (kv.Iterator, error) { it, err := m.MemDB.IterReverse(k) - return &tikvIterator{Iterator: it}, ToTiDBErr(err) + return &tikvIterator{Iterator: it}, derr.ToTiDBErr(err) } // SnapshotIter returns a Iterator for a snapshot of MemBuffer. @@ -121,7 +122,7 @@ func (u *tikvUnionStore) GetMemBuffer() kv.MemBuffer { func (u *tikvUnionStore) Get(ctx context.Context, k kv.Key) ([]byte, error) { data, err := u.KVUnionStore.Get(ctx, k) - return data, ToTiDBErr(err) + return data, derr.ToTiDBErr(err) } func (u *tikvUnionStore) HasPresumeKeyNotExists(k kv.Key) bool { @@ -134,7 +135,7 @@ func (u *tikvUnionStore) UnmarkPresumeKeyNotExists(k kv.Key) { func (u *tikvUnionStore) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { it, err := u.KVUnionStore.Iter(k, upperBound) - return newKVIterator(it), ToTiDBErr(err) + return newKVIterator(it), derr.ToTiDBErr(err) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. @@ -143,7 +144,7 @@ func (u *tikvUnionStore) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) // TODO: Add lower bound limit func (u *tikvUnionStore) IterReverse(k kv.Key) (kv.Iterator, error) { it, err := u.KVUnionStore.IterReverse(k) - return newKVIterator(it), ToTiDBErr(err) + return newKVIterator(it), derr.ToTiDBErr(err) } type tikvGetter struct { @@ -156,7 +157,7 @@ func newKVGetter(getter unionstore.Getter) kv.Getter { func (g *tikvGetter) Get(_ context.Context, k kv.Key) ([]byte, error) { data, err := g.Getter.Get(k) - return data, ToTiDBErr(err) + return data, derr.ToTiDBErr(err) } // tikvIterator wraps unionstore.Iterator as kv.Iterator diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 8703b1861c65d..ee94eceec166a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -825,12 +825,10 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool { return false } - enableAsyncCommitOption := c.txn.us.GetOption(kv.EnableAsyncCommit) - enableAsyncCommit := enableAsyncCommitOption != nil && enableAsyncCommitOption.(bool) asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit // TODO the keys limit need more tests, this value makes the unit test pass by now. // Async commit is not compatible with Binlog because of the non unique timestamp issue. - if c.sessionID > 0 && enableAsyncCommit && + if c.sessionID > 0 && c.txn.enableAsyncCommit && uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit && !c.shouldWriteBinlog() { totalKeySize := uint64(0) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 56bcdf4dd5a82..ae65f15dc18e6 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -565,32 +565,16 @@ func (s *KVSnapshot) IterReverse(k []byte) (unionstore.Iterator, error) { // value of this option. Only ReplicaRead is supported for snapshot func (s *KVSnapshot) SetOption(opt int, val interface{}) { switch opt { - case kv.ReplicaRead: - s.mu.Lock() - s.mu.replicaRead = val.(kv.ReplicaReadType) - s.mu.Unlock() case kv.CollectRuntimeStats: s.mu.Lock() s.mu.stats = val.(*SnapshotRuntimeStats) s.mu.Unlock() - case kv.IsStalenessReadOnly: - s.mu.Lock() - s.mu.isStaleness = val.(bool) - s.mu.Unlock() - case kv.MatchStoreLabels: - s.mu.Lock() - s.mu.matchStoreLabels = val.([]*metapb.StoreLabel) - s.mu.Unlock() } } // DelOption deletes an option. func (s *KVSnapshot) DelOption(opt int) { switch opt { - case kv.ReplicaRead: - s.mu.Lock() - s.mu.replicaRead = kv.ReplicaReadLeader - s.mu.Unlock() case kv.CollectRuntimeStats: s.mu.Lock() s.mu.stats = nil @@ -609,6 +593,13 @@ func (s *KVSnapshot) SetKeyOnly(b bool) { s.keyOnly = b } +// SetReplicaRead sets up the replica read type. +func (s *KVSnapshot) SetReplicaRead(readType kv.ReplicaReadType) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.replicaRead = readType +} + // SetIsolationLevel sets the isolation level used to scan data from tikv. func (s *KVSnapshot) SetIsolationLevel(level IsoLevel) { s.isolationLevel = level @@ -632,6 +623,20 @@ func (s *KVSnapshot) SetTaskID(id uint64) { s.mu.taskID = id } +// SetIsStatenessReadOnly indicates whether the transaction is staleness read only transaction +func (s *KVSnapshot) SetIsStatenessReadOnly(b bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.isStaleness = b +} + +// SetMatchStoreLabels sets up labels to filter target stores. +func (s *KVSnapshot) SetMatchStoreLabels(labels []*metapb.StoreLabel) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.matchStoreLabels = labels +} + // SnapCacheHitCount gets the snapshot cache hit count. Only for test. func (s *KVSnapshot) SnapCacheHitCount() int { return int(atomic.LoadInt64(&s.mu.hitCnt)) diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index d1e635f205efa..5589752043b2b 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -105,7 +105,7 @@ func (s *testCommitterSuite) begin(c *C) tikv.TxnProbe { func (s *testCommitterSuite) beginAsyncCommit(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) return txn } diff --git a/store/tikv/tests/async_commit_test.go b/store/tikv/tests/async_commit_test.go index 0f4985fa7ab86..381771bfa0836 100644 --- a/store/tikv/tests/async_commit_test.go +++ b/store/tikv/tests/async_commit_test.go @@ -134,7 +134,7 @@ func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) tikv.T func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) return tikv.TxnProbe{KVTxn: txn} } @@ -160,7 +160,7 @@ func (s *testAsyncCommitSuite) SetUpTest(c *C) { func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) for i, k := range keys { if len(values[i]) > 0 { err = txn.Set(k, values[i]) diff --git a/store/tikv/tests/snapshot_fail_test.go b/store/tikv/tests/snapshot_fail_test.go index 1360841bd743a..aca3c59099cf7 100644 --- a/store/tikv/tests/snapshot_fail_test.go +++ b/store/tikv/tests/snapshot_fail_test.go @@ -152,6 +152,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL", "return"), IsNil) @@ -181,7 +182,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { // Prewrite k1 and k2 again without committing them txn, err = s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) err = txn.Set([]byte("k1"), []byte("v3")) c.Assert(err, IsNil) err = txn.Set([]byte("k2"), []byte("v4")) @@ -210,7 +211,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { c.Assert(txn.Set([]byte("k1"), []byte("v1")), IsNil) err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, false) + txn.SetEnableAsyncCommit(false) txn.SetEnable1PC(false) txn.SetOption(kv.GuaranteeLinearizability, false) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 47ccdce12caea..a8c0f70f8da8d 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -82,6 +82,7 @@ type KVTxn struct { syncLog bool priority Priority isPessimistic bool + enableAsyncCommit bool enable1PC bool scope string kvFilter KVFilter @@ -272,6 +273,11 @@ func (txn *KVTxn) SetCommitCallback(f func(string, error)) { txn.commitCallback = f } +// SetEnableAsyncCommit indicates if the transaction will try to use async commit. +func (txn *KVTxn) SetEnableAsyncCommit(b bool) { + txn.enableAsyncCommit = b +} + // SetEnable1PC indicates if the transaction will try to use 1 phase commit. func (txn *KVTxn) SetEnable1PC(b bool) { txn.enable1PC = b diff --git a/tidb-server/main.go b/tidb-server/main.go index f070d2eeec48d..3e2351bf7c352 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -572,7 +572,7 @@ func setGlobalVars() { kvcache.GlobalLRUMemUsageTracker.AttachToGlobalTracker(executor.GlobalMemoryUsageTracker) t, err := time.ParseDuration(cfg.TiKVClient.StoreLivenessTimeout) - if err != nil { + if err != nil || t < 0 { logutil.BgLogger().Fatal("invalid duration value for store-liveness-timeout", zap.String("currentValue", cfg.TiKVClient.StoreLivenessTimeout)) } diff --git a/util/processinfo.go b/util/processinfo.go index 29716d914c3de..ebbf17094b80d 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/execdetails" @@ -161,6 +162,7 @@ func serverStatus2Str(state uint16) string { // kill statement rely on this interface. type SessionManager interface { ShowProcessList() map[uint64]*ProcessInfo + ShowTxnList() []*txninfo.TxnInfo GetProcessInfo(id uint64) (*ProcessInfo, bool) Kill(connectionID uint64, query bool) KillAllConnections()