Skip to content

Commit

Permalink
fix boostd-data client and service; working itests for piecemeta;
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Jun 22, 2022
1 parent 154f164 commit d25e4c4
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 43 deletions.
35 changes: 32 additions & 3 deletions cmd/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/filecoin-project/boost/cmd/boostd-data/model"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
carindex "github.com/ipld/go-car/v2/index"
"github.com/ipld/go-car/v2/index"
mh "github.com/multiformats/go-multihash"
)

Expand All @@ -28,13 +28,40 @@ func NewStore(addr string) (*Store, error) {
}, nil
}

func (s *Store) GetIndex(pieceCid cid.Cid) ([]carindex.Record, error) {
var resp []carindex.Record
func (s *Store) GetIndex(pieceCid cid.Cid) (index.Index, error) {
var resp []model.Record
err := s.client.Call(&resp, "boostddata_getIndex", pieceCid)
if err != nil {
return nil, err
}

//TODO: figure out how to remove this conversion
var records []index.Record
for _, r := range resp {
records = append(records, index.Record{
r.Cid,
r.Offset,
})
}

mis := make(index.MultihashIndexSorted)
err = mis.Load(records)
if err != nil {
return nil, err
}

return &mis, nil
}

func (s *Store) GetRecords(pieceCid cid.Cid) ([]model.Record, error) {
var resp []model.Record
err := s.client.Call(&resp, "boostddata_getIndex", pieceCid)
if err != nil {
return nil, err
}

log.Warnw("get-index", "piece-cid", pieceCid, "records", len(resp))

return resp, nil
}

Expand Down Expand Up @@ -63,6 +90,8 @@ func (s *Store) AddDealForPiece(pieceCid cid.Cid, dealInfo model.DealInfo) error
}

func (s *Store) AddIndex(pieceCid cid.Cid, records []model.Record) error {
log.Warnw("add-index", "piece-cid", pieceCid, "records", len(records))

return s.client.Call(nil, "boostddata_addIndex", pieceCid, records)
}

Expand Down
17 changes: 7 additions & 10 deletions cmd/boostd-data/ldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/ipld/go-car/v2/index"
carindex "github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
)
Expand All @@ -31,8 +30,7 @@ var (
prefixMhtoPieceCids uint64 = 2
sprefixMhtoPieceCids string

size = binary.MaxVarintLen64
cutsize = size + 2
size = binary.MaxVarintLen64
)

func init() {
Expand Down Expand Up @@ -190,15 +188,14 @@ func (db *DB) GetPieceCidToMetadata(ctx context.Context, pieceCid cid.Cid) (mode
}

// AllRecords
func (db *DB) AllRecords(ctx context.Context, cursor uint64) ([]index.Record, error) {
var records []index.Record
func (db *DB) AllRecords(ctx context.Context, cursor uint64) ([]model.Record, error) {
var records []model.Record

buf := make([]byte, size)
binary.PutUvarint(buf, cursor)

var q query.Query
q.Prefix = string(buf)

q.Prefix = fmt.Sprintf("%d/", cursor)
results, err := db.Query(ctx, q)
if err != nil {
return nil, err
Expand All @@ -210,18 +207,18 @@ func (db *DB) AllRecords(ctx context.Context, cursor uint64) ([]index.Record, er
break
}

k := r.Key[cutsize:]
k := r.Key[len(q.Prefix)+1:]

m, err := multihash.FromHexString(k)
if err != nil {
panic(err)
return nil, err
}

kcid := cid.NewCidV1(cid.Raw, m)

offset, _ := binary.Uvarint(r.Value)

records = append(records, index.Record{
records = append(records, model.Record{
Cid: kcid,
Offset: offset,
})
Expand Down
12 changes: 7 additions & 5 deletions cmd/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *Store) AddDealForPiece(pieceCid cid.Cid, dealInfo model.DealInfo) error
return nil
}

func (s *Store) GetRecords(pieceCid cid.Cid) ([]carindex.Record, error) {
func (s *Store) GetRecords(pieceCid cid.Cid) ([]model.Record, error) {
log.Debugw("handle.get-iterable-index", "piece-cid", pieceCid)

defer func(now time.Time) {
Expand Down Expand Up @@ -186,11 +186,11 @@ func (s *Store) PiecesContainingMultihash(m mh.Multihash) ([]cid.Cid, error) {
return s.db.GetPieceCidsByMultihash(ctx, m)
}

func (s *Store) GetIndex(pieceCid cid.Cid) ([]carindex.Record, error) {
log.Debugw("handle.get-index", "pieceCid", pieceCid)
func (s *Store) GetIndex(pieceCid cid.Cid) ([]model.Record, error) {
log.Warnw("handle.get-index", "pieceCid", pieceCid)

defer func(now time.Time) {
log.Debugw("handled.get-index", "took", fmt.Sprintf("%s", time.Since(now)))
log.Warnw("handled.get-index", "took", fmt.Sprintf("%s", time.Since(now)))
}(time.Now())

s.Lock()
Expand All @@ -208,6 +208,8 @@ func (s *Store) GetIndex(pieceCid cid.Cid) ([]carindex.Record, error) {
return nil, err
}

log.Warnw("handle.get-index.records", "len(records)", len(records))

return records, nil
}

Expand Down Expand Up @@ -283,7 +285,7 @@ func (s *Store) AddIndex(pieceCid cid.Cid, records []model.Record) error {
return err
}

err = s.db.Sync(ctx, datastore.NewKey(keyCursorPrefix))
err = s.db.Sync(ctx, datastore.NewKey(fmt.Sprintf("%d", cursor)))
if err != nil {
return err
}
Expand Down
40 changes: 40 additions & 0 deletions cmd/boostd-data/svc/svc_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package svc

import (
"bufio"
"bytes"
"context"
"encoding/hex"
"errors"
Expand Down Expand Up @@ -122,6 +124,28 @@ func TestLdbService(t *testing.T) {
t.Fatal("expected pieceCid to be indexed")
}

recs, err := cl.GetRecords(pieceCid)
if err != nil {
t.Fatal(err)
}

if len(recs) == 0 {
t.Fatal("expected to get records back from GetIndex")
}

loadedSubject, err := cl.GetIndex(pieceCid)
if err != nil {
t.Fatal(err)
}

ok, err := compareIndices(subject, loadedSubject)
if err != nil {
t.Fatal(err)
}
if !ok {
log.Fatal("compare failed")
}

log.Debug("sleeping for a while.. running tests..")

cleanup()
Expand Down Expand Up @@ -207,3 +231,19 @@ func getRecords(subject index.Index) ([]model.Record, error) {
}
return records, nil
}

func compareIndices(subject, subjectDb index.Index) (bool, error) {
var b bytes.Buffer
w := bufio.NewWriter(&b)

subject.Marshal(w)

var b2 bytes.Buffer
w2 := bufio.NewWriter(&b2)

subjectDb.Marshal(w2)

res := bytes.Compare(b.Bytes(), b2.Bytes())

return res == 0, nil
}
5 changes: 4 additions & 1 deletion indexprovider/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ type Wrapper struct {

func NewWrapper() func(lc fx.Lifecycle, r repo.LockedRepo, dealsDB *db.DealsDB,
legacyProv lotus_storagemarket.StorageProvider, prov provider.Interface,
pieceMeta *piecemeta.PieceMeta,
meshCreator idxprov.MeshCreator) *Wrapper {

return func(lc fx.Lifecycle, r repo.LockedRepo, dealsDB *db.DealsDB,
legacyProv lotus_storagemarket.StorageProvider, prov provider.Interface,
pieceMeta *piecemeta.PieceMeta,
meshCreator idxprov.MeshCreator) *Wrapper {

_, isDisabled := prov.(*DisabledIndexProvider)
Expand All @@ -55,7 +57,8 @@ func NewWrapper() func(lc fx.Lifecycle, r repo.LockedRepo, dealsDB *db.DealsDB,
prov: prov,
meshCreator: meshCreator,
//cfg: cfg,
enabled: !isDisabled,
enabled: !isDisabled,
pieceMeta: pieceMeta,
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions node/modules/piecemeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ import (
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/stores"
"github.com/filecoin-project/lotus/api/v1api"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/markets/sectoraccessor"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
carindex "github.com/ipld/go-car/v2/index"
)

func NewPieceMeta(pp sectorstorage.PieceProvider) *piecemeta.PieceMeta {
// TODO: pass params
return piecemeta.NewPieceMeta()
func NewPieceMeta(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp sectorstorage.PieceProvider, full v1api.FullNode) *piecemeta.PieceMeta {
sa := sectoraccessor.NewSectorAccessor(maddr, secb, pp, full)

return piecemeta.NewPieceMeta(sa)
}

func NewPieceStore(pm *piecemeta.PieceMeta) piecestore.PieceStore {
Expand Down
Loading

0 comments on commit d25e4c4

Please sign in to comment.