Skip to content

Commit

Permalink
ddl: concurrent safe api for using rule bundles cache (#22035)
Browse files Browse the repository at this point in the history
Signed-off-by: xhe <xw897002528@gmail.com>
  • Loading branch information
xhebox committed Jan 13, 2021
1 parent ba8223f commit 289bc7e
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 65 deletions.
29 changes: 10 additions & 19 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ PARTITION BY RANGE (c) (
);`)

is := s.dom.InfoSchema()
bundles := make(map[string]*placement.Bundle)
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
partDefs := tb.Meta().GetPartitionInfo().Definitions
p0ID := placement.GroupID(partDefs[0].ID)
bundles[p0ID] = &placement.Bundle{
is.SetBundle(&placement.Bundle{
ID: p0ID,
Rules: []*placement.Rule{{Role: placement.Leader, Count: 1}},
}
})

// normal cases
_, err = tk.Exec(`alter table t1 alter partition p0
Expand Down Expand Up @@ -379,12 +377,10 @@ func (s *testDBSuite1) TestPlacementPolicyCache(c *C) {
}()

initTable := func() []string {
bundles := make(map[string]*placement.Bundle)
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id int) partition by hash(id) partitions 2")

is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
Expand All @@ -393,10 +389,10 @@ func (s *testDBSuite1) TestPlacementPolicyCache(c *C) {
rows := []string{}
for k, v := range partDefs {
ptID := placement.GroupID(v.ID)
bundles[ptID] = &placement.Bundle{
is.SetBundle(&placement.Bundle{
ID: ptID,
Rules: []*placement.Rule{{Count: k}},
}
})
rows = append(rows, fmt.Sprintf("%s 0 test t1 %s <nil> %d ", ptID, v.Name.L, k))
}
return rows
Expand Down Expand Up @@ -433,9 +429,7 @@ PARTITION BY RANGE (c) (
PARTITION p3 VALUES LESS THAN (21)
);`)

bundles := make(map[string]*placement.Bundle)
is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
Expand All @@ -444,7 +438,7 @@ PARTITION BY RANGE (c) (
for _, def := range partDefs {
if def.Name.String() == "p0" {
groupID := placement.GroupID(def.ID)
bundles[groupID] = &placement.Bundle{
is.SetBundle(&placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
Expand All @@ -460,10 +454,10 @@ PARTITION BY RANGE (c) (
},
},
},
}
})
} else if def.Name.String() == "p2" {
groupID := placement.GroupID(def.ID)
bundles[groupID] = &placement.Bundle{
is.SetBundle(&placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
Expand All @@ -479,8 +473,7 @@ PARTITION BY RANGE (c) (
},
},
},
}

})
}
}

Expand Down Expand Up @@ -641,16 +634,14 @@ PARTITION BY RANGE (c) (
PARTITION p1 VALUES LESS THAN (11)
);`)

bundles := make(map[string]*placement.Bundle)
is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
pid, err := tables.FindPartitionByName(tb.Meta(), "p0")
c.Assert(err, IsNil)
groupID := placement.GroupID(pid)
bundles[groupID] = &placement.Bundle{
is.SetBundle(&placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
Expand All @@ -666,7 +657,7 @@ PARTITION BY RANGE (c) (
},
},
},
}
})
dbInfo := testGetSchemaByName(c, tk.Se, "test")
tk2 := testkit.NewTestKit(c, s.store)
var chkErr error
Expand Down
7 changes: 3 additions & 4 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,8 @@ func (builder *RequestBuilder) verifyTxnScope() error {
}
}

bundles := builder.is.RuleBundles()
for phyTableID := range visitPhysicalTableID {
valid := VerifyTxnScope(builder.txnScope, phyTableID, bundles)
valid := VerifyTxnScope(builder.txnScope, phyTableID, builder.is)
if !valid {
var tblName string
var partName string
Expand Down Expand Up @@ -527,11 +526,11 @@ func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ra
}

// VerifyTxnScope verify whether the txnScope and visited physical table break the leader rule's dcLocation.
func VerifyTxnScope(txnScope string, physicalTableID int64, bundles map[string]*placement.Bundle) bool {
func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSchema) bool {
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
return true
}
bundle, ok := bundles[placement.GroupID(physicalTableID)]
bundle, ok := is.BundleByName(placement.GroupID(physicalTableID))
if !ok {
return true
}
Expand Down
3 changes: 1 addition & 2 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1823,9 +1823,8 @@ func (e *memtableRetriever) setDataForStatementsSummary(ctx sessionctx.Context,
func (e *memtableRetriever) setDataForPlacementPolicy(ctx sessionctx.Context) error {
checker := privilege.GetPrivilegeManager(ctx)
is := infoschema.GetInfoSchema(ctx)
ruleBundles := is.RuleBundles()
var rows [][]types.Datum
for _, bundle := range ruleBundles {
for _, bundle := range is.RuleBundles() {
id, err := placement.ObjectIDFromGroupID(bundle.ID)
if err != nil {
return errors.Wrapf(err, "Restore bundle %s failed", bundle.ID)
Expand Down
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (e *PointGetExecutor) verifyTxnScope() error {
tblInfo, _ := is.TableByID(tblID)
tblName = tblInfo.Meta().Name.String()
}
valid := distsql.VerifyTxnScope(txnScope, tblID, is.RuleBundles())
valid := distsql.VerifyTxnScope(txnScope, tblID, is)
if valid {
return nil
}
Expand Down
7 changes: 2 additions & 5 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8520,17 +8520,15 @@ PARTITION BY RANGE (c) (
tk.MustExec(`insert into t1 (c,d,e) values (2,3,5);`)
tk.MustExec(`insert into t1 (c,d,e) values (3,5,7);`)

bundles := make(map[string]*placement.Bundle)
is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
setBundle := func(parName, dc string) {
pid, err := tables.FindPartitionByName(tb.Meta(), parName)
c.Assert(err, IsNil)
groupID := placement.GroupID(pid)
oldBundle := &placement.Bundle{
is.SetBundle(&placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
Expand All @@ -8546,8 +8544,7 @@ PARTITION BY RANGE (c) (
},
},
},
}
bundles[groupID] = placement.BuildPlacementCopyBundle(oldBundle, pid)
})
}
setBundle("p0", "sh")
setBundle("p1", "bj")
Expand Down
12 changes: 6 additions & 6 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [
}

func (b *Builder) applyPlacementDelete(id string) {
delete(b.is.ruleBundleMap, id)
b.is.deleteBundle(id)
}

func (b *Builder) applyPlacementUpdate(id string) error {
Expand All @@ -453,7 +453,7 @@ func (b *Builder) applyPlacementUpdate(id string) error {
}

if !bundle.IsEmpty() {
b.is.ruleBundleMap[id] = bundle
b.is.SetBundle(bundle)
}
return nil
}
Expand All @@ -475,8 +475,9 @@ func (b *Builder) copySchemasMap(oldIS *infoSchema) {
}

func (b *Builder) copyBundlesMap(oldIS *infoSchema) {
for k, v := range oldIS.ruleBundleMap {
b.is.ruleBundleMap[k] = v
is := b.is
for _, v := range oldIS.RuleBundles() {
is.SetBundle(v)
}
}

Expand All @@ -500,9 +501,8 @@ func (b *Builder) copySchemaTables(dbName string) *model.DBInfo {
func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement.Bundle, schemaVersion int64) (*Builder, error) {
info := b.is
info.schemaMetaVersion = schemaVersion
info.ruleBundleMap = make(map[string]*placement.Bundle, len(bundles))
for _, bundle := range bundles {
info.ruleBundleMap[bundle.ID] = bundle
info.SetBundle(bundle)
}

for _, di := range dbInfos {
Expand Down
38 changes: 29 additions & 9 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package infoschema
import (
"fmt"
"sort"
"sync"
"sync/atomic"

"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -56,10 +57,10 @@ type InfoSchema interface {
FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition)
// BundleByName is used to get a rule bundle.
BundleByName(name string) (*placement.Bundle, bool)
// RuleBundles returns all placement rule bundles.
RuleBundles() map[string]*placement.Bundle
// MockBundles is only used for TEST.
MockBundles(map[string]*placement.Bundle)
// SetBundle is used internally to update rule bundles or mock tests.
SetBundle(*placement.Bundle)
// RuleBundles will return a copy of all rule bundles.
RuleBundles() []*placement.Bundle
}

type sortedTables []table.Table
Expand Down Expand Up @@ -95,7 +96,8 @@ const bucketCount = 512

type infoSchema struct {
// ruleBundleMap stores all placement rules
ruleBundleMap map[string]*placement.Bundle
ruleBundleMutex sync.RWMutex
ruleBundleMap map[string]*placement.Bundle

schemaMap map[string]*schemaTables

Expand All @@ -110,6 +112,7 @@ type infoSchema struct {
func MockInfoSchema(tbList []*model.TableInfo) InfoSchema {
result := &infoSchema{}
result.schemaMap = make(map[string]*schemaTables)
result.ruleBundleMap = make(map[string]*placement.Bundle)
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
tableNames := &schemaTables{
Expand All @@ -133,6 +136,7 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema {
func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) InfoSchema {
result := &infoSchema{}
result.schemaMap = make(map[string]*schemaTables)
result.ruleBundleMap = make(map[string]*placement.Bundle)
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
tableNames := &schemaTables{
Expand Down Expand Up @@ -405,16 +409,32 @@ func GetInfoSchemaBySessionVars(sessVar *variable.SessionVars) InfoSchema {
}

func (is *infoSchema) BundleByName(name string) (*placement.Bundle, bool) {
is.ruleBundleMutex.RLock()
defer is.ruleBundleMutex.RUnlock()
t, r := is.ruleBundleMap[name]
return t, r
}

func (is *infoSchema) RuleBundles() map[string]*placement.Bundle {
return is.ruleBundleMap
func (is *infoSchema) RuleBundles() []*placement.Bundle {
is.ruleBundleMutex.RLock()
defer is.ruleBundleMutex.RUnlock()
bundles := make([]*placement.Bundle, 0, len(is.ruleBundleMap))
for _, bundle := range is.ruleBundleMap {
bundles = append(bundles, bundle)
}
return bundles
}

func (is *infoSchema) SetBundle(bundle *placement.Bundle) {
is.ruleBundleMutex.Lock()
defer is.ruleBundleMutex.Unlock()
is.ruleBundleMap[bundle.ID] = bundle
}

func (is *infoSchema) MockBundles(ruleBundleMap map[string]*placement.Bundle) {
is.ruleBundleMap = ruleBundleMap
func (is *infoSchema) deleteBundle(id string) {
is.ruleBundleMutex.Lock()
defer is.ruleBundleMutex.Unlock()
delete(is.ruleBundleMap, id)
}

// GetBundle get the first available bundle by array of IDs, possibbly fallback to the default.
Expand Down
9 changes: 3 additions & 6 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,6 @@ func (*testSuite) TestGetBundle(c *C) {

is := handle.Get()

bundles := make(map[string]*placement.Bundle)
is.MockBundles(bundles)

bundle := &placement.Bundle{
ID: placement.PDBundleID,
Rules: []*placement.Rule{
Expand All @@ -366,7 +363,7 @@ func (*testSuite) TestGetBundle(c *C) {
},
},
}
bundles[placement.PDBundleID] = bundle
is.SetBundle(bundle)

b := infoschema.GetBundle(is, []int64{})
c.Assert(b.Rules, DeepEquals, bundle.Rules)
Expand All @@ -387,7 +384,7 @@ func (*testSuite) TestGetBundle(c *C) {
},
},
}
bundles[ptID] = bundle
is.SetBundle(bundle)

b = infoschema.GetBundle(is, []int64{2, 3})
c.Assert(b, DeepEquals, bundle)
Expand All @@ -408,7 +405,7 @@ func (*testSuite) TestGetBundle(c *C) {
},
},
}
bundles[ptID] = bundle
is.SetBundle(bundle)

b = infoschema.GetBundle(is, []int64{1, 2, 3})
c.Assert(b, DeepEquals, bundle)
Expand Down
13 changes: 5 additions & 8 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,12 +1403,10 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) {
c.Assert(err, IsNil)
partDefs := tb.Meta().GetPartitionInfo().Definitions

bundles := make(map[string]*placement.Bundle)
is.MockBundles(bundles)
tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows())

bundleID := "pd"
bundle := &placement.Bundle{
is.SetBundle(&placement.Bundle{
ID: bundleID,
Rules: []*placement.Rule{
{
Expand All @@ -1418,12 +1416,11 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) {
Count: 3,
},
},
}
bundles[bundleID] = bundle
})
tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows())

bundleID = fmt.Sprintf("%s%d", placement.BundleIDPrefix, partDefs[0].ID)
bundle = &placement.Bundle{
bundle := &placement.Bundle{
ID: bundleID,
Index: 3,
Override: true,
Expand All @@ -1443,7 +1440,7 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) {
},
},
}
bundles[bundleID] = bundle
is.SetBundle(bundle)
expected := fmt.Sprintf(`%s 3 0 test test_placement p0 <nil> voter 3 "+zone=bj"`, bundleID)
tk.MustQuery(`select group_id, group_index, rule_id, schema_name, table_name, partition_name, index_name,
role, replicas, constraints from information_schema.placement_policy`).Check(testkit.Rows(expected))
Expand All @@ -1459,7 +1456,7 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) {
bundle1.ID = bundleID
bundle1.Rules[0].GroupID = bundleID
bundle1.Rules[1].GroupID = bundleID
bundles[bundleID] = bundle1
is.SetBundle(bundle1)
tk.MustQuery("select rule_id, schema_name, table_name, partition_name from information_schema.placement_policy order by partition_name, rule_id").Check(testkit.Rows(
"0 test test_placement p0", "1 test test_placement p0", "0 test test_placement p1", "1 test test_placement p1"))
}
Loading

0 comments on commit 289bc7e

Please sign in to comment.