diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 30cadd4a3a2d7..965213864f687 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -57,6 +57,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/set" + "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -6580,6 +6581,16 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) if t.Meta().Partition != nil { return errors.Trace(ErrOptOnCacheTable.GenWithStackByArgs("partition mode")) } + + // Initialize the cached table meta lock info in `mysql.table_cache_meta`. + // The operation shouldn't fail in most cases, and if it does, return the error directly. + // This DML and the following DDL is not atomic, that's not a problem. + _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), + "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) + if err != nil { + return errors.Trace(err) + } + job := &model.Job{ SchemaID: schema.ID, SchemaName: schema.Name.L, @@ -6590,8 +6601,7 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) } err = d.doDDLJob(ctx, job) - err = d.callHookOnChanged(err) - return errors.Trace(err) + return d.callHookOnChanged(err) } func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error) { @@ -6614,6 +6624,5 @@ func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error } err = d.doDDLJob(ctx, job) - err = d.callHookOnChanged(err) - return errors.Trace(err) + return d.callHookOnChanged(err) } diff --git a/ddl/placement_policy_ddl_test.go b/ddl/placement_policy_ddl_test.go index c75891d90b349..28efb3bae3e3c 100644 --- a/ddl/placement_policy_ddl_test.go +++ b/ddl/placement_policy_ddl_test.go @@ -119,7 +119,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) { t4.State = model.StatePublic db1.Tables = append(db1.Tables, t4) - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos( []*model.DBInfo{db1, db2, dbP}, nil, []*model.PolicyInfo{p1, p2, p3, p4, p5}, diff --git a/domain/domain.go b/domain/domain.go index c5d8dc8246cd0..d3a9664b97fe7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -95,6 +95,7 @@ type Domain struct { isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. renewLeaseCh chan func() // It is used to call the renewLease function of the cache table. onClose func() + sysExecutorFactory func(*Domain) (pools.Resource, error) } // loadInfoSchema loads infoschema at startTS. @@ -159,7 +160,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -173,6 +174,16 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return is, false, currentSchemaVersion, nil, nil } +func (do *Domain) sysFacHack() (pools.Resource, error) { + // TODO: Here we create new sessions with sysFac in DDL, + // which will use `do` as Domain instead of call `domap.Get`. + // That's because `domap.Get` requires a lock, but before + // we initialize Domain finish, we can't require that again. + // After we remove the lazy logic of creating Domain, we + // can simplify code here. + return do.sysExecutorFactory(do) +} + func (do *Domain) fetchPolicies(m *meta.Meta) ([]*model.PolicyInfo, error) { allPolicies, err := m.ListPolicies() if err != nil { @@ -271,7 +282,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -711,7 +722,8 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio const serverIDForStandalone = 1 // serverID for standalone deployment. // Init initializes a domain. -func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error { +func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) (pools.Resource, error)) error { + do.sysExecutorFactory = sysExecutorFactory perfschema.Init() if ebd, ok := do.store.(kv.EtcdBackend); ok { var addrs []string @@ -753,7 +765,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R // After we remove the lazy logic of creating Domain, we // can simplify code here. sysFac := func() (pools.Resource, error) { - return sysFactory(do) + return sysExecutorFactory(do) } sysCtxPool := pools.NewResourcePool(sysFac, 2, 2, resourceIdleTimeout) ctx, cancelFunc := context.WithCancel(context.Background()) diff --git a/executor/builder.go b/executor/builder.go index 21cd0de21984f..109cc49915d58 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -4736,7 +4736,7 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64 zap.Stack("stack trace")) } }() - err := tbl.(table.CachedTable).UpdateLockForRead(b.ctx.GetStore(), startTS) + err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS) if err != nil { log.Warn("Update Lock Info Error") } diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index 05f85e8275b06..0744372305cb6 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -918,7 +918,7 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) { tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows( "test 2", )) - c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 26) + c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 27) // More tests about the privileges. tk.MustExec("create user 'testuser'@'localhost'") @@ -944,14 +944,14 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) { Hostname: "localhost", }, nil, nil), Equals, true) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("26")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("27")) c.Assert(tk.Se.Auth(&auth.UserIdentity{ Username: "testuser3", Hostname: "localhost", }, nil, nil), Equals, true) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("26")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("27")) } func (s *testInfoschemaTableSuite) TestSequences(c *C) { diff --git a/executor/main_test.go b/executor/main_test.go index 1e02955ee8e4c..bff65b72d6a2d 100644 --- a/executor/main_test.go +++ b/executor/main_test.go @@ -57,7 +57,6 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } callback := func(i int) int { testDataMap.GenerateOutputIfNeeded() diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 264128ce8c088..6d2e4e4870320 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu } func newSlowQueryRetriever() (*slowQueryRetriever, error) { - newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) + newISBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0) if err != nil { return nil, err } diff --git a/expression/integration_serial_test.go b/expression/integration_serial_test.go index 2e99fc490dd8e..0665f2b2082ba 100644 --- a/expression/integration_serial_test.go +++ b/expression/integration_serial_test.go @@ -3890,7 +3890,14 @@ func TestPreparePlanCacheNotForCacheTable(t *testing.T) { tk.MustExec("create table t(a int);") tk.MustExec("alter table t cache") - tk.MustQuery("select * from t where a = 1") + var useCache bool + for i := 0; i < 50; i++ { + tk.MustQuery("select * from t where a = 1") + if tk.HasPlan("select * from t where a = 1", "Union") { + useCache = true + } + } + require.True(t, useCache) // already read cache after reading first time tk.MustQuery("explain format = 'brief' select * from t where a = 1").Check(testkit.Rows( "Projection 10.00 root test.t.a", diff --git a/expression/main_test.go b/expression/main_test.go index 3e99fa8cbf7ab..9a1e170078f65 100644 --- a/expression/main_test.go +++ b/expression/main_test.go @@ -54,7 +54,6 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } callback := func(i int) int { diff --git a/infoschema/builder.go b/infoschema/builder.go index d0ab2d58ac273..70ca84f5dd6c6 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -20,6 +20,7 @@ import ( "sort" "strings" + "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" @@ -33,6 +34,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/domainutil" + "github.com/pingcap/tidb/util/sqlexec" ) // Builder builds a new InfoSchema. @@ -43,6 +45,7 @@ type Builder struct { store kv.Storage // TODO: renewLeaseCh is only used to pass data between table and domain renewLeaseCh chan func() + factory func() (pools.Resource, error) } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -630,7 +633,13 @@ func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInf return nil, errors.Trace(err) } if t, ok := ret.(table.CachedTable); ok { - err = t.Init(b.renewLeaseCh) + var tmp pools.Resource + tmp, err = b.factory() + if err != nil { + return nil, errors.Trace(err) + } + + err = t.Init(b.renewLeaseCh, tmp.(sqlexec.SQLExecutor)) if err != nil { return nil, errors.Trace(err) } @@ -674,7 +683,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage, renewCh chan func()) *Builder { +func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Resource, error)) *Builder { return &Builder{ store: store, is: &infoSchema{ @@ -684,6 +693,7 @@ func NewBuilder(store kv.Storage, renewCh chan func()) *Builder { sortedTablesBuckets: make([]sortedTables, bucketCount), }, renewLeaseCh: renewCh, + factory: factory, } } diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 87c1695833f4d..c66fc5e69241c 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -109,7 +109,7 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) - builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1) + builder, err := infoschema.NewBuilder(dom.Store(), nil, nil).InitWithDBInfos(dbInfos, nil, nil, 1) require.NoError(t, err) txn, err := store.Begin() @@ -259,7 +259,7 @@ func TestInfoTables(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() @@ -326,7 +326,7 @@ func TestGetBundle(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index e4529bb407cc9..b40db2fe4ef5d 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4211,7 +4211,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as if r := recover(); r != nil { } }() - err := cachedTable.UpdateLockForRead(store, startTS) + err := cachedTable.UpdateLockForRead(ctx, store, startTS) if err != nil { log.Warn("Update Lock Info Error") } diff --git a/session/bootstrap.go b/session/bootstrap.go index 573c17bdaaf2e..815e921c4a6fb 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -357,6 +357,14 @@ const ( last_analyzed_at TIMESTAMP, PRIMARY KEY (table_id, column_id) CLUSTERED );` + // CreateTableCacheMetaTable stores the cached table meta lock information. + CreateTableCacheMetaTable = `CREATE TABLE IF NOT EXISTS mysql.table_cache_meta ( + tid bigint(11) NOT NULL DEFAULT 0, + lock_type enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE', + lease bigint(20) NOT NULL DEFAULT 0, + oldReadLease bigint(20) NOT NULL DEFAULT 0, + PRIMARY KEY (tid) + );` ) // bootstrap initiates system DB for a store. @@ -528,11 +536,13 @@ const ( version77 = 77 // version78 updates mysql.stats_buckets.lower_bound, mysql.stats_buckets.upper_bound and mysql.stats_histograms.last_analyze_pos from BLOB to LONGBLOB. version78 = 78 + // version79 adds the mysql.table_cache_meta table + version79 = 79 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version78 +var currentBootstrapVersion int64 = version79 var ( bootstrapVersion = []func(Session, int64){ @@ -614,6 +624,7 @@ var ( upgradeToVer76, upgradeToVer77, upgradeToVer78, + upgradeToVer79, } ) @@ -1612,6 +1623,13 @@ func upgradeToVer78(s Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms MODIFY last_analyze_pos LONGBLOB DEFAULT NULL") } +func upgradeToVer79(s Session, ver int64) { + if ver >= version79 { + return + } + doReentrantDDL(s, CreateTableCacheMetaTable) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1694,6 +1712,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateCapturePlanBaselinesBlacklist) // Create column_stats_usage table mustExecute(s, CreateColumnStatsUsageTable) + // Create table_cache_meta table. + mustExecute(s, CreateTableCacheMetaTable) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/statistics/testdata/stats_suite_out.json b/statistics/testdata/stats_suite_out.json index 2897c02fb3df2..df4c8498d845c 100644 --- a/statistics/testdata/stats_suite_out.json +++ b/statistics/testdata/stats_suite_out.json @@ -656,7 +656,7 @@ { "Start": 800, "End": 900, - "Count": 761.004166655054 + "Count": 752.004166655054 }, { "Start": 900, @@ -711,7 +711,7 @@ { "Start": 800, "End": 1000, - "Count": 1219.196869573942 + "Count": 1210.196869573942 }, { "Start": 900, @@ -736,7 +736,7 @@ { "Start": 200, "End": 400, - "Count": 1186.5288209899081 + "Count": 1230.0288209899081 }, { "Start": 200, diff --git a/table/table.go b/table/table.go index c96c77b587d19..1c38ed4335ca6 100644 --- a/table/table.go +++ b/table/table.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/sqlexec" ) // Type is used to distinguish between different tables that store data in different ways. @@ -252,12 +253,12 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table type CachedTable interface { Table - Init(renewCh chan func()) error + Init(renewCh chan func(), exec sqlexec.SQLExecutor) error // TryReadFromCache checks if the cache table is readable. TryReadFromCache(ts uint64) kv.MemBuffer // UpdateLockForRead If you cannot meet the conditions of the read buffer, // you need to update the lock information and read the data from the original table - UpdateLockForRead(store kv.Storage, ts uint64) error + UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error } diff --git a/table/tables/cache.go b/table/tables/cache.go index 0aa00578c5c51..c95379593c066 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -26,8 +26,10 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" + "go.uber.org/zap" ) // RenewLeaseType define the type for renew lease. @@ -41,7 +43,6 @@ const ( ) var ( - _ table.Table = &cachedTable{} _ table.CachedTable = &cachedTable{} ) @@ -88,41 +89,31 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { nowTime := oracle.GetTimeFromTS(ts) distance := leaseTime.Sub(nowTime) // TODO make this configurable in the following PRs - if distance >= 0 && distance <= (1*time.Second) { + if distance >= 0 && distance <= (1500*time.Millisecond) { c.renewCh <- c.renewLease(ts, RenewReadLease, data) } - return data + return data.MemBuffer } return nil } -// MockStateRemote represents the information of stateRemote. -// Exported it only for testing. -var MockStateRemote = struct { - Ch chan remoteTask - Data *mockStateRemoteData -}{} - -// NewCachedTable creates a new CachedTable Instance -func NewCachedTable(tbl *TableCommon) (table.Table, error) { - if MockStateRemote.Data == nil { - MockStateRemote.Data = newMockStateRemoteData() - MockStateRemote.Ch = make(chan remoteTask, 100) - go mockRemoteService(MockStateRemote.Data, MockStateRemote.Ch) - } - +// newCachedTable creates a new CachedTable Instance +func newCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ TableCommon: *tbl, - handle: &mockStateRemoteHandle{MockStateRemote.Ch}, - renewCh: make(chan func()), } return ret, nil } // Init is an extra operation for cachedTable after TableFromMeta, // Because cachedTable need some additional parameter that can't be passed in TableFromMeta. -func (c *cachedTable) Init(renewCh chan func()) error { +func (c *cachedTable) Init(renewCh chan func(), exec sqlexec.SQLExecutor) error { c.renewCh = renewCh + raw, ok := exec.(sqlExec) + if !ok { + return errors.New("Need sqlExec rather than sqlexec.SQLExecutor") + } + c.handle = NewStateRemote(raw) return nil } @@ -167,11 +158,11 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) return buffer, startTS, nil } -func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error { +func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error { // Load data from original table and the update lock information. tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.LockForRead(context.Background(), tid, ts, lease) + succ, err := c.handle.LockForRead(ctx, tid, lease) if err != nil { return errors.Trace(err) } @@ -199,7 +190,7 @@ func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. } now := txn.StartTS() start := time.Now() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) if err != nil { return nil, errors.Trace(err) } @@ -215,7 +206,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, } now := txn.StartTS() start := time.Now() - err = c.handle.LockForWrite(ctx, c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(ctx, c.Meta().ID, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -231,7 +222,7 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type } now := txn.StartTS() start := time.Now() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -243,15 +234,15 @@ func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) return func() { tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.RenewLease(context.Background(), tid, ts, lease, op) + succ, err := c.handle.RenewLease(context.Background(), tid, lease, op) if err != nil { - log.Warn("Renew read lease error") + log.Warn("Renew read lease error", zap.Error(err)) } if succ { c.cacheData.Store(&cacheData{ Start: data.Start, Lease: lease, - MemBuffer: data, + MemBuffer: data.MemBuffer, }) } } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index b7efc743957b0..62e48ccd24c94 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,6 +15,7 @@ package tables_test import ( + "context" "testing" "time" @@ -184,7 +185,7 @@ func TestCacheCondition(t *testing.T) { } // Contains PointGet Delete should not trigger cache. - tk.MustExec("delete from t2 where id = 2") + tk.MustExec("delete from t2 where id = 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) @@ -212,34 +213,36 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) { tk.MustExec("alter table write_tmp1 cache") // Read and add read lock - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003")) // read lock should valid - for i := 0; i < 10; i++ { - if tk.HasPlan("select *from write_tmp1", "UnionScan") { + var i int + for i = 0; i < 10; i++ { + if tk.HasPlan("select * from write_tmp1", "UnionScan") { break } } + require.True(t, i < 10) + tk.MustExec("use test") tk1.MustExec("insert into write_tmp1 values (2, 222, 222)") // write lock exists - require.False(t, tk.HasPlan("select *from write_tmp1", "UnionScan")) + require.False(t, tk.HasPlan("select * from write_tmp1", "UnionScan")) // wait write lock expire and check cache can be used again - for !tk.HasPlan("select *from write_tmp1", "UnionScan") { - tk.MustExec("select *from write_tmp1") + for !tk.HasPlan("select * from write_tmp1", "UnionScan") { + tk.MustExec("select * from write_tmp1") } - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) tk1.MustExec("update write_tmp1 set v = 3333 where id = 2") - for !tk.HasPlan("select *from write_tmp1", "UnionScan") { - tk.MustExec("select *from write_tmp1") + for !tk.HasPlan("select * from write_tmp1", "UnionScan") { + tk.MustExec("select * from write_tmp1") } - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) } func TestCacheTableComplexRead(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() - doneCh := make(chan struct{}, 1) tk1 := testkit.NewTestKit(t, store) tk2 := testkit.NewTestKit(t, store) tk1.MustExec("use test") @@ -247,25 +250,29 @@ func TestCacheTableComplexRead(t *testing.T) { tk1.MustExec("create table complex_cache (id int primary key auto_increment, u int unique, v int)") tk1.MustExec("insert into complex_cache values" + "(5, 105, 1005), (7, 117, 1007), (9, 109, 1009)") tk1.MustExec("alter table complex_cache cache") + tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + var i int + for i = 0; i < 100; i++ { + time.Sleep(100 * time.Millisecond) + if tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") { + break + } + } + require.True(t, i < 10) + tk1.MustExec("begin") - tk1.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - - go func() { - tk2.MustExec("begin") - tk2.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - var i int - for i = 0; i < 10; i++ { - time.Sleep(100 * time.Millisecond) - if tk2.HasPlan("select *from complex_cache where id > 7", "UnionScan") { - break - } + tk2.MustExec("begin") + tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + for i = 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + if tk2.HasPlan("select * from complex_cache where id > 7", "UnionScan") { + break } - require.True(t, i < 10) - tk2.MustExec("commit") - doneCh <- struct{}{} - }() - <-doneCh - tk1.HasPlan("select *from complex_cache where id > 7", "UnionScan") + } + require.True(t, i < 10) + tk2.MustExec("commit") + + tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") tk1.MustExec("commit") } @@ -285,10 +292,18 @@ func TestBeginSleepABA(t *testing.T) { tk1.MustExec("insert into aba values (1, 1)") tk1.MustExec("alter table aba cache") tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) + cacheUsed := false + for i := 0; i < 100; i++ { + if tk1.HasPlan("select * from aba", "UnionScan") { + cacheUsed = true + break + } + } + require.True(t, cacheUsed) // Begin, read from cache. tk1.MustExec("begin") - cacheUsed := false + cacheUsed = false for i := 0; i < 100; i++ { if tk1.HasPlan("select * from aba", "UnionScan") { cacheUsed = true @@ -406,12 +421,28 @@ func TestRenewLease(t *testing.T) { require.NoError(t, err) var i int tk.MustExec("select * from cache_renew_t") - _, oldLease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) + + tk1 := testkit.NewTestKit(t, store) + remote := tables.NewStateRemote(tk1.Session()) + var leaseBefore uint64 + for i = 0; i < 20; i++ { + time.Sleep(200 * time.Millisecond) + lockType, lease, err := remote.Load(context.Background(), tbl.Meta().ID) + require.NoError(t, err) + if lockType == tables.CachedTableLockRead { + leaseBefore = lease + break + } + } + require.True(t, i < 20) + for i = 0; i < 20; i++ { time.Sleep(200 * time.Millisecond) tk.MustExec("select * from cache_renew_t") - _, lease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) - if lease != oldLease { + lockType, lease, err := remote.Load(context.Background(), tbl.Meta().ID) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + if leaseBefore != lease { break } } diff --git a/table/tables/main_test.go b/table/tables/main_test.go index c7499ee0109aa..ebfceb2bd3bca 100644 --- a/table/tables/main_test.go +++ b/table/tables/main_test.go @@ -26,7 +26,6 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } goleak.VerifyTestMain(m, opts...) } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 5e18b3d7b950d..83fd06f7e410e 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -16,12 +16,11 @@ package tables import ( "context" - "fmt" + "strconv" "sync" "time" "github.com/pingcap/errors" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" @@ -65,310 +64,17 @@ type StateRemote interface { // If this operation succeed, according to the protocol, the TiKV data will not be // modified until the lease expire. It's safe for the caller to load the table data, // cache and use the data. - // The parameter `now` means the current tso. Because the tso is get from PD, in - // the TiDB side, its value lags behind the real one forever, this doesn't matter. - // Because `now` is only used to clean up the orphan lock, as long as it's smaller - // than the real one, the correctness of the algorithm is not violated. - LockForRead(ctx context.Context, tid int64, now, lease uint64) (bool, error) + LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(ctx context.Context, tid int64, now, ts uint64) error + LockForWrite(ctx context.Context, tid int64, lease uint64) error // RenewLease attempt to renew the read / write lock on the table with the specified tableID - RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) -} - -// mockStateRemoteHandle implement the StateRemote interface. -type mockStateRemoteHandle struct { - ch chan remoteTask -} - -var _ StateRemote = &mockStateRemoteHandle{} - -func (r *mockStateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { - op := &loadOP{tid: tid} - op.Add(1) - r.ch <- op - op.Wait() - return op.lockType, op.lease, op.err -} - -func (r *mockStateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) { - op := &lockForReadOP{tid: tid, now: now, ts: ts} - op.Add(1) - r.ch <- op - op.Wait() - return op.succ, op.err -} - -func (r *mockStateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { - op := &lockForWriteOP{tid: tid, now: now, ts: ts} - op.Add(1) - r.ch <- op - op.Wait() - if op.err != nil { - return errors.Trace(op.err) - } - // No block, finish. - if op.oldLease == 0 { - return nil - } - - // Wait for read lock to expire. - t1 := oracle.GetTimeFromTS(op.oldLease) - t2 := oracle.GetTimeFromTS(now) - waitDuration := t1.Sub(t2) - time.Sleep(waitDuration) - - // TODO: now should be a new ts - op = &lockForWriteOP{tid: tid, now: op.oldLease + 1, ts: leaseFromTS(op.oldLease + 1)} - op.Add(1) - r.ch <- op - op.Wait() - // op.oldLease should be 0 this time. - return op.err -} - -func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) { - switch op { - case RenewReadLease: - op := &renewLeaseForReadOP{tid: tid, oldTs: oldTs, newTs: newTs} - op.Add(1) - r.ch <- op - op.Wait() - return op.succ, op.err - case RenewWriteLease: - // TODO : Renew Write Lease will implement in next pr. - } - return false, errors.New("not implemented yet") -} - -func mockRemoteService(r *mockStateRemoteData, ch chan remoteTask) { - for task := range ch { - task.Exec(r) - } -} - -type remoteTask interface { - Exec(data *mockStateRemoteData) -} - -// loadOP is a kind of remoteTask -type loadOP struct { - sync.WaitGroup - // Input - tid int64 - - // Output - lockType CachedTableLockType - lease uint64 - err error -} - -func (op *loadOP) Exec(data *mockStateRemoteData) { - op.lockType, op.lease, op.err = data.Load(op.tid) - op.Done() -} - -// lockForReadOP is a kind of rmoteTask -type lockForReadOP struct { - sync.WaitGroup - // Input - tid int64 - now uint64 - ts uint64 - - // Output - succ bool - err error -} - -func (op *lockForReadOP) Exec(r *mockStateRemoteData) { - op.succ, op.err = r.LockForRead(op.tid, op.now, op.ts) - op.Done() -} - -// lockForWriteOP is a kind of remote task -type lockForWriteOP struct { - sync.WaitGroup - // Input - tid int64 - now uint64 - ts uint64 - - // Output - err error - oldLease uint64 -} - -func (op *lockForWriteOP) Exec(data *mockStateRemoteData) { - op.oldLease, op.err = data.LockForWrite(op.tid, op.now, op.ts) - op.Done() -} - -// renewForReadOP is a kind of remote task -type renewLeaseForReadOP struct { - sync.WaitGroup - // Input - tid int64 - oldTs uint64 - newTs uint64 - - // Output - succ bool - err error -} - -func (op *renewLeaseForReadOP) Exec(r *mockStateRemoteData) { - op.succ, op.err = r.renewLeaseForRead(op.tid, op.oldTs, op.newTs) - op.Done() -} - -type mockStateRemoteData struct { - mu sync.Mutex - data map[int64]*stateRecord -} - -type stateRecord struct { - lockLease uint64 - oldReadLease uint64 // only use for intent lock, it means old read lease. - lockType CachedTableLockType -} - -func newMockStateRemoteData() *mockStateRemoteData { - return &mockStateRemoteData{ - data: make(map[int64]*stateRecord), - } -} - -func (r *mockStateRemoteData) Load(tid int64) (CachedTableLockType, uint64, error) { - r.mu.Lock() - defer r.mu.Unlock() - record, ok := r.data[tid] - if !ok { - return CachedTableLockNone, 0, nil - } - return record.lockType, record.lockLease, nil -} - -func (r *mockStateRemoteData) LockForRead(tid int64, now, ts uint64) (bool, error) { - r.mu.Lock() - defer r.mu.Unlock() - record, ok := r.data[tid] - if !ok { - record = &stateRecord{ - lockLease: ts, - oldReadLease: ts, - lockType: CachedTableLockRead, - } - r.data[tid] = record - return true, nil - } - switch record.lockType { - case CachedTableLockNone: - // Add the read lock - record.lockType = CachedTableLockRead - record.lockLease = ts - return true, nil - case CachedTableLockRead: - // Renew lease for this case. - if record.lockLease < ts { - record.lockLease = ts - return true, nil - } - // Already read locked. - return true, nil - case CachedTableLockWrite, CachedTableLockIntend: - if now > record.lockLease { - // Outdated...clear orphan lock - record.lockType = CachedTableLockRead - record.lockLease = ts - return true, nil - } - return false, nil - } - return false, errors.New("unknown lock type") -} - -func (r *mockStateRemoteData) LockForWrite(tid int64, now, ts uint64) (uint64, error) { - r.mu.Lock() - defer r.mu.Unlock() - record, ok := r.data[tid] - if !ok { - record = &stateRecord{ - lockType: CachedTableLockWrite, - lockLease: ts, - } - r.data[tid] = record - return 0, nil - } - - switch record.lockType { - case CachedTableLockNone: - record.lockType = CachedTableLockWrite - record.lockLease = ts - return 0, nil - case CachedTableLockRead: - if now > record.lockLease { - // Outdated, clear orphan lock and add write lock directly. - record.lockType = CachedTableLockWrite - record.lockLease = ts - return 0, nil - } - - // Change state to intend, prevent renew lease operation. - oldLease := record.lockLease - record.lockType = CachedTableLockIntend - record.lockLease = leaseFromTS(ts) - record.oldReadLease = oldLease - return oldLease, nil - case CachedTableLockWrite: - if ts > record.lockLease { - record.lockLease = ts - } - case CachedTableLockIntend: - // Add the write lock. - if now > record.oldReadLease { - record.lockType = CachedTableLockWrite - record.lockLease = ts - } else { - return record.oldReadLease, nil - } - default: - return 0, fmt.Errorf("wrong lock state %v", record.lockType) - } - return 0, nil -} - -func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs uint64) (bool, error) { - r.mu.Lock() - defer r.mu.Unlock() - record, ok := r.data[tid] - if !ok { - record = &stateRecord{ - lockLease: newTs, - lockType: CachedTableLockRead, - } - r.data[tid] = record - return true, nil - } - if record.lockType != CachedTableLockRead { - return false, errors.New("The read lock can be renewed only in the read lock state") - } - if record.lockLease < oldTs { - return false, errors.New("The remote Lease is invalid") - } - if record.lockLease <= newTs { - record.lockLease = newTs - return true, nil - } - return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") + RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) } type sqlExec interface { - AffectedRows() uint64 ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error) - GetStore() kv.Storage } type stateRemoteHandle struct { @@ -390,13 +96,11 @@ func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLoc return lockType, lease, err } -// LockForRead try to lock the table, if this operation succeed, the remote data -// is "read locked" and will not be modified according to the protocol, until the lease expire. -func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) { +func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint64) ( /*succ*/ bool, error) { h.Lock() defer h.Unlock() succ := false - err := h.runInTxn(ctx, func(ctx context.Context) error { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -428,32 +132,24 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts return succ, err } -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint64) error { h.Lock() defer h.Unlock() for { - waitAndRetry, err := h.lockForWriteOnce(ctx, tid, now, ts) + waitAndRetry, err := h.lockForWriteOnce(ctx, tid, ts) if err != nil { return err } if waitAndRetry == 0 { break } - time.Sleep(waitAndRetry) - store := h.exec.GetStore() - o := store.GetOracle() - newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) - if err != nil { - return errors.Trace(err) - } - now, ts = newTS, leaseFromTS(newTS) } return nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) (waitAndRetry time.Duration, err error) { - err = h.runInTxn(ctx, func(ctx context.Context) error { +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts uint64) (waitAndRetry time.Duration, err error) { + err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -474,12 +170,17 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now } case CachedTableLockRead: // Change from READ to INTEND - if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil { + if _, err = h.execSQL(ctx, + "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", + lease, + ts, + tid); err != nil { return errors.Trace(err) } + // Wait for lease to expire, and then retry. - waitAndRetry = waitForLeaseExpire(oldReadLease, now) - case CachedTableLockIntend, CachedTableLockWrite: + waitAndRetry = waitForLeaseExpire(lease, now) + case CachedTableLockIntend: // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. if now > oldReadLease { if lockType == CachedTableLockIntend { @@ -509,16 +210,40 @@ func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { return 0 } -func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, now, newTs uint64, op RenewLeaseType) (bool, error) { +func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease uint64, op RenewLeaseType) (bool, error) { h.Lock() defer h.Unlock() - // TODO: `now` should use the real current tso to check the old lease is not expired. - _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", newTs, tid) - if err != nil { - return false, errors.Trace(err) + + var succ bool + if op == RenewReadLease { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // read lock had already expired, fail to renew + return nil + } + if lockType != CachedTableLockRead { + // Not read lock, fail to renew + return nil + } + + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newLease) + if err != nil { + return errors.Trace(err) + } + } + succ = true + return nil + }) + return succ, err } - succ := h.exec.AffectedRows() > 0 - return succ, err + + // TODO: renew for write lease + return false, errors.New("not implement yet") } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { @@ -536,13 +261,23 @@ func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { return err } -func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context) error) error { +func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context, txnTS uint64) error) error { err := h.beginTxn(ctx) if err != nil { return errors.Trace(err) } - err = fn(ctx) + rows, err := h.execSQL(ctx, "select @@tidb_current_ts") + if err != nil { + return errors.Trace(err) + } + resultStr := rows[0].GetString(0) + txnTS, err := strconv.ParseUint(resultStr, 10, 64) + if err != nil { + return errors.Trace(err) + } + + err = fn(ctx, txnTS) if err != nil { terror.Log(h.rollbackTxn(ctx)) return errors.Trace(err) diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index 969f0fb56b0e0..227924d58c8db 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -17,11 +17,14 @@ package tables_test import ( "context" "testing" + "time" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) // CreateMetaLockForCachedTable initializes the cached table meta lock information. @@ -64,65 +67,74 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lockType.String(), "NONE") require.Equal(t, lease, uint64(0)) + ts, err := se.GetStore().GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) + require.NoError(t, err) + physicalTime := oracle.GetTimeFromTS(ts) + leaseVal := oracle.GoTimeToTS(physicalTime.Add(200 * time.Millisecond)) + // Check read lock. - succ, err := h.LockForRead(ctx, 5, 1234, 1234) + succ, err := h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.True(t, succ) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockRead) require.Equal(t, lockType.String(), "READ") - require.Equal(t, lease, uint64(1234)) + require.Equal(t, lease, leaseVal) // LockForRead when read lock is hold. // This operation equals to renew lease. - succ, err = h.LockForRead(ctx, 5, 1235, 1235) + succ, err = h.LockForRead(ctx, 5, leaseVal+1) require.NoError(t, err) require.True(t, succ) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockRead) require.Equal(t, lockType.String(), "READ") - require.Equal(t, lease, uint64(1235)) + require.Equal(t, lease, leaseVal+1) // Renew read lock lease operation. - succ, err = h.RenewLease(ctx, 5, 0, 1264, tables.RenewReadLease) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(400 * time.Millisecond)) + succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease) require.NoError(t, err) require.True(t, succ) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockRead) require.Equal(t, lockType.String(), "READ") - require.Equal(t, lease, uint64(1264)) + require.Equal(t, lease, leaseVal) // Check write lock. - require.NoError(t, h.LockForWrite(ctx, 5, 2234, 2234)) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(700 * time.Millisecond)) + require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, lease, uint64(2234)) + require.Equal(t, lease, leaseVal) // Lock for write again - require.NoError(t, h.LockForWrite(ctx, 5, 3234, 3234)) - lockType, lease, err = h.Load(ctx, 5) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(800 * time.Millisecond)) + require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + lockType, _, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, lease, uint64(3234)) // Renew read lock lease should fail when the write lock is hold. - succ, err = h.RenewLease(ctx, 5, 0, 1264, tables.RenewReadLease) + succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease) require.NoError(t, err) require.False(t, succ) // Acquire read lock should also fail when the write lock is hold. - succ, err = h.LockForRead(ctx, 5, 1264, 1264) + succ, err = h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.False(t, succ) // But clear orphan write lock should success. - succ, err = h.LockForRead(ctx, 5, 4234, 4234) + time.Sleep(time.Second) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(2 * time.Second)) + succ, err = h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.True(t, succ) } diff --git a/table/tables/tables.go b/table/tables/tables.go index 5adf626ca2f78..716b2e879cbcc 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -85,7 +85,7 @@ func MockTableFromMeta(tblInfo *model.TableInfo) table.Table { var t TableCommon initTableCommon(&t, tblInfo, tblInfo.ID, columns, nil) if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { - ret, err := NewCachedTable(&t) + ret, err := newCachedTable(&t) if err != nil { return nil } @@ -153,7 +153,7 @@ func TableFromMeta(allocs autoid.Allocators, tblInfo *model.TableInfo) (table.Ta return nil, err } if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { - return NewCachedTable(&t) + return newCachedTable(&t) } return &t, nil } diff --git a/telemetry/main_test.go b/telemetry/main_test.go index 3ab35518fd644..f498d16a2d564 100644 --- a/telemetry/main_test.go +++ b/telemetry/main_test.go @@ -32,7 +32,6 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } goleak.VerifyTestMain(m, opts...)