diff --git a/cmd/boostd/recover.go b/cmd/boostd/recover.go index 04b155827..3c108f7a0 100644 --- a/cmd/boostd/recover.go +++ b/cmd/boostd/recover.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/boost/piecedirectory" bdclient "github.com/filecoin-project/boostd-data/client" "github.com/filecoin-project/boostd-data/model" + "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-commp-utils/writer" "github.com/filecoin-project/go-jsonrpc" @@ -33,7 +34,6 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" - carv2 "github.com/ipld/go-car/v2" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multibase" "github.com/urfave/cli/v2" @@ -461,7 +461,7 @@ func (dr *DisasterRecovery) CompleteSector(s abi.SectorNumber) error { } // safeUnsealSector tries to return a reader to an unsealed sector or times out -func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi.UnpaddedPieceSize, piecesize abi.PaddedPieceSize) (io.ReadCloser, bool, error) { +func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi.UnpaddedPieceSize, piecesize abi.PaddedPieceSize) (mount.Reader, bool, error) { mid, _ := address.IDFromAddress(maddr) sid := abi.SectorID{ @@ -474,7 +474,7 @@ func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi logger.Errorw("storage find sector", "err", err) } - var reader io.ReadCloser + var reader mount.Reader var isUnsealed bool done := make(chan struct{}) @@ -507,7 +507,7 @@ func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi logger.Debugw("sa.IsUnsealed return true", "sector", sectorid) go func() { - reader, err = sa.UnsealSector(ctx, sectorid, offset, piecesize.Unpadded()) + reader, err = sa.UnsealSectorAt(ctx, sectorid, offset, piecesize.Unpadded()) if err != nil { logger.Errorw("sa.UnsealSector return error", "sector", sectorid, "err", err) return @@ -574,25 +574,12 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab if err != nil { return err } + defer reader.Close() if !isUnsealed { return fmt.Errorf("sector %d is not unsealed", sid) } dr.Sectors[sid].Deals[cdi].IsUnsealed = true - - readerAt := reader.(Reader) - - opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)} - rr, err := carv2.NewReader(readerAt, opts...) - if err != nil { - return err - } - - drr, err := rr.DataReader() - if err != nil { - return err - } - dr.Sectors[sid].Deals[cdi].GotDataReader = true if !ignoreLID { // populate LID @@ -657,7 +644,7 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab if !ignoreCommp { // commp over data reader w := &writer.Writer{} - _, err = io.CopyBuffer(w, drr, make([]byte, writer.CommPBuf)) + _, err = io.CopyBuffer(w, reader, make([]byte, writer.CommPBuf)) if err != nil { return fmt.Errorf("copy into commp writer: %w", err) } @@ -670,9 +657,6 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab encoder := cidenc.Encoder{Base: multibase.MustNewEncoder(multibase.Base32)} _ = encoder - //fmt.Println("CommP CID: ", encoder.Encode(commp.PieceCID)) - //fmt.Println("Piece size: ", types.NewInt(uint64(commp.PieceSize.Unpadded().Padded()))) - if !commp.PieceCID.Equals(piececid) { return fmt.Errorf("calculated commp doesnt match on-chain data, expected %s, got %s", piececid, commp.PieceCID) } @@ -776,13 +760,6 @@ func getActorAddress(ctx context.Context, cctx *cli.Context) (maddr address.Addr return maddr, nil } -type Reader interface { - io.Closer - io.Reader - io.ReaderAt - io.Seeker -} - func createLogger(logPath string) (*zap.SugaredLogger, error) { logCfg := zap.NewDevelopmentConfig() logCfg.OutputPaths = []string{"stdout", logPath}