Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: give priority to small tables for importing
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Apr 4, 2019
1 parent 7bae12e commit 204b718
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
5 changes: 4 additions & 1 deletion lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type MDTableMeta struct {
SchemaFile string
DataFiles []string
charSet string
TotalSize int64
}

func (m *MDTableMeta) GetSchema() string {
Expand Down Expand Up @@ -121,6 +122,7 @@ func (ftype fileType) String() string {
type fileInfo struct {
tableName filter.Table
path string
size int64
}

var tableNameRegexp = regexp.MustCompile(`^([^.]+)\.(.*?)(?:\.[0-9]+)?$`)
Expand Down Expand Up @@ -184,6 +186,7 @@ func (s *mdLoaderSetup) setup(dir string) error {
}
}
tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo.path)
tableMeta.TotalSize += fileInfo.size
}

return nil
Expand All @@ -205,7 +208,7 @@ func (s *mdLoaderSetup) listFiles(dir string) error {
fname := strings.TrimSpace(f.Name())
lowerFName := strings.ToLower(fname)

info := fileInfo{path: path}
info := fileInfo{path: path, size: f.Size()}

var (
ftype fileType
Expand Down
30 changes: 24 additions & 6 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"os"
"path"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -461,12 +462,34 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
stopPeriodicActions := make(chan struct{}, 1)
go rc.runPeriodicActions(ctx, stopPeriodicActions)

type task struct {
tr *TableRestore
cp *TableCheckpoint
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
err := task.tr.restoreTable(ctx, rc, task.cp)
metric.RecordTableCount("completed", err)
restoreErr.Set(task.tr.tableName, err)
wg.Done()
}
}()
}

for _, dbMeta := range rc.dbMetas {
dbInfo, ok := rc.dbInfos[dbMeta.Name]
if !ok {
common.AppLogger.Errorf("database %s not found in rc.dbInfos", dbMeta.Name)
continue
}
// Put the small table in the front of the slice which can avoid large table
// take a long time to import and block small table to release index worker.
sort.Slice(dbMeta.Tables, func(i, j int) bool {
return dbMeta.Tables[i].TotalSize < dbMeta.Tables[j].TotalSize
})
for _, tableMeta := range dbMeta.Tables {
tableInfo, ok := dbInfo.Tables[tableMeta.Name]
if !ok {
Expand All @@ -493,12 +516,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}

wg.Add(1)
go func(t *TableRestore, cp *TableCheckpoint) {
defer wg.Done()
err := t.restoreTable(ctx, rc, cp)
metric.RecordTableCount("completed", err)
restoreErr.Set(t.tableName, err)
}(tr, cp)
taskCh <- task{tr: tr, cp: cp}
}
}

Expand Down

0 comments on commit 204b718

Please sign in to comment.