Skip to content

Commit

Permalink
add inverted index
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Jun 15, 2022
1 parent 2c726f4 commit 1021556
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 10 deletions.
3 changes: 2 additions & 1 deletion cmd/boostd-data/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module github.com/filecoin-project/boost/cmd/boostd-data

require (
github.com/couchbase/gocb/v2 v2.5.0
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf
github.com/docker/go-connections v0.4.0
github.com/ethereum/go-ethereum v1.10.17
Expand All @@ -13,6 +14,7 @@ require (
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log/v2 v2.5.0
github.com/ipld/go-car/v2 v2.1.1
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61
github.com/multiformats/go-multihash v0.1.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d
Expand Down Expand Up @@ -46,7 +48,6 @@ require (
github.com/multiformats/go-base32 v0.0.3 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
Expand Down
97 changes: 97 additions & 0 deletions cmd/boostd-data/ldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/boost/cmd/boostd-data/model"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
Expand All @@ -18,11 +19,17 @@ var (
// LevelDB key value for storing next free cursor.
keyNextCursor uint64 = 0
dskeyNextCursor datastore.Key

// LevelDB key prefix for PieceCid to cursor table.
// LevelDB keys will be built by concatenating PieceCid to this prefix.
prefixPieceCidToCursor uint64 = 1
sprefixPieceCidToCursor string

// LevelDB key prefix for Multihash to PieceCids table.
// LevelDB keys will be built by concatenating Multihash to this prefix.
prefixMhtoPieceCids uint64 = 2
sprefixMhtoPieceCids string

size = binary.MaxVarintLen64
cutsize = size + 2
)
Expand All @@ -35,6 +42,10 @@ func init() {
buf = make([]byte, size)
binary.PutUvarint(buf, prefixPieceCidToCursor)
sprefixPieceCidToCursor = string(buf)

buf = make([]byte, size)
binary.PutUvarint(buf, prefixMhtoPieceCids)
sprefixMhtoPieceCids = string(buf)
}

// NextCursor
Expand All @@ -56,6 +67,83 @@ func (db *DB) SetNextCursor(ctx context.Context, cursor uint64) error {
return db.Put(ctx, dskeyNextCursor, buf)
}

// GetPieceCidsByMultihash
func (db *DB) GetPieceCidsByMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) {
key := datastore.NewKey(fmt.Sprintf("%s%s", sprefixMhtoPieceCids, mh.String()))

val, err := db.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to get value for multihash %s, err: %w", mh, err)
}

var pcids []cid.Cid
if err := json.Unmarshal(val, &pcids); err != nil {
return nil, fmt.Errorf("failed to unmarshal pieceCids slice: %w", err)
}

return pcids, nil
}

// SetMultihashToPieceCid
func (db *DB) SetMultihashToPieceCid(ctx context.Context, mh multihash.Multihash, pieceCid cid.Cid) error {
batch, err := db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create ds batch: %w", err)
}

key := datastore.NewKey(fmt.Sprintf("%s%s", sprefixMhtoPieceCids, mh.String()))

// do we already have an entry for this multihash ?
val, err := db.Get(ctx, key)
if err != nil && err != ds.ErrNotFound {
return fmt.Errorf("failed to get value for multihash %s, err: %w", mh, err)
}

// if we don't have an existing entry for this mh, create one
if err == ds.ErrNotFound {
v := []cid.Cid{pieceCid}
b, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("failed to marshal pieceCids slice: %w", err)
}
if err := batch.Put(ctx, key, b); err != nil {
return fmt.Errorf("failed to batch put mh=%s, err=%w", mh, err)
}
return nil
}

// else, append the pieceCid to the existing list
var pcids []cid.Cid
if err := json.Unmarshal(val, &pcids); err != nil {
return fmt.Errorf("failed to unmarshal pieceCids slice: %w", err)
}

// if we already have the pieceCid indexed for the multihash, nothing to do here.
if has(pcids, pieceCid) {
return nil
}

pcids = append(pcids, pieceCid)

b, err := json.Marshal(pcids)
if err != nil {
return fmt.Errorf("failed to marshal pieceCids slice: %w", err)
}
if err := batch.Put(ctx, key, b); err != nil {
return fmt.Errorf("failed to batch put mh=%s, err%w", mh, err)
}

if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

if err := db.Sync(ctx, key); err != nil {
return fmt.Errorf("failed to sync puts: %w", err)
}

return nil
}

// SetPieceCidToMetadata
func (db *DB) SetPieceCidToMetadata(ctx context.Context, pieceCid cid.Cid, md model.Metadata) error {
b, err := json.Marshal(md)
Expand Down Expand Up @@ -150,3 +238,12 @@ func (db *DB) GetOffset(ctx context.Context, cursorPrefix string, m multihash.Mu
offset, _ := binary.Uvarint(b)
return offset, nil
}

func has(list []cid.Cid, v cid.Cid) bool {
for _, l := range list {
if l.Equals(v) {
return true
}
}
return false
}
27 changes: 18 additions & 9 deletions cmd/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/filecoin-project/boost/cmd/boostd-data/model"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -157,9 +158,8 @@ func (s *PieceMetaService) PiecesContainingMultihash(m mh.Multihash) ([]cid.Cid,
log.Debugw("handled.pieces-containing-mh", "took", fmt.Sprintf("%s", time.Since(now)))
}(time.Now())

// TODO: inverted index

return nil, nil
ctx := context.Background()
return s.db.GetPieceCidsByMultihash(ctx, m)
}

func (s *PieceMetaService) AddIndex(pieceCid cid.Cid, records []model.Record) error {
Expand All @@ -169,21 +169,30 @@ func (s *PieceMetaService) AddIndex(pieceCid cid.Cid, records []model.Record) er
log.Debugw("handled.add-index", "took", fmt.Sprintf("%s", time.Since(now)))
}(time.Now())

ctx := context.Background()

var recs []carindex.Record
for _, r := range records {
recs = append(recs, carindex.Record{
Cid: r.Cid,
Offset: r.Offset,
})
//fmt.Println("got r: ", r.Cid, r.Offset)
}
// --- first ---:

// TODO first: see inverted index in dagstore today
spew.Dump(r.Cid)

// --- second ---:
// set inverted index, i.e. multihash -> pieceCid
dmh, err := multihash.Decode(r.Cid.Bytes())
if err != nil {
return fmt.Errorf("cannot decode multihash: %w", err)
}

ctx := context.Background()
_, mh, err := multihash.MHFromBytes(dmh.Digest)
if err != nil {
return fmt.Errorf("cannot create multihash from bytes: %w", err)
}

s.db.SetMultihashToPieceCid(ctx, mh, pieceCid)
}

// get and set next cursor (handle synchronization, maybe with CAS)
cursor, keyCursorPrefix, err := s.db.NextCursor(ctx)
Expand Down

0 comments on commit 1021556

Please sign in to comment.