Skip to content

Commit

Permalink
fix: add parallelisation to migrate-lid and improve logging (#1689)
Browse files Browse the repository at this point in the history
* improve logging, add parallelism

* use atomic counters

* move back unlock

* use atomic.load
  • Loading branch information
LexLuthr committed Sep 12, 2023
1 parent 59ff559 commit 6472af0
Showing 1 changed file with 85 additions and 43 deletions.
128 changes: 85 additions & 43 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"path"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/filecoin-project/boost-gfm/piecestore"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/schollz/progressbar/v3"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// The methods on the store that are used for migration
Expand Down Expand Up @@ -65,6 +68,12 @@ var commonFlags = []cli.Flag{
Usage: "if the index has already been migrated, overwrite it",
Required: false,
},
&cli.IntFlag{
Name: "parallel",
Usage: "the number of indexes to be processed in parallel",
Required: false,
Value: 4,
},
}

var migrateLevelDBCmd = &cli.Command{
Expand Down Expand Up @@ -198,7 +207,7 @@ func migrate(cctx *cli.Context, dbType string, store StoreMigrationApi, migrateT
if migrateType == "dagstore" {
// Migrate the indices
bar.Describe("Migrating indices...")
errCount, err := migrateIndices(ctx, logger, bar, repoDir, store, cctx.Bool("force"))
errCount, err := migrateIndices(ctx, logger, bar, repoDir, store, cctx.Bool("force"), cctx.Int("parallel"))
if errCount > 0 {
msg := fmt.Sprintf("Warning: there were errors migrating %d indices.", errCount)
msg += " See the log for details:\n" + logPath
Expand Down Expand Up @@ -227,7 +236,12 @@ func migrate(cctx *cli.Context, dbType string, store StoreMigrationApi, migrateT
return nil
}

func migrateIndices(ctx context.Context, logger *zap.SugaredLogger, bar *progressbar.ProgressBar, repoDir string, store StoreMigrationApi, force bool) (int, error) {
type idxTime struct {
t time.Duration
lck sync.Mutex
}

func migrateIndices(ctx context.Context, logger *zap.SugaredLogger, bar *progressbar.ProgressBar, repoDir string, store StoreMigrationApi, force bool, parallel int) (int64, error) {
indicesPath := path.Join(repoDir, "dagstore", "index")
logger.Infof("migrating dagstore indices at %s", indicesPath)

Expand All @@ -240,44 +254,65 @@ func migrateIndices(ctx context.Context, logger *zap.SugaredLogger, bar *progres
bar.ChangeMax(len(idxPaths))

indicesStart := time.Now()
var count int
var errCount int
var indexTime time.Duration
for i, ipath := range idxPaths {
if ctx.Err() != nil {
return errCount, fmt.Errorf("index migration cancelled")
}

start := time.Now()

timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 60*time.Second)
defer timeoutCancel()

indexed, err := migrateIndexWithTimeout(timeoutCtx, ipath, store, force)
bar.Add(1) //nolint:errcheck
if err != nil {
took := time.Since(start)
indexTime += took

logger.Errorw("migrate index failed", "piece cid", ipath.name, "took", took.String(), "err", err)

errCount++
continue
}

if indexed {
count++
took := time.Since(start)
indexTime += took
logger.Infow("migrated index", "piece cid", ipath.name, "processed", i+1, "total", len(idxPaths),
"took", took.String(), "average", (indexTime / time.Duration(count)).String())
} else {
logger.Infow("index already migrated", "piece cid", ipath.name, "processed", i+1, "total", len(idxPaths))
}
var count int64
var errCount int64
var indexTime idxTime
var processed int64

queue := make(chan idxPath, len(idxPaths))
for _, ipath := range idxPaths {
queue <- ipath
}
close(queue)

var eg errgroup.Group
for i := 0; i < parallel; i++ {
eg.Go(func() error {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return ctx.Err()
case p, ok := <-queue:
if !ok {
// Finished adding all the queued items, exit the thread
return nil
}
start := time.Now()

indexed, perr := migrateIndexWithTimeout(ctx, p, store, force)
bar.Add(1) //nolint:errcheck

took := time.Since(start)
indexTime.lck.Lock()
indexTime.t += took
indexTime.lck.Unlock()

if perr != nil {
logger.Errorw("migrate index failed", "piece cid", p.name, "took", took.String(), "err", perr)
atomic.AddInt64(&errCount, 1)
}

if indexed {
atomic.AddInt64(&count, 1)
atomic.AddInt64(&processed, 1)
logger.Infow("migrated index", "piece cid", p.name, "processed", atomic.LoadInt64(&processed), "total", len(idxPaths),
"took", took.String(), "average", (indexTime.t / time.Duration(atomic.LoadInt64(&count))).String())

} else {
atomic.AddInt64(&processed, 1)
logger.Infow("index already migrated", "piece cid", p.name, "processed", atomic.LoadInt64(&processed), "total", len(idxPaths))
}
}
}
return ctx.Err()
})
}

err = eg.Wait()
logger.Errorw("waiting for indexing threads to finish", err)

logger.Infow("migrated indices", "total", len(idxPaths), "took", time.Since(indicesStart).String())
return errCount, nil
return atomic.LoadInt64(&errCount), nil
}

type migrateIndexResult struct {
Expand All @@ -286,13 +321,20 @@ type migrateIndexResult struct {
}

func migrateIndexWithTimeout(ctx context.Context, ipath idxPath, store StoreMigrationApi, force bool) (bool, error) {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 60*time.Second)
defer timeoutCancel()

return execMigrateIndexWithTimeout(timeoutCtx, ipath, store, force)
}

func execMigrateIndexWithTimeout(ctx context.Context, ipath idxPath, store StoreMigrationApi, force bool) (bool, error) {
result := make(chan migrateIndexResult, 1)
go func() {
result <- doMigrateIndex(ctx, ipath, store, force)
}()
select {
case <-time.After(75 * time.Second):
return false, errors.New("index migration timed out after 75 seconds")
case <-ctx.Done():
return false, errors.New("index migration timed out after 60 seconds")
case result := <-result:
return result.Indexed, result.Error
}
Expand Down Expand Up @@ -324,21 +366,25 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f
}

// Load the index file
readStart := time.Now()
idx, err := loadIndex(ipath.path)
if err != nil {
return false, fmt.Errorf("loading index %s from disk: %w", ipath.path, err)
}
log.Debugw("ReadIndex", "took", time.Since(readStart).String())

itidx, ok := idx.(index.IterableIndex)
if !ok {
return false, fmt.Errorf("index %s is not iterable for piece %s", ipath.path, pieceCid)
}

// Convert from IterableIndex to an array of records
convStart := time.Now()
records, err := getRecords(itidx)
if err != nil {
return false, fmt.Errorf("getting records for index %s: %w", ipath.path, err)
}
log.Debugw("ConvertIndex", "took", time.Since(convStart).String())

// Add the index to the store
addStart := time.Now()
Expand Down Expand Up @@ -550,10 +596,6 @@ func getIndexPaths(pathDir string) ([]idxPath, error) {
}

func loadIndex(path string) (index.Index, error) {
defer func(now time.Time) {
log.Debugw("loadindex", "took", time.Since(now))
}(time.Now())

idxf, err := os.Open(path)
if err != nil {
return nil, err
Expand Down

0 comments on commit 6472af0

Please sign in to comment.