Skip to content

Commit

Permalink
statistics: fix the problem that the hot cache cannot be emptied when…
Browse files Browse the repository at this point in the history
… the interval is less than 60 (#4396) (#4432)

* This is an automated cherry-pick of #4396

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>

* fix conflict without sync test

Signed-off-by: lhy1024 <admin@liudos.us>

* fix lint

Signed-off-by: lhy1024 <admin@liudos.us>

* ref #4390

Signed-off-by: lhy1024 <admin@liudos.us>

* statistics: fix hot peer cache (#4446)

* fix hot peer cache

Signed-off-by: lhy1024 <admin@liudos.us>

* fix

Signed-off-by: lhy1024 <admin@liudos.us>

* fix

Signed-off-by: lhy1024 <admin@liudos.us>

* add more test

Signed-off-by: lhy1024 <admin@liudos.us>

* fix ci

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* ref #4390

Signed-off-by: lhy1024 <admin@liudos.us>

* add comment and test

Signed-off-by: lhy1024 <admin@liudos.us>

* address comments

Signed-off-by: lhy1024 <admin@liudos.us>

* fix

Signed-off-by: lhy1024 <admin@liudos.us>

* add more test

Signed-off-by: lhy1024 <admin@liudos.us>

* add comment

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ShuNing <nolouch@gmail.com>
Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* revert log

Signed-off-by: lhy1024 <admin@liudos.us>

* remove todo

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: lhy1024 <admin@liudos.us>
Co-authored-by: ShuNing <nolouch@gmail.com>
  • Loading branch information
3 people authored Dec 21, 2021
1 parent 073e629 commit 552c53e
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 39 deletions.
21 changes: 21 additions & 0 deletions pkg/movingaverage/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,24 @@ func (aot *AvgOverTime) Set(avg float64) {
func (aot *AvgOverTime) IsFull() bool {
return aot.intervalSum >= aot.avgInterval
}

// Clone returns a copy of AvgOverTime
func (aot *AvgOverTime) Clone() *AvgOverTime {
que := aot.que.Clone()
margin := deltaWithInterval{
delta: aot.margin.delta,
interval: aot.margin.interval,
}
return &AvgOverTime{
que: que,
margin: margin,
deltaSum: aot.deltaSum,
intervalSum: aot.intervalSum,
avgInterval: aot.avgInterval,
}
}

// GetIntervalSum returns the sum of interval
func (aot *AvgOverTime) GetIntervalSum() time.Duration {
return aot.intervalSum
}
11 changes: 11 additions & 0 deletions pkg/movingaverage/median_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,14 @@ func (r *MedianFilter) Set(n float64) {
r.records[0] = n
r.count = 1
}

// Clone returns a copy of MedianFilter
func (r *MedianFilter) Clone() *MedianFilter {
records := make([]float64, len(r.records))
copy(records, r.records)
return &MedianFilter{
records: records,
size: r.size,
count: r.count,
}
}
15 changes: 15 additions & 0 deletions pkg/movingaverage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,18 @@ func (sq *SafeQueue) PopFront() interface{} {
defer sq.mu.Unlock()
return sq.que.PopFront()
}

// Clone returns a copy of SafeQueue
func (sq *SafeQueue) Clone() *SafeQueue {
sq.mu.Lock()
defer sq.mu.Unlock()
q := queue.New().Init()
for i := 0; i < sq.que.Len(); i++ {
v := sq.que.PopFront()
sq.que.PushBack(v)
q.PushBack(v)
}
return &SafeQueue{
que: q,
}
}
11 changes: 11 additions & 0 deletions pkg/movingaverage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,14 @@ func (t *testMovingAvg) TestQueue(c *C) {
c.Assert(1, Equals, v1.(int))
c.Assert(2, Equals, v2.(int))
}

func (t *testMovingAvg) TestClone(c *C) {
s1 := NewSafeQueue()
s1.PushBack(1)
s1.PushBack(2)
s2 := s1.Clone()
s2.PopFront()
s2.PopFront()
c.Assert(s1.que.Len(), Equals, 2)
c.Assert(s2.que.Len(), Equals, 0)
}
11 changes: 11 additions & 0 deletions pkg/movingaverage/time_median.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,14 @@ func (t *TimeMedian) GetFilledPeriod() int { // it is unrelated with mfSize
func (t *TimeMedian) GetInstantaneous() float64 {
return t.instantaneous
}

// Clone returns a copy of TimeMedian
func (t *TimeMedian) Clone() *TimeMedian {
return &TimeMedian{
aot: t.aot.Clone(),
mf: t.mf.Clone(),
aotSize: t.aotSize,
mfSize: t.mfSize,
instantaneous: t.instantaneous,
}
}
29 changes: 26 additions & 3 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ func (d *dimStat) Get() float64 {
return d.Rolling.Get()
}

func (d *dimStat) Clone() *dimStat {
return &dimStat{
typ: d.typ,
Rolling: d.Rolling.Clone(),
LastAverage: d.LastAverage.Clone(),
}
}

// HotPeerStat records each hot peer's statistics
type HotPeerStat struct {
StoreID uint64 `json:"store_id"`
Expand Down Expand Up @@ -96,6 +104,11 @@ type HotPeerStat struct {
thresholds [dimLen]float64
peers []uint64
lastTransferLeaderTime time.Time
// source represents the statistics item source, such as directly, inherit, adopt.
source sourceKind
// If the item in storeA is just adopted from storeB,
// then other store, such as storeC, will be forbidden to adopt from storeA until the item in storeA is hot.
allowAdopt bool
}

// ID returns region ID. Implementing TopNItem.
Expand Down Expand Up @@ -128,12 +141,15 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi
zap.Float64("key-rate", stat.GetKeyRate()),
zap.Float64("key-rate-instant", stat.KeyRate),
zap.Float64("key-rate-threshold", stat.thresholds[keyDim]),
zap.Bool("is-leader", stat.isLeader),
zap.String("type", stat.Kind.String()),
zap.Int("hot-degree", stat.HotDegree),
zap.Int("hot-anti-count", stat.AntiCount),
zap.Bool("just-transfer-leader", stat.justTransferLeader),
zap.Bool("is-leader", stat.isLeader),
zap.Duration("sum-interval", stat.getIntervalSum()),
zap.Bool("need-delete", stat.IsNeedDelete()),
zap.String("type", stat.Kind.String()),
zap.String("source", stat.source.String()),
zap.Bool("allow-adopt", stat.allowAdopt),
zap.Bool("just-transfer-leader", stat.justTransferLeader),
zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime))
}

Expand Down Expand Up @@ -197,3 +213,10 @@ func (stat *HotPeerStat) clearLastAverage() {
stat.rollingByteRate.clearLastAverage()
stat.rollingKeyRate.clearLastAverage()
}

func (stat *HotPeerStat) getIntervalSum() time.Duration {
if stat == nil || stat.rollingByteRate == nil {
return 0
}
return stat.rollingByteRate.LastAverage.GetIntervalSum()
}
24 changes: 20 additions & 4 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,19 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS
interval: interval,
peers: peers,
thresholds: thresholds,
source: direct,
}

if oldItem == nil {
if tmpItem != nil { // use the tmpItem cached from the store where this region was in before
if tmpItem != nil && tmpItem.AntiCount > 0 { // use the tmpItem cached from the store where this region was in before
newItem.source = inherit
oldItem = tmpItem
tmpItem = nil
} else { // new item is new peer after adding replica
for _, storeID := range storeIDs {
oldItem = f.getOldHotPeerStat(region.GetID(), storeID)
if oldItem != nil {
if oldItem != nil && oldItem.allowAdopt {
newItem.source = adopt
break
}
}
Expand Down Expand Up @@ -394,6 +398,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k
if interval.Seconds() >= RegionHeartBeatReportInterval {
newItem.HotDegree = 1
newItem.AntiCount = hotRegionAntiCount
newItem.allowAdopt = true
}
newItem.isNew = true
newItem.rollingByteRate = newDimStat(byteDim)
Expand All @@ -406,8 +411,15 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k
return newItem
}

newItem.rollingByteRate = oldItem.rollingByteRate
newItem.rollingKeyRate = oldItem.rollingKeyRate
if newItem.source == adopt {
newItem.rollingByteRate = oldItem.rollingByteRate.Clone()
newItem.rollingKeyRate = oldItem.rollingKeyRate.Clone()
newItem.allowAdopt = false
} else {
newItem.rollingByteRate = oldItem.rollingByteRate
newItem.rollingKeyRate = oldItem.rollingKeyRate
newItem.allowAdopt = oldItem.allowAdopt
}

if newItem.justTransferLeader {
// skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate
Expand All @@ -431,18 +443,22 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k
if newItem.isFullAndHot() {
newItem.HotDegree = 1
newItem.AntiCount = hotRegionAntiCount
newItem.allowAdopt = true
} else {
newItem.needDelete = true
}
} else {
if newItem.isFullAndHot() {
newItem.HotDegree = oldItem.HotDegree + 1
newItem.AntiCount = hotRegionAntiCount
newItem.allowAdopt = true
} else {
newItem.HotDegree = oldItem.HotDegree - 1
newItem.AntiCount = oldItem.AntiCount - 1
if newItem.AntiCount <= 0 {
newItem.needDelete = true
} else {
newItem.allowAdopt = true
}
}
}
Expand Down
Loading

0 comments on commit 552c53e

Please sign in to comment.