From 6472af0159fa318e59ebb782a422d3a9d8567b7c Mon Sep 17 00:00:00 2001 From: LexLuthr <88259624+LexLuthr@users.noreply.github.com> Date: Tue, 12 Sep 2023 13:05:45 +0400 Subject: [PATCH] fix: add parallelisation to migrate-lid and improve logging (#1689) * improve logging, add parallelism * use atomic counters * move back unlock * use atomic.load --- cmd/migrate-lid/migrate_lid.go | 128 ++++++++++++++++++++++----------- 1 file changed, 85 insertions(+), 43 deletions(-) diff --git a/cmd/migrate-lid/migrate_lid.go b/cmd/migrate-lid/migrate_lid.go index 4a70b8255..c27e1d57b 100644 --- a/cmd/migrate-lid/migrate_lid.go +++ b/cmd/migrate-lid/migrate_lid.go @@ -8,6 +8,8 @@ import ( "path" "sort" "strings" + "sync" + "sync/atomic" "time" "github.com/filecoin-project/boost-gfm/piecestore" @@ -32,6 +34,7 @@ import ( "github.com/schollz/progressbar/v3" "github.com/urfave/cli/v2" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) // The methods on the store that are used for migration @@ -65,6 +68,12 @@ var commonFlags = []cli.Flag{ Usage: "if the index has already been migrated, overwrite it", Required: false, }, + &cli.IntFlag{ + Name: "parallel", + Usage: "the number of indexes to be processed in parallel", + Required: false, + Value: 4, + }, } var migrateLevelDBCmd = &cli.Command{ @@ -198,7 +207,7 @@ func migrate(cctx *cli.Context, dbType string, store StoreMigrationApi, migrateT if migrateType == "dagstore" { // Migrate the indices bar.Describe("Migrating indices...") - errCount, err := migrateIndices(ctx, logger, bar, repoDir, store, cctx.Bool("force")) + errCount, err := migrateIndices(ctx, logger, bar, repoDir, store, cctx.Bool("force"), cctx.Int("parallel")) if errCount > 0 { msg := fmt.Sprintf("Warning: there were errors migrating %d indices.", errCount) msg += " See the log for details:\n" + logPath @@ -227,7 +236,12 @@ func migrate(cctx *cli.Context, dbType string, store StoreMigrationApi, migrateT return nil } -func migrateIndices(ctx context.Context, logger *zap.SugaredLogger, bar *progressbar.ProgressBar, repoDir string, store StoreMigrationApi, force bool) (int, error) { +type idxTime struct { + t time.Duration + lck sync.Mutex +} + +func migrateIndices(ctx context.Context, logger *zap.SugaredLogger, bar *progressbar.ProgressBar, repoDir string, store StoreMigrationApi, force bool, parallel int) (int64, error) { indicesPath := path.Join(repoDir, "dagstore", "index") logger.Infof("migrating dagstore indices at %s", indicesPath) @@ -240,44 +254,65 @@ func migrateIndices(ctx context.Context, logger *zap.SugaredLogger, bar *progres bar.ChangeMax(len(idxPaths)) indicesStart := time.Now() - var count int - var errCount int - var indexTime time.Duration - for i, ipath := range idxPaths { - if ctx.Err() != nil { - return errCount, fmt.Errorf("index migration cancelled") - } - - start := time.Now() - - timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 60*time.Second) - defer timeoutCancel() - - indexed, err := migrateIndexWithTimeout(timeoutCtx, ipath, store, force) - bar.Add(1) //nolint:errcheck - if err != nil { - took := time.Since(start) - indexTime += took - - logger.Errorw("migrate index failed", "piece cid", ipath.name, "took", took.String(), "err", err) - - errCount++ - continue - } - - if indexed { - count++ - took := time.Since(start) - indexTime += took - logger.Infow("migrated index", "piece cid", ipath.name, "processed", i+1, "total", len(idxPaths), - "took", took.String(), "average", (indexTime / time.Duration(count)).String()) - } else { - logger.Infow("index already migrated", "piece cid", ipath.name, "processed", i+1, "total", len(idxPaths)) - } + var count int64 + var errCount int64 + var indexTime idxTime + var processed int64 + + queue := make(chan idxPath, len(idxPaths)) + for _, ipath := range idxPaths { + queue <- ipath + } + close(queue) + + var eg errgroup.Group + for i := 0; i < parallel; i++ { + eg.Go(func() error { + for ctx.Err() == nil { + select { + case <-ctx.Done(): + return ctx.Err() + case p, ok := <-queue: + if !ok { + // Finished adding all the queued items, exit the thread + return nil + } + start := time.Now() + + indexed, perr := migrateIndexWithTimeout(ctx, p, store, force) + bar.Add(1) //nolint:errcheck + + took := time.Since(start) + indexTime.lck.Lock() + indexTime.t += took + indexTime.lck.Unlock() + + if perr != nil { + logger.Errorw("migrate index failed", "piece cid", p.name, "took", took.String(), "err", perr) + atomic.AddInt64(&errCount, 1) + } + + if indexed { + atomic.AddInt64(&count, 1) + atomic.AddInt64(&processed, 1) + logger.Infow("migrated index", "piece cid", p.name, "processed", atomic.LoadInt64(&processed), "total", len(idxPaths), + "took", took.String(), "average", (indexTime.t / time.Duration(atomic.LoadInt64(&count))).String()) + + } else { + atomic.AddInt64(&processed, 1) + logger.Infow("index already migrated", "piece cid", p.name, "processed", atomic.LoadInt64(&processed), "total", len(idxPaths)) + } + } + } + return ctx.Err() + }) } + err = eg.Wait() + logger.Errorw("waiting for indexing threads to finish", err) + logger.Infow("migrated indices", "total", len(idxPaths), "took", time.Since(indicesStart).String()) - return errCount, nil + return atomic.LoadInt64(&errCount), nil } type migrateIndexResult struct { @@ -286,13 +321,20 @@ type migrateIndexResult struct { } func migrateIndexWithTimeout(ctx context.Context, ipath idxPath, store StoreMigrationApi, force bool) (bool, error) { + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 60*time.Second) + defer timeoutCancel() + + return execMigrateIndexWithTimeout(timeoutCtx, ipath, store, force) +} + +func execMigrateIndexWithTimeout(ctx context.Context, ipath idxPath, store StoreMigrationApi, force bool) (bool, error) { result := make(chan migrateIndexResult, 1) go func() { result <- doMigrateIndex(ctx, ipath, store, force) }() select { - case <-time.After(75 * time.Second): - return false, errors.New("index migration timed out after 75 seconds") + case <-ctx.Done(): + return false, errors.New("index migration timed out after 60 seconds") case result := <-result: return result.Indexed, result.Error } @@ -324,10 +366,12 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f } // Load the index file + readStart := time.Now() idx, err := loadIndex(ipath.path) if err != nil { return false, fmt.Errorf("loading index %s from disk: %w", ipath.path, err) } + log.Debugw("ReadIndex", "took", time.Since(readStart).String()) itidx, ok := idx.(index.IterableIndex) if !ok { @@ -335,10 +379,12 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f } // Convert from IterableIndex to an array of records + convStart := time.Now() records, err := getRecords(itidx) if err != nil { return false, fmt.Errorf("getting records for index %s: %w", ipath.path, err) } + log.Debugw("ConvertIndex", "took", time.Since(convStart).String()) // Add the index to the store addStart := time.Now() @@ -550,10 +596,6 @@ func getIndexPaths(pathDir string) ([]idxPath, error) { } func loadIndex(path string) (index.Index, error) { - defer func(now time.Time) { - log.Debugw("loadindex", "took", time.Since(now)) - }(time.Now()) - idxf, err := os.Open(path) if err != nil { return nil, err