Skip to content

Commit

Permalink
stats: merge non-overlapped feedback when update bucket count (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and db-storage committed May 29, 2019
1 parent c2de5f9 commit ede530e
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 8 deletions.
65 changes: 63 additions & 2 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,13 @@ func (b *BucketFeedback) splitBucket(newNumBkts int, totalCount float64, originB
// Split the bucket.
bounds := b.getBoundaries(newNumBkts + 1)
bkts := make([]bucket, 0, len(bounds)-1)
sc := &stmtctx.StatementContext{TimeZone: time.UTC}
for i := 1; i < len(bounds); i++ {
newBkt := bucket{&bounds[i-1], bounds[i].Copy(), 0, 0}
// get bucket count
_, ratio := getOverlapFraction(Feedback{b.lower, b.upper, int64(originBucketCount), 0}, newBkt)
countInNewBkt := originBucketCount * ratio
countInNewBkt = b.refineBucketCount(newBkt, countInNewBkt)
countInNewBkt = b.refineBucketCount(sc, newBkt, countInNewBkt)
// do not split if the count of result bucket is too small.
if countInNewBkt < minBucketFraction*totalCount {
bounds[i] = bounds[i-1]
Expand Down Expand Up @@ -454,11 +455,71 @@ func getOverlapFraction(fb Feedback, bkt bucket) (float64, float64) {
return overlap, ratio
}

// mergeFullyContainedFeedback merges the max fraction of non-overlapped feedbacks that are fully contained in the bucket.
func (b *BucketFeedback) mergeFullyContainedFeedback(sc *stmtctx.StatementContext, bkt bucket) (float64, float64, bool) {
var feedbacks []Feedback
// Get all the fully contained feedbacks.
for _, fb := range b.feedback {
res, err := outOfRange(sc, bkt.Lower, bkt.Upper, fb.Lower)
if res != 0 || err != nil {
return 0, 0, false
}
res, err = outOfRange(sc, bkt.Lower, bkt.Upper, fb.Upper)
if res != 0 || err != nil {
return 0, 0, false
}
feedbacks = append(feedbacks, fb)
}
if len(feedbacks) == 0 {
return 0, 0, false
}
// Sort feedbacks by end point and start point incrementally, then pick every feedback that is not overlapped
// with the previous chosen feedbacks.
var existsErr bool
sort.Slice(feedbacks, func(i, j int) bool {
res, err := feedbacks[i].Upper.CompareDatum(sc, feedbacks[j].Upper)
if err != nil {
existsErr = true
}
if existsErr || res != 0 {
return res < 0
}
res, err = feedbacks[i].Lower.CompareDatum(sc, feedbacks[j].Lower)
if err != nil {
existsErr = true
}
return res < 0
})
if existsErr {
return 0, 0, false
}
previousEnd := &types.Datum{}
var sumFraction, sumCount float64
for _, fb := range feedbacks {
res, err := previousEnd.CompareDatum(sc, fb.Lower)
if err != nil {
return 0, 0, false
}
if res <= 0 {
fraction, _ := getOverlapFraction(fb, bkt)
sumFraction += fraction
sumCount += float64(fb.Count)
previousEnd = fb.Upper
}
}
return sumFraction, sumCount, true
}

// refineBucketCount refine the newly split bucket count. It uses the feedback that overlaps most
// with the bucket to get the bucket count.
func (b *BucketFeedback) refineBucketCount(bkt bucket, defaultCount float64) float64 {
func (b *BucketFeedback) refineBucketCount(sc *stmtctx.StatementContext, bkt bucket, defaultCount float64) float64 {
bestFraction := minBucketFraction
count := defaultCount
sumFraction, sumCount, ok := b.mergeFullyContainedFeedback(sc, bkt)
if ok && sumFraction > bestFraction {
bestFraction = sumFraction
count = sumCount / sumFraction
}
for _, fb := range b.feedback {
fraction, ratio := getOverlapFraction(fb, bkt)
// choose the max overlap fraction
Expand Down
19 changes: 17 additions & 2 deletions statistics/feedback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func (s *testFeedbackSuite) TestUpdateHistogram(c *C) {
defaultBucketCount = 7
defer func() { defaultBucketCount = originBucketCount }()
c.Assert(UpdateHistogram(q.Hist, q).ToString(0), Equals,
"column:0 ndv:10057 totColSize:0\n"+
"column:0 ndv:10058 totColSize:0\n"+
"num: 10000 lower_bound: 0 upper_bound: 1 repeats: 0\n"+
"num: 8 lower_bound: 2 upper_bound: 7 repeats: 0\n"+
"num: 9 lower_bound: 2 upper_bound: 7 repeats: 0\n"+
"num: 11 lower_bound: 8 upper_bound: 19 repeats: 0\n"+
"num: 0 lower_bound: 20 upper_bound: 20 repeats: 0\n"+
"num: 18 lower_bound: 21 upper_bound: 39 repeats: 0\n"+
Expand Down Expand Up @@ -152,6 +152,21 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
"num: 0 lower_bound: 11 upper_bound: 1000000 repeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{true, true})
c.Assert(totalCount, Equals, int64(1))

// test merge the non-overlapped feedbacks.
h = NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0)
appendBucket(h, 0, 10000)
feedbacks = feedbacks[:0]
feedbacks = append(feedbacks, newFeedback(0, 4000, 4000))
feedbacks = append(feedbacks, newFeedback(4001, 9999, 1000))
q = NewQueryFeedback(0, h, 0, false)
q.Feedback = feedbacks
buckets, isNewBuckets, totalCount = splitBuckets(q.Hist, q)
c.Assert(buildNewHistogram(q.Hist, buckets).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 5001 lower_bound: 0 upper_bound: 10000 repeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false})
c.Assert(totalCount, Equals, int64(5001))
}

func (s *testFeedbackSuite) TestMergeBuckets(c *C) {
Expand Down
32 changes: 28 additions & 4 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,38 +1483,62 @@ func (s *testStatsSuite) TestUnsignedFeedbackRanges(c *C) {

testKit.MustExec("use test")
testKit.MustExec("create table t (a tinyint unsigned, primary key(a))")
testKit.MustExec("create table t1 (a bigint unsigned, primary key(a))")
for i := 0; i < 20; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values (%d)", i))
testKit.MustExec(fmt.Sprintf("insert into t1 values (%d)", i))
}
h.HandleDDLEvent(<-h.DDLEventCh())
h.HandleDDLEvent(<-h.DDLEventCh())
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
testKit.MustExec("analyze table t with 3 buckets")
testKit.MustExec("analyze table t, t1 with 3 buckets")
for i := 30; i < 40; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values (%d)", i))
testKit.MustExec(fmt.Sprintf("insert into t1 values (%d)", i))
}
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
tests := []struct {
sql string
hist string
sql string
hist string
tblName string
}{
{
sql: "select * from t where a <= 50",
hist: "column:1 ndv:30 totColSize:0\n" +
"num: 8 lower_bound: 0 upper_bound: 7 repeats: 0\n" +
"num: 8 lower_bound: 8 upper_bound: 15 repeats: 0\n" +
"num: 14 lower_bound: 16 upper_bound: 50 repeats: 0",
tblName: "t",
},
{
sql: "select count(*) from t",
hist: "column:1 ndv:30 totColSize:0\n" +
"num: 8 lower_bound: 0 upper_bound: 7 repeats: 0\n" +
"num: 8 lower_bound: 8 upper_bound: 15 repeats: 0\n" +
"num: 14 lower_bound: 16 upper_bound: 255 repeats: 0",
tblName: "t",
},
{
sql: "select * from t1 where a <= 50",
hist: "column:1 ndv:30 totColSize:0\n" +
"num: 8 lower_bound: 0 upper_bound: 7 repeats: 0\n" +
"num: 8 lower_bound: 8 upper_bound: 15 repeats: 0\n" +
"num: 14 lower_bound: 16 upper_bound: 50 repeats: 0",
tblName: "t1",
},
{
sql: "select count(*) from t1",
hist: "column:1 ndv:30 totColSize:0\n" +
"num: 8 lower_bound: 0 upper_bound: 7 repeats: 0\n" +
"num: 8 lower_bound: 8 upper_bound: 15 repeats: 0\n" +
"num: 14 lower_bound: 16 upper_bound: 18446744073709551615 repeats: 0",
tblName: "t1",
},
}
is := s.do.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
for i, t := range tests {
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr(t.tblName))
c.Assert(err, IsNil)
testKit.MustQuery(t.sql)
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
c.Assert(h.DumpStatsFeedbackToKV(), IsNil)
Expand Down

0 comments on commit ede530e

Please sign in to comment.