diff --git a/.golangci_br.yml b/.golangci_br.yml index 28bbba74f749f..835a88488e7a3 100644 --- a/.golangci_br.yml +++ b/.golangci_br.yml @@ -26,7 +26,6 @@ linters: - exhaustivestruct - exhaustive - godot - - gosec - errorlint - wrapcheck - gomoddirectives @@ -81,3 +80,11 @@ linters-settings: issues: exclude-rules: + - path: br/tests/ + linters: + - gosec + - errcheck + - path: _test\.go + linters: + - gosec + diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 45fd0ab664f50..658ad77d51d08 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -79,7 +79,7 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error for _, col := range cols { if mysql.HasPriKeyFlag(col.Flag) { incrementalBits := autoRandomIncrementBits(col, int(meta.AutoRandomBits)) - autoRandomBits := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63n(1< 0 { - rd := rand.New(rand.NewSource(options.AutoRandomSeed)) + rd := rand.New(rand.NewSource(options.AutoRandomSeed)) // nolint:gosec mask := int64(1)<= 0 { tableName := engine[:index] - engineID, err := strconv.Atoi(engine[index+1:]) + engineID, err := strconv.Atoi(engine[index+1:]) // nolint:gosec if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 49358a9aee102..58d8c59966a6b 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -180,7 +180,7 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 } needAutoID := common.TableHasAutoRowID(m.tr.tableInfo.Core) || m.tr.tableInfo.Core.GetAutoIncrementColInfo() != nil || m.tr.tableInfo.Core.ContainsAutoRandomBits() err = exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName) // nolint:gosec rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID) if err != nil { return errors.Trace(err) @@ -381,6 +381,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks needChecksum = true needRemoteDupe = true err = exec.Transact(ctx, "checksum pre-check", func(ctx context.Context, tx *sql.Tx) error { + // nolint:gosec query := fmt.Sprintf("SELECT task_id, total_kvs_base, total_bytes_base, checksum_base, total_kvs, total_bytes, checksum, status, has_duplicates from %s WHERE table_id = ? FOR UPDATE", m.tableName) rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID) if err != nil { @@ -593,7 +594,7 @@ func (m *dbTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { // avoid override existing metadata if the meta is already inserted. exist := false err := exec.Transact(ctx, "check whether this task has started before", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID) + query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID) // nolint:gosec rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -635,7 +636,7 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t return errors.Annotate(err, "enable pessimistic transaction failed") } return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName) // nolint:gosec rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task metas failed") @@ -695,7 +696,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U paused := false var pausedCfg storedCfgs err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) // nolint:gosec rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -821,7 +822,7 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool switchBack := true allFinished := finished err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) // nolint:gosec rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") diff --git a/br/pkg/mock/mock_cluster.go b/br/pkg/mock/mock_cluster.go index d1ece26505d05..387887a7f1b12 100644 --- a/br/pkg/mock/mock_cluster.go +++ b/br/pkg/mock/mock_cluster.go @@ -207,7 +207,7 @@ func waitUntilServerOnline(addr string, statusPort uint) string { // connect http status statusURL := fmt.Sprintf("http://127.0.0.1:%d/status", statusPort) for retry = 0; retry < retryTime; retry++ { - resp, err := http.Get(statusURL) // nolint:noctx + resp, err := http.Get(statusURL) // nolint:noctx,gosec if err == nil { // Ignore errors. _, _ = io.ReadAll(resp.Body) diff --git a/br/pkg/storage/hdfs.go b/br/pkg/storage/hdfs.go index cbcc24088292f..d2b3d996047ce 100644 --- a/br/pkg/storage/hdfs.go +++ b/br/pkg/storage/hdfs.go @@ -49,6 +49,7 @@ func dfsCommand(args ...string) (*exec.Cmd, error) { } cmd = append(cmd, bin, "dfs") cmd = append(cmd, args...) + //nolint:gosec return exec.Command(cmd[0], cmd[1:]...), nil } diff --git a/br/pkg/utils/pprof.go b/br/pkg/utils/pprof.go index 684d974174d7d..efa25389b80d8 100644 --- a/br/pkg/utils/pprof.go +++ b/br/pkg/utils/pprof.go @@ -11,7 +11,7 @@ import ( // #nosec // register HTTP handler for /debug/pprof "net/http" - _ "net/http/pprof" + _ "net/http/pprof" // nolint:gosec "github.com/pingcap/errors" "github.com/pingcap/failpoint" diff --git a/cmd/explaintest/r/explain_indexmerge.result b/cmd/explaintest/r/explain_indexmerge.result index 83bc89a593e7c..40e0cb4a1159d 100644 --- a/cmd/explaintest/r/explain_indexmerge.result +++ b/cmd/explaintest/r/explain_indexmerge.result @@ -6,19 +6,23 @@ create index td on t (d); load stats 's/explain_indexmerge_stats_t.json'; explain format = 'brief' select * from t where a < 50 or b < 50; id estRows task access object operator info -TableReader 98.00 root data:Selection -└─Selection 98.00 cop[tikv] or(lt(test.t.a, 50), lt(test.t.b, 50)) - └─TableFullScan 5000000.00 cop[tikv] table:t keep order:false +IndexMerge 98.00 root +├─TableRangeScan(Build) 49.00 cop[tikv] table:t range:[-inf,50), keep order:false +├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false +└─TableRowIDScan(Probe) 98.00 cop[tikv] table:t keep order:false explain format = 'brief' select * from t where (a < 50 or b < 50) and f > 100; id estRows task access object operator info -TableReader 98.00 root data:Selection -└─Selection 98.00 cop[tikv] gt(test.t.f, 100), or(lt(test.t.a, 50), lt(test.t.b, 50)) - └─TableFullScan 5000000.00 cop[tikv] table:t keep order:false +IndexMerge 98.00 root +├─TableRangeScan(Build) 49.00 cop[tikv] table:t range:[-inf,50), keep order:false +├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false +└─Selection(Probe) 98.00 cop[tikv] gt(test.t.f, 100) + └─TableRowIDScan 98.00 cop[tikv] table:t keep order:false explain format = 'brief' select * from t where b < 50 or c < 50; id estRows task access object operator info -TableReader 98.00 root data:Selection -└─Selection 98.00 cop[tikv] or(lt(test.t.b, 50), lt(test.t.c, 50)) - └─TableFullScan 5000000.00 cop[tikv] table:t keep order:false +IndexMerge 98.00 root +├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false +├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tc(c) range:[-inf,50), keep order:false +└─TableRowIDScan(Probe) 98.00 cop[tikv] table:t keep order:false set session tidb_enable_index_merge = on; explain format = 'brief' select * from t where a < 50 or b < 50; id estRows task access object operator info diff --git a/domain/domain.go b/domain/domain.go index d3a9664b97fe7..3560b865d2cea 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -50,6 +50,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/telemetry" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/domainutil" @@ -89,6 +90,7 @@ type Domain struct { cancel context.CancelFunc indexUsageSyncLease time.Duration planReplayer *planReplayer + expiredTimeStamp4PC types.Time serverID uint64 serverIDSession *concurrency.Session @@ -335,6 +337,22 @@ func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) { return meta.NewSnapshotMeta(snapshot), nil } +// ExpiredTimeStamp4PC gets expiredTimeStamp4PC from domain. +func (do *Domain) ExpiredTimeStamp4PC() types.Time { + do.m.Lock() + defer do.m.Unlock() + + return do.expiredTimeStamp4PC +} + +// SetExpiredTimeStamp4PC sets the expiredTimeStamp4PC from domain. +func (do *Domain) SetExpiredTimeStamp4PC(time types.Time) { + do.m.Lock() + defer do.m.Unlock() + + do.expiredTimeStamp4PC = time +} + // DDL gets DDL from domain. func (do *Domain) DDL() ddl.DDL { return do.ddl @@ -712,6 +730,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease}, onClose: onClose, renewLeaseCh: make(chan func(), 10), + expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp), } do.SchemaValidator = NewSchemaValidator(ddlLease, do) diff --git a/executor/simple.go b/executor/simple.go index ee6932ad1dc91..388269ce8120e 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" @@ -160,7 +161,7 @@ func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { case *ast.ShutdownStmt: err = e.executeShutdown(x) case *ast.AdminStmt: - err = e.executeAdminReloadStatistics(x) + err = e.executeAdmin(x) } e.done = true return err @@ -1659,6 +1660,16 @@ func asyncDelayShutdown(p *os.Process, delay time.Duration) { } } +func (e *SimpleExec) executeAdmin(s *ast.AdminStmt) error { + switch s.Tp { + case ast.AdminReloadStatistics: + return e.executeAdminReloadStatistics(s) + case ast.AdminFlushPlanCache: + return e.executeAdminFlushPlanCache(s) + } + return nil +} + func (e *SimpleExec) executeAdminReloadStatistics(s *ast.AdminStmt) error { if s.Tp != ast.AdminReloadStatistics { return errors.New("This AdminStmt is not ADMIN RELOAD STATS_EXTENDED") @@ -1668,3 +1679,25 @@ func (e *SimpleExec) executeAdminReloadStatistics(s *ast.AdminStmt) error { } return domain.GetDomain(e.ctx).StatsHandle().ReloadExtendedStatistics() } + +func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error { + if s.Tp != ast.AdminFlushPlanCache { + return errors.New("This AdminStmt is not ADMIN FLUSH PLAN_CACHE") + } + if s.StatementScope == ast.StatementScopeGlobal { + return errors.New("Do not support the 'admin flush global scope.'") + } + if !plannercore.PreparedPlanCacheEnabled() { + e.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("The plan cache is disable. So there no need to flush the plan cache")) + return nil + } + now := types.NewTime(types.FromGoTime(time.Now().In(e.ctx.GetSessionVars().StmtCtx.TimeZone)), mysql.TypeTimestamp, 3) + e.ctx.GetSessionVars().LastUpdateTime4PC = now + e.ctx.PreparedPlanCache().DeleteAll() + if s.StatementScope == ast.StatementScopeInstance { + // Record the timestamp. When other sessions want to use the plan cache, + // it will check the timestamp first to decide whether the plan cache should be flushed. + domain.GetDomain(e.ctx).SetExpiredTimeStamp4PC(now) + } + return nil +} diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index d42d32e5ea111..8c0911ef43393 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -294,6 +294,16 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont } prepared.SchemaVersion = is.SchemaMetaVersion() } + // If the lastUpdateTime less than expiredTimeStamp4PC, + // it means other sessions have executed 'admin flush instance plan_cache'. + // So we need to clear the current session's plan cache. + // And update lastUpdateTime to the newest one. + expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC() + if prepared.UseCache && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 { + sctx.PreparedPlanCache().DeleteAll() + prepared.CachedPlan = nil + vars.LastUpdateTime4PC = expiredTimeStamp4PC + } err = e.getPhysicalPlan(ctx, sctx, is, preparedObj) if err != nil { return err @@ -401,6 +411,7 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context, } stmtCtx.UseCache = prepared.UseCache + var bindSQL string if prepared.UseCache { bindSQL = GetBindSQL4PlanCache(sctx, prepared.Stmt) cacheKey = NewPlanCacheKey(sctx.GetSessionVars(), e.ExecID, prepared.SchemaVersion) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 38048d4d30009..30c5a205ab257 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -2203,6 +2203,10 @@ func (s *testIntegrationSuite) TestOptimizeHintOnPartitionTable(c *C) { partition p1 values less than(11), partition p2 values less than(16));`) tk.MustExec(`insert into t values (1,1,"1"), (2,2,"2"), (8,8,"8"), (11,11,"11"), (15,15,"15")`) + tk.MustExec("set @@tidb_enable_index_merge = off") + defer func() { + tk.MustExec("set @@tidb_enable_index_merge = on") + }() // Create virtual tiflash replica info. dom := domain.GetDomain(tk.Se) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index b830d26da025d..b91db6e0e995a 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1380,6 +1380,8 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan, return &AdminResetTelemetryID{}, nil case ast.AdminReloadStatistics: return &Simple{Statement: as}, nil + case ast.AdminFlushPlanCache: + return &Simple{Statement: as}, nil default: return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as) } diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index d633896e7b718..637fdf6396cf9 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -51,6 +51,208 @@ type testPrepareSuite struct { type testPrepareSerialSuite struct { } +func (s *testPrepareSerialSuite) TestFlushPlanCache(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + tk2 := testkit.NewTestKit(c, store) + orgEnable := core.PreparedPlanCacheEnabled() + defer func() { + dom.Close() + err = store.Close() + c.Assert(err, IsNil) + core.SetPreparedPlanCache(orgEnable) + }() + core.SetPreparedPlanCache(true) + tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + c.Assert(err, IsNil) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t1(id int, a int, b int, key(a))") + tk.MustExec("create table t2(id int, a int, b int, key(a))") + tk.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';") + tk.MustExec("execute stmt1;") + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("prepare stmt2 from 'SELECT * from t1';") + tk.MustExec("execute stmt2;") + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';") + tk.MustExec("execute stmt3;") + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk2.MustExec("use test") + tk2.MustExec("drop table if exists t1") + tk2.MustExec("drop table if exists t2") + tk2.MustExec("create table t1(id int, a int, b int, key(a))") + tk2.MustExec("create table t2(id int, a int, b int, key(a))") + tk2.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';") + tk2.MustExec("execute stmt1;") + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk2.MustExec("prepare stmt2 from 'SELECT * from t1';") + tk2.MustExec("execute stmt2;") + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk2.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';") + tk2.MustExec("execute stmt3;") + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("admin flush session plan_cache;") + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk2.MustExec("admin flush instance plan_cache;") + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + err = tk.ExecToErr("admin flush global plan_cache;") + c.Check(err.Error(), Equals, "Do not support the 'admin flush global scope.'") +} + +func (s *testPrepareSerialSuite) TestFlushPlanCacheWithoutPCEnable(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + tk2 := testkit.NewTestKit(c, store) + orgEnable := core.PreparedPlanCacheEnabled() + defer func() { + dom.Close() + err = store.Close() + c.Assert(err, IsNil) + core.SetPreparedPlanCache(orgEnable) + }() + core.SetPreparedPlanCache(false) + tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + c.Assert(err, IsNil) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t1(id int, a int, b int, key(a))") + tk.MustExec("create table t2(id int, a int, b int, key(a))") + tk.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';") + tk.MustExec("execute stmt1;") + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("prepare stmt2 from 'SELECT * from t1';") + tk.MustExec("execute stmt2;") + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';") + tk.MustExec("execute stmt3;") + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("use test") + tk2.MustExec("drop table if exists t1") + tk2.MustExec("drop table if exists t2") + tk2.MustExec("create table t1(id int, a int, b int, key(a))") + tk2.MustExec("create table t2(id int, a int, b int, key(a))") + tk2.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';") + tk2.MustExec("execute stmt1;") + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("prepare stmt2 from 'SELECT * from t1';") + tk2.MustExec("execute stmt2;") + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';") + tk2.MustExec("execute stmt3;") + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("admin flush session plan_cache;") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1105 The plan cache is disable. So there no need to flush the plan cache")) + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("admin flush instance plan_cache;") + tk2.MustQuery("show warnings;").Check(testkit.Rows("Warning 1105 The plan cache is disable. So there no need to flush the plan cache")) + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + err = tk.ExecToErr("admin flush global plan_cache;") + c.Check(err.Error(), Equals, "Do not support the 'admin flush global scope.'") +} + func (s *testPrepareSerialSuite) TestPrepareCache(c *C) { defer testleak.AfterTest(c)() store, dom, err := newStoreWithBootstrap() diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index f70d1eb7e5550..013d86e227342 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -286,9 +286,9 @@ }, { "SQL": "select /*+ USE_INDEX_MERGE(t1, c_d_e, f_g) */ * from t where c < 1 or f > 2", - "Best": "TableReader(Table(t)->Sel([or(lt(test.t.c, 1), gt(test.t.f, 2))]))", + "Best": "IndexMergeReader(PartialPlans->[Index(t.c_d_e)[[-inf,1)], Index(t.f)[(2,+inf]]], TablePlan->Table(t))", "HasWarn": true, - "Hints": "use_index(@`sel_1` `test`.`t` )" + "Hints": "use_index_merge(@`sel_1` `t` `c_d_e`, `f`)" }, { "SQL": "select /*+ NO_INDEX_MERGE(), USE_INDEX_MERGE(t, primary, f_g, c_d_e) */ * from t where a < 1 or f > 2", @@ -304,15 +304,15 @@ }, { "SQL": "select /*+ USE_INDEX_MERGE(db2.t) */ * from t where c < 1 or f > 2", - "Best": "TableReader(Table(t)->Sel([or(lt(test.t.c, 1), gt(test.t.f, 2))]))", + "Best": "IndexMergeReader(PartialPlans->[Index(t.c_d_e)[[-inf,1)], Index(t.f)[(2,+inf]]], TablePlan->Table(t))", "HasWarn": true, - "Hints": "use_index(@`sel_1` `test`.`t` )" + "Hints": "use_index_merge(@`sel_1` `t` `c_d_e`, `f`)" }, { "SQL": "select /*+ USE_INDEX_MERGE(db2.t, c_d_e, f_g) */ * from t where c < 1 or f > 2", - "Best": "TableReader(Table(t)->Sel([or(lt(test.t.c, 1), gt(test.t.f, 2))]))", + "Best": "IndexMergeReader(PartialPlans->[Index(t.c_d_e)[[-inf,1)], Index(t.f)[(2,+inf]]], TablePlan->Table(t))", "HasWarn": true, - "Hints": "use_index(@`sel_1` `test`.`t` )" + "Hints": "use_index_merge(@`sel_1` `t` `c_d_e`, `f`)" } ] }, diff --git a/planner/optimize.go b/planner/optimize.go index 028474a373e0e..73c2b137a2b38 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -152,23 +152,9 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in if !ok { useBinding = false } - - var ( - bindRecord *bindinfo.BindRecord - scope string - err error - ) - if useBinding { - bindRecord, scope, err = getBindRecord(sctx, stmtNode) - if err != nil || bindRecord == nil || len(bindRecord.Bindings) == 0 { - useBinding = false - } - } - if ok { - // add the extra Limit after matching the bind record - stmtNode = plannercore.TryAddExtraLimit(sctx, stmtNode) - node = stmtNode - + bindRecord, scope, match := matchSQLBinding(sctx, stmtNode) + if !match { + useBinding = false } if ok { // add the extra Limit after matching the bind record diff --git a/session/bootstrap.go b/session/bootstrap.go index 53f2e95c164a5..1fdb93b58ff0e 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -538,14 +538,17 @@ const ( version78 = 78 // version79 adds the mysql.table_cache_meta table version79 = 79 - + // version80 fixes the issue https://github.com/pingcap/tidb/issues/25422. + // If the TiDB upgrading from the 4.x to a newer version, we keep the tidb_analyze_version to 1. + version80 = 80 + // version81 insert "tidb_enable_index_merge|off" to mysql.GLOBAL_VARIABLES if there is no tidb_enable_index_merge. + // This will only happens when we upgrade a cluster before 4.0.0 to 4.0.0+. + version81 = 81 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version - -var currentBootstrapVersion int64 = version79 - +var currentBootstrapVersion int64 = version81 var ( bootstrapVersion = []func(Session, int64){ @@ -628,7 +631,8 @@ var ( upgradeToVer77, upgradeToVer78, upgradeToVer79, - + upgradeToVer80, + upgradeToVer81, } ) @@ -1634,6 +1638,49 @@ func upgradeToVer79(s Session, ver int64) { doReentrantDDL(s, CreateTableCacheMetaTable) } +func upgradeToVer80(s Session, ver int64) { + if ver >= version80 { + return + } + // Check if tidb_analyze_version exists in mysql.GLOBAL_VARIABLES. + // If not, insert "tidb_analyze_version | 1" since this is the old behavior before we introduce this variable. + ctx := context.Background() + rs, err := s.ExecuteInternal(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?;", + mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBAnalyzeVersion) + terror.MustNil(err) + req := rs.NewChunk(nil) + err = rs.Next(ctx, req) + terror.MustNil(err) + if req.NumRows() != 0 { + return + } + + mustExecute(s, "INSERT HIGH_PRIORITY IGNORE INTO %n.%n VALUES (%?, %?);", + mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBAnalyzeVersion, 1) +} + +// For users that upgrade TiDB from a pre-4.0 version, we want to disable index merge by default. +// This helps minimize query plan regressions. +func upgradeToVer81(s Session, ver int64) { + if ver >= version81 { + return + } + // Check if tidb_enable_index_merge exists in mysql.GLOBAL_VARIABLES. + // If not, insert "tidb_enable_index_merge | off". + ctx := context.Background() + rs, err := s.ExecuteInternal(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?;", + mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBEnableIndexMerge) + terror.MustNil(err) + req := rs.NewChunk(nil) + err = rs.Next(ctx, req) + terror.MustNil(err) + if req.NumRows() != 0 { + return + } + + mustExecute(s, "INSERT HIGH_PRIORITY IGNORE INTO %n.%n VALUES (%?, %?);", + mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBEnableIndexMerge, variable.Off) +} func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index 6caf7e702c10b..1b44f58667114 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -928,3 +928,146 @@ func TestAnalyzeVersionUpgradeFrom300To500(t *testing.T) { require.Equal(t, 1, row.Len()) require.Equal(t, "1", row.GetString(0)) } + + +func TestIndexMergeInNewCluster(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + // Indicates we are in a new cluster. + require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersion(store)) + dom, err := BootstrapSession(store) + require.NoError(t, err) + defer func() { require.NoError(t, store.Close()) }() + defer dom.Close() + se := createSessionAndSetID(t, store) + + // In a new created cluster(above 5.4+), tidb_enable_index_merge is 1 by default. + mustExec(t, se, "use test;") + r := mustExec(t, se, "select @@tidb_enable_index_merge;") + require.NotNil(t, r) + + ctx := context.Background() + chk := r.NewChunk(nil) + err = r.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + row := chk.GetRow(0) + require.Equal(t, 1, row.Len()) + require.Equal(t, int64(1), row.GetInt64(0)) +} + +func TestIndexMergeUpgradeFrom300To540(t *testing.T) { + ctx := context.Background() + store, _ := createStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + // Upgrade from 3.0.0 to 5.4+. + ver300 := 33 + seV3 := createSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(int64(ver300)) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + mustExec(t, seV3, fmt.Sprintf("update mysql.tidb set variable_value=%d where variable_name='tidb_server_version'", ver300)) + mustExec(t, seV3, fmt.Sprintf("delete from mysql.GLOBAL_VARIABLES where variable_name='%s'", variable.TiDBEnableIndexMerge)) + mustExec(t, seV3, "commit") + unsetStoreBootstrapped(store.UUID()) + ver, err := getBootstrapVersion(seV3) + require.NoError(t, err) + require.Equal(t, int64(ver300), ver) + + // We are now in 3.0.0, check tidb_enable_index_merge shoudle not exist. + res := mustExec(t, seV3, fmt.Sprintf("select * from mysql.GLOBAL_VARIABLES where variable_name='%s'", variable.TiDBEnableIndexMerge)) + chk := res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 0, chk.NumRows()) + + domCurVer, err := BootstrapSession(store) + require.NoError(t, err) + defer domCurVer.Close() + seCurVer := createSessionAndSetID(t, store) + ver, err = getBootstrapVersion(seCurVer) + require.NoError(t, err) + require.Equal(t, currentBootstrapVersion, ver) + + // We are now in 5.x, tidb_enable_index_merge should be off. + res = mustExec(t, seCurVer, "select @@tidb_enable_index_merge") + chk = res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + row := chk.GetRow(0) + require.Equal(t, 1, row.Len()) + require.Equal(t, int64(0), row.GetInt64(0)) +} + +func TestIndexMergeUpgradeFrom400To540(t *testing.T) { + for i := 0; i < 2; i++ { + ctx := context.Background() + store, _ := createStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + // upgrade from 4.0.0 to 5.4+. + ver400 := 46 + seV4 := createSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(int64(ver400)) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + mustExec(t, seV4, fmt.Sprintf("update mysql.tidb set variable_value=%d where variable_name='tidb_server_version'", ver400)) + mustExec(t, seV4, fmt.Sprintf("update mysql.GLOBAL_VARIABLES set variable_value='%s' where variable_name='%s'", variable.Off, variable.TiDBEnableIndexMerge)) + mustExec(t, seV4, "commit") + unsetStoreBootstrapped(store.UUID()) + ver, err := getBootstrapVersion(seV4) + require.NoError(t, err) + require.Equal(t, int64(ver400), ver) + + // We are now in 4.0.0, tidb_enable_index_merge is off. + res := mustExec(t, seV4, fmt.Sprintf("select * from mysql.GLOBAL_VARIABLES where variable_name='%s'", variable.TiDBEnableIndexMerge)) + chk := res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + row := chk.GetRow(0) + require.Equal(t, 2, row.Len()) + require.Equal(t, variable.Off, row.GetString(1)) + + if i == 0 { + // For the first time, We set tidb_enable_index_merge as on. + // And after upgrade to 5.x, tidb_enable_index_merge should remains to be on. + // For the second it should be off. + mustExec(t, seV4, "set global tidb_enable_index_merge = on") + } + + // Upgrade to 5.x. + domCurVer, err := BootstrapSession(store) + require.NoError(t, err) + defer domCurVer.Close() + seCurVer := createSessionAndSetID(t, store) + ver, err = getBootstrapVersion(seCurVer) + require.NoError(t, err) + require.Equal(t, currentBootstrapVersion, ver) + + // We are now in 5.x, tidb_enable_index_merge should be on because we enable it in 4.0.0. + res = mustExec(t, seCurVer, "select @@tidb_enable_index_merge") + chk = res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + row = chk.GetRow(0) + require.Equal(t, 1, row.Len()) + if i == 0 { + require.Equal(t, int64(1), row.GetInt64(0)) + } else { + require.Equal(t, int64(0), row.GetInt64(0)) + } + } +} + diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 3b0c8f33402e9..b205bda1de5ab 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -465,7 +465,8 @@ type SessionVars struct { // preparedStmtID is id of prepared statement. preparedStmtID uint32 // PreparedParams params for prepared statements - PreparedParams PreparedParams + PreparedParams PreparedParams + LastUpdateTime4PC types.Time // ActiveRoles stores active roles for current user ActiveRoles []*auth.RoleIdentity @@ -1172,7 +1173,7 @@ func NewSessionVars() *SessionVars { SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish, WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, - enableIndexMerge: false, + enableIndexMerge: DefTiDBEnableIndexMerge, NoopFuncsMode: TiDBOptOnOffWarn(DefTiDBEnableNoopFuncs), replicaRead: kv.ReplicaReadLeader, AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 3491f28bc73dc..396c93bddba1b 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -537,7 +537,7 @@ var defaultSysVars = []*SysVar{ s.SetEnableCascadesPlanner(TiDBOptOn(val)) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableIndexMerge, Value: Off, Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableIndexMerge, Value: BoolToOnOff(DefTiDBEnableIndexMerge), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { s.SetEnableIndexMerge(TiDBOptOn(val)) return nil }}, diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index e15c9f92b92b8..91b3451f15ed9 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -825,3 +825,12 @@ func TestDefaultCharsetAndCollation(t *testing.T) { require.NoError(t, err) require.Equal(t, val, mysql.DefaultCollationName) } + +func TestIndexMergeSwitcher(t *testing.T) { + vars := NewSessionVars() + vars.GlobalVarsAccessor = NewMockGlobalAccessor4Tests() + val, err := GetSessionOrGlobalSystemVar(vars, TiDBEnableIndexMerge) + require.NoError(t, err) + require.Equal(t, DefTiDBEnableIndexMerge, true) + require.Equal(t, BoolToOnOff(DefTiDBEnableIndexMerge), val) +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index ccec9e1b5a8fc..7540000f4fc8b 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -779,6 +779,7 @@ const ( DefTiDBRegardNULLAsPoint = true DefEnablePlacementCheck = true DefTimestamp = "0" + DefTiDBEnableIndexMerge = true ) // Process global variables. diff --git a/table/tables/cache.go b/table/tables/cache.go index a31f5d0780854..7e3eb7c40b9a9 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -183,57 +183,31 @@ func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, t } // AddRecord implements the AddRecord method for the table.Table interface. +func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + return c.TableCommon.AddRecord(sctx, r, opts...) +} -func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { - txn, err := ctx.Txn(true) - if err != nil { - return nil, err +func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle StateRemote) { + txnCtx := sctx.GetSessionVars().TxnCtx + if txnCtx.CachedTables == nil { + txnCtx.CachedTables = make(map[int64]interface{}) } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) - if err != nil { - return nil, errors.Trace(err) + if _, ok := txnCtx.CachedTables[tid]; !ok { + txnCtx.CachedTables[tid] = handle } - ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) - return c.TableCommon.AddRecord(ctx, r, opts...) - } // UpdateRecord implements table.Table func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { - - txn, err := sctx.Txn(true) - if err != nil { - return err - } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(ctx, c.Meta().ID, leaseFromTS(now)) - if err != nil { - return errors.Trace(err) - } - sctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) - + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) } // RemoveRecord implements table.Table RemoveRecord interface. - -func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - txn, err := ctx.Txn(true) - if err != nil { - return err - } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) - if err != nil { - return errors.Trace(err) - } - ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) - return c.TableCommon.RemoveRecord(ctx, h, r) - +func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error { + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + return c.TableCommon.RemoveRecord(sctx, h, r) } func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() { diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 0138a01ab94e1..aeddd5b972ab2 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -67,9 +67,7 @@ type StateRemote interface { LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - - LockForWrite(ctx context.Context, tid int64, lease uint64) error - + LockForWrite(ctx context.Context, tid int64) (uint64, error) // RenewLease attempt to renew the read / write lock on the table with the specified tableID RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) @@ -134,33 +132,32 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 return succ, err } - -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint64) error { +// LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64, error) { h.Lock() defer h.Unlock() + var ret uint64 for { - waitAndRetry, err := h.lockForWriteOnce(ctx, tid, ts) + waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid) if err != nil { - return err + return 0, err } if waitAndRetry == 0 { - + ret = lease break } time.Sleep(waitAndRetry) } - - return nil + return ret, nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts uint64) (waitAndRetry time.Duration, err error) { - +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64) (waitAndRetry time.Duration, ts uint64, err error) { err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - + ts = leaseFromTS(now) // The lease is outdated, so lock is invalid, clear orphan lock of any kind. if now > lease { if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { @@ -221,38 +218,69 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease h.Lock() defer h.Unlock() + switch op { + case RenewReadLease: + return h.renewReadLease(ctx, tid, newLease) + case RenewWriteLease: + return h.renewWriteLease(ctx, tid, newLease) + } + return false, errors.New("wrong renew lease type") +} +func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { var succ bool - if op == RenewReadLease { - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // read lock had already expired, fail to renew + return nil + } + if lockType != CachedTableLockRead { + // Not read lock, fail to renew + return nil + } + + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newLease) if err != nil { return errors.Trace(err) } - if now >= oldLease { - // read lock had already expired, fail to renew - return nil - } - if lockType != CachedTableLockRead { - // Not read lock, fail to renew - return nil - } + } + succ = true + return nil + }) + return succ, err +} - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "READ", newLease) - if err != nil { - return errors.Trace(err) - } - } - succ = true +func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { + var succ bool + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // write lock had already expired, fail to renew return nil - }) - return succ, err - } - - // TODO: renew for write lease - return false, errors.New("not implement yet") + } + if lockType != CachedTableLockWrite { + // Not write lock, fail to renew + return nil + } + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "WRITE", newLease) + if err != nil { + return errors.Trace(err) + } + } + succ = true + return nil + }) + return succ, err } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error {