Skip to content

Commit

Permalink
*: use the real StateRemote interface implementation for cached table (
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Dec 10, 2021
1 parent 03b6a8e commit 9a074f1
Show file tree
Hide file tree
Showing 22 changed files with 259 additions and 435 deletions.
17 changes: 13 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -6580,6 +6581,16 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error)
if t.Meta().Partition != nil {
return errors.Trace(ErrOptOnCacheTable.GenWithStackByArgs("partition mode"))
}

// Initialize the cached table meta lock info in `mysql.table_cache_meta`.
// The operation shouldn't fail in most cases, and if it does, return the error directly.
// This DML and the following DDL is not atomic, that's not a problem.
_, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(),
"insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID)
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: schema.ID,
SchemaName: schema.Name.L,
Expand All @@ -6590,8 +6601,7 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error)
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
return d.callHookOnChanged(err)
}

func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error) {
Expand All @@ -6614,6 +6624,5 @@ func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
return d.callHookOnChanged(err)
}
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) {
t4.State = model.StatePublic
db1.Tables = append(db1.Tables, t4)

builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(
builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(
[]*model.DBInfo{db1, db2, dbP},
nil,
[]*model.PolicyInfo{p1, p2, p3, p4, p5},
Expand Down
20 changes: 16 additions & 4 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Domain struct {
isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false.
renewLeaseCh chan func() // It is used to call the renewLease function of the cache table.
onClose func()
sysExecutorFactory func(*Domain) (pools.Resource, error)
}

// loadInfoSchema loads infoschema at startTS.
Expand Down Expand Up @@ -159,7 +160,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, currentSchemaVersion, nil, err
}

newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
}
Expand All @@ -173,6 +174,16 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return is, false, currentSchemaVersion, nil, nil
}

func (do *Domain) sysFacHack() (pools.Resource, error) {
// TODO: Here we create new sessions with sysFac in DDL,
// which will use `do` as Domain instead of call `domap.Get`.
// That's because `domap.Get` requires a lock, but before
// we initialize Domain finish, we can't require that again.
// After we remove the lazy logic of creating Domain, we
// can simplify code here.
return do.sysExecutorFactory(do)
}

func (do *Domain) fetchPolicies(m *meta.Meta) ([]*model.PolicyInfo, error) {
allPolicies, err := m.ListPolicies()
if err != nil {
Expand Down Expand Up @@ -271,7 +282,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
diffs = append(diffs, diff)
}
builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithOldInfoSchema(do.infoCache.GetLatest())
builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest())
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
for _, diff := range diffs {
Expand Down Expand Up @@ -711,7 +722,8 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
const serverIDForStandalone = 1 // serverID for standalone deployment.

// Init initializes a domain.
func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error {
func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) (pools.Resource, error)) error {
do.sysExecutorFactory = sysExecutorFactory
perfschema.Init()
if ebd, ok := do.store.(kv.EtcdBackend); ok {
var addrs []string
Expand Down Expand Up @@ -753,7 +765,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
// After we remove the lazy logic of creating Domain, we
// can simplify code here.
sysFac := func() (pools.Resource, error) {
return sysFactory(do)
return sysExecutorFactory(do)
}
sysCtxPool := pools.NewResourcePool(sysFac, 2, 2, resourceIdleTimeout)
ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4736,7 +4736,7 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64
zap.Stack("stack trace"))
}
}()
err := tbl.(table.CachedTable).UpdateLockForRead(b.ctx.GetStore(), startTS)
err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS)
if err != nil {
log.Warn("Update Lock Info Error")
}
Expand Down
6 changes: 3 additions & 3 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows(
"test 2",
))
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 26)
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 27)

// More tests about the privileges.
tk.MustExec("create user 'testuser'@'localhost'")
Expand All @@ -944,14 +944,14 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
Hostname: "localhost",
}, nil, nil), Equals, true)

tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("26"))
tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("27"))

c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser3",
Hostname: "localhost",
}, nil, nil), Equals, true)

tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("26"))
tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("27"))
}

func (s *testInfoschemaTableSuite) TestSequences(c *C) {
Expand Down
1 change: 0 additions & 1 deletion executor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"),
}
callback := func(i int) int {
testDataMap.GenerateOutputIfNeeded()
Expand Down
2 changes: 1 addition & 1 deletion executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu
}

func newSlowQueryRetriever() (*slowQueryRetriever, error) {
newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0)
newISBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion expression/integration_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3890,7 +3890,14 @@ func TestPreparePlanCacheNotForCacheTable(t *testing.T) {
tk.MustExec("create table t(a int);")
tk.MustExec("alter table t cache")

tk.MustQuery("select * from t where a = 1")
var useCache bool
for i := 0; i < 50; i++ {
tk.MustQuery("select * from t where a = 1")
if tk.HasPlan("select * from t where a = 1", "Union") {
useCache = true
}
}
require.True(t, useCache)
// already read cache after reading first time
tk.MustQuery("explain format = 'brief' select * from t where a = 1").Check(testkit.Rows(
"Projection 10.00 root test.t.a",
Expand Down
1 change: 0 additions & 1 deletion expression/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"),
}

callback := func(i int) int {
Expand Down
14 changes: 12 additions & 2 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sort"
"strings"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/sqlexec"
)

// Builder builds a new InfoSchema.
Expand All @@ -43,6 +45,7 @@ type Builder struct {
store kv.Storage
// TODO: renewLeaseCh is only used to pass data between table and domain
renewLeaseCh chan func()
factory func() (pools.Resource, error)
}

// ApplyDiff applies SchemaDiff to the new InfoSchema.
Expand Down Expand Up @@ -630,7 +633,13 @@ func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInf
return nil, errors.Trace(err)
}
if t, ok := ret.(table.CachedTable); ok {
err = t.Init(b.renewLeaseCh)
var tmp pools.Resource
tmp, err = b.factory()
if err != nil {
return nil, errors.Trace(err)
}

err = t.Init(b.renewLeaseCh, tmp.(sqlexec.SQLExecutor))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -674,7 +683,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc)
}

// NewBuilder creates a new Builder with a Handle.
func NewBuilder(store kv.Storage, renewCh chan func()) *Builder {
func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Resource, error)) *Builder {
return &Builder{
store: store,
is: &infoSchema{
Expand All @@ -684,6 +693,7 @@ func NewBuilder(store kv.Storage, renewCh chan func()) *Builder {
sortedTablesBuckets: make([]sortedTables, bucketCount),
},
renewLeaseCh: renewCh,
factory: factory,
}
}

Expand Down
6 changes: 3 additions & 3 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestBasic(t *testing.T) {
})
require.NoError(t, err)

builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1)
builder, err := infoschema.NewBuilder(dom.Store(), nil, nil).InitWithDBInfos(dbInfos, nil, nil, 1)
require.NoError(t, err)

txn, err := store.Begin()
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestInfoTables(t *testing.T) {
require.NoError(t, err)
}()

builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0)
builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
is := builder.Build()

Expand Down Expand Up @@ -326,7 +326,7 @@ func TestGetBundle(t *testing.T) {
require.NoError(t, err)
}()

builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0)
builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
is := builder.Build()

Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4211,7 +4211,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if r := recover(); r != nil {
}
}()
err := cachedTable.UpdateLockForRead(store, startTS)
err := cachedTable.UpdateLockForRead(ctx, store, startTS)
if err != nil {
log.Warn("Update Lock Info Error")
}
Expand Down
22 changes: 21 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ const (
last_analyzed_at TIMESTAMP,
PRIMARY KEY (table_id, column_id) CLUSTERED
);`
// CreateTableCacheMetaTable stores the cached table meta lock information.
CreateTableCacheMetaTable = `CREATE TABLE IF NOT EXISTS mysql.table_cache_meta (
tid bigint(11) NOT NULL DEFAULT 0,
lock_type enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE',
lease bigint(20) NOT NULL DEFAULT 0,
oldReadLease bigint(20) NOT NULL DEFAULT 0,
PRIMARY KEY (tid)
);`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -528,11 +536,13 @@ const (
version77 = 77
// version78 updates mysql.stats_buckets.lower_bound, mysql.stats_buckets.upper_bound and mysql.stats_histograms.last_analyze_pos from BLOB to LONGBLOB.
version78 = 78
// version79 adds the mysql.table_cache_meta table
version79 = 79
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version78
var currentBootstrapVersion int64 = version79

var (
bootstrapVersion = []func(Session, int64){
Expand Down Expand Up @@ -614,6 +624,7 @@ var (
upgradeToVer76,
upgradeToVer77,
upgradeToVer78,
upgradeToVer79,
}
)

Expand Down Expand Up @@ -1612,6 +1623,13 @@ func upgradeToVer78(s Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms MODIFY last_analyze_pos LONGBLOB DEFAULT NULL")
}

func upgradeToVer79(s Session, ver int64) {
if ver >= version79 {
return
}
doReentrantDDL(s, CreateTableCacheMetaTable)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down Expand Up @@ -1694,6 +1712,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateCapturePlanBaselinesBlacklist)
// Create column_stats_usage table
mustExecute(s, CreateColumnStatsUsageTable)
// Create table_cache_meta table.
mustExecute(s, CreateTableCacheMetaTable)
}

// doDMLWorks executes DML statements in bootstrap stage.
Expand Down
6 changes: 3 additions & 3 deletions statistics/testdata/stats_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@
{
"Start": 800,
"End": 900,
"Count": 761.004166655054
"Count": 752.004166655054
},
{
"Start": 900,
Expand Down Expand Up @@ -711,7 +711,7 @@
{
"Start": 800,
"End": 1000,
"Count": 1219.196869573942
"Count": 1210.196869573942
},
{
"Start": 900,
Expand All @@ -736,7 +736,7 @@
{
"Start": 200,
"End": 400,
"Count": 1186.5288209899081
"Count": 1230.0288209899081
},
{
"Start": 200,
Expand Down
5 changes: 3 additions & 2 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/sqlexec"
)

// Type is used to distinguish between different tables that store data in different ways.
Expand Down Expand Up @@ -252,12 +253,12 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table
type CachedTable interface {
Table

Init(renewCh chan func()) error
Init(renewCh chan func(), exec sqlexec.SQLExecutor) error

// TryReadFromCache checks if the cache table is readable.
TryReadFromCache(ts uint64) kv.MemBuffer

// UpdateLockForRead If you cannot meet the conditions of the read buffer,
// you need to update the lock information and read the data from the original table
UpdateLockForRead(store kv.Storage, ts uint64) error
UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error
}
Loading

0 comments on commit 9a074f1

Please sign in to comment.