Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flagged pieces filter #1501

Merged
merged 1 commit into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Store struct {
NextPiecesToCheck func(ctx context.Context) ([]cid.Cid, error)
FlagPiece func(ctx context.Context, pieceCid cid.Cid, hasUnsealedDeal bool) error
UnflagPiece func(ctx context.Context, pieceCid cid.Cid) error
FlaggedPiecesList func(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context) (int, error)
FlaggedPiecesList func(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error)
}
closer jsonrpc.ClientCloser
dialOpts []jsonrpc.Option
Expand Down Expand Up @@ -179,10 +179,10 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
return s.client.UnflagPiece(ctx, pieceCid)
}

func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
return s.client.FlaggedPiecesList(ctx, cursor, offset, limit)
func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
return s.client.FlaggedPiecesList(ctx, filter, cursor, offset, limit)
}

func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
return s.client.FlaggedPiecesCount(ctx)
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
return s.client.FlaggedPiecesCount(ctx, filter)
}
4 changes: 2 additions & 2 deletions extern/boostd-data/couchbase/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
return s.db.UnflagPiece(ctx, pieceCid)
}

func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
func (s *Store) FlaggedPiecesList(ctx context.Context, f *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces")
var spanCursor int
if cursor != nil {
Expand All @@ -299,7 +299,7 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset
return s.db.FlaggedPiecesList(ctx, cursor, offset, limit)
}

func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count")
defer span.End()

Expand Down
53 changes: 48 additions & 5 deletions extern/boostd-data/ldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"encoding/json"
"errors"
"fmt"
"sort"
"time"

"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -615,7 +617,7 @@ func (db *DB) RemoveIndexes(ctx context.Context, cursor uint64, pieceCid cid.Cid
return nil
}

func (db *DB) ListFlaggedPieces(ctx context.Context) ([]model.FlaggedPiece, error) {
func (db *DB) ListFlaggedPieces(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, o int, limit int) ([]model.FlaggedPiece, error) {
ctx, span := tracing.Tracer.Start(ctx, "db.list_flagged_pieces")
defer span.End()

Expand Down Expand Up @@ -647,19 +649,48 @@ func (db *DB) ListFlaggedPieces(ctx context.Context) ([]model.FlaggedPiece, erro
return nil, fmt.Errorf("failed to unmarshal LeveldbFlaggedMetadata: %w; %v", err, r.Value)
}

records = append(records, model.FlaggedPiece{CreatedAt: v.CreatedAt, PieceCid: pieceCid})
if filter != nil && filter.HasUnsealedCopy != v.HasUnsealedCopy {
continue
}

if cursor != nil && v.CreatedAt.Before(*cursor) {
continue
}

records = append(records, model.FlaggedPiece{
CreatedAt: v.CreatedAt,
UpdatedAt: v.UpdatedAt,
PieceCid: pieceCid,
HasUnsealedCopy: v.HasUnsealedCopy,
})
}

sort.Slice(records, func(i, j int) bool {
return records[i].CreatedAt.Before(records[j].CreatedAt)
})

if offset > 0 {
if offset >= len(records) {
records = []model.FlaggedPiece{}
} else {
records = records[offset:]
}
}

if len(records) > limit {
records = records[:limit]
}

return records, nil
}

func (db *DB) FlaggedPiecesCount(ctx context.Context) (int, error) {
func (db *DB) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
ctx, span := tracing.Tracer.Start(ctx, "db.flagged_pieces_count")
defer span.End()

q := query.Query{
Prefix: "/" + sprefixPieceCidToFlagged + "/",
KeysOnly: true,
KeysOnly: filter == nil,
}
results, err := db.Query(ctx, q)
if err != nil {
Expand All @@ -668,11 +699,23 @@ func (db *DB) FlaggedPiecesCount(ctx context.Context) (int, error) {

var i int
for {
_, ok := results.NextSync()
r, ok := results.NextSync()
if !ok {
break
}

if filter != nil {
var v LeveldbFlaggedMetadata
err = json.Unmarshal(r.Value, &v)
if err != nil {
return 0, fmt.Errorf("failed to unmarshal LeveldbFlaggedMetadata: %w; %v", err, r.Value)
}

if filter.HasUnsealedCopy != v.HasUnsealedCopy {
continue
}
}

i++
}

Expand Down
8 changes: 4 additions & 4 deletions extern/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
return nil
}

func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
log.Debugw("handle.flagged-pieces-list")

ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_list")
Expand All @@ -465,10 +465,10 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset
log.Debugw("handled.flagged-pieces-list", "took", time.Since(now).String())
}(time.Now())

return s.db.ListFlaggedPieces(ctx)
return s.db.ListFlaggedPieces(ctx, filter, cursor, offset, limit)
}

func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
log.Debugw("handle.flagged-pieces-count")

ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count")
Expand All @@ -478,7 +478,7 @@ func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
log.Debugw("handled.flagged-pieces-count", "took", time.Since(now).String())
}(time.Now())

return s.db.FlaggedPiecesCount(ctx)
return s.db.FlaggedPiecesCount(ctx, filter)
}

func normalizePieceCidError(pieceCid cid.Cid, err error) error {
Expand Down
8 changes: 6 additions & 2 deletions extern/boostd-data/svc/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type AddIndexProgress struct {
Err string `json:"e,omitempty"`
}

type FlaggedPiecesListFilter struct {
HasUnsealedCopy bool
}

type Service interface {
AddDealForPiece(context.Context, cid.Cid, model.DealInfo) error
AddIndex(context.Context, cid.Cid, []model.Record, bool) <-chan AddIndexProgress
Expand All @@ -52,8 +56,8 @@ type Service interface {
NextPiecesToCheck(ctx context.Context) ([]cid.Cid, error)
FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy bool) error
UnflagPiece(ctx context.Context, pieceCid cid.Cid) error
FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount(ctx context.Context) (int, error)
FlaggedPiecesList(ctx context.Context, filter *FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount(ctx context.Context, filter *FlaggedPiecesListFilter) (int, error)
}

type ServiceImpl interface {
Expand Down
29 changes: 24 additions & 5 deletions extern/boostd-data/yugabyte/piecedoctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/ipfs/go-cid"
"github.com/jackc/pgtype"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -220,7 +221,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
return nil
}

func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces")
var spanCursor int
if cursor != nil {
Expand All @@ -233,12 +234,24 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset

var args []interface{}
idx := 0
qry := `SELECT PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy from PieceFlagged`
qry := `SELECT PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy from PieceFlagged `
where := ""
if cursor != nil {
qry += `WHERE CreatedAt < $1 `
where += `WHERE CreatedAt < $1 `
args = append(args, cursor)
idx++
}
if filter != nil {
if where == "" {
where += `WHERE `
} else {
where += `AND `
}
where += fmt.Sprintf(`HasUnsealedCopy = $%d `, idx+1)
args = append(args, filter.HasUnsealedCopy)
idx++
}
qry += where
qry += `ORDER BY CreatedAt desc `

qry += fmt.Sprintf(`LIMIT $%d OFFSET $%d`, idx+1, idx+2)
Expand Down Expand Up @@ -275,13 +288,19 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset
return pieces, nil
}

func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) {
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count")
defer span.End()

var args []interface{}
var count int
qry := `SELECT COUNT(*) FROM PieceFlagged`
err := s.db.QueryRow(ctx, qry).Scan(&count)
if filter != nil {
qry += ` WHERE HasUnsealedCopy = $1`
args = append(args, filter.HasUnsealedCopy)
}

err := s.db.QueryRow(ctx, qry, args...).Scan(&count)
if err != nil {
return 0, fmt.Errorf("getting flagged pieces count: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion gql/resolver_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (r *resolver) LID(ctx context.Context) (*lidState, error) {
}
}

fp, err := r.piecedirectory.FlaggedPiecesCount(ctx)
fp, err := r.piecedirectory.FlaggedPiecesCount(ctx, nil)
if err != nil {
return nil, err
}
Expand Down
16 changes: 11 additions & 5 deletions gql/resolver_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ type flaggedPieceResolver struct {
}

type piecesFlaggedArgs struct {
Cursor *gqltypes.BigInt // CreatedAt in milli-seconds
Offset graphql.NullInt
Limit graphql.NullInt
HasUnsealedCopy graphql.NullBool
Cursor *gqltypes.BigInt // CreatedAt in milli-seconds
Offset graphql.NullInt
Limit graphql.NullInt
}

type flaggedPieceListResolver struct {
Expand All @@ -92,10 +93,15 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (*
limit = int(*args.Limit.Value)
}

var filter *types.FlaggedPiecesListFilter
if args.HasUnsealedCopy.Set && args.HasUnsealedCopy.Value != nil {
filter = &types.FlaggedPiecesListFilter{HasUnsealedCopy: *args.HasUnsealedCopy.Value}
}

// Fetch one extra row so that we can check if there are more rows
// beyond the limit
cursor := bigIntToTime(args.Cursor)
flaggedPieces, err := r.piecedirectory.FlaggedPiecesList(ctx, cursor, offset, limit+1)
flaggedPieces, err := r.piecedirectory.FlaggedPiecesList(ctx, filter, cursor, offset, limit+1)
if err != nil {
return nil, err
}
Expand All @@ -106,7 +112,7 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (*
}

// Get the total row count
count, err := r.piecedirectory.FlaggedPiecesCount(ctx)
count, err := r.piecedirectory.FlaggedPiecesCount(ctx, filter)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ type RootQuery {
retrievalLogsCount: RetrievalStatesCount!

"""Get a list of pieces that have been flagged as having problems"""
piecesFlagged(cursor: BigInt, offset: Int, limit: Int): FlaggedPiecesList!
piecesFlagged(hasUnsealedCopy: Boolean, cursor: BigInt, offset: Int, limit: Int): FlaggedPiecesList!

"""Get information about a piece from the piece store, DAG store and database"""
pieceStatus(pieceCid: String!): PieceStatus!
Expand Down
16 changes: 8 additions & 8 deletions piecedirectory/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)

// The piece should be flagged because there is no index for it
count, err := cl.FlaggedPiecesCount(ctx)
count, err := cl.FlaggedPiecesCount(ctx, nil)
require.NoError(t, err)
require.Equal(t, 1, count)

pcids, err := cl.FlaggedPiecesList(ctx, nil, 0, 10)
pcids, err := cl.FlaggedPiecesList(ctx, nil, nil, 0, 10)
require.NoError(t, err)
require.Equal(t, 1, len(pcids))

Expand All @@ -315,11 +315,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)

// The piece should no longer be flagged
count, err = cl.FlaggedPiecesCount(ctx)
count, err = cl.FlaggedPiecesCount(ctx, nil)
require.NoError(t, err)
require.Equal(t, 0, count)

pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10)
pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10)
require.NoError(t, err)
require.Equal(t, 0, len(pcids))

Expand All @@ -331,11 +331,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)

// The piece should be flagged because there is no unsealed copy
count, err = cl.FlaggedPiecesCount(ctx)
count, err = cl.FlaggedPiecesCount(ctx, nil)
require.NoError(t, err)
require.Equal(t, 1, count)

pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10)
pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10)
require.NoError(t, err)
require.Equal(t, 1, len(pcids))

Expand All @@ -347,11 +347,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)

// The piece should no longer be flagged
count, err = cl.FlaggedPiecesCount(ctx)
count, err = cl.FlaggedPiecesCount(ctx, nil)
require.NoError(t, err)
require.Equal(t, 0, count)

pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10)
pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10)
require.NoError(t, err)
require.Equal(t, 0, len(pcids))
}
Loading