diff --git a/cmd/boostd/main.go b/cmd/boostd/main.go index ba8dd1c73..10102888a 100644 --- a/cmd/boostd/main.go +++ b/cmd/boostd/main.go @@ -50,6 +50,7 @@ func main() { dagstoreCmd, netCmd, pieceDirCmd, + recoverCmd, }, } app.Setup() diff --git a/cmd/boostd/recover.go b/cmd/boostd/recover.go index 210650927..001225ba6 100644 --- a/cmd/boostd/recover.go +++ b/cmd/boostd/recover.go @@ -17,7 +17,6 @@ import ( "github.com/filecoin-project/boost/cmd/lib" "github.com/filecoin-project/boost/db" "github.com/filecoin-project/boost/piecedirectory" - bdclient "github.com/filecoin-project/boostd-data/client" "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-commp-utils/writer" @@ -191,14 +190,14 @@ func action(cctx *cli.Context) error { if ignoreLID { pd = nil } else { - cl := bdclient.NewStore() - defer cl.Close(ctx) - err = cl.Dial(ctx, cctx.String("api-lid")) + pdClient := piecedirectory.NewStore() + defer pdClient.Close(ctx) + err = pdClient.Dial(ctx, cctx.String("api-lid")) if err != nil { return fmt.Errorf("connecting to local index directory service: %w", err) } pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa} - pd = piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle")) + pd = piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle")) pd.Start(ctx) } diff --git a/cmd/migrate-lid/migrate_lid.go b/cmd/migrate-lid/migrate_lid.go index 92e2d04bd..16641fc4d 100644 --- a/cmd/migrate-lid/migrate_lid.go +++ b/cmd/migrate-lid/migrate_lid.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "github.com/filecoin-project/boost-gfm/piecestore" + "github.com/filecoin-project/boost/cmd/lib" "github.com/filecoin-project/boost/db" "github.com/filecoin-project/boostd-data/couchbase" "github.com/filecoin-project/boostd-data/ldb" @@ -17,20 +19,11 @@ import ( "github.com/filecoin-project/boostd-data/svc" "github.com/filecoin-project/boostd-data/svc/types" "github.com/filecoin-project/go-address" - vfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm" - "github.com/filecoin-project/go-fil-markets/piecestore" - piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-statemachine/fsm" lcli "github.com/filecoin-project/lotus/cli" - "github.com/filecoin-project/lotus/lib/backupds" "github.com/filecoin-project/lotus/node/modules" - "github.com/filecoin-project/lotus/node/repo" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" "github.com/ipld/go-car/v2/index" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multicodec" @@ -347,7 +340,7 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f func migratePieceStore(ctx context.Context, logger *zap.SugaredLogger, bar *progressbar.ProgressBar, repoDir string, store StoreMigrationApi) (int, error) { // Open the datastore in the existing repo - ds, err := openDataStore(repoDir) + ds, err := lib.OpenDataStore(repoDir) if err != nil { return 0, fmt.Errorf("creating piece store from repo %s: %w", repoDir, err) } @@ -364,12 +357,12 @@ func migratePieceStore(ctx context.Context, logger *zap.SugaredLogger, bar *prog // Create a mapping of on-chain deal ID to deal proposal cid. // This is needed below so that we can map from the legacy piece store // info to a legacy deal. - propCidByChainDealID, err := getPropCidByChainDealID(ctx, ds) + propCidByChainDealID, err := lib.GetPropCidByChainDealID(ctx, ds) if err != nil { return 0, fmt.Errorf("building chain deal id -> proposal cid map: %w", err) } - ps, err := openPieceStore(ctx, ds) + ps, err := lib.OpenPieceStore(ctx, ds) if err != nil { return 0, fmt.Errorf("opening piece store: %w", err) } @@ -484,81 +477,6 @@ func migratePieceStore(ctx context.Context, logger *zap.SugaredLogger, bar *prog return errorCount, nil } -func getPropCidByChainDealID(ctx context.Context, ds *backupds.Datastore) (map[abi.DealID]cid.Cid, error) { - deals, err := getLegacyDealsFSM(ctx, ds) - if err != nil { - return nil, err - } - - // Build a mapping of chain deal ID to proposal CID - var list []storagemarket.MinerDeal - if err := deals.List(&list); err != nil { - return nil, err - } - - byChainDealID := make(map[abi.DealID]cid.Cid, len(list)) - for _, d := range list { - if d.DealID != 0 { - byChainDealID[d.DealID] = d.ProposalCid - } - } - - return byChainDealID, nil -} - -func getLegacyDealsFSM(ctx context.Context, ds *backupds.Datastore) (fsm.Group, error) { - // Get the deals FSM - provDS := namespace.Wrap(ds, datastore.NewKey("/deals/provider")) - deals, migrate, err := vfsm.NewVersionedFSM(provDS, fsm.Parameters{ - StateType: storagemarket.MinerDeal{}, - StateKeyField: "State", - }, nil, "2") - if err != nil { - return nil, fmt.Errorf("reading legacy deals from datastore: %w", err) - } - - err = migrate(ctx) - if err != nil { - return nil, fmt.Errorf("running provider fsm migration script: %w", err) - } - - return deals, err -} - -func openDataStore(path string) (*backupds.Datastore, error) { - ctx := context.Background() - - rpo, err := repo.NewFS(path) - if err != nil { - return nil, fmt.Errorf("could not open repo %s: %w", path, err) - } - - exists, err := rpo.Exists() - if err != nil { - return nil, fmt.Errorf("checking repo %s exists: %w", path, err) - } - if !exists { - return nil, fmt.Errorf("repo does not exist: %s", path) - } - - lr, err := rpo.Lock(repo.StorageMiner) - if err != nil { - return nil, fmt.Errorf("locking repo %s: %w", path, err) - } - - mds, err := lr.Datastore(ctx, "/metadata") - if err != nil { - return nil, err - } - - bds, err := backupds.Wrap(mds, "") - if err != nil { - return nil, fmt.Errorf("opening backupds: %w", err) - } - - return bds, nil -} - func getRecords(subject index.Index) ([]model.Record, error) { records := make([]model.Record, 0) @@ -743,13 +661,13 @@ func migrateDBReverse(cctx *cli.Context, repoDir string, dbType string, pieceDir logger.Infof("starting migration of %d piece infos from %s local index directory to piece store", len(pcids), dbType) // Open the datastore - ds, err := openDataStore(repoDir) + ds, err := lib.OpenDataStore(repoDir) if err != nil { return fmt.Errorf("creating datastore from repo %s: %w", repoDir, err) } // Open the Piece Store - ps, err := openPieceStore(ctx, ds) + ps, err := lib.OpenPieceStore(ctx, ds) if err != nil { return fmt.Errorf("opening piece store: %w", err) } @@ -830,36 +748,6 @@ func migrateReversePiece(ctx context.Context, pieceCid cid.Cid, pieceDir StoreMi return migrated, nil } -func openPieceStore(ctx context.Context, ds *backupds.Datastore) (piecestore.PieceStore, error) { - // Open the piece store - ps, err := piecestoreimpl.NewPieceStore(namespace.Wrap(ds, datastore.NewKey("/storagemarket"))) - if err != nil { - return nil, fmt.Errorf("creating piece store from datastore : %w", err) - } - - // Wait for the piece store to be ready - ch := make(chan error, 1) - ps.OnReady(func(e error) { - ch <- e - }) - - err = ps.Start(ctx) - if err != nil { - return nil, fmt.Errorf("starting piece store: %w", err) - } - - select { - case err = <-ch: - if err != nil { - return nil, fmt.Errorf("waiting for piece store to be ready: %w", err) - } - case <-ctx.Done(): - return nil, errors.New("cancelled while waiting for piece store to be ready") - } - - return ps, nil -} - func createLogger(logPath string) (*zap.SugaredLogger, error) { logCfg := zap.NewDevelopmentConfig() logCfg.OutputPaths = []string{logPath}