Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change service GetIndex / AddIndex to return channel instead of array #1444

Merged
merged 16 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions cmd/migrate-lid/couch-to-yuga.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/filecoin-project/boostd-data/couchbase"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/yugabyte"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -211,16 +212,26 @@ func migrateLidToLidIndex(ctx context.Context, pieceCid cid.Cid, source StoreMig
}

// Load the index from the source store
records, err := source.GetIndex(ctx, pieceCid)
idx, err := source.GetIndex(ctx, pieceCid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it won't be a better idea to change the signature to receive a channel as an argument. This way the caller can control if they want to have a buffered or unbuffered channel as a receiver and also decide how to control blocking.

Not sure if this is super relevant here though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again I am not sure if the reflection magic would work to convert the interface correctly to the JSON streaming API...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at the jsonrpc docs, I don't believe channel as a parameter is supported:
https://github.com/filecoin-project/go-jsonrpc#supported-function-signatures

if err != nil {
return false, fmt.Errorf("loading index %s: %w", pieceCid, err)
}

var records []model.Record
for r := range idx {
if r.Error != nil {
return false, r.Error
}
records = append(records, r.Record)
}

// Add the index to the destination store
addStart := time.Now()
err = dest.AddIndex(ctx, pieceCid, records, true)
if err != nil {
return false, fmt.Errorf("adding index %s to store: %w", pieceCid, err)
respch := dest.AddIndex(ctx, pieceCid, records, true)
for resp := range respch {
if resp.Err != "" {
return false, fmt.Errorf("adding index %s to store: %s", pieceCid, err)
}
}
log.Debugw("AddIndex", "took", time.Since(addStart).String())

Expand Down
13 changes: 8 additions & 5 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/boostd-data/ldb"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-address"
vfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand Down Expand Up @@ -43,8 +44,8 @@ import (
type StoreMigrationApi interface {
Start(ctx context.Context) error
IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error)
GetIndex(context.Context, cid.Cid) ([]model.Record, error)
AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error
GetIndex(context.Context, cid.Cid) (<-chan types.IndexRecord, error)
AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress
AddDealForPiece(ctx context.Context, pcid cid.Cid, info model.DealInfo) error
ListPieces(ctx context.Context) ([]cid.Cid, error)
GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error)
Expand Down Expand Up @@ -333,9 +334,11 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f

// Add the index to the store
addStart := time.Now()
err = store.AddIndex(ctx, pieceCid, records, false)
if err != nil {
return false, fmt.Errorf("adding index %s to store: %w", ipath.path, err)
respch := store.AddIndex(ctx, pieceCid, records, false)
for resp := range respch {
if resp.Err != "" {
return false, fmt.Errorf("adding index %s to store: %s", ipath.path, err)
}
}
log.Debugw("AddIndex", "took", time.Since(addStart).String())

Expand Down
38 changes: 29 additions & 9 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-jsonrpc"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
Expand All @@ -18,10 +19,10 @@ var log = logger.Logger("boostd-data-client")
type Store struct {
client struct {
AddDealForPiece func(context.Context, cid.Cid, model.DealInfo) error
AddIndex func(context.Context, cid.Cid, []model.Record, bool) error
AddIndex func(context.Context, cid.Cid, []model.Record, bool) <-chan types.AddIndexProgress
IsIndexed func(ctx context.Context, pieceCid cid.Cid) (bool, error)
IsCompleteIndex func(ctx context.Context, pieceCid cid.Cid) (bool, error)
GetIndex func(context.Context, cid.Cid) ([]model.Record, error)
GetIndex func(context.Context, cid.Cid) (<-chan types.IndexRecord, error)
GetOffsetSize func(context.Context, cid.Cid, mh.Multihash) (*model.OffsetSize, error)
ListPieces func(ctx context.Context) ([]cid.Cid, error)
GetPieceMetadata func(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error)
Expand All @@ -37,16 +38,17 @@ type Store struct {
FlaggedPiecesList func(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context) (int, error)
}
closer jsonrpc.ClientCloser
closer jsonrpc.ClientCloser
dialOpts []jsonrpc.Option
}

func NewStore() *Store {
return &Store{}
func NewStore(dialOpts ...jsonrpc.Option) *Store {
return &Store{dialOpts: dialOpts}
}

func (s *Store) Dial(ctx context.Context, addr string) error {
var err error
s.closer, err = jsonrpc.NewClient(ctx, addr, "boostddata", &s.client, nil)
s.closer, err = jsonrpc.NewMergeClient(ctx, addr, "boostddata", []interface{}{&s.client}, nil, s.dialOpts...)
if err != nil {
return fmt.Errorf("dialing local index directory server: %w", err)
}
Expand All @@ -66,7 +68,10 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (index.Index, er
}

var records []index.Record
for _, r := range resp {
for r := range resp {
if r.Error != nil {
return nil, r.Error
}
records = append(records, index.Record{
Cid: r.Cid,
Offset: r.Offset,
Expand All @@ -90,7 +95,15 @@ func (s *Store) GetRecords(ctx context.Context, pieceCid cid.Cid) ([]model.Recor

log.Debugw("get-records", "piece-cid", pieceCid, "records", len(resp))

return resp, nil
var records []model.Record
for r := range resp {
if r.Error != nil {
return nil, r.Error
}
records = append(records, r.Record)
}

return records, nil
}

func (s *Store) GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error) {
Expand All @@ -112,7 +125,14 @@ func (s *Store) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
log.Debugw("add-index", "piece-cid", pieceCid, "records", len(records))

return s.client.AddIndex(ctx, pieceCid, records, isCompleteIndex)
respch := s.client.AddIndex(ctx, pieceCid, records, isCompleteIndex)
for resp := range respch {
if resp.Err != "" {
return fmt.Errorf("add index with piece cid %s: %s", pieceCid, resp.Err)
}
//fmt.Printf("%s: Percent complete: %f%%\n", time.Now(), resp.Progress*100)
}
return nil
}

func (s *Store) IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
Expand Down
54 changes: 35 additions & 19 deletions extern/boostd-data/couchbase/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *Store) PiecesContainingMultihash(ctx context.Context, m mh.Multihash) (
return pcids, normalizeMultihashError(m, err)
}

func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) ([]model.Record, error) {
func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.IndexRecord, error) {
log.Debugw("handle.get-index", "pieceCid", pieceCid)

ctx, span := tracing.Tracer.Start(ctx, "store.get_index")
Expand All @@ -152,7 +152,13 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) ([]model.Record,

log.Debugw("handle.get-index.records", "len(records)", len(records))

return records, nil
recs := make(chan types.IndexRecord, len(records))
for _, r := range records {
recs <- types.IndexRecord{Record: r}
}
close(recs)

return recs, nil
}

func (s *Store) IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
Expand Down Expand Up @@ -181,7 +187,7 @@ func (s *Store) IsCompleteIndex(ctx context.Context, pieceCid cid.Cid) (bool, er
return md.CompleteIndex, nil
}

func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress {
log.Debugw("handle.add-index", "records", len(records))

ctx, span := tracing.Tracer.Start(ctx, "store.add_index")
Expand All @@ -197,22 +203,32 @@ func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.
mhs = append(mhs, r.Cid.Hash())
}

setMhStart := time.Now()
err := s.db.SetMultihashesToPieceCid(ctx, mhs, pieceCid)
if err != nil {
return fmt.Errorf("failed to add entry from mh to pieceCid: %w", err)
}
log.Debugw("handled.add-index SetMultihashesToPieceCid", "took", time.Since(setMhStart).String())

// Add a mapping from piece cid -> offset / size of each block so that
// clients can get the block info for all blocks in a piece
addOffsetsStart := time.Now()
if err := s.db.AddIndexRecords(ctx, pieceCid, records); err != nil {
return err
}
log.Debugw("handled.add-index AddIndexRecords", "took", time.Since(addOffsetsStart).String())

return s.db.MarkIndexingComplete(ctx, pieceCid, len(records), isCompleteIndex)
progress := make(chan types.AddIndexProgress, 1)
go func() {
defer close(progress)
progress <- types.AddIndexProgress{Progress: 0}

setMhStart := time.Now()
err := s.db.SetMultihashesToPieceCid(ctx, mhs, pieceCid)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
log.Debugw("handled.add-index SetMultihashesToPieceCid", "took", time.Since(setMhStart).String())
progress <- types.AddIndexProgress{Progress: 0.5}

// Add a mapping from piece cid -> offset / size of each block so that
// clients can get the block info for all blocks in a piece
addOffsetsStart := time.Now()
if err := s.db.AddIndexRecords(ctx, pieceCid, records); err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
log.Debugw("handled.add-index AddIndexRecords", "took", time.Since(addOffsetsStart).String())
progress <- types.AddIndexProgress{Progress: 1}
}()

return progress
}

func (s *Store) IndexedAt(ctx context.Context, pieceCid cid.Cid) (time.Time, error) {
Expand Down
Loading