Skip to content

Commit

Permalink
feat: flagged pieces
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jun 8, 2023
1 parent b8626da commit 12ea959
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 49 deletions.
12 changes: 6 additions & 6 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Store struct {
NextPiecesToCheck func(ctx context.Context) ([]cid.Cid, error)
FlagPiece func(ctx context.Context, pieceCid cid.Cid, hasUnsealedDeal bool) error
UnflagPiece func(ctx context.Context, pieceCid cid.Cid) error
FlaggedPiecesList func(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context) (int, error)
FlaggedPiecesList func(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error)
}
closer jsonrpc.ClientCloser
dialOpts []jsonrpc.Option
Expand Down Expand Up @@ -179,10 +179,10 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
return s.client.UnflagPiece(ctx, pieceCid)
}

func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
return s.client.FlaggedPiecesList(ctx, cursor, offset, limit)
func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
return s.client.FlaggedPiecesList(ctx, filter, cursor, offset, limit)
}

func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
return s.client.FlaggedPiecesCount(ctx)
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
return s.client.FlaggedPiecesCount(ctx, filter)
}
4 changes: 2 additions & 2 deletions extern/boostd-data/couchbase/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
return s.db.UnflagPiece(ctx, pieceCid)
}

func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
func (s *Store) FlaggedPiecesList(ctx context.Context, f *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces")
var spanCursor int
if cursor != nil {
Expand All @@ -299,7 +299,7 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset
return s.db.FlaggedPiecesList(ctx, cursor, offset, limit)
}

func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count")
defer span.End()

Expand Down
53 changes: 48 additions & 5 deletions extern/boostd-data/ldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"encoding/json"
"errors"
"fmt"
"sort"
"time"

"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -615,7 +617,7 @@ func (db *DB) RemoveIndexes(ctx context.Context, cursor uint64, pieceCid cid.Cid
return nil
}

func (db *DB) ListFlaggedPieces(ctx context.Context) ([]model.FlaggedPiece, error) {
func (db *DB) ListFlaggedPieces(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, o int, limit int) ([]model.FlaggedPiece, error) {
ctx, span := tracing.Tracer.Start(ctx, "db.list_flagged_pieces")
defer span.End()

Expand Down Expand Up @@ -647,19 +649,48 @@ func (db *DB) ListFlaggedPieces(ctx context.Context) ([]model.FlaggedPiece, erro
return nil, fmt.Errorf("failed to unmarshal LeveldbFlaggedMetadata: %w; %v", err, r.Value)
}

records = append(records, model.FlaggedPiece{CreatedAt: v.CreatedAt, PieceCid: pieceCid})
if filter != nil && filter.HasUnsealedCopy != v.HasUnsealedCopy {
continue
}

if cursor != nil && v.CreatedAt.Before(*cursor) {
continue
}

records = append(records, model.FlaggedPiece{
CreatedAt: v.CreatedAt,
UpdatedAt: v.UpdatedAt,
PieceCid: pieceCid,
HasUnsealedCopy: v.HasUnsealedCopy,
})
}

sort.Slice(records, func(i, j int) bool {
return records[i].CreatedAt.Before(records[j].CreatedAt)
})

if offset > 0 {
if offset >= len(records) {
records = []model.FlaggedPiece{}
} else {
records = records[offset:]
}
}

if len(records) > limit {
records = records[:limit]
}

return records, nil
}

func (db *DB) FlaggedPiecesCount(ctx context.Context) (int, error) {
func (db *DB) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
ctx, span := tracing.Tracer.Start(ctx, "db.flagged_pieces_count")
defer span.End()

q := query.Query{
Prefix: "/" + sprefixPieceCidToFlagged + "/",
KeysOnly: true,
KeysOnly: filter == nil,
}
results, err := db.Query(ctx, q)
if err != nil {
Expand All @@ -668,11 +699,23 @@ func (db *DB) FlaggedPiecesCount(ctx context.Context) (int, error) {

var i int
for {
_, ok := results.NextSync()
r, ok := results.NextSync()
if !ok {
break
}

if filter != nil {
var v LeveldbFlaggedMetadata
err = json.Unmarshal(r.Value, &v)
if err != nil {
return 0, fmt.Errorf("failed to unmarshal LeveldbFlaggedMetadata: %w; %v", err, r.Value)
}

if filter.HasUnsealedCopy != v.HasUnsealedCopy {
continue
}
}

i++
}

Expand Down
8 changes: 4 additions & 4 deletions extern/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
return nil
}

func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
log.Debugw("handle.flagged-pieces-list")

ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_list")
Expand All @@ -465,10 +465,10 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset
log.Debugw("handled.flagged-pieces-list", "took", time.Since(now).String())
}(time.Now())

return s.db.ListFlaggedPieces(ctx)
return s.db.ListFlaggedPieces(ctx, filter, cursor, offset, limit)
}

func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
log.Debugw("handle.flagged-pieces-count")

ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count")
Expand All @@ -478,7 +478,7 @@ func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
log.Debugw("handled.flagged-pieces-count", "took", time.Since(now).String())
}(time.Now())

return s.db.FlaggedPiecesCount(ctx)
return s.db.FlaggedPiecesCount(ctx, filter)
}

func normalizePieceCidError(pieceCid cid.Cid, err error) error {
Expand Down
8 changes: 6 additions & 2 deletions extern/boostd-data/svc/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type AddIndexProgress struct {
Err string `json:"e,omitempty"`
}

type FlaggedPiecesListFilter struct {
HasUnsealedCopy bool
}

type Service interface {
AddDealForPiece(context.Context, cid.Cid, model.DealInfo) error
AddIndex(context.Context, cid.Cid, []model.Record, bool) <-chan AddIndexProgress
Expand All @@ -52,8 +56,8 @@ type Service interface {
NextPiecesToCheck(ctx context.Context) ([]cid.Cid, error)
FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy bool) error
UnflagPiece(ctx context.Context, pieceCid cid.Cid) error
FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount(ctx context.Context) (int, error)
FlaggedPiecesList(ctx context.Context, filter *FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount(ctx context.Context, filter *FlaggedPiecesListFilter) (int, error)
}

type ServiceImpl interface {
Expand Down
29 changes: 24 additions & 5 deletions extern/boostd-data/yugabyte/piecedoctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/ipfs/go-cid"
"github.com/jackc/pgtype"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -220,7 +221,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
return nil
}

func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces")
var spanCursor int
if cursor != nil {
Expand All @@ -233,12 +234,24 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset

var args []interface{}
idx := 0
qry := `SELECT PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy from PieceFlagged`
qry := `SELECT PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy from PieceFlagged `
where := ""
if cursor != nil {
qry += `WHERE CreatedAt < $1 `
where += `WHERE CreatedAt < $1 `
args = append(args, cursor)
idx++
}
if filter != nil {
if where == "" {
where += `WHERE `
} else {
where += `AND `
}
where += fmt.Sprintf(`HasUnsealedCopy = $%d `, idx+1)
args = append(args, filter.HasUnsealedCopy)
idx++
}
qry += where
qry += `ORDER BY CreatedAt desc `

qry += fmt.Sprintf(`LIMIT $%d OFFSET $%d`, idx+1, idx+2)
Expand Down Expand Up @@ -275,13 +288,19 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset
return pieces, nil
}

func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count")
defer span.End()

var args []interface{}
var count int
qry := `SELECT COUNT(*) FROM PieceFlagged`
err := s.db.QueryRow(ctx, qry).Scan(&count)
if filter != nil {
qry += ` WHERE HasUnsealedCopy = $1`
args = append(args, filter.HasUnsealedCopy)
}

err := s.db.QueryRow(ctx, qry, args...).Scan(&count)
if err != nil {
return 0, fmt.Errorf("getting flagged pieces count: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion gql/resolver_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (r *resolver) LID(ctx context.Context) (*lidState, error) {
}
}

fp, err := r.piecedirectory.FlaggedPiecesCount(ctx)
fp, err := r.piecedirectory.FlaggedPiecesCount(ctx, nil)
if err != nil {
return nil, err
}
Expand Down
16 changes: 11 additions & 5 deletions gql/resolver_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ type flaggedPieceResolver struct {
}

type piecesFlaggedArgs struct {
Cursor *gqltypes.BigInt // CreatedAt in milli-seconds
Offset graphql.NullInt
Limit graphql.NullInt
HasUnsealedCopy graphql.NullBool
Cursor *gqltypes.BigInt // CreatedAt in milli-seconds
Offset graphql.NullInt
Limit graphql.NullInt
}

type flaggedPieceListResolver struct {
Expand All @@ -92,10 +93,15 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (*
limit = int(*args.Limit.Value)
}

var filter *types.FlaggedPiecesListFilter
if args.HasUnsealedCopy.Set && args.HasUnsealedCopy.Value != nil {
filter = &types.FlaggedPiecesListFilter{HasUnsealedCopy: *args.HasUnsealedCopy.Value}
}

// Fetch one extra row so that we can check if there are more rows
// beyond the limit
cursor := bigIntToTime(args.Cursor)
flaggedPieces, err := r.piecedirectory.FlaggedPiecesList(ctx, cursor, offset, limit+1)
flaggedPieces, err := r.piecedirectory.FlaggedPiecesList(ctx, filter, cursor, offset, limit+1)
if err != nil {
return nil, err
}
Expand All @@ -106,7 +112,7 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (*
}

// Get the total row count
count, err := r.piecedirectory.FlaggedPiecesCount(ctx)
count, err := r.piecedirectory.FlaggedPiecesCount(ctx, filter)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ type RootQuery {
retrievalLogsCount: RetrievalStatesCount!

"""Get a list of pieces that have been flagged as having problems"""
piecesFlagged(cursor: BigInt, offset: Int, limit: Int): FlaggedPiecesList!
piecesFlagged(hasUnsealedCopy: Boolean, cursor: BigInt, offset: Int, limit: Int): FlaggedPiecesList!

"""Get information about a piece from the piece store, DAG store and database"""
pieceStatus(pieceCid: String!): PieceStatus!
Expand Down
16 changes: 8 additions & 8 deletions piecedirectory/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)

// The piece should be flagged because there is no index for it
count, err := cl.FlaggedPiecesCount(ctx)
count, err := cl.FlaggedPiecesCount(ctx, nil)
require.NoError(t, err)
require.Equal(t, 1, count)

pcids, err := cl.FlaggedPiecesList(ctx, nil, 0, 10)
pcids, err := cl.FlaggedPiecesList(ctx, nil, nil, 0, 10)
require.NoError(t, err)
require.Equal(t, 1, len(pcids))

Expand All @@ -315,11 +315,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)

// The piece should no longer be flagged
count, err = cl.FlaggedPiecesCount(ctx)
count, err = cl.FlaggedPiecesCount(ctx, nil)
require.NoError(t, err)
require.Equal(t, 0, count)

pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10)
pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10)
require.NoError(t, err)
require.Equal(t, 0, len(pcids))

Expand All @@ -331,11 +331,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)

// The piece should be flagged because there is no unsealed copy
count, err = cl.FlaggedPiecesCount(ctx)
count, err = cl.FlaggedPiecesCount(ctx, nil)
require.NoError(t, err)
require.Equal(t, 1, count)

pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10)
pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10)
require.NoError(t, err)
require.Equal(t, 1, len(pcids))

Expand All @@ -347,11 +347,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)

// The piece should no longer be flagged
count, err = cl.FlaggedPiecesCount(ctx)
count, err = cl.FlaggedPiecesCount(ctx, nil)
require.NoError(t, err)
require.Equal(t, 0, count)

pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10)
pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10)
require.NoError(t, err)
require.Equal(t, 0, len(pcids))
}
Loading

0 comments on commit 12ea959

Please sign in to comment.