Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: compute and store column order correlation with handle #9315

Merged
merged 7 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ type AnalyzeColumnsExec struct {
pkInfo *model.ColumnInfo
concurrency int
priority int
keepOrder bool
analyzePB *tipb.AnalyzeReq
resultHandler *tableResultHandler
maxNumBuckets uint64
Expand All @@ -268,7 +267,7 @@ func (e *AnalyzeColumnsExec) open() error {
ranges = ranger.FullIntRange(false)
}
e.resultHandler = &tableResultHandler{}
firstPartRanges, secondPartRanges := splitRanges(ranges, e.keepOrder)
firstPartRanges, secondPartRanges := splitRanges(ranges, true)
firstResult, err := e.buildResp(firstPartRanges)
if err != nil {
return errors.Trace(err)
Expand All @@ -289,9 +288,11 @@ func (e *AnalyzeColumnsExec) open() error {

func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
kvReq, err := builder.SetTableRanges(e.physicalTableID, ranges, nil).
SetAnalyzeRequest(e.analyzePB).
SetKeepOrder(e.keepOrder).
SetKeepOrder(true).
SetConcurrency(e.concurrency).
Build()
if err != nil {
Expand Down Expand Up @@ -363,7 +364,8 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
}
for i, col := range e.colsInfo {
for j, s := range collectors[i].Samples {
collectors[i].Samples[j], err = tablecodec.DecodeColumnValue(s.GetBytes(), &col.FieldType, timeZone)
collectors[i].Samples[j].Ordinal = j
collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ PARTITION BY RANGE ( a ) (
for _, def := range pi.Definitions {
statsTbl := handle.GetPartitionStats(table.Meta(), def.ID)
c.Assert(statsTbl.Pseudo, IsFalse)
c.Assert(len(statsTbl.Columns), Equals, 2)
c.Assert(len(statsTbl.Columns), Equals, 3)
c.Assert(len(statsTbl.Indices), Equals, 1)
for _, col := range statsTbl.Columns {
c.Assert(col.Len(), Greater, 0)
Expand All @@ -80,7 +80,7 @@ PARTITION BY RANGE ( a ) (
statsTbl := handle.GetPartitionStats(table.Meta(), def.ID)
if i == 0 {
c.Assert(statsTbl.Pseudo, IsFalse)
c.Assert(len(statsTbl.Columns), Equals, 2)
c.Assert(len(statsTbl.Columns), Equals, 3)
c.Assert(len(statsTbl.Indices), Equals, 1)
} else {
c.Assert(statsTbl.Pseudo, IsTrue)
Expand Down
3 changes: 0 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,9 +1360,7 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeInde

func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, maxNumBuckets uint64) *AnalyzeColumnsExec {
cols := task.ColsInfo
keepOrder := false
if task.PKInfo != nil {
keepOrder = true
cols = append([]*model.ColumnInfo{task.PKInfo}, cols...)
}

Expand All @@ -1373,7 +1371,6 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency,
keepOrder: keepOrder,
analyzePB: &tipb.AnalyzeReq{
Tp: tipb.AnalyzeType_TypeColumn,
StartTs: math.MaxUint64,
Expand Down
1 change: 1 addition & 0 deletions executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (e *ShowExec) histogramToRow(dbName, tblName, partitionName, colName string
hist.NDV,
hist.NullCount,
avgColSize,
hist.Correlation,
})
}

Expand Down
8 changes: 5 additions & 3 deletions executor/show_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,16 @@ func (s *testSuite1) TestShowPartitionStats(c *C) {
c.Assert(result.Rows()[0][2], Equals, "p0")

result = tk.MustQuery("show stats_histograms").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][2], Equals, "p0")
c.Assert(result.Rows()[0][3], Equals, "a")
c.Assert(result.Rows()[1][2], Equals, "p0")
c.Assert(result.Rows()[1][3], Equals, "idx")
c.Assert(result.Rows()[1][3], Equals, "b")
c.Assert(result.Rows()[2][2], Equals, "p0")
c.Assert(result.Rows()[2][3], Equals, "idx")

result = tk.MustQuery("show stats_buckets").Sort()
result.Check(testkit.Rows("test t p0 a 0 0 1 1 1 1", "test t p0 idx 1 0 1 1 1 1"))
result.Check(testkit.Rows("test t p0 a 0 0 1 1 1 1", "test t p0 b 0 0 1 1 1 1", "test t p0 idx 1 0 1 1 1 1"))

result = tk.MustQuery("show stats_healthy")
result.Check(testkit.Rows("test t p0 100"))
Expand Down
8 changes: 4 additions & 4 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) {
}{
{
sql: "analyze table t3",
best: "Analyze{Index(a),Table(b)}",
best: "Analyze{Index(a),Table(a, b)}",
},
// Test analyze full table.
{
Expand Down Expand Up @@ -708,10 +708,10 @@ func (s *testAnalyzeSuite) TestCorrelatedEstimation(c *C) {
" ├─TableReader_12 10.00 root data:TableScan_11",
" │ └─TableScan_11 10.00 cop table:t, range:[-inf,+inf], keep order:false",
" └─MaxOneRow_13 1.00 root ",
" └─Projection_14 0.00 root concat(cast(t1.a), \",\", cast(t1.b))",
" └─IndexLookUp_21 0.00 root ",
" └─Projection_14 0.10 root concat(cast(t1.a), \",\", cast(t1.b))",
" └─IndexLookUp_21 0.10 root ",
" ├─IndexScan_18 1.00 cop table:t1, index:c, range: decided by [eq(t1.c, test.t.c)], keep order:false",
" └─Selection_20 0.00 cop eq(t1.a, test.t.a)",
" └─Selection_20 0.10 cop eq(t1.a, test.t.a)",
" └─TableScan_19 1.00 cop table:t, keep order:false",
))
}
Expand Down
23 changes: 5 additions & 18 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,36 +645,23 @@ func (b *PlanBuilder) buildCheckIndexSchema(tn *ast.TableName, indexName string)
// getColsInfo returns the info of index columns, normal columns and primary key.
func getColsInfo(tn *ast.TableName) (indicesInfo []*model.IndexInfo, colsInfo []*model.ColumnInfo, pkCol *model.ColumnInfo) {
tbl := tn.TableInfo
// idxNames contains all the normal columns that can be analyzed more effectively, because those columns occur as index
// columns or primary key columns with integer type.
var idxNames []string
if tbl.PKIsHandle {
for _, col := range tbl.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
idxNames = append(idxNames, col.Name.L)
pkCol = col
}
}
}
for _, idx := range tn.TableInfo.Indices {
if idx.State == model.StatePublic {
indicesInfo = append(indicesInfo, idx)
if len(idx.Columns) == 1 {
idxNames = append(idxNames, idx.Columns[0].Name.L)
}
}
}
for _, col := range tbl.Columns {
isIndexCol := false
for _, idx := range idxNames {
if idx == col.Name.L {
isIndexCol = true
break
}
}
if !isIndexCol {
colsInfo = append(colsInfo, col)
if col == pkCol {
continue
}
colsInfo = append(colsInfo, col)
}
return
}
Expand Down Expand Up @@ -1782,9 +1769,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) {
names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong}
case ast.ShowStatsHistograms:
names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Update_time", "Distinct_count", "Null_count", "Avg_col_size"}
names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Update_time", "Distinct_count", "Null_count", "Avg_col_size", "Correlation"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeTiny, mysql.TypeDatetime,
mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDouble}
mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDouble, mysql.TypeDouble}
case ast.ShowStatsBuckets:
names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Bucket_id", "Count",
"Repeats", "Lower_Bound", "Upper_Bound"}
Expand Down
10 changes: 10 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ const (
cm_sketch blob,
stats_ver bigint(64) NOT NULL DEFAULT 0,
flag bigint(64) NOT NULL DEFAULT 0,
correlation double NOT NULL DEFAULT 0,
unique index tbl(table_id, is_index, hist_id)
);`

Expand Down Expand Up @@ -286,6 +287,7 @@ const (
version24 = 24
version25 = 25
version26 = 26
version27 = 27
)

func checkBootstrapped(s Session) (bool, error) {
Expand Down Expand Up @@ -448,6 +450,10 @@ func upgrade(s Session) {
upgradeToVer26(s)
}

if ver < version27 {
upgradeToVer27(s)
}

updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")

Expand Down Expand Up @@ -719,6 +725,10 @@ func upgradeToVer26(s Session) {
mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Create_role_priv='Y',Drop_role_priv='Y'")
}

func upgradeToVer27(s Session) {
doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms ADD COLUMN `correlation` double NOT NULL DEFAULT 0", infoschema.ErrColumnExists)
}

// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
Expand Down
3 changes: 2 additions & 1 deletion statistics/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables stat
continue
}
hist := NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize)
hist.Correlation = row.GetFloat64(9)
table.Columns[hist.ID] = &Column{
Histogram: *hist,
PhysicalID: table.PhysicalID,
Expand All @@ -134,7 +135,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables stat
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables statsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver from mysql.stats_histograms"
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
Expand Down
32 changes: 26 additions & 6 deletions statistics/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
}
sc := ctx.GetSessionVars().StmtCtx
samples := collector.Samples
err := types.SortDatums(sc, samples)
err := SortSampleItems(sc, samples)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
hg := NewHistogram(id, ndv, collector.NullCount, 0, tp, int(numBuckets), collector.TotalSize)

Expand All @@ -124,9 +124,11 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
}
bucketIdx := 0
var lastCount int64
hg.AppendBucket(&samples[0], &samples[0], int64(sampleFactor), int64(ndvFactor))
var corrXYSum float64
hg.AppendBucket(&samples[0].Value, &samples[0].Value, int64(sampleFactor), int64(ndvFactor))
for i := int64(1); i < sampleNum; i++ {
cmp, err := hg.GetUpper(bucketIdx).CompareDatum(sc, &samples[i])
corrXYSum += float64(i) * float64(samples[i].Ordinal)
cmp, err := hg.GetUpper(bucketIdx).CompareDatum(sc, &samples[i].Value)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -143,14 +145,32 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
}
} else if totalCount-float64(lastCount) <= valuesPerBucket {
// The bucket still have room to store a new item, update the bucket.
hg.updateLastBucket(&samples[i], int64(totalCount), int64(ndvFactor))
hg.updateLastBucket(&samples[i].Value, int64(totalCount), int64(ndvFactor))
} else {
lastCount = hg.Buckets[bucketIdx].Count
// The bucket is full, store the item in the next bucket.
bucketIdx++
hg.AppendBucket(&samples[i], &samples[i], int64(totalCount), int64(ndvFactor))
hg.AppendBucket(&samples[i].Value, &samples[i].Value, int64(totalCount), int64(ndvFactor))
}
}
// Compute column order correlation with handle.
if sampleNum == 1 {
hg.Correlation = 1
return hg, nil
}
// X means the ordinal of the item in original sequence, Y means the oridnal of the item in the
// sorted sequence, we know that X and Y value sets are both:
// 0, 1, ..., sampleNum-1
// we can simply compute sum(X) = sum(Y) =
// (sampleNum-1)*sampleNum / 2
// and sum(X^2) = sum(Y^2) =
// (sampleNum-1)*sampleNum*(2*sampleNum-1) / 6
// The formula for computing correlation is borrowed from PostgreSQL.
// Note that (itemsCount*corrX2Sum - corrXSum*corrXSum) would never be zero when sampleNum is larger than 1.
itemsCount := float64(sampleNum)
corrXSum := (itemsCount - 1) * itemsCount / 2.0
corrX2Sum := (itemsCount - 1) * itemsCount * (2*itemsCount - 1) / 6.0
hg.Correlation = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum)
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
return hg, nil
}

Expand Down
4 changes: 2 additions & 2 deletions statistics/fmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func (s *testStatisticsSuite) TestSketch(c *C) {
sc := &stmtctx.StatementContext{TimeZone: time.Local}
maxSize := 1000
sampleSketch, ndv, err := buildFMSketch(sc, s.samples, maxSize)
sampleSketch, ndv, err := buildFMSketch(sc, extractSampleItemsDatums(s.samples), maxSize)
c.Check(err, IsNil)
c.Check(ndv, Equals, int64(6232))

Expand Down Expand Up @@ -51,7 +51,7 @@ func (s *testStatisticsSuite) TestSketch(c *C) {
func (s *testStatisticsSuite) TestSketchProtoConversion(c *C) {
sc := &stmtctx.StatementContext{TimeZone: time.Local}
maxSize := 1000
sampleSketch, ndv, err := buildFMSketch(sc, s.samples, maxSize)
sampleSketch, ndv, err := buildFMSketch(sc, extractSampleItemsDatums(s.samples), maxSize)
c.Check(err, IsNil)
c.Check(ndv, Equals, int64(6232))

Expand Down
65 changes: 65 additions & 0 deletions statistics/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,68 @@ func newStoreWithBootstrap(statsLease time.Duration) (kv.Storage, *domain.Domain
do.SetStatsUpdating(true)
return store, do, errors.Trace(err)
}

func (s *testStatsSuite) TestCorrelation(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")
testKit.MustExec("create table t(c1 int primary key, c2 int)")
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
testKit.MustExec("insert into t values(1,1),(3,12),(4,20),(2,7),(5,21)")
testKit.MustExec("analyze table t")
result := testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "1")
testKit.MustExec("insert into t values(8,18)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "0.828571")

testKit.MustExec("truncate table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 0)
testKit.MustExec("insert into t values(1,21),(3,12),(4,7),(2,20),(5,1)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "-1")
testKit.MustExec("insert into t values(8,4)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "-0.942857")

testKit.MustExec("drop table t")
testKit.MustExec("create table t(c1 int, c2 int)")
testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(4,20),(5,21),(8,18)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "1")
c.Assert(result.Rows()[1][9], Equals, "0.828571")

testKit.MustExec("truncate table t")
testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(8,18),(4,20),(5,21)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 2)
c.Assert(result.Rows()[0][9], Equals, "0.828571")
c.Assert(result.Rows()[1][9], Equals, "1")

testKit.MustExec("drop table t")
testKit.MustExec("create table t(c1 int primary key, c2 int, c3 int, key idx_c2(c2))")
testKit.MustExec("insert into t values(1,1,1),(2,2,2),(3,3,3)")
testKit.MustExec("analyze table t")
result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 0").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][9], Equals, "0")
c.Assert(result.Rows()[1][9], Equals, "1")
c.Assert(result.Rows()[2][9], Equals, "1")
result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 1").Sort()
c.Assert(len(result.Rows()), Equals, 1)
c.Assert(result.Rows()[0][9], Equals, "0")
}
Loading