Skip to content

Commit

Permalink
refactor: clean up interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Nov 3, 2022
1 parent d982f72 commit 8d46505
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 68 deletions.
7 changes: 3 additions & 4 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type Store struct {
GetIndex func(context.Context, cid.Cid) ([]model.Record, error)
GetOffsetSize func(context.Context, cid.Cid, mh.Multihash) (*model.OffsetSize, error)
GetPieceDeals func(context.Context, cid.Cid) ([]model.DealInfo, error)
GetRecords func(context.Context, cid.Cid) ([]model.Record, error)
IndexedAt func(context.Context, cid.Cid) (time.Time, error)
PiecesContainingMultihash func(context.Context, mh.Multihash) ([]cid.Cid, error)
}
Expand Down Expand Up @@ -56,8 +55,8 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (index.Index, er
var records []index.Record
for _, r := range resp {
records = append(records, index.Record{
r.Cid,
r.Offset,
Cid: r.Cid,
Offset: r.Offset,
})
}

Expand All @@ -76,7 +75,7 @@ func (s *Store) GetRecords(ctx context.Context, pieceCid cid.Cid) ([]model.Recor
return nil, err
}

log.Debugw("get-index", "piece-cid", pieceCid, "records", len(resp))
log.Debugw("get-records", "piece-cid", pieceCid, "records", len(resp))

return resp, nil
}
Expand Down
9 changes: 4 additions & 5 deletions extern/boostd-data/couchbase/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/filecoin-project/boost/tracing"
"github.com/filecoin-project/boostd-data/model"
"github.com/ipfs/go-cid"
carindex "github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
)

Expand Down Expand Up @@ -167,15 +166,15 @@ func (db *DB) GetPieceCidsByMultihash(ctx context.Context, mh multihash.Multihas

const throttleSize = 32

// SetMultihashToPieceCid
func (db *DB) SetMultihashesToPieceCid(ctx context.Context, recs []carindex.Record, pieceCid cid.Cid) error {
// SetMultihashesToPieceCid
func (db *DB) SetMultihashesToPieceCid(ctx context.Context, mhs []multihash.Multihash, pieceCid cid.Cid) error {
ctx, span := tracing.Tracer.Start(ctx, "db.set_multihashes_to_piece_cid")
defer span.End()

throttle := make(chan struct{}, throttleSize)
var eg errgroup.Group
for _, r := range recs {
mh := r.Cid.Hash()
for _, mh := range mhs {
mh := mh

throttle <- struct{}{}
eg.Go(func() error {
Expand Down
40 changes: 8 additions & 32 deletions extern/boostd-data/couchbase/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
carindex "github.com/ipld/go-car/v2/index"
mh "github.com/multiformats/go-multihash"
)

Expand Down Expand Up @@ -100,28 +99,6 @@ func (s *Store) PiecesContainingMultihash(ctx context.Context, m mh.Multihash) (
return s.db.GetPieceCidsByMultihash(ctx, m)
}

// TODO: Why do we have both GetRecords and GetIndex?
func (s *Store) GetRecords(ctx context.Context, pieceCid cid.Cid) ([]model.Record, error) {
log.Debugw("handle.get-iterable-index", "piece-cid", pieceCid)

ctx, span := tracing.Tracer.Start(ctx, "store.get_records")
defer span.End()

defer func(now time.Time) {
log.Debugw("handled.get-iterable-index", "took", fmt.Sprintf("%s", time.Since(now)))
}(time.Now())

s.pieceLocks[toStripedLockIndex(pieceCid)].RLock()
defer s.pieceLocks[toStripedLockIndex(pieceCid)].RUnlock()

records, err := s.db.AllRecords(ctx, pieceCid)
if err != nil {
return nil, err
}

return records, nil
}

func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) ([]model.Record, error) {
log.Debugw("handle.get-index", "pieceCid", pieceCid)

Expand Down Expand Up @@ -157,30 +134,29 @@ func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.
s.pieceLocks[toStripedLockIndex(pieceCid)].Lock()
defer s.pieceLocks[toStripedLockIndex(pieceCid)].Unlock()

// TODO: use array of cids instead of array of Records
var recs []carindex.Record
// Add a mapping from multihash -> piece cid so that clients can look up
// which pieces contain a multihash
mhs := make([]mh.Multihash, 0, len(records))
for _, r := range records {
recs = append(recs, carindex.Record{
Cid: r.Cid,
Offset: r.Offset,
})
mhs = append(mhs, r.Cid.Hash())
}

setMhStart := time.Now()
err := s.db.SetMultihashesToPieceCid(ctx, recs, pieceCid)
err := s.db.SetMultihashesToPieceCid(ctx, mhs, pieceCid)
if err != nil {
return fmt.Errorf("failed to add entry from mh to pieceCid: %w", err)
}
log.Debugw("handled.add-index SetMultihashesToPieceCid", "took", time.Since(setMhStart).String())

// process index and store entries
// Add a mapping from piece cid -> offset / size of each block so that
// clients can get the block info for all blocks in a piece
addOffsetsStart := time.Now()
if err := s.db.AddIndexRecords(ctx, pieceCid, records); err != nil {
return err
}
log.Debugw("handled.add-index AddIndexRecords", "took", time.Since(addOffsetsStart).String())

// mark that indexing is complete
// Mark indexing as complete
md := model.Metadata{
IndexedAt: time.Now(),
Deals: []model.DealInfo{},
Expand Down
26 changes: 0 additions & 26 deletions extern/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,32 +88,6 @@ func (s *Store) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo
return nil
}

func (s *Store) GetRecords(ctx context.Context, pieceCid cid.Cid) ([]model.Record, error) {
log.Debugw("handle.get-iterable-index", "piece-cid", pieceCid)

ctx, span := tracing.Tracer.Start(ctx, "store.get_records")
defer span.End()

defer func(now time.Time) {
log.Debugw("handled.get-iterable-index", "took", fmt.Sprintf("%s", time.Since(now)))
}(time.Now())

s.Lock()
defer s.Unlock()

md, err := s.db.GetPieceCidToMetadata(ctx, pieceCid)
if err != nil {
return nil, err
}

records, err := s.db.AllRecords(ctx, md.Cursor)
if err != nil {
return nil, err
}

return records, nil
}

func (s *Store) GetOffsetSize(ctx context.Context, pieceCid cid.Cid, hash mh.Multihash) (*model.OffsetSize, error) {
log.Debugw("handle.get-offset-size", "piece-cid", pieceCid)

Expand Down
20 changes: 19 additions & 1 deletion extern/boostd-data/svc/types/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
package types

import "context"
import (
"context"
"time"

"github.com/filecoin-project/boostd-data/model"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)

type Service interface {
AddDealForPiece(context.Context, cid.Cid, model.DealInfo) error
AddIndex(context.Context, cid.Cid, []model.Record) error
GetIndex(context.Context, cid.Cid) ([]model.Record, error)
GetOffsetSize(context.Context, cid.Cid, mh.Multihash) (*model.OffsetSize, error)
GetPieceDeals(context.Context, cid.Cid) ([]model.DealInfo, error)
IndexedAt(context.Context, cid.Cid) (time.Time, error)
PiecesContainingMultihash(context.Context, mh.Multihash) ([]cid.Cid, error)
}

type ServiceImpl interface {
Service
Start(ctx context.Context) error
}

0 comments on commit 8d46505

Please sign in to comment.