diff --git a/pkg/statistics/handle/storage/json.go b/pkg/statistics/handle/storage/json.go new file mode 100644 index 0000000000000..a174d38aaa566 --- /dev/null +++ b/pkg/statistics/handle/storage/json.go @@ -0,0 +1,344 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "bytes" + "encoding/json" + "io" + "sync/atomic" + "time" + + "github.com/klauspost/compress/gzip" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + compressutil "github.com/pingcap/tidb/pkg/util/compress" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/memory" + "go.uber.org/zap" +) + +func dumpJSONExtendedStats(statsColl *statistics.ExtendedStatsColl) []*util.JSONExtendedStats { + if statsColl == nil || len(statsColl.Stats) == 0 { + return nil + } + stats := make([]*util.JSONExtendedStats, 0, len(statsColl.Stats)) + for name, item := range statsColl.Stats { + js := &util.JSONExtendedStats{ + StatsName: name, + ColIDs: item.ColIDs, + Tp: item.Tp, + ScalarVals: item.ScalarVals, + StringVals: item.StringVals, + } + stats = append(stats, js) + } + return stats +} + +func extendedStatsFromJSON(statsColl []*util.JSONExtendedStats) *statistics.ExtendedStatsColl { + if len(statsColl) == 0 { + return nil + } + stats := statistics.NewExtendedStatsColl() + for _, js := range statsColl { + item := &statistics.ExtendedStatsItem{ + ColIDs: js.ColIDs, + Tp: js.Tp, + ScalarVals: js.ScalarVals, + StringVals: js.StringVals, + } + stats.Stats[js.StatsName] = item + } + return stats +} + +func dumpJSONCol(hist *statistics.Histogram, cmsketch *statistics.CMSketch, topn *statistics.TopN, fmsketch *statistics.FMSketch, statsVer *int64) *util.JSONColumn { + jsonCol := &util.JSONColumn{ + Histogram: statistics.HistogramToProto(hist), + NullCount: hist.NullCount, + TotColSize: hist.TotColSize, + LastUpdateVersion: hist.LastUpdateVersion, + Correlation: hist.Correlation, + StatsVer: statsVer, + } + if cmsketch != nil || topn != nil { + jsonCol.CMSketch = statistics.CMSketchToProto(cmsketch, topn) + } + if fmsketch != nil { + jsonCol.FMSketch = statistics.FMSketchToProto(fmsketch) + } + return jsonCol +} + +// GenJSONTableFromStats generate jsonTable from tableInfo and stats +func GenJSONTableFromStats(sctx sessionctx.Context, dbName string, tableInfo *model.TableInfo, tbl *statistics.Table) (*util.JSONTable, error) { + tracker := memory.NewTracker(memory.LabelForAnalyzeMemory, -1) + tracker.AttachTo(sctx.GetSessionVars().MemTracker) + defer tracker.Detach() + jsonTbl := &util.JSONTable{ + DatabaseName: dbName, + TableName: tableInfo.Name.L, + Columns: make(map[string]*util.JSONColumn, len(tbl.Columns)), + Indices: make(map[string]*util.JSONColumn, len(tbl.Indices)), + Count: tbl.RealtimeCount, + ModifyCount: tbl.ModifyCount, + Version: tbl.Version, + } + for _, col := range tbl.Columns { + sc := stmtctx.NewStmtCtxWithTimeZone(time.UTC) + hist, err := col.ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return nil, errors.Trace(err) + } + proto := dumpJSONCol(hist, col.CMSketch, col.TopN, col.FMSketch, &col.StatsVer) + tracker.Consume(proto.TotalMemoryUsage()) + if atomic.LoadUint32(&sctx.GetSessionVars().Killed) == 1 { + return nil, errors.Trace(statistics.ErrQueryInterrupted) + } + jsonTbl.Columns[col.Info.Name.L] = proto + col.FMSketch.DestroyAndPutToPool() + } + for _, idx := range tbl.Indices { + proto := dumpJSONCol(&idx.Histogram, idx.CMSketch, idx.TopN, nil, &idx.StatsVer) + tracker.Consume(proto.TotalMemoryUsage()) + if atomic.LoadUint32(&sctx.GetSessionVars().Killed) == 1 { + return nil, errors.Trace(statistics.ErrQueryInterrupted) + } + jsonTbl.Indices[idx.Info.Name.L] = proto + } + jsonTbl.ExtStats = dumpJSONExtendedStats(tbl.ExtendedStats) + return jsonTbl, nil +} + +// TableStatsFromJSON loads statistic from JSONTable and return the Table of statistic. +func TableStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *util.JSONTable) (*statistics.Table, error) { + newHistColl := statistics.HistColl{ + PhysicalID: physicalID, + HavePhysicalID: true, + RealtimeCount: jsonTbl.Count, + ModifyCount: jsonTbl.ModifyCount, + Columns: make(map[int64]*statistics.Column, len(jsonTbl.Columns)), + Indices: make(map[int64]*statistics.Index, len(jsonTbl.Indices)), + } + tbl := &statistics.Table{ + HistColl: newHistColl, + } + for id, jsonIdx := range jsonTbl.Indices { + for _, idxInfo := range tableInfo.Indices { + if idxInfo.Name.L != id { + continue + } + hist := statistics.HistogramFromProto(jsonIdx.Histogram) + hist.ID, hist.NullCount, hist.LastUpdateVersion, hist.Correlation = idxInfo.ID, jsonIdx.NullCount, jsonIdx.LastUpdateVersion, jsonIdx.Correlation + cm, topN := statistics.CMSketchAndTopNFromProto(jsonIdx.CMSketch) + statsVer := int64(statistics.Version0) + if jsonIdx.StatsVer != nil { + statsVer = *jsonIdx.StatsVer + } else if jsonIdx.Histogram.Ndv > 0 || jsonIdx.NullCount > 0 { + // If the statistics are collected without setting stats version(which happens in v4.0 and earlier versions), + // we set it to 1. + statsVer = int64(statistics.Version1) + } + idx := &statistics.Index{ + Histogram: *hist, + CMSketch: cm, + TopN: topN, + Info: idxInfo, + StatsVer: statsVer, + PhysicalID: physicalID, + StatsLoadedStatus: statistics.NewStatsFullLoadStatus(), + } + tbl.Indices[idx.ID] = idx + } + } + + for id, jsonCol := range jsonTbl.Columns { + for _, colInfo := range tableInfo.Columns { + if colInfo.Name.L != id { + continue + } + hist := statistics.HistogramFromProto(jsonCol.Histogram) + sc := stmtctx.NewStmtCtxWithTimeZone(time.UTC) + tmpFT := colInfo.FieldType + // For new collation data, when storing the bounds of the histogram, we store the collate key instead of the + // original value. + // But there's additional conversion logic for new collation data, and the collate key might be longer than + // the FieldType.flen. + // If we use the original FieldType here, there might be errors like "Invalid utf8mb4 character string" + // or "Data too long". + // So we change it to TypeBlob to bypass those logics here. + if colInfo.FieldType.EvalType() == types.ETString && colInfo.FieldType.GetType() != mysql.TypeEnum && colInfo.FieldType.GetType() != mysql.TypeSet { + tmpFT = *types.NewFieldType(mysql.TypeBlob) + } + hist, err := hist.ConvertTo(sc, &tmpFT) + if err != nil { + return nil, errors.Trace(err) + } + cm, topN := statistics.CMSketchAndTopNFromProto(jsonCol.CMSketch) + fms := statistics.FMSketchFromProto(jsonCol.FMSketch) + hist.ID, hist.NullCount, hist.LastUpdateVersion, hist.TotColSize, hist.Correlation = colInfo.ID, jsonCol.NullCount, jsonCol.LastUpdateVersion, jsonCol.TotColSize, jsonCol.Correlation + statsVer := int64(statistics.Version0) + if jsonCol.StatsVer != nil { + statsVer = *jsonCol.StatsVer + } else if jsonCol.Histogram.Ndv > 0 || jsonCol.NullCount > 0 { + // If the statistics are collected without setting stats version(which happens in v4.0 and earlier versions), + // we set it to 1. + statsVer = int64(statistics.Version1) + } + col := &statistics.Column{ + PhysicalID: physicalID, + Histogram: *hist, + CMSketch: cm, + TopN: topN, + FMSketch: fms, + Info: colInfo, + IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + StatsVer: statsVer, + StatsLoadedStatus: statistics.NewStatsFullLoadStatus(), + } + tbl.Columns[col.ID] = col + } + } + tbl.ExtendedStats = extendedStatsFromJSON(jsonTbl.ExtStats) + return tbl, nil +} + +// JSONTableToBlocks convert JSONTable to json, then compresses it to blocks by gzip. +func JSONTableToBlocks(jsTable *util.JSONTable, blockSize int) ([][]byte, error) { + data, err := json.Marshal(jsTable) + if err != nil { + return nil, errors.Trace(err) + } + var gzippedData bytes.Buffer + gzipWriter := compressutil.GzipWriterPool.Get().(*gzip.Writer) + defer compressutil.GzipWriterPool.Put(gzipWriter) + gzipWriter.Reset(&gzippedData) + if _, err := gzipWriter.Write(data); err != nil { + return nil, errors.Trace(err) + } + if err := gzipWriter.Close(); err != nil { + return nil, errors.Trace(err) + } + blocksNum := gzippedData.Len() / blockSize + if gzippedData.Len()%blockSize != 0 { + blocksNum = blocksNum + 1 + } + blocks := make([][]byte, blocksNum) + for i := 0; i < blocksNum-1; i++ { + blocks[i] = gzippedData.Bytes()[blockSize*i : blockSize*(i+1)] + } + blocks[blocksNum-1] = gzippedData.Bytes()[blockSize*(blocksNum-1):] + return blocks, nil +} + +// BlocksToJSONTable convert gzip-compressed blocks to JSONTable +func BlocksToJSONTable(blocks [][]byte) (*util.JSONTable, error) { + if len(blocks) == 0 { + return nil, errors.New("Block empty error") + } + data := blocks[0] + for i := 1; i < len(blocks); i++ { + data = append(data, blocks[i]...) + } + gzippedData := bytes.NewReader(data) + gzipReader := compressutil.GzipReaderPool.Get().(*gzip.Reader) + if err := gzipReader.Reset(gzippedData); err != nil { + compressutil.GzipReaderPool.Put(gzipReader) + return nil, err + } + defer func() { + compressutil.GzipReaderPool.Put(gzipReader) + }() + if err := gzipReader.Close(); err != nil { + return nil, err + } + jsonStr, err := io.ReadAll(gzipReader) + if err != nil { + return nil, errors.Trace(err) + } + jsonTbl := util.JSONTable{} + err = json.Unmarshal(jsonStr, &jsonTbl) + if err != nil { + return nil, errors.Trace(err) + } + return &jsonTbl, nil +} + +// TableHistoricalStatsToJSON converts the historical stats of a table to JSONTable. +func TableHistoricalStatsToJSON(sctx sessionctx.Context, physicalID int64, snapshot uint64) (jt *util.JSONTable, exist bool, err error) { + if _, err := util.Exec(sctx, "begin"); err != nil { + return nil, false, err + } + defer func() { + err = util.FinishTransaction(sctx, err) + }() + + // get meta version + rows, _, err := util.ExecRows(sctx, "select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot) + if err != nil { + return nil, false, errors.AddStack(err) + } + if len(rows) < 1 { + logutil.BgLogger().Warn("failed to get records of stats_meta_history", + zap.Int64("table-id", physicalID), + zap.Uint64("snapshotTS", snapshot)) + return nil, false, nil + } + statsMetaVersion := rows[0].GetInt64(0) + // get stats meta + rows, _, err = util.ExecRows(sctx, "select modify_count, count from mysql.stats_meta_history where table_id = %? and version = %?", physicalID, statsMetaVersion) + if err != nil { + return nil, false, errors.AddStack(err) + } + modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) + + // get stats version + rows, _, err = util.ExecRows(sctx, "select distinct version from mysql.stats_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot) + if err != nil { + return nil, false, errors.AddStack(err) + } + if len(rows) < 1 { + logutil.BgLogger().Warn("failed to get record of stats_history", + zap.Int64("table-id", physicalID), + zap.Uint64("snapshotTS", snapshot)) + return nil, false, nil + } + statsVersion := rows[0].GetInt64(0) + + // get stats + rows, _, err = util.ExecRows(sctx, "select stats_data from mysql.stats_history where table_id = %? and version = %? order by seq_no", physicalID, statsVersion) + if err != nil { + return nil, false, errors.AddStack(err) + } + blocks := make([][]byte, 0) + for _, row := range rows { + blocks = append(blocks, row.GetBytes(0)) + } + jsonTbl, err := BlocksToJSONTable(blocks) + if err != nil { + return nil, false, errors.AddStack(err) + } + jsonTbl.Count = count + jsonTbl.ModifyCount = modifyCount + jsonTbl.IsHistoricalStats = true + return jsonTbl, true, nil +} diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go new file mode 100644 index 0000000000000..b35879bce7c4e --- /dev/null +++ b/pkg/statistics/handle/storage/stats_read_writer.go @@ -0,0 +1,617 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "fmt" + "runtime" + "sync" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/sqlexec" +) + +// statsReadWriter implements the util.StatsReadWriter interface. +type statsReadWriter struct { + statsHandler util.StatsHandle +} + +// NewStatsReadWriter creates a new StatsReadWriter. +func NewStatsReadWriter(statsHandler util.StatsHandle) util.StatsReadWriter { + return &statsReadWriter{statsHandler: statsHandler} +} + +// InsertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value. +// This operation also updates version. +func (s *statsReadWriter) InsertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + } + }() + + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + startTS, err := util.GetStartTS(sctx) + if err != nil { + return errors.Trace(err) + } + + // First of all, we update the version. + _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID) + if err != nil { + return err + } + statsVer = startTS + // If we didn't update anything by last SQL, it means the stats of this table does not exist. + if sctx.GetSessionVars().StmtCtx.AffectedRows() > 0 { + // By this step we can get the count of this table, then we can sure the count and repeats of bucket. + var rs sqlexec.RecordSet + rs, err = util.Exec(sctx, "select count from mysql.stats_meta where table_id = %?", physicalID) + if err != nil { + return err + } + defer terror.Call(rs.Close) + req := rs.NewChunk(nil) + err = rs.Next(context.Background(), req) + if err != nil { + return err + } + count := req.GetRow(0).GetInt64(0) + for _, colInfo := range colInfos { + value := types.NewDatum(colInfo.GetOriginDefaultValue()) + value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, &colInfo.FieldType) + if err != nil { + return err + } + if value.IsNull() { + // If the adding column has default value null, all the existing rows have null value on the newly added column. + if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { + return err + } + } else { + // If this stats exists, we insert histogram meta first, the distinct_count will always be one. + if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { + return err + } + value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return err + } + // There must be only one bucket for this new column and the value is the default value. + if _, err := util.Exec(sctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { + return err + } + } + } + } + return nil + }, util.FlagWrapTxn) +} + +// InsertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the +// new columns and indices which belong to this table. +func (s *statsReadWriter) InsertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + } + }() + + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + startTS, err := util.GetStartTS(sctx) + if err != nil { + return errors.Trace(err) + } + if _, err := util.Exec(sctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil { + return err + } + statsVer = startTS + for _, col := range info.Columns { + if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil { + return err + } + } + for _, idx := range info.Indices { + if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil { + return err + } + } + return nil + }, util.FlagWrapTxn) +} + +// ChangeGlobalStatsID changes the table ID in global-stats to the new table ID. +func (s *statsReadWriter) ChangeGlobalStatsID(from, to int64) (err error) { + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} { + _, err = util.Exec(sctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from) + if err != nil { + return err + } + } + return nil + }, util.FlagWrapTxn) +} + +// ResetTableStats2KVForDrop resets the count to 0. +func (s *statsReadWriter) ResetTableStats2KVForDrop(physicalID int64) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + } + }() + + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + startTS, err := util.GetStartTS(sctx) + if err != nil { + return errors.Trace(err) + } + if _, err := util.Exec(sctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil { + return err + } + return nil + }, util.FlagWrapTxn) +} + +// UpdateStatsVersion will set statistics version to the newest TS, +// then tidb-server will reload automatic. +func (s *statsReadWriter) UpdateStatsVersion() error { + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + return UpdateStatsVersion(sctx) + }, util.FlagWrapTxn) +} + +// SaveTableStatsToStorage saves the stats of a table to storage. +func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveTableStatsToStorage(sctx, results, analyzeSnapshot) + return err + }) + if err == nil && statsVer != 0 { + tableID := results.TableID.GetStatisticsID() + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true) + } + return err +} + +// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. +func (s *statsReadWriter) StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + count, modifyCount, _, err = StatsMetaCountAndModifyCount(sctx, tableID) + return err + }, util.FlagWrapTxn) + return +} + +// TableStatsFromStorage loads table stats info from storage. +func (s *statsReadWriter) TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + var ok bool + statsTbl, ok = s.statsHandler.Get(physicalID) + if !ok { + statsTbl = nil + } + statsTbl, err = TableStatsFromStorage(sctx, snapshot, tableInfo, physicalID, loadAll, s.statsHandler.Lease(), statsTbl) + return err + }, util.FlagWrapTxn) + return +} + +// SaveStatsToStorage saves the stats to storage. +// If count is negative, both count and modify count would not be used and not be written to the table. Unless, corresponding +// fields in the stats_meta table will be updated. +// TODO: refactor to reduce the number of parameters +func (s *statsReadWriter) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, + cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveStatsToStorage(sctx, tableID, + count, modifyCount, isIndex, hg, cms, topN, statsVersion, isAnalyzed, updateAnalyzeTime) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) + } + return +} + +// saveMetaToStorage saves stats meta to the storage. +func (s *statsReadWriter) saveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveMetaToStorage(sctx, tableID, count, modifyCount) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) + } + return +} + +// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. +func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = InsertExtendedStats(sctx, s.statsHandler, statsName, colIDs, tp, tableID, ifNotExists) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + } + return +} + +// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. +func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = MarkExtendedStatsDeleted(sctx, s.statsHandler, statsName, tableID, ifExists) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + } + return +} + +// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. +func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveExtendedStatsToStorage(sctx, tableID, extStats, isLoad) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + } + return +} + +// LoadNeededHistograms will load histograms for those needed columns/indices. +func (s *statsReadWriter) LoadNeededHistograms() (err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + return LoadNeededHistograms(sctx, s.statsHandler, loadFMSketch) + }, util.FlagWrapTxn) + return err +} + +// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. +func (s *statsReadWriter) ReloadExtendedStatistics() error { + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + tables := make([]*statistics.Table, 0, s.statsHandler.Len()) + for _, tbl := range s.statsHandler.Values() { + t, err := ExtendedStatsFromStorage(sctx, tbl.Copy(), tbl.PhysicalID, true) + if err != nil { + return err + } + tables = append(tables, t) + } + s.statsHandler.UpdateStatsCache(tables, nil) + return nil + }, util.FlagWrapTxn) +} + +// DumpStatsToJSON dumps statistic to json. +func (s *statsReadWriter) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, + historyStatsExec sqlexec.RestrictedSQLExecutor, dumpPartitionStats bool) (*util.JSONTable, error) { + var snapshot uint64 + if historyStatsExec != nil { + sctx := historyStatsExec.(sessionctx.Context) + snapshot = sctx.GetSessionVars().SnapshotTS + } + return s.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot, dumpPartitionStats) +} + +// DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history. +// As implemented in getTableHistoricalStatsToJSONWithFallback, if historical stats are nonexistent, it will fall back +// to the latest stats, and these table names (and partition names) will be returned in fallbackTbls. +func (s *statsReadWriter) DumpHistoricalStatsBySnapshot( + dbName string, + tableInfo *model.TableInfo, + snapshot uint64, +) ( + jt *util.JSONTable, + fallbackTbls []string, + err error, +) { + historicalStatsEnabled, err := s.statsHandler.CheckHistoricalStatsEnable() + if err != nil { + return nil, nil, errors.Errorf("check %v failed: %v", variable.TiDBEnableHistoricalStats, err) + } + if !historicalStatsEnabled { + return nil, nil, errors.Errorf("%v should be enabled", variable.TiDBEnableHistoricalStats) + } + + defer func() { + if err == nil { + handle_metrics.DumpHistoricalStatsSuccessCounter.Inc() + } else { + handle_metrics.DumpHistoricalStatsFailedCounter.Inc() + } + }() + pi := tableInfo.GetPartitionInfo() + if pi == nil { + jt, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, tableInfo.ID, snapshot) + if fallback { + fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s", dbName, tableInfo.Name.O)) + } + return jt, fallbackTbls, err + } + jsonTbl := &util.JSONTable{ + DatabaseName: dbName, + TableName: tableInfo.Name.L, + Partitions: make(map[string]*util.JSONTable, len(pi.Definitions)), + } + for _, def := range pi.Definitions { + tbl, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, def.ID, snapshot) + if err != nil { + return nil, nil, errors.Trace(err) + } + if fallback { + fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s %s", dbName, tableInfo.Name.O, def.Name.O)) + } + jsonTbl.Partitions[def.Name.L] = tbl + } + tbl, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, tableInfo.ID, snapshot) + if err != nil { + return nil, nil, err + } + if fallback { + fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s global", dbName, tableInfo.Name.O)) + } + // dump its global-stats if existed + if tbl != nil { + jsonTbl.Partitions[util.TiDBGlobalStats] = tbl + } + return jsonTbl, fallbackTbls, nil +} + +// DumpStatsToJSONBySnapshot dumps statistic to json. +func (s *statsReadWriter) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*util.JSONTable, error) { + pruneMode, err := s.statsHandler.GetCurrentPruneMode() + if err != nil { + return nil, err + } + isDynamicMode := variable.PartitionPruneMode(pruneMode) == variable.Dynamic + pi := tableInfo.GetPartitionInfo() + if pi == nil { + return s.TableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) + } + jsonTbl := &util.JSONTable{ + DatabaseName: dbName, + TableName: tableInfo.Name.L, + Partitions: make(map[string]*util.JSONTable, len(pi.Definitions)), + } + // dump partition stats only if in static mode or enable dumpPartitionStats flag in dynamic mode + if !isDynamicMode || dumpPartitionStats { + for _, def := range pi.Definitions { + tbl, err := s.TableStatsToJSON(dbName, tableInfo, def.ID, snapshot) + if err != nil { + return nil, errors.Trace(err) + } + if tbl == nil { + continue + } + jsonTbl.Partitions[def.Name.L] = tbl + } + } + // dump its global-stats if existed + tbl, err := s.TableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) + if err != nil { + return nil, errors.Trace(err) + } + if tbl != nil { + jsonTbl.Partitions[util.TiDBGlobalStats] = tbl + } + return jsonTbl, nil +} + +// getTableHistoricalStatsToJSONWithFallback try to get table historical stats, if not exist, directly fallback to the +// latest stats, and the second return value would be true. +func (s *statsReadWriter) getTableHistoricalStatsToJSONWithFallback( + dbName string, + tableInfo *model.TableInfo, + physicalID int64, + snapshot uint64, +) ( + *util.JSONTable, + bool, + error, +) { + jt, exist, err := s.tableHistoricalStatsToJSON(physicalID, snapshot) + if err != nil { + return nil, false, err + } + if !exist { + jt, err = s.TableStatsToJSON(dbName, tableInfo, physicalID, 0) + fallback := true + if snapshot == 0 { + fallback = false + } + return jt, fallback, err + } + return jt, false, nil +} + +func (s *statsReadWriter) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (jt *util.JSONTable, exist bool, err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + jt, exist, err = TableHistoricalStatsToJSON(sctx, physicalID, snapshot) + return err + }, util.FlagWrapTxn) + return +} + +// TableStatsToJSON dumps statistic to json. +func (s *statsReadWriter) TableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*util.JSONTable, error) { + tbl, err := s.TableStatsFromStorage(tableInfo, physicalID, true, snapshot) + if err != nil || tbl == nil { + return nil, err + } + var jsonTbl *util.JSONTable + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + tbl.Version, tbl.ModifyCount, tbl.RealtimeCount, err = StatsMetaByTableIDFromStorage(sctx, physicalID, snapshot) + if err != nil { + return err + } + jsonTbl, err = GenJSONTableFromStats(sctx, dbName, tableInfo, tbl) + return err + }) + if err != nil { + return nil, err + } + return jsonTbl, nil +} + +// TestLoadStatsErr is only for test. +type TestLoadStatsErr struct{} + +// LoadStatsFromJSON will load statistic from JSONTable, and save it to the storage. +// In final, it will also udpate the stats cache. +func (s *statsReadWriter) LoadStatsFromJSON(ctx context.Context, is infoschema.InfoSchema, + jsonTbl *util.JSONTable, concurrencyForPartition uint8) error { + if err := s.LoadStatsFromJSONNoUpdate(ctx, is, jsonTbl, concurrencyForPartition); err != nil { + return errors.Trace(err) + } + return errors.Trace(s.statsHandler.Update(is)) +} + +// LoadStatsFromJSONNoUpdate will load statistic from JSONTable, and save it to the storage. +func (s *statsReadWriter) LoadStatsFromJSONNoUpdate(ctx context.Context, is infoschema.InfoSchema, + jsonTbl *util.JSONTable, concurrencyForPartition uint8) error { + nCPU := uint8(runtime.GOMAXPROCS(0)) + if concurrencyForPartition == 0 { + concurrencyForPartition = nCPU / 2 // default + } + if concurrencyForPartition > nCPU { + concurrencyForPartition = nCPU // for safety + } + + table, err := is.TableByName(model.NewCIStr(jsonTbl.DatabaseName), model.NewCIStr(jsonTbl.TableName)) + if err != nil { + return errors.Trace(err) + } + tableInfo := table.Meta() + pi := tableInfo.GetPartitionInfo() + if pi == nil || jsonTbl.Partitions == nil { + err := s.loadStatsFromJSON(tableInfo, tableInfo.ID, jsonTbl) + if err != nil { + return errors.Trace(err) + } + } else { + // load partition statistics concurrently + taskCh := make(chan model.PartitionDefinition, len(pi.Definitions)) + for _, def := range pi.Definitions { + taskCh <- def + } + close(taskCh) + var wg sync.WaitGroup + e := new(atomic.Pointer[error]) + for i := 0; i < int(concurrencyForPartition); i++ { + wg.Add(1) + s.statsHandler.GPool().Go(func() { + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("%v", r) + e.CompareAndSwap(nil, &err) + } + wg.Done() + }() + + for def := range taskCh { + tbl := jsonTbl.Partitions[def.Name.L] + if tbl == nil { + continue + } + + loadFunc := s.loadStatsFromJSON + if intest.InTest && ctx.Value(TestLoadStatsErr{}) != nil { + loadFunc = ctx.Value(TestLoadStatsErr{}).(func(*model.TableInfo, int64, *util.JSONTable) error) + } + + err := loadFunc(tableInfo, def.ID, tbl) + if err != nil { + e.CompareAndSwap(nil, &err) + return + } + if e.Load() != nil { + return + } + } + }) + } + wg.Wait() + if e.Load() != nil { + return *e.Load() + } + + // load global-stats if existed + if globalStats, ok := jsonTbl.Partitions[util.TiDBGlobalStats]; ok { + if err := s.loadStatsFromJSON(tableInfo, tableInfo.ID, globalStats); err != nil { + return errors.Trace(err) + } + } + } + return nil +} + +func (s *statsReadWriter) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *util.JSONTable) error { + tbl, err := TableStatsFromJSON(tableInfo, physicalID, jsonTbl) + if err != nil { + return errors.Trace(err) + } + + for _, col := range tbl.Columns { + // loadStatsFromJSON doesn't support partition table now. + // The table level count and modify_count would be overridden by the SaveMetaToStorage below, so we don't need + // to care about them here. + err = s.SaveStatsToStorage(tbl.PhysicalID, tbl.RealtimeCount, 0, 0, &col.Histogram, col.CMSketch, col.TopN, int(col.GetStatsVer()), 1, false, util.StatsMetaHistorySourceLoadStats) + if err != nil { + return errors.Trace(err) + } + } + for _, idx := range tbl.Indices { + // loadStatsFromJSON doesn't support partition table now. + // The table level count and modify_count would be overridden by the SaveMetaToStorage below, so we don't need + // to care about them here. + err = s.SaveStatsToStorage(tbl.PhysicalID, tbl.RealtimeCount, 0, 1, &idx.Histogram, idx.CMSketch, idx.TopN, int(idx.GetStatsVer()), 1, false, util.StatsMetaHistorySourceLoadStats) + if err != nil { + return errors.Trace(err) + } + } + err = s.SaveExtendedStatsToStorage(tbl.PhysicalID, tbl.ExtendedStats, true) + if err != nil { + return errors.Trace(err) + } + return s.saveMetaToStorage(tbl.PhysicalID, tbl.RealtimeCount, tbl.ModifyCount, util.StatsMetaHistorySourceLoadStats) +} diff --git a/pkg/statistics/handle/util/util.go b/pkg/statistics/handle/util/util.go new file mode 100644 index 0000000000000..5ed0da604c73c --- /dev/null +++ b/pkg/statistics/handle/util/util.go @@ -0,0 +1,296 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "strconv" + "time" + + "github.com/ngaut/pools" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/pingcap/tidb/pkg/util/sqlexec/mock" + "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/oracle" +) + +const ( + // StatsMetaHistorySourceAnalyze indicates stats history meta source from analyze + StatsMetaHistorySourceAnalyze = "analyze" + // StatsMetaHistorySourceLoadStats indicates stats history meta source from load stats + StatsMetaHistorySourceLoadStats = "load stats" + // StatsMetaHistorySourceFlushStats indicates stats history meta source from flush stats + StatsMetaHistorySourceFlushStats = "flush stats" + // StatsMetaHistorySourceSchemaChange indicates stats history meta source from schema change + StatsMetaHistorySourceSchemaChange = "schema change" + // StatsMetaHistorySourceExtendedStats indicates stats history meta source from extended stats + StatsMetaHistorySourceExtendedStats = "extended stats" + + // TiDBGlobalStats represents the global-stats for a partitioned table. + TiDBGlobalStats = "global" +) + +var ( + // UseCurrentSessionOpt to make sure the sql is executed in current session. + UseCurrentSessionOpt = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession} + + // StatsCtx is used to mark the request is from stats module. + StatsCtx = kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) +) + +// SessionPool is used to recycle sessionctx. +type SessionPool interface { + Get() (pools.Resource, error) + Put(pools.Resource) +} + +// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`. +func FinishTransaction(sctx sessionctx.Context, err error) error { + if err == nil { + _, _, err = ExecRows(sctx, "commit") + } else { + _, _, err1 := ExecRows(sctx, "rollback") + terror.Log(errors.Trace(err1)) + } + return errors.Trace(err) +} + +var ( + // FlagWrapTxn indicates whether to wrap a transaction. + FlagWrapTxn = 0 +) + +// CallWithSCtx allocates a sctx from the pool and call the f(). +func CallWithSCtx(pool SessionPool, f func(sctx sessionctx.Context) error, flags ...int) (err error) { + se, err := pool.Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + pool.Put(se) + } + }() + sctx := se.(sessionctx.Context) + if err := UpdateSCtxVarsForStats(sctx); err != nil { // update stats variables automatically + return err + } + + wrapTxn := false + for _, flag := range flags { + if flag == FlagWrapTxn { + wrapTxn = true + } + } + if wrapTxn { + err = WrapTxn(sctx, f) + } else { + err = f(sctx) + } + return err +} + +// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics. +func UpdateSCtxVarsForStats(sctx sessionctx.Context) error { + // analyzer version + verInString, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) + if err != nil { + return err + } + ver, err := strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeVersion = int(ver) + + // enable historical stats + val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) + if err != nil { + return err + } + sctx.GetSessionVars().EnableHistoricalStats = variable.TiDBOptOn(val) + + // partition mode + pruneMode, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBPartitionPruneMode) + if err != nil { + return err + } + sctx.GetSessionVars().PartitionPruneMode.Store(pruneMode) + + // enable analyze snapshot + analyzeSnapshot, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAnalyzeSnapshot) + if err != nil { + return err + } + sctx.GetSessionVars().EnableAnalyzeSnapshot = variable.TiDBOptOn(analyzeSnapshot) + + // enable skip column types + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeSkipColumnTypes = variable.ParseAnalyzeSkipColumnTypes(val) + + // skip missing partition stats + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSkipMissingPartitionStats) + if err != nil { + return err + } + sctx.GetSessionVars().SkipMissingPartitionStats = variable.TiDBOptOn(val) + verInString, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency) + if err != nil { + return err + } + ver, err = strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver) + return nil +} + +// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility. +func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) { + // TODO: check whether this sctx is already in a txn + if _, _, err := ExecRows(sctx, "begin"); err != nil { + return err + } + defer func() { + err = FinishTransaction(sctx, err) + }() + err = f(sctx) + return +} + +// GetStartTS gets the start ts from current transaction. +func GetStartTS(sctx sessionctx.Context) (uint64, error) { + txn, err := sctx.Txn(true) + if err != nil { + return 0, err + } + return txn.StartTS(), nil +} + +// Exec is a helper function to execute sql and return RecordSet. +func Exec(sctx sessionctx.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) { + sqlExec, ok := sctx.(sqlexec.SQLExecutor) + if !ok { + return nil, errors.Errorf("invalid sql executor") + } + // TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor + return sqlExec.ExecuteInternal(StatsCtx, sql, args...) +} + +// ExecRows is a helper function to execute sql and return rows and fields. +func ExecRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + if intest.InTest { + if v := sctx.Value(mock.MockRestrictedSQLExecutorKey{}); v != nil { + return v.(*mock.MockRestrictedSQLExecutor).ExecRestrictedSQL(StatsCtx, + UseCurrentSessionOpt, sql, args...) + } + } + + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + return nil, nil, errors.Errorf("invalid sql executor") + } + return sqlExec.ExecRestrictedSQL(StatsCtx, UseCurrentSessionOpt, sql, args...) +} + +// ExecWithOpts is a helper function to execute sql and return rows and fields. +func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + return nil, nil, errors.Errorf("invalid sql executor") + } + return sqlExec.ExecRestrictedSQL(StatsCtx, opts, sql, args...) +} + +// DurationToTS converts duration to timestamp. +func DurationToTS(d time.Duration) uint64 { + return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0) +} + +// GetFullTableName returns the full table name. +func GetFullTableName(is infoschema.InfoSchema, tblInfo *model.TableInfo) string { + for _, schema := range is.AllSchemas() { + if t, err := is.TableByName(schema.Name, tblInfo.Name); err == nil { + if t.Meta().ID == tblInfo.ID { + return schema.Name.O + "." + tblInfo.Name.O + } + } + } + return strconv.FormatInt(tblInfo.ID, 10) +} + +// JSONTable is used for dumping statistics. +type JSONTable struct { + Columns map[string]*JSONColumn `json:"columns"` + Indices map[string]*JSONColumn `json:"indices"` + Partitions map[string]*JSONTable `json:"partitions"` + DatabaseName string `json:"database_name"` + TableName string `json:"table_name"` + ExtStats []*JSONExtendedStats `json:"ext_stats"` + Count int64 `json:"count"` + ModifyCount int64 `json:"modify_count"` + Version uint64 `json:"version"` + IsHistoricalStats bool `json:"is_historical_stats"` +} + +// JSONExtendedStats is used for dumping extended statistics. +type JSONExtendedStats struct { + StatsName string `json:"stats_name"` + StringVals string `json:"string_vals"` + ColIDs []int64 `json:"cols"` + ScalarVals float64 `json:"scalar_vals"` + Tp uint8 `json:"type"` +} + +// JSONColumn is used for dumping statistics. +type JSONColumn struct { + Histogram *tipb.Histogram `json:"histogram"` + CMSketch *tipb.CMSketch `json:"cm_sketch"` + FMSketch *tipb.FMSketch `json:"fm_sketch"` + // StatsVer is a pointer here since the old version json file would not contain version information. + StatsVer *int64 `json:"stats_ver"` + NullCount int64 `json:"null_count"` + TotColSize int64 `json:"tot_col_size"` + LastUpdateVersion uint64 `json:"last_update_version"` + Correlation float64 `json:"correlation"` +} + +// TotalMemoryUsage returns the total memory usage of this column. +func (col *JSONColumn) TotalMemoryUsage() (size int64) { + if col.Histogram != nil { + size += int64(col.Histogram.Size()) + } + if col.CMSketch != nil { + size += int64(col.CMSketch.Size()) + } + if col.FMSketch != nil { + size += int64(col.FMSketch.Size()) + } + return size +}