Skip to content

Commit

Permalink
statistics: add bench for MergeGlobalStatsTopNByConcurrency (#45998) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Aug 14, 2023
1 parent 98f87e8 commit ff50dad
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 9 deletions.
1 change: 1 addition & 0 deletions statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_test(
name = "statistics_test",
timeout = "short",
srcs = [
"cmsketch_bench_test.go",
"cmsketch_test.go",
"feedback_test.go",
"fmsketch_test.go",
Expand Down
161 changes: 161 additions & 0 deletions statistics/cmsketch_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics_test

import (
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
)

// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/statistics
func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
loc := time.UTC
sc := &stmtctx.StatementContext{TimeZone: loc}
version := 1
isKilled := uint32(0)

// Prepare TopNs.
topNs := make([]*statistics.TopN, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := statistics.NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(b, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(b, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
require.NoError(b, err)
topN.AppendTopN(key3, 3)
}
}
topNs = append(topNs, topN)
}

// Prepare Hists.
hists := make([]*statistics.Histogram, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = statistics.MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled)
}
}

// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/statistics
func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *testing.B) {
loc := time.UTC
sc := &stmtctx.StatementContext{TimeZone: loc}
version := 1
isKilled := uint32(0)

// Prepare TopNs.
topNs := make([]*statistics.TopN, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := statistics.NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(b, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(b, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
require.NoError(b, err)
topN.AppendTopN(key3, 3)
}
}
topNs = append(topNs, topN)
}

// Prepare Hists.
hists := make([]*statistics.Histogram, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}
wrapper := &statistics.StatsWrapper{
AllTopN: topNs,
AllHg: hists,
}
const mergeConcurrency = 4
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
batchSize = 1
} else if batchSize > handle.MaxPartitionMergeBatchSize {
batchSize = handle.MaxPartitionMergeBatchSize
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = handle.MergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
}
}

var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000, 10000000}
var benchmarkConcurrencySizes = []int{100, 1000, 10000, 100000, 1000000, 10000000, 100000000}

func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergePartTopN2GlobalTopNWithHists(size, b)
})
}
}

func BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists(b *testing.B) {
for _, size := range benchmarkConcurrencySizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(size, b)
})
}
}
86 changes: 86 additions & 0 deletions statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"math"
"math/rand"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -303,3 +305,87 @@ func TestCMSketchCodingTopN(t *testing.T) {
_, _, err = DecodeCMSketchAndTopN([]byte{}, rows)
require.NoError(t, err)
}

func TestMergePartTopN2GlobalTopNWithoutHists(t *testing.T) {
loc := time.UTC
sc := &stmtctx.StatementContext{TimeZone: loc}
version := 1
isKilled := uint32(0)

// Prepare TopNs.
topNs := make([]*TopN, 0, 10)
for i := 0; i < 10; i++ {
// Construct TopN, should be key(1, 1) -> 2, key(1, 2) -> 2, key(1, 3) -> 3.
topN := NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(1))
require.NoError(t, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(2))
require.NoError(t, err)
topN.AppendTopN(key2, 2)
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(3))
require.NoError(t, err)
topN.AppendTopN(key3, 3)
}
topNs = append(topNs, topN)
}

// Test merge 2 topN with nil hists.
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, nil, false, &isKilled)
require.NoError(t, err)
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
require.Equal(t, uint64(50), globalTopN.TotalCount(), "should have 50 rows")
require.Len(t, leftTopN, 1, "should have 1 left topN")
}

func TestMergePartTopN2GlobalTopNWithHists(t *testing.T) {
loc := time.UTC
sc := &stmtctx.StatementContext{TimeZone: loc}
version := 1
isKilled := uint32(0)

// Prepare TopNs.
topNs := make([]*TopN, 0, 10)
for i := 0; i < 10; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(t, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(t, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
require.NoError(t, err)
topN.AppendTopN(key3, 3)
}
}
topNs = append(topNs, topN)
}

// Prepare Hists.
hists := make([]*Histogram, 0, 10)
for i := 0; i < 10; i++ {
// Construct Hist
h := NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}

// Test merge 2 topN.
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, hists, false, &isKilled)
require.NoError(t, err)
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
require.Equal(t, uint64(55), globalTopN.TotalCount(), "should have 55")
require.Len(t, leftTopN, 1, "should have 1 left topN")
}
18 changes: 9 additions & 9 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ const (
// TiDBGlobalStats represents the global-stats for a partitioned table.
TiDBGlobalStats = "global"

// maxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
maxPartitionMergeBatchSize = 256
// MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
MaxPartitionMergeBatchSize = 256
)

// Handle can update stats info periodically.
Expand Down Expand Up @@ -804,7 +804,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], popedTopN, allHg[i], err = h.mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
globalStats.TopN[i], popedTopN, allHg[i], err = mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
if err != nil {
return
}
Expand Down Expand Up @@ -836,7 +836,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
return
}

func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper,
func mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency
Expand All @@ -848,17 +848,17 @@ func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
batchSize = 1
} else if batchSize > maxPartitionMergeBatchSize {
batchSize = maxPartitionMergeBatchSize
} else if batchSize > MaxPartitionMergeBatchSize {
batchSize = MaxPartitionMergeBatchSize
}
return h.mergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killed)
return MergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killed)
}

// mergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
// the partition size for each worker to solve it
func (h *Handle) mergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool, killed *uint32) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
if len(wrapper.AllTopN) < mergeConcurrency {
Expand Down

0 comments on commit ff50dad

Please sign in to comment.