Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Jun 10, 2022
1 parent 2d1007f commit dcee16d
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 51 deletions.
13 changes: 2 additions & 11 deletions cmd/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,8 @@ func (s *PieceMeta) AddDealForPiece(pieceCid cid.Cid, dealInfo model.DealInfo) e
// Add mh => piece index to store ;;; multihash -> []cid.Cid ; given multihash, which pieces is it in ; already in dagstore as a datastore impl

// Add mh => offset index to store ;;; piececid, multihash, offset -> adding the offset
func (s *PieceMeta) AddIndex(pieceCid cid.Cid, records []carindex.Record) error {

// first: see inverted index in dagstore today

// second:
// alloacte index for pieceCid
// foreach record -> add cid -> offset (for the given pieceCid)

// TODO: mark that indexing is complete ; metadata value for each piece
// pieceCid -> {cursor ; isIndexed ; []dealInfo }
return nil
func (s *PieceMeta) AddIndex(pieceCid cid.Cid, records []model.Record) error {
return s.client.Call(nil, "boostddata_addIndex", pieceCid, records)
}

func (s *PieceMeta) IsIndexed(pieceCid cid.Cid) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/boostd-data/ldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (db *DB) SetNextCursor(ctx context.Context, cursor uint64) error {
return db.Put(ctx, dskeyNextCursor, buf)
}

// SetPieceCidToCursor
// SetPieceCidToMetadata
func (db *DB) SetPieceCidToMetadata(ctx context.Context, pieceCid cid.Cid, cursor uint64) error {
key := datastore.NewKey(fmt.Sprintf("%s%s", sprefixPieceCidToCursor, pieceCid.String()))

Expand All @@ -64,7 +64,7 @@ func (db *DB) SetPieceCidToMetadata(ctx context.Context, pieceCid cid.Cid, curso
return db.Put(ctx, key, value)
}

// GetPieceCidToCursor
// GetPieceCidToMetadata
func (db *DB) GetPieceCidToMetadata(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
key := datastore.NewKey(fmt.Sprintf("%s%s", sprefixPieceCidToCursor, pieceCid.String()))

Expand Down
90 changes: 55 additions & 35 deletions cmd/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ldb

import (
"context"
"errors"
"fmt"
"io/ioutil"
"time"
Expand All @@ -11,7 +12,10 @@ import (
"github.com/ipfs/go-datastore"
levelds "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-car/v2/index"
carindex "github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
mh "github.com/multiformats/go-multihash"
"github.com/syndtr/goleveldb/leveldb/opt"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
Expand Down Expand Up @@ -42,8 +46,10 @@ func newDB(path string, readonly bool) (*DB, error) {
return &DB{ldb}, nil
}

func NewPieceMetaService(repopath string) *PieceMetaService {
if repopath == "" {
func NewPieceMetaService(_repopath string) *PieceMetaService {
// tests
repopath := _repopath
if _repopath == "" {
var err error
repopath, err = ioutil.TempDir("", "ds-leveldb")
if err != nil {
Expand All @@ -56,6 +62,13 @@ func NewPieceMetaService(repopath string) *PieceMetaService {
panic(err)
}

// tests
if _repopath == "" {
// prepare db
log.Debug("preparing db with next cursor")
db.SetNextCursor(context.Background(), 100)
}

log.Debugw("new piece meta service", "repo path", repopath)

return &PieceMetaService{
Expand All @@ -70,13 +83,9 @@ func (s *PieceMetaService) AddDealForPiece(pieceCid cid.Cid, dealInfo model.Deal
log.Debugw("handled.add-deal-for-piece", "took", fmt.Sprintf("%s", time.Since(now)))
}(time.Now())

//panic("not implemented")

return nil
}

// TODO: maybe implement over rpc subscription
// TODO: maybe pass ctx in func signature
func (s *PieceMetaService) GetRecords(pieceCid cid.Cid) ([]carindex.Record, error) {
log.Debugw("handle.get-iterable-index", "piece-cid", pieceCid)

Expand Down Expand Up @@ -137,23 +146,33 @@ func (s *PieceMetaService) PiecesContainingMultihash(m mh.Multihash) ([]cid.Cid,
return nil, nil
}

func (s *PieceMetaService) AddIndex(pieceCid cid.Cid, records []carindex.Record) error {
func (s *PieceMetaService) AddIndex(pieceCid cid.Cid, records []model.Record) error {
log.Debugw("handle.add-index", "records", len(records))

defer func(now time.Time) {
log.Debugw("handled.add-index", "took", fmt.Sprintf("%s", time.Since(now)))
}(time.Now())

var recs []carindex.Record
for _, r := range records {
recs = append(recs, carindex.Record{
Cid: r.Cid,
Offset: r.Offset,
})
//fmt.Println("got r: ", r.Cid, r.Offset)
}
// --- first ---:

// TODO first: see inverted index in dagstore today

// --- second ---:
// foreach record -> add cid -> offset (for the given pieceCid)

ctx := context.Background()

// get and set next cursor (handle synchronization, maybe with CAS)
cursor, keyCursorPrefix, err := s.db.NextCursor(ctx)
if err != nil {
return err
return fmt.Errorf("couldnt generate next cursor: %w", err)
}

// alloacte metadata for pieceCid
Expand All @@ -162,42 +181,43 @@ func (s *PieceMetaService) AddIndex(pieceCid cid.Cid, records []carindex.Record)
return err
}

// put pieceCid in pieceCid->cursor table
err = s.db.SetPieceCidToMetadata(ctx, pieceCid, cursor)
mis := make(index.MultihashIndexSorted)
err = mis.Load(recs)
if err != nil {
return err
}

var subject index.Index
subject = &mis

// process index and store entries
//switch idx := subject.(type) {
//case carindex.IterableIndex:
//i := 0
//err := idx.ForEach(func(m multihash.Multihash, offset uint64) error {
//i++
//gi++

//err := db.AddOffset(ctx, keyCursorPrefix, m, offset)
//if err != nil {
//return err
//}

//return nil
//})
//if err != nil {
//return err
//}

//log.Debugf(fmt.Sprintf("processed %d index entries for piece cid %s", i, pieceCid.String()))
//default:
//panic(fmt.Sprintf("wanted %v but got %v\n", multicodec.CarMultihashIndexSorted, idx.Codec()))
//}
switch idx := subject.(type) {
case index.IterableIndex:
err := idx.ForEach(func(m multihash.Multihash, offset uint64) error {

return s.db.AddOffset(ctx, keyCursorPrefix, m, offset)
})
if err != nil {
return err
}

default:
return errors.New(fmt.Sprintf("wanted %v but got %v\n", multicodec.CarMultihashIndexSorted, idx.Codec()))
}

// TODO: mark that indexing is complete ; metadata value for each piece
// pieceCid -> {cursor ; isIndexed ; []dealInfo }
// put pieceCid in pieceCid->cursor table
// right now we store just cursor
err = s.db.SetPieceCidToMetadata(ctx, pieceCid, cursor)
if err != nil {
return err
}

err = s.db.Sync(ctx, datastore.NewKey(keyCursorPrefix))
if err != nil {
return err
}

// TODO: mark that indexing is complete ; metadata value for each piece
// pieceCid -> {cursor ; isIndexed ; []dealInfo }
return nil
}
6 changes: 6 additions & 0 deletions cmd/boostd-data/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package model
import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
)

type DealInfo struct {
Expand All @@ -21,3 +22,8 @@ type Metadata struct {
Cursor uint64 `json:"cursor"`
IsIndexed bool `json:"is_indexed"`
}

type Record struct {
Cid cid.Cid
Offset uint64
}
Binary file not shown.
94 changes: 91 additions & 3 deletions cmd/boostd-data/svc/svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package svc

import (
"context"
"encoding/hex"
"errors"
"fmt"
"net"
"net/http"
"os"
"testing"
"time"

"github.com/filecoin-project/boost/cmd/boostd-data/client"
"github.com/filecoin-project/boost/cmd/boostd-data/model"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
)

func init() {
Expand All @@ -25,16 +32,51 @@ func TestLdbService(t *testing.T) {
t.Fatal(err)
}

pieceCid, err := cid.Parse("baga6ea4seaqj2j4zfi2xk7okc7fnuw42pip6vjv2tnc4ojsbzlt3rfrdroa7qly")
sampleidx := "fixtures/baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka.full.idx"

pieceCid, err := cid.Parse("baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka")
if err != nil {
t.Fatal(err)
}

subject, err := loadIndex(sampleidx)
if err != nil {
t.Fatal(err)
}

records, err := getRecords(subject)
if err != nil {
t.Fatal(err)
}

err = cl.AddIndex(pieceCid, records)
if err != nil {
t.Fatal(err)
}

b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842")
if err != nil {
t.Fatal(err)
}

mhash, err := multihash.Cast(b)
if err != nil {
t.Fatal(err)
}
dealInfo := model.DealInfo{}
err = cl.AddDealForPiece(pieceCid, dealInfo)

offset, err := cl.GetOffset(pieceCid, mhash)
if err != nil {
t.Fatal(err)
}

fmt.Println(offset)

//dealInfo := model.DealInfo{}
//err = cl.AddDealForPiece(pieceCid, dealInfo)
//if err != nil {
//t.Fatal(err)
//}

log.Debug("sleeping for a while.. running tests..")
time.Sleep(2 * time.Second)

Expand Down Expand Up @@ -77,3 +119,49 @@ func setupService(t *testing.T, db string) (string, func()) {

return ln.Addr().String(), cleanup
}

func loadIndex(path string) (index.Index, error) {
defer func(now time.Time) {
log.Debugw("loadindex", "took", fmt.Sprintf("%s", time.Since(now)))
}(time.Now())

idxf, err := os.Open(path)
if err != nil {
return nil, err
}
defer idxf.Close()

subject, err := index.ReadFrom(idxf)
if err != nil {
return nil, err
}

return subject, nil
}

func getRecords(subject index.Index) ([]model.Record, error) {
records := make([]model.Record, 0)

switch idx := subject.(type) {
case index.IterableIndex:
err := idx.ForEach(func(m multihash.Multihash, offset uint64) error {

//fmt.Println("reading offset for: ", m.String(), offset)

cid := cid.NewCidV1(cid.Raw, m)

records = append(records, model.Record{
Cid: cid,
Offset: offset,
})

return nil
})
if err != nil {
return nil, err
}
default:
return nil, errors.New(fmt.Sprintf("wanted %v but got %v\n", multicodec.CarMultihashIndexSorted, idx.Codec()))
}
return records, nil
}

0 comments on commit dcee16d

Please sign in to comment.