Skip to content

Commit

Permalink
ddl: enhance row size estimation before adding index (pingcap#52086)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Mar 27, 2024
1 parent 7b8fd37 commit a804eaf
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 29 deletions.
1 change: 0 additions & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ go_library(
"//pkg/sessiontxn",
"//pkg/statistics",
"//pkg/statistics/handle",
"//pkg/statistics/handle/cache",
"//pkg/statistics/handle/util",
"//pkg/store/copr",
"//pkg/store/driver/backoff",
Expand Down
6 changes: 4 additions & 2 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type BackfillTaskMeta struct {
EleTypeKey []byte `json:"ele_type_key"`

CloudStorageURI string `json:"cloud_storage_uri"`
EstimateRowSize int `json:"estimate_row_size"`
}

// BackfillSubTaskMeta is the sub-task meta for backfilling index.
Expand Down Expand Up @@ -107,15 +108,16 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor(
indexInfos = append(indexInfos, indexInfo)
}
cloudStorageURI := s.taskMeta.CloudStorageURI
estRowSize := s.taskMeta.EstimateRowSize

switch stage {
case proto.BackfillStepReadIndex:
jc := ddlObj.jobContext(jobMeta.ID, jobMeta.ReorgMeta)
ddlObj.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query)
ddlObj.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, s.getBackendCtx, cloudStorageURI)
return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, s.getBackendCtx, cloudStorageURI, estRowSize)
case proto.BackfillStepMergeSort:
return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl, cloudStorageURI)
return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl, cloudStorageURI, estRowSize)
case proto.BackfillStepWriteAndIngest:
if len(cloudStorageURI) == 0 {
return nil, errors.Errorf("local import does not have write & ingest step")
Expand Down
14 changes: 9 additions & 5 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import (

type mergeSortExecutor struct {
taskexecutor.EmptyStepExecutor
jobID int64
idxNum int
ptbl table.PhysicalTable
cloudStoreURI string
jobID int64
idxNum int
ptbl table.PhysicalTable
avgRowSize int
cloudStoreURI string

mu sync.Mutex
subtaskSortedKVMeta *external.SortedKVMeta
}
Expand All @@ -46,12 +48,14 @@ func newMergeSortExecutor(
idxNum int,
ptbl table.PhysicalTable,
cloudStoreURI string,
avgRowSize int,
) (*mergeSortExecutor, error) {
return &mergeSortExecutor{
jobID: jobID,
idxNum: idxNum,
ptbl: ptbl,
cloudStoreURI: cloudStoreURI,
avgRowSize: avgRowSize,
}, nil
}

Expand Down Expand Up @@ -85,7 +89,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
}

prefix := path.Join(strconv.Itoa(int(m.jobID)), strconv.Itoa(int(subtask.ID)))
partSize, err := getMergeSortPartSize(m.ptbl.Meta(), int(variable.GetDDLReorgWorkerCounter()), m.idxNum)
partSize, err := getMergeSortPartSize(m.avgRowSize, int(variable.GetDDLReorgWorkerCounter()), m.idxNum)
if err != nil {
return err
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (ctx *OperatorCtx) OperatorErr() error {
return *err
}

func getWriterMemSize(tblInfo *model.TableInfo, idxNum int) (uint64, error) {
func getWriterMemSize(avgRowSize int, idxNum int) (uint64, error) {
failpoint.Inject("mockWriterMemSize", func() {
failpoint.Return(1*size.GB, nil)
})
_, writerCnt := expectedIngestWorkerCnt(tblInfo)
_, writerCnt := expectedIngestWorkerCnt(avgRowSize)
memTotal, err := memory.MemTotal()
if err != nil {
return 0, err
Expand All @@ -131,8 +131,8 @@ func getWriterMemSize(tblInfo *model.TableInfo, idxNum int) (uint64, error) {
return memSize, nil
}

func getMergeSortPartSize(tblInfo *model.TableInfo, concurrency int, idxNum int) (uint64, error) {
writerMemSize, err := getWriterMemSize(tblInfo, idxNum)
func getMergeSortPartSize(avgRowSize int, concurrency int, idxNum int) (uint64, error) {
writerMemSize, err := getWriterMemSize(avgRowSize, idxNum)
if err != nil {
return 0, nil
}
Expand All @@ -155,6 +155,7 @@ func NewAddIndexIngestPipeline(
totalRowCount *atomic.Int64,
metricCounter prometheus.Counter,
reorgMeta *model.DDLReorgMeta,
avgRowSize int,
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
Expand All @@ -171,7 +172,7 @@ func NewAddIndexIngestPipeline(
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
readerCnt, writerCnt := expectedIngestWorkerCnt(tbl.Meta())
readerCnt, writerCnt := expectedIngestWorkerCnt(avgRowSize)

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
Expand Down Expand Up @@ -207,6 +208,7 @@ func NewWriteIndexToExternalStoragePipeline(
metricCounter prometheus.Counter,
onClose external.OnCloseFunc,
reorgMeta *model.DDLReorgMeta,
avgRowSize int,
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
Expand All @@ -223,7 +225,7 @@ func NewWriteIndexToExternalStoragePipeline(
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
readerCnt, writerCnt := expectedIngestWorkerCnt(tbl.Meta())
readerCnt, writerCnt := expectedIngestWorkerCnt(avgRowSize)

backend, err := storage.ParseBackend(extStoreURI, nil)
if err != nil {
Expand All @@ -234,7 +236,7 @@ func NewWriteIndexToExternalStoragePipeline(
return nil, err
}

memSize, err := getWriterMemSize(tbl.Meta(), len(indexes))
memSize, err := getWriterMemSize(avgRowSize, len(indexes))
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type readIndexExecutor struct {
ptbl table.PhysicalTable
jc *JobContext

avgRowSize int
cloudStorageURI string

bc ingest.BackendCtx
Expand All @@ -65,6 +66,7 @@ func newReadIndexExecutor(
jc *JobContext,
bcGetter func() (ingest.BackendCtx, error),
cloudStorageURI string,
avgRowSize int,
) (*readIndexExecutor, error) {
bc, err := bcGetter()
if err != nil {
Expand All @@ -78,6 +80,7 @@ func newReadIndexExecutor(
jc: jc,
bc: bc,
cloudStorageURI: cloudStorageURI,
avgRowSize: avgRowSize,
curRowCount: &atomic.Int64{},
}, nil
}
Expand Down Expand Up @@ -241,7 +244,7 @@ func (r *readIndexExecutor) buildLocalStorePipeline(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
return NewAddIndexIngestPipeline(
opCtx, d.store, d.sessPool, r.bc, engines, sessCtx, r.job.ID,
tbl, r.indexes, start, end, totalRowCount, counter, r.job.ReorgMeta)
tbl, r.indexes, start, end, totalRowCount, counter, r.job.ReorgMeta, r.avgRowSize)
}

func (r *readIndexExecutor) buildExternalStorePipeline(
Expand Down Expand Up @@ -282,5 +285,7 @@ func (r *readIndexExecutor) buildExternalStorePipeline(
totalRowCount,
counter,
onClose,
r.job.ReorgMeta)
r.job.ReorgMeta,
r.avgRowSize,
)
}
31 changes: 21 additions & 10 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
poolutil "github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -88,7 +87,7 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sess.P
jobCtx *JobContext) (backfillScheduler, error) {
if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ctx = logutil.WithCategory(ctx, "ddl-ingest")
return newIngestBackfillScheduler(ctx, info, sessPool, tbl, false), nil
return newIngestBackfillScheduler(ctx, info, sessPool, tbl, false)
}
return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx)
}
Expand Down Expand Up @@ -314,6 +313,7 @@ type ingestBackfillScheduler struct {
sessPool *sess.Pool
tbl table.PhysicalTable
distribute bool
avgRowSize int

closed bool

Expand All @@ -329,18 +329,30 @@ type ingestBackfillScheduler struct {
checkpointMgr *ingest.CheckpointManager
}

func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo,
sessPool *sess.Pool, tbl table.PhysicalTable, distribute bool) *ingestBackfillScheduler {
func newIngestBackfillScheduler(
ctx context.Context,
info *reorgInfo,
sessPool *sess.Pool,
tbl table.PhysicalTable,
distribute bool,
) (*ingestBackfillScheduler, error) {
sctx, err := sessPool.Get()
if err != nil {
return nil, err
}
defer sessPool.Put(sctx)
avgRowSize := estimateTableRowSize(ctx, info.d.store, sctx.GetRestrictedSQLExecutor(), tbl)
return &ingestBackfillScheduler{
ctx: ctx,
reorgInfo: info,
sessPool: sessPool,
tbl: tbl,
avgRowSize: avgRowSize,
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
poolErr: make(chan error),
distribute: distribute,
}
}, nil
}

func (b *ingestBackfillScheduler) setupWorkers() error {
Expand Down Expand Up @@ -506,13 +518,12 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
}

func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) {
return expectedIngestWorkerCnt(b.tbl.Meta())
return expectedIngestWorkerCnt(b.avgRowSize)
}

func expectedIngestWorkerCnt(tblInfo *model.TableInfo) (readerCnt, writerCnt int) {
func expectedIngestWorkerCnt(avgRowSize int) (readerCnt, writerCnt int) {
workerCnt := int(variable.GetDDLReorgWorkerCounter())
rowCount, avgRowLen, _, _ := cache.TableRowStatsCache.EstimateDataLength(tblInfo)
if rowCount == 0 {
if avgRowSize == 0 {
// Statistic data not exist, use default concurrency.
readerCnt = min(workerCnt/2, maxBackfillWorkerSize)
readerCnt = max(readerCnt, 1)
Expand All @@ -523,7 +534,7 @@ func expectedIngestWorkerCnt(tblInfo *model.TableInfo) (readerCnt, writerCnt int
readerRatio := []float64{0.5, 1, 2, 4, 8}
rowSize := []uint64{200, 500, 1000, 3000, math.MaxUint64}
for i, s := range rowSize {
if avgRowLen <= s {
if uint64(avgRowSize) <= s {
readerCnt = max(int(float64(workerCnt)*readerRatio[i]), 1)
writerCnt = max(workerCnt, 1)
break
Expand Down
Loading

0 comments on commit a804eaf

Please sign in to comment.