From 289bc7e79cdf5c3f2809a47d7c9737ba6637a32b Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 13 Jan 2021 10:58:58 +0800 Subject: [PATCH] ddl: concurrent safe api for using rule bundles cache (#22035) Signed-off-by: xhe --- ddl/placement_sql_test.go | 29 +++++++++----------------- distsql/request_builder.go | 7 +++---- executor/infoschema_reader.go | 3 +-- executor/point_get.go | 2 +- expression/integration_test.go | 7 ++----- infoschema/builder.go | 12 +++++------ infoschema/infoschema.go | 38 ++++++++++++++++++++++++++-------- infoschema/infoschema_test.go | 9 +++----- infoschema/tables_test.go | 13 +++++------- session/session_test.go | 7 ++----- 10 files changed, 62 insertions(+), 65 deletions(-) diff --git a/ddl/placement_sql_test.go b/ddl/placement_sql_test.go index a1b7d50b0c176..15d2aff04842c 100644 --- a/ddl/placement_sql_test.go +++ b/ddl/placement_sql_test.go @@ -45,17 +45,15 @@ PARTITION BY RANGE (c) ( );`) is := s.dom.InfoSchema() - bundles := make(map[string]*placement.Bundle) - is.MockBundles(bundles) tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) partDefs := tb.Meta().GetPartitionInfo().Definitions p0ID := placement.GroupID(partDefs[0].ID) - bundles[p0ID] = &placement.Bundle{ + is.SetBundle(&placement.Bundle{ ID: p0ID, Rules: []*placement.Rule{{Role: placement.Leader, Count: 1}}, - } + }) // normal cases _, err = tk.Exec(`alter table t1 alter partition p0 @@ -379,12 +377,10 @@ func (s *testDBSuite1) TestPlacementPolicyCache(c *C) { }() initTable := func() []string { - bundles := make(map[string]*placement.Bundle) tk.MustExec("drop table if exists t1") tk.MustExec("create table t1(id int) partition by hash(id) partitions 2") is := s.dom.InfoSchema() - is.MockBundles(bundles) tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) @@ -393,10 +389,10 @@ func (s *testDBSuite1) TestPlacementPolicyCache(c *C) { rows := []string{} for k, v := range partDefs { ptID := placement.GroupID(v.ID) - bundles[ptID] = &placement.Bundle{ + is.SetBundle(&placement.Bundle{ ID: ptID, Rules: []*placement.Rule{{Count: k}}, - } + }) rows = append(rows, fmt.Sprintf("%s 0 test t1 %s %d ", ptID, v.Name.L, k)) } return rows @@ -433,9 +429,7 @@ PARTITION BY RANGE (c) ( PARTITION p3 VALUES LESS THAN (21) );`) - bundles := make(map[string]*placement.Bundle) is := s.dom.InfoSchema() - is.MockBundles(bundles) tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) @@ -444,7 +438,7 @@ PARTITION BY RANGE (c) ( for _, def := range partDefs { if def.Name.String() == "p0" { groupID := placement.GroupID(def.ID) - bundles[groupID] = &placement.Bundle{ + is.SetBundle(&placement.Bundle{ ID: groupID, Rules: []*placement.Rule{ { @@ -460,10 +454,10 @@ PARTITION BY RANGE (c) ( }, }, }, - } + }) } else if def.Name.String() == "p2" { groupID := placement.GroupID(def.ID) - bundles[groupID] = &placement.Bundle{ + is.SetBundle(&placement.Bundle{ ID: groupID, Rules: []*placement.Rule{ { @@ -479,8 +473,7 @@ PARTITION BY RANGE (c) ( }, }, }, - } - + }) } } @@ -641,16 +634,14 @@ PARTITION BY RANGE (c) ( PARTITION p1 VALUES LESS THAN (11) );`) - bundles := make(map[string]*placement.Bundle) is := s.dom.InfoSchema() - is.MockBundles(bundles) tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) pid, err := tables.FindPartitionByName(tb.Meta(), "p0") c.Assert(err, IsNil) groupID := placement.GroupID(pid) - bundles[groupID] = &placement.Bundle{ + is.SetBundle(&placement.Bundle{ ID: groupID, Rules: []*placement.Rule{ { @@ -666,7 +657,7 @@ PARTITION BY RANGE (c) ( }, }, }, - } + }) dbInfo := testGetSchemaByName(c, tk.Se, "test") tk2 := testkit.NewTestKit(c, s.store) var chkErr error diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 71f7c8b05dffd..87b44792e4c7d 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -284,9 +284,8 @@ func (builder *RequestBuilder) verifyTxnScope() error { } } - bundles := builder.is.RuleBundles() for phyTableID := range visitPhysicalTableID { - valid := VerifyTxnScope(builder.txnScope, phyTableID, bundles) + valid := VerifyTxnScope(builder.txnScope, phyTableID, builder.is) if !valid { var tblName string var partName string @@ -527,11 +526,11 @@ func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ra } // VerifyTxnScope verify whether the txnScope and visited physical table break the leader rule's dcLocation. -func VerifyTxnScope(txnScope string, physicalTableID int64, bundles map[string]*placement.Bundle) bool { +func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSchema) bool { if txnScope == "" || txnScope == oracle.GlobalTxnScope { return true } - bundle, ok := bundles[placement.GroupID(physicalTableID)] + bundle, ok := is.BundleByName(placement.GroupID(physicalTableID)) if !ok { return true } diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 0d92d307e1594..cf733da680c0f 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -1823,9 +1823,8 @@ func (e *memtableRetriever) setDataForStatementsSummary(ctx sessionctx.Context, func (e *memtableRetriever) setDataForPlacementPolicy(ctx sessionctx.Context) error { checker := privilege.GetPrivilegeManager(ctx) is := infoschema.GetInfoSchema(ctx) - ruleBundles := is.RuleBundles() var rows [][]types.Datum - for _, bundle := range ruleBundles { + for _, bundle := range is.RuleBundles() { id, err := placement.ObjectIDFromGroupID(bundle.ID) if err != nil { return errors.Wrapf(err, "Restore bundle %s failed", bundle.ID) diff --git a/executor/point_get.go b/executor/point_get.go index 72929f33aa7df..91369979ed32f 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -390,7 +390,7 @@ func (e *PointGetExecutor) verifyTxnScope() error { tblInfo, _ := is.TableByID(tblID) tblName = tblInfo.Meta().Name.String() } - valid := distsql.VerifyTxnScope(txnScope, tblID, is.RuleBundles()) + valid := distsql.VerifyTxnScope(txnScope, tblID, is) if valid { return nil } diff --git a/expression/integration_test.go b/expression/integration_test.go index 22cb616dbc10a..dd5bda2dbf901 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -8520,9 +8520,7 @@ PARTITION BY RANGE (c) ( tk.MustExec(`insert into t1 (c,d,e) values (2,3,5);`) tk.MustExec(`insert into t1 (c,d,e) values (3,5,7);`) - bundles := make(map[string]*placement.Bundle) is := s.dom.InfoSchema() - is.MockBundles(bundles) tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) @@ -8530,7 +8528,7 @@ PARTITION BY RANGE (c) ( pid, err := tables.FindPartitionByName(tb.Meta(), parName) c.Assert(err, IsNil) groupID := placement.GroupID(pid) - oldBundle := &placement.Bundle{ + is.SetBundle(&placement.Bundle{ ID: groupID, Rules: []*placement.Rule{ { @@ -8546,8 +8544,7 @@ PARTITION BY RANGE (c) ( }, }, }, - } - bundles[groupID] = placement.BuildPlacementCopyBundle(oldBundle, pid) + }) } setBundle("p0", "sh") setBundle("p1", "bj") diff --git a/infoschema/builder.go b/infoschema/builder.go index 1a70930fdd780..aae0ed06f1cf5 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -443,7 +443,7 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [ } func (b *Builder) applyPlacementDelete(id string) { - delete(b.is.ruleBundleMap, id) + b.is.deleteBundle(id) } func (b *Builder) applyPlacementUpdate(id string) error { @@ -453,7 +453,7 @@ func (b *Builder) applyPlacementUpdate(id string) error { } if !bundle.IsEmpty() { - b.is.ruleBundleMap[id] = bundle + b.is.SetBundle(bundle) } return nil } @@ -475,8 +475,9 @@ func (b *Builder) copySchemasMap(oldIS *infoSchema) { } func (b *Builder) copyBundlesMap(oldIS *infoSchema) { - for k, v := range oldIS.ruleBundleMap { - b.is.ruleBundleMap[k] = v + is := b.is + for _, v := range oldIS.RuleBundles() { + is.SetBundle(v) } } @@ -500,9 +501,8 @@ func (b *Builder) copySchemaTables(dbName string) *model.DBInfo { func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement.Bundle, schemaVersion int64) (*Builder, error) { info := b.is info.schemaMetaVersion = schemaVersion - info.ruleBundleMap = make(map[string]*placement.Bundle, len(bundles)) for _, bundle := range bundles { - info.ruleBundleMap[bundle.ID] = bundle + info.SetBundle(bundle) } for _, di := range dbInfos { diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 4e2643ba3f1ae..4fcbdc042de85 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -16,6 +16,7 @@ package infoschema import ( "fmt" "sort" + "sync" "sync/atomic" "github.com/pingcap/parser/model" @@ -56,10 +57,10 @@ type InfoSchema interface { FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition) // BundleByName is used to get a rule bundle. BundleByName(name string) (*placement.Bundle, bool) - // RuleBundles returns all placement rule bundles. - RuleBundles() map[string]*placement.Bundle - // MockBundles is only used for TEST. - MockBundles(map[string]*placement.Bundle) + // SetBundle is used internally to update rule bundles or mock tests. + SetBundle(*placement.Bundle) + // RuleBundles will return a copy of all rule bundles. + RuleBundles() []*placement.Bundle } type sortedTables []table.Table @@ -95,7 +96,8 @@ const bucketCount = 512 type infoSchema struct { // ruleBundleMap stores all placement rules - ruleBundleMap map[string]*placement.Bundle + ruleBundleMutex sync.RWMutex + ruleBundleMap map[string]*placement.Bundle schemaMap map[string]*schemaTables @@ -110,6 +112,7 @@ type infoSchema struct { func MockInfoSchema(tbList []*model.TableInfo) InfoSchema { result := &infoSchema{} result.schemaMap = make(map[string]*schemaTables) + result.ruleBundleMap = make(map[string]*placement.Bundle) result.sortedTablesBuckets = make([]sortedTables, bucketCount) dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList} tableNames := &schemaTables{ @@ -133,6 +136,7 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema { func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) InfoSchema { result := &infoSchema{} result.schemaMap = make(map[string]*schemaTables) + result.ruleBundleMap = make(map[string]*placement.Bundle) result.sortedTablesBuckets = make([]sortedTables, bucketCount) dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList} tableNames := &schemaTables{ @@ -405,16 +409,32 @@ func GetInfoSchemaBySessionVars(sessVar *variable.SessionVars) InfoSchema { } func (is *infoSchema) BundleByName(name string) (*placement.Bundle, bool) { + is.ruleBundleMutex.RLock() + defer is.ruleBundleMutex.RUnlock() t, r := is.ruleBundleMap[name] return t, r } -func (is *infoSchema) RuleBundles() map[string]*placement.Bundle { - return is.ruleBundleMap +func (is *infoSchema) RuleBundles() []*placement.Bundle { + is.ruleBundleMutex.RLock() + defer is.ruleBundleMutex.RUnlock() + bundles := make([]*placement.Bundle, 0, len(is.ruleBundleMap)) + for _, bundle := range is.ruleBundleMap { + bundles = append(bundles, bundle) + } + return bundles +} + +func (is *infoSchema) SetBundle(bundle *placement.Bundle) { + is.ruleBundleMutex.Lock() + defer is.ruleBundleMutex.Unlock() + is.ruleBundleMap[bundle.ID] = bundle } -func (is *infoSchema) MockBundles(ruleBundleMap map[string]*placement.Bundle) { - is.ruleBundleMap = ruleBundleMap +func (is *infoSchema) deleteBundle(id string) { + is.ruleBundleMutex.Lock() + defer is.ruleBundleMutex.Unlock() + delete(is.ruleBundleMap, id) } // GetBundle get the first available bundle by array of IDs, possibbly fallback to the default. diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 578d149e7382c..cc194617518e9 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -352,9 +352,6 @@ func (*testSuite) TestGetBundle(c *C) { is := handle.Get() - bundles := make(map[string]*placement.Bundle) - is.MockBundles(bundles) - bundle := &placement.Bundle{ ID: placement.PDBundleID, Rules: []*placement.Rule{ @@ -366,7 +363,7 @@ func (*testSuite) TestGetBundle(c *C) { }, }, } - bundles[placement.PDBundleID] = bundle + is.SetBundle(bundle) b := infoschema.GetBundle(is, []int64{}) c.Assert(b.Rules, DeepEquals, bundle.Rules) @@ -387,7 +384,7 @@ func (*testSuite) TestGetBundle(c *C) { }, }, } - bundles[ptID] = bundle + is.SetBundle(bundle) b = infoschema.GetBundle(is, []int64{2, 3}) c.Assert(b, DeepEquals, bundle) @@ -408,7 +405,7 @@ func (*testSuite) TestGetBundle(c *C) { }, }, } - bundles[ptID] = bundle + is.SetBundle(bundle) b = infoschema.GetBundle(is, []int64{1, 2, 3}) c.Assert(b, DeepEquals, bundle) diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 35fad3f8d0ef9..32f347a1f9d79 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -1403,12 +1403,10 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) { c.Assert(err, IsNil) partDefs := tb.Meta().GetPartitionInfo().Definitions - bundles := make(map[string]*placement.Bundle) - is.MockBundles(bundles) tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows()) bundleID := "pd" - bundle := &placement.Bundle{ + is.SetBundle(&placement.Bundle{ ID: bundleID, Rules: []*placement.Rule{ { @@ -1418,12 +1416,11 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) { Count: 3, }, }, - } - bundles[bundleID] = bundle + }) tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows()) bundleID = fmt.Sprintf("%s%d", placement.BundleIDPrefix, partDefs[0].ID) - bundle = &placement.Bundle{ + bundle := &placement.Bundle{ ID: bundleID, Index: 3, Override: true, @@ -1443,7 +1440,7 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) { }, }, } - bundles[bundleID] = bundle + is.SetBundle(bundle) expected := fmt.Sprintf(`%s 3 0 test test_placement p0 voter 3 "+zone=bj"`, bundleID) tk.MustQuery(`select group_id, group_index, rule_id, schema_name, table_name, partition_name, index_name, role, replicas, constraints from information_schema.placement_policy`).Check(testkit.Rows(expected)) @@ -1459,7 +1456,7 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) { bundle1.ID = bundleID bundle1.Rules[0].GroupID = bundleID bundle1.Rules[1].GroupID = bundleID - bundles[bundleID] = bundle1 + is.SetBundle(bundle1) tk.MustQuery("select rule_id, schema_name, table_name, partition_name from information_schema.placement_policy order by partition_name, rule_id").Check(testkit.Rows( "0 test test_placement p0", "1 test test_placement p0", "0 test test_placement p1", "1 test test_placement p1")) } diff --git a/session/session_test.go b/session/session_test.go index b1c171670665e..b54f777ccc8bb 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3280,16 +3280,14 @@ PARTITION BY RANGE (c) ( PARTITION p1 VALUES LESS THAN (200) );`) // Config the Placement Rules - bundles := make(map[string]*placement.Bundle) is := s.dom.InfoSchema() - is.MockBundles(bundles) tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) setBundle := func(parName, dc string) { pid, err := tables.FindPartitionByName(tb.Meta(), parName) c.Assert(err, IsNil) groupID := placement.GroupID(pid) - oldBundle := &placement.Bundle{ + is.SetBundle(&placement.Bundle{ ID: groupID, Rules: []*placement.Rule{ { @@ -3305,8 +3303,7 @@ PARTITION BY RANGE (c) ( }, }, }, - } - bundles[groupID] = placement.BuildPlacementCopyBundle(oldBundle, pid) + }) } setBundle("p0", "dc-1") setBundle("p1", "dc-2")