Skip to content

Commit

Permalink
statistics: compute and store column order correlation with handle
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka committed Feb 22, 2019
1 parent 266ff4b commit f4995ce
Show file tree
Hide file tree
Showing 16 changed files with 213 additions and 63 deletions.
10 changes: 6 additions & 4 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ type AnalyzeColumnsExec struct {
pkInfo *model.ColumnInfo
concurrency int
priority int
keepOrder bool
analyzePB *tipb.AnalyzeReq
resultHandler *tableResultHandler
maxNumBuckets uint64
Expand All @@ -268,7 +267,7 @@ func (e *AnalyzeColumnsExec) open() error {
ranges = ranger.FullIntRange(false)
}
e.resultHandler = &tableResultHandler{}
firstPartRanges, secondPartRanges := splitRanges(ranges, e.keepOrder)
firstPartRanges, secondPartRanges := splitRanges(ranges, true)
firstResult, err := e.buildResp(firstPartRanges)
if err != nil {
return errors.Trace(err)
Expand All @@ -289,9 +288,11 @@ func (e *AnalyzeColumnsExec) open() error {

func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
kvReq, err := builder.SetTableRanges(e.physicalTableID, ranges, nil).
SetAnalyzeRequest(e.analyzePB).
SetKeepOrder(e.keepOrder).
SetKeepOrder(true).
SetConcurrency(e.concurrency).
Build()
if err != nil {
Expand Down Expand Up @@ -363,7 +364,8 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
}
for i, col := range e.colsInfo {
for j, s := range collectors[i].Samples {
collectors[i].Samples[j], err = tablecodec.DecodeColumnValue(s.GetBytes(), &col.FieldType, timeZone)
collectors[i].Samples[j].Ordinal = j
collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ PARTITION BY RANGE ( a ) (
for _, def := range pi.Definitions {
statsTbl := handle.GetPartitionStats(table.Meta(), def.ID)
c.Assert(statsTbl.Pseudo, IsFalse)
c.Assert(len(statsTbl.Columns), Equals, 2)
c.Assert(len(statsTbl.Columns), Equals, 3)
c.Assert(len(statsTbl.Indices), Equals, 1)
for _, col := range statsTbl.Columns {
c.Assert(col.Len(), Greater, 0)
Expand All @@ -80,7 +80,7 @@ PARTITION BY RANGE ( a ) (
statsTbl := handle.GetPartitionStats(table.Meta(), def.ID)
if i == 0 {
c.Assert(statsTbl.Pseudo, IsFalse)
c.Assert(len(statsTbl.Columns), Equals, 2)
c.Assert(len(statsTbl.Columns), Equals, 3)
c.Assert(len(statsTbl.Indices), Equals, 1)
} else {
c.Assert(statsTbl.Pseudo, IsTrue)
Expand Down
3 changes: 0 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,9 +1360,7 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeInde

func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, maxNumBuckets uint64) *AnalyzeColumnsExec {
cols := task.ColsInfo
keepOrder := false
if task.PKInfo != nil {
keepOrder = true
cols = append([]*model.ColumnInfo{task.PKInfo}, cols...)
}

Expand All @@ -1373,7 +1371,6 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency,
keepOrder: keepOrder,
analyzePB: &tipb.AnalyzeReq{
Tp: tipb.AnalyzeType_TypeColumn,
StartTs: math.MaxUint64,
Expand Down
1 change: 1 addition & 0 deletions executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (e *ShowExec) histogramToRow(dbName, tblName, partitionName, colName string
hist.NDV,
hist.NullCount,
avgColSize,
hist.Correlation,
})
}

Expand Down
8 changes: 5 additions & 3 deletions executor/show_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,16 @@ func (s *testSuite1) TestShowPartitionStats(c *C) {
c.Assert(result.Rows()[0][2], Equals, "p0")

result = tk.MustQuery("show stats_histograms").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][2], Equals, "p0")
c.Assert(result.Rows()[0][3], Equals, "a")
c.Assert(result.Rows()[1][2], Equals, "p0")
c.Assert(result.Rows()[1][3], Equals, "idx")
c.Assert(result.Rows()[1][3], Equals, "b")
c.Assert(result.Rows()[2][2], Equals, "p0")
c.Assert(result.Rows()[2][3], Equals, "idx")

result = tk.MustQuery("show stats_buckets").Sort()
result.Check(testkit.Rows("test t p0 a 0 0 1 1 1 1", "test t p0 idx 1 0 1 1 1 1"))
result.Check(testkit.Rows("test t p0 a 0 0 1 1 1 1", "test t p0 b 0 0 1 1 1 1", "test t p0 idx 1 0 1 1 1 1"))

result = tk.MustQuery("show stats_healthy")
result.Check(testkit.Rows("test t p0 100"))
Expand Down
8 changes: 4 additions & 4 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) {
}{
{
sql: "analyze table t3",
best: "Analyze{Index(a),Table(b)}",
best: "Analyze{Index(a),Table(a, b)}",
},
// Test analyze full table.
{
Expand Down Expand Up @@ -708,10 +708,10 @@ func (s *testAnalyzeSuite) TestCorrelatedEstimation(c *C) {
" ├─TableReader_12 10.00 root data:TableScan_11",
" │ └─TableScan_11 10.00 cop table:t, range:[-inf,+inf], keep order:false",
" └─MaxOneRow_13 1.00 root ",
" └─Projection_14 0.00 root concat(cast(t1.a), \",\", cast(t1.b))",
" └─IndexLookUp_21 0.00 root ",
" └─Projection_14 0.10 root concat(cast(t1.a), \",\", cast(t1.b))",
" └─IndexLookUp_21 0.10 root ",
" ├─IndexScan_18 1.00 cop table:t1, index:c, range: decided by [eq(t1.c, test.t.c)], keep order:false",
" └─Selection_20 0.00 cop eq(t1.a, test.t.a)",
" └─Selection_20 0.10 cop eq(t1.a, test.t.a)",
" └─TableScan_19 1.00 cop table:t, keep order:false",
))
}
Expand Down
23 changes: 5 additions & 18 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,36 +645,23 @@ func (b *PlanBuilder) buildCheckIndexSchema(tn *ast.TableName, indexName string)
// getColsInfo returns the info of index columns, normal columns and primary key.
func getColsInfo(tn *ast.TableName) (indicesInfo []*model.IndexInfo, colsInfo []*model.ColumnInfo, pkCol *model.ColumnInfo) {
tbl := tn.TableInfo
// idxNames contains all the normal columns that can be analyzed more effectively, because those columns occur as index
// columns or primary key columns with integer type.
var idxNames []string
if tbl.PKIsHandle {
for _, col := range tbl.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
idxNames = append(idxNames, col.Name.L)
pkCol = col
}
}
}
for _, idx := range tn.TableInfo.Indices {
if idx.State == model.StatePublic {
indicesInfo = append(indicesInfo, idx)
if len(idx.Columns) == 1 {
idxNames = append(idxNames, idx.Columns[0].Name.L)
}
}
}
for _, col := range tbl.Columns {
isIndexCol := false
for _, idx := range idxNames {
if idx == col.Name.L {
isIndexCol = true
break
}
}
if !isIndexCol {
colsInfo = append(colsInfo, col)
if col == pkCol {
continue
}
colsInfo = append(colsInfo, col)
}
return
}
Expand Down Expand Up @@ -1782,9 +1769,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) {
names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong}
case ast.ShowStatsHistograms:
names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Update_time", "Distinct_count", "Null_count", "Avg_col_size"}
names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Update_time", "Distinct_count", "Null_count", "Avg_col_size", "Correlation"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeTiny, mysql.TypeDatetime,
mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDouble}
mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDouble, mysql.TypeDouble}
case ast.ShowStatsBuckets:
names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Bucket_id", "Count",
"Repeats", "Lower_Bound", "Upper_Bound"}
Expand Down
10 changes: 10 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ const (
cm_sketch blob,
stats_ver bigint(64) NOT NULL DEFAULT 0,
flag bigint(64) NOT NULL DEFAULT 0,
correlation double NOT NULL DEFAULT 0,
unique index tbl(table_id, is_index, hist_id)
);`

Expand Down Expand Up @@ -286,6 +287,7 @@ const (
version24 = 24
version25 = 25
version26 = 26
version27 = 27
)

func checkBootstrapped(s Session) (bool, error) {
Expand Down Expand Up @@ -448,6 +450,10 @@ func upgrade(s Session) {
upgradeToVer26(s)
}

if ver < version27 {
upgradeToVer27(s)
}

updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")

Expand Down Expand Up @@ -719,6 +725,10 @@ func upgradeToVer26(s Session) {
mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Create_role_priv='Y',Drop_role_priv='Y'")
}

func upgradeToVer27(s Session) {
doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms ADD COLUMN `correlation` double NOT NULL DEFAULT 0", infoschema.ErrColumnExists)
}

// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
Expand Down
3 changes: 2 additions & 1 deletion statistics/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables stat
continue
}
hist := NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize)
hist.Correlation = row.GetFloat64(9)
table.Columns[hist.ID] = &Column{
Histogram: *hist,
PhysicalID: table.PhysicalID,
Expand All @@ -134,7 +135,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables stat
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables statsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver from mysql.stats_histograms"
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
Expand Down
32 changes: 26 additions & 6 deletions statistics/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
}
sc := ctx.GetSessionVars().StmtCtx
samples := collector.Samples
err := types.SortDatums(sc, samples)
err := SortSampleItems(sc, samples)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
hg := NewHistogram(id, ndv, collector.NullCount, 0, tp, int(numBuckets), collector.TotalSize)

Expand All @@ -124,9 +124,11 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
}
bucketIdx := 0
var lastCount int64
hg.AppendBucket(&samples[0], &samples[0], int64(sampleFactor), int64(ndvFactor))
var corrXYSum float64
hg.AppendBucket(&samples[0].Value, &samples[0].Value, int64(sampleFactor), int64(ndvFactor))
for i := int64(1); i < sampleNum; i++ {
cmp, err := hg.GetUpper(bucketIdx).CompareDatum(sc, &samples[i])
corrXYSum += float64(i) * float64(samples[i].Ordinal)
cmp, err := hg.GetUpper(bucketIdx).CompareDatum(sc, &samples[i].Value)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -143,14 +145,32 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
}
} else if totalCount-float64(lastCount) <= valuesPerBucket {
// The bucket still have room to store a new item, update the bucket.
hg.updateLastBucket(&samples[i], int64(totalCount), int64(ndvFactor))
hg.updateLastBucket(&samples[i].Value, int64(totalCount), int64(ndvFactor))
} else {
lastCount = hg.Buckets[bucketIdx].Count
// The bucket is full, store the item in the next bucket.
bucketIdx++
hg.AppendBucket(&samples[i], &samples[i], int64(totalCount), int64(ndvFactor))
hg.AppendBucket(&samples[i].Value, &samples[i].Value, int64(totalCount), int64(ndvFactor))
}
}
// Compute column order correlation with handle.
if sampleNum == 1 {
hg.Correlation = 1
return hg, nil
}
// X means the ordinal of the item in original sequence, Y means the oridnal of the item in the
// sorted sequence, we know that X and Y value sets are both:
// 0, 1, ..., sampleNum-1
// we can simply compute sum(X) = sum(Y) =
// (sampleNum-1)*sampleNum / 2
// and sum(X^2) = sum(Y^2) =
// (sampleNum-1)*sampleNum*(2*sampleNum-1) / 6
// The formula for computing correlation is borrowed from PostgreSQL.
// Note that (itemsCount*corrX2Sum - corrXSum*corrXSum) would never be zero when sampleNum is larger than 1.
itemsCount := float64(sampleNum)
corrXSum := (itemsCount - 1) * itemsCount / 2.0
corrX2Sum := (itemsCount - 1) * itemsCount * (2*itemsCount - 1) / 6.0
hg.Correlation = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum)
return hg, nil
}

Expand Down
4 changes: 2 additions & 2 deletions statistics/fmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func (s *testStatisticsSuite) TestSketch(c *C) {
sc := &stmtctx.StatementContext{TimeZone: time.Local}
maxSize := 1000
sampleSketch, ndv, err := buildFMSketch(sc, s.samples, maxSize)
sampleSketch, ndv, err := buildFMSketch(sc, extractSampleItemsDatums(s.samples), maxSize)
c.Check(err, IsNil)
c.Check(ndv, Equals, int64(6232))

Expand Down Expand Up @@ -51,7 +51,7 @@ func (s *testStatisticsSuite) TestSketch(c *C) {
func (s *testStatisticsSuite) TestSketchProtoConversion(c *C) {
sc := &stmtctx.StatementContext{TimeZone: time.Local}
maxSize := 1000
sampleSketch, ndv, err := buildFMSketch(sc, s.samples, maxSize)
sampleSketch, ndv, err := buildFMSketch(sc, extractSampleItemsDatums(s.samples), maxSize)
c.Check(err, IsNil)
c.Check(ndv, Equals, int64(6232))

Expand Down
65 changes: 65 additions & 0 deletions statistics/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,68 @@ func newStoreWithBootstrap(statsLease time.Duration) (kv.Storage, *domain.Domain
do.SetStatsUpdating(true)
return store, do, errors.Trace(err)
}

func (s *testStatsSuite) TestCorrelation(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")
testKit.MustExec("create table t(c1 int primary key, c2 int)")
testKit.MustExec("insert into t values(1,1),(3,12),(4,20),(2,7),(5,21)")
testKit.MustExec("analyze table t")
result := testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "1")
testKit.MustExec("insert into t values(8,18)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "0.828571")

testKit.MustExec("truncate table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 0)
testKit.MustExec("insert into t values(1,21),(3,12),(4,7),(2,20),(5,1)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "-1")
testKit.MustExec("insert into t values(8,4)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "-0.942857")

testKit.MustExec("drop table t")
testKit.MustExec("create table t(c1 int, c2 int)")
testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(4,20),(5,21),(8,18)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "1")
c.Assert(result.Rows()[1][9], Equals, "0.828571")

testKit.MustExec("truncate table t")
testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(8,18),(4,20),(5,21)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0.828571")
c.Assert(result.Rows()[1][9], Equals, "1")

testKit.MustExec("drop table t")
testKit.MustExec("create table t(c1 int primary key, c2 int, c3 int, key idx_c2(c2))")
testKit.MustExec("insert into t values(1,1,1),(2,2,2),(3,3,3)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 0").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "1")
c.Assert(result.Rows()[2][9], Equals, "1")
result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 1").Sort()
c.Assert(len(result.Rows()), Equals, 1)
c.Assert(result.Rows()[0][9], Equals, "0")
}
Loading

0 comments on commit f4995ce

Please sign in to comment.