diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 88bace1ca79..e6897862705 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -273,7 +273,7 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*opera func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *operator.Operator { ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name) - ordinaryPeers := make(map[uint64]*metapb.Peer) + ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) specialPeers := make(map[string]map[uint64]*metapb.Peer) // Group peers by the engine of their stores for _, peer := range region.GetPeers() { @@ -282,24 +282,36 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * return nil } if ordinaryFilter.Target(r.cluster.GetOpts(), store) { - ordinaryPeers[peer.GetId()] = peer + ordinaryPeers[peer.GetStoreId()] = peer } else { engine := store.GetLabelValue(filter.EngineKey) if _, ok := specialPeers[engine]; !ok { specialPeers[engine] = make(map[uint64]*metapb.Peer) } - specialPeers[engine][peer.GetId()] = peer + specialPeers[engine][peer.GetStoreId()] = peer } } - targetPeers := make(map[uint64]*metapb.Peer) - selectedStores := make(map[uint64]struct{}) - scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { + targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer + selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set + scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer for _, peer := range peers { - candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context) - newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) - targetPeers[newPeer.GetStoreId()] = newPeer - selectedStores[newPeer.GetStoreId()] = struct{}{} + if _, ok := selectedStores[peer.GetStoreId()]; ok { + // It is both sourcePeer and targetPeer itself, no need to select. + continue + } + for { + candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context) + newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) + targetPeers[newPeer.GetStoreId()] = newPeer + selectedStores[newPeer.GetStoreId()] = struct{}{} + // If the selected peer is a peer other than origin peer in this region, + // it is considered that the selected peer select itself. + // This origin peer re-selects. + if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() { + break + } + } } } diff --git a/server/schedule/region_scatterer_test.go b/server/schedule/region_scatterer_test.go index 8560af0c998..9b9e9f347bc 100644 --- a/server/schedule/region_scatterer_test.go +++ b/server/schedule/region_scatterer_test.go @@ -9,6 +9,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" @@ -353,8 +354,8 @@ func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) { } // For leader, we expect each store have about 20 leader for each group checker(scatterer.ordinaryEngine.selectedLeader, 20, 5) - // For peer, we expect each store have about 50 peers for each group - checker(scatterer.ordinaryEngine.selectedPeer, 50, 15) + // For peer, we expect each store have about 60 peers for each group + checker(scatterer.ordinaryEngine.selectedPeer, 60, 15) } } @@ -474,3 +475,54 @@ func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) { } check(scatterer.ordinaryEngine.selectedPeer) } + +// TestSelectedStores tests if the peer count has changed due to the picking strategy. +// Ref https://github.com/tikv/pd/issues/4565 +func (s *testScatterRegionSuite) TestSelectedStores(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + // Add 4 stores. + for i := uint64(1); i <= 4; i++ { + tc.AddRegionStore(i, 0) + // prevent store from being disconnected + tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute) + } + group := "group" + scatterer := NewRegionScatterer(ctx, tc) + + // Put a lot of regions in Store 1/2/3. + for i := uint64(1); i < 100; i++ { + region := tc.AddLeaderRegion(i+10, i%3+1, (i+1)%3+1, (i+2)%3+1) + peers := make(map[uint64]*metapb.Peer, 3) + for _, peer := range region.GetPeers() { + peers[peer.GetStoreId()] = peer + } + scatterer.Put(peers, i%3+1, group) + } + + // Try to scatter a region with peer store id 2/3/4 + for i := uint64(1); i < 20; i++ { + region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2) + op := scatterer.scatterRegion(region, group) + c.Assert(isPeerCountChanged(op), IsFalse) + } +} + +func isPeerCountChanged(op *operator.Operator) bool { + if op == nil { + return false + } + add, remove := 0, 0 + for i := 0; i < op.Len(); i++ { + step := op.Step(i) + switch step.(type) { + case operator.AddPeer, operator.AddLearner, operator.AddLightPeer, operator.AddLightLearner: + add++ + case operator.RemovePeer: + remove++ + } + } + return add != remove +}