Skip to content

Commit

Permalink
region_scatterer: fix the bug that could generate schedule with too f…
Browse files Browse the repository at this point in the history
…ew peers (#4570) (#4579)

* region_scatterer: fix the bug that could generate schedule with too few peers (#4570)

close #4565

Signed-off-by: HunDunDM <hundundm@gmail.com>

* region_scatterer: fix incorrect test (#4586)

Signed-off-by: HunDunDM <hundundm@gmail.com>

Co-authored-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
ti-chi-bot and HunDunDM authored Jan 21, 2022
1 parent 0f07368 commit f4ed192
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 12 deletions.
32 changes: 22 additions & 10 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*opera

func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *operator.Operator {
ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name)
ordinaryPeers := make(map[uint64]*metapb.Peer)
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
specialPeers := make(map[string]map[uint64]*metapb.Peer)
// Group peers by the engine of their stores
for _, peer := range region.GetPeers() {
Expand All @@ -282,24 +282,36 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
return nil
}
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
ordinaryPeers[peer.GetId()] = peer
ordinaryPeers[peer.GetStoreId()] = peer
} else {
engine := store.GetLabelValue(filter.EngineKey)
if _, ok := specialPeers[engine]; !ok {
specialPeers[engine] = make(map[uint64]*metapb.Peer)
}
specialPeers[engine][peer.GetId()] = peer
specialPeers[engine][peer.GetStoreId()] = peer
}
}

targetPeers := make(map[uint64]*metapb.Peer)
selectedStores := make(map[uint64]struct{})
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) {
targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
for _, peer := range peers {
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
if _, ok := selectedStores[peer.GetStoreId()]; ok {
// It is both sourcePeer and targetPeer itself, no need to select.
continue
}
for {
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
// If the selected peer is a peer other than origin peer in this region,
// it is considered that the selected peer select itself.
// This origin peer re-selects.
if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() {
break
}
}
}
}

Expand Down
56 changes: 54 additions & 2 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -353,8 +354,8 @@ func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) {
}
// For leader, we expect each store have about 20 leader for each group
checker(scatterer.ordinaryEngine.selectedLeader, 20, 5)
// For peer, we expect each store have about 50 peers for each group
checker(scatterer.ordinaryEngine.selectedPeer, 50, 15)
// For peer, we expect each store have about 60 peers for each group
checker(scatterer.ordinaryEngine.selectedPeer, 60, 15)
}
}

Expand Down Expand Up @@ -474,3 +475,54 @@ func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) {
}
check(scatterer.ordinaryEngine.selectedPeer)
}

// TestSelectedStores tests if the peer count has changed due to the picking strategy.
// Ref https://github.com/tikv/pd/issues/4565
func (s *testScatterRegionSuite) TestSelectedStores(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
// Add 4 stores.
for i := uint64(1); i <= 4; i++ {
tc.AddRegionStore(i, 0)
// prevent store from being disconnected
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
}
group := "group"
scatterer := NewRegionScatterer(ctx, tc)

// Put a lot of regions in Store 1/2/3.
for i := uint64(1); i < 100; i++ {
region := tc.AddLeaderRegion(i+10, i%3+1, (i+1)%3+1, (i+2)%3+1)
peers := make(map[uint64]*metapb.Peer, 3)
for _, peer := range region.GetPeers() {
peers[peer.GetStoreId()] = peer
}
scatterer.Put(peers, i%3+1, group)
}

// Try to scatter a region with peer store id 2/3/4
for i := uint64(1); i < 20; i++ {
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
op := scatterer.scatterRegion(region, group)
c.Assert(isPeerCountChanged(op), IsFalse)
}
}

func isPeerCountChanged(op *operator.Operator) bool {
if op == nil {
return false
}
add, remove := 0, 0
for i := 0; i < op.Len(); i++ {
step := op.Step(i)
switch step.(type) {
case operator.AddPeer, operator.AddLearner, operator.AddLightPeer, operator.AddLightLearner:
add++
case operator.RemovePeer:
remove++
}
}
return add != remove
}

0 comments on commit f4ed192

Please sign in to comment.