diff --git a/extern/boostd-data/client/client.go b/extern/boostd-data/client/client.go index c563c5076..83502f474 100644 --- a/extern/boostd-data/client/client.go +++ b/extern/boostd-data/client/client.go @@ -35,8 +35,8 @@ type Store struct { NextPiecesToCheck func(ctx context.Context) ([]cid.Cid, error) FlagPiece func(ctx context.Context, pieceCid cid.Cid, hasUnsealedDeal bool) error UnflagPiece func(ctx context.Context, pieceCid cid.Cid) error - FlaggedPiecesList func(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) - FlaggedPiecesCount func(ctx context.Context) (int, error) + FlaggedPiecesList func(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) + FlaggedPiecesCount func(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) } closer jsonrpc.ClientCloser dialOpts []jsonrpc.Option @@ -179,10 +179,10 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error { return s.client.UnflagPiece(ctx, pieceCid) } -func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { - return s.client.FlaggedPiecesList(ctx, cursor, offset, limit) +func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { + return s.client.FlaggedPiecesList(ctx, filter, cursor, offset, limit) } -func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) { - return s.client.FlaggedPiecesCount(ctx) +func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) { + return s.client.FlaggedPiecesCount(ctx, filter) } diff --git a/extern/boostd-data/couchbase/service.go b/extern/boostd-data/couchbase/service.go index ae06541c0..4248ab54c 100644 --- a/extern/boostd-data/couchbase/service.go +++ b/extern/boostd-data/couchbase/service.go @@ -285,7 +285,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error { return s.db.UnflagPiece(ctx, pieceCid) } -func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { +func (s *Store) FlaggedPiecesList(ctx context.Context, f *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces") var spanCursor int if cursor != nil { @@ -299,7 +299,7 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset return s.db.FlaggedPiecesList(ctx, cursor, offset, limit) } -func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) { +func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) { ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count") defer span.End() diff --git a/extern/boostd-data/ldb/db.go b/extern/boostd-data/ldb/db.go index 523ad12eb..6decc2068 100644 --- a/extern/boostd-data/ldb/db.go +++ b/extern/boostd-data/ldb/db.go @@ -6,10 +6,12 @@ import ( "encoding/json" "errors" "fmt" + "sort" "time" "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/boostd-data/shared/tracing" + "github.com/filecoin-project/boostd-data/svc/types" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore" @@ -615,7 +617,7 @@ func (db *DB) RemoveIndexes(ctx context.Context, cursor uint64, pieceCid cid.Cid return nil } -func (db *DB) ListFlaggedPieces(ctx context.Context) ([]model.FlaggedPiece, error) { +func (db *DB) ListFlaggedPieces(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, o int, limit int) ([]model.FlaggedPiece, error) { ctx, span := tracing.Tracer.Start(ctx, "db.list_flagged_pieces") defer span.End() @@ -647,19 +649,48 @@ func (db *DB) ListFlaggedPieces(ctx context.Context) ([]model.FlaggedPiece, erro return nil, fmt.Errorf("failed to unmarshal LeveldbFlaggedMetadata: %w; %v", err, r.Value) } - records = append(records, model.FlaggedPiece{CreatedAt: v.CreatedAt, PieceCid: pieceCid}) + if filter != nil && filter.HasUnsealedCopy != v.HasUnsealedCopy { + continue + } + + if cursor != nil && v.CreatedAt.Before(*cursor) { + continue + } + + records = append(records, model.FlaggedPiece{ + CreatedAt: v.CreatedAt, + UpdatedAt: v.UpdatedAt, + PieceCid: pieceCid, + HasUnsealedCopy: v.HasUnsealedCopy, + }) + } + + sort.Slice(records, func(i, j int) bool { + return records[i].CreatedAt.Before(records[j].CreatedAt) + }) + + if offset > 0 { + if offset >= len(records) { + records = []model.FlaggedPiece{} + } else { + records = records[offset:] + } + } + + if len(records) > limit { + records = records[:limit] } return records, nil } -func (db *DB) FlaggedPiecesCount(ctx context.Context) (int, error) { +func (db *DB) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) { ctx, span := tracing.Tracer.Start(ctx, "db.flagged_pieces_count") defer span.End() q := query.Query{ Prefix: "/" + sprefixPieceCidToFlagged + "/", - KeysOnly: true, + KeysOnly: filter == nil, } results, err := db.Query(ctx, q) if err != nil { @@ -668,11 +699,23 @@ func (db *DB) FlaggedPiecesCount(ctx context.Context) (int, error) { var i int for { - _, ok := results.NextSync() + r, ok := results.NextSync() if !ok { break } + if filter != nil { + var v LeveldbFlaggedMetadata + err = json.Unmarshal(r.Value, &v) + if err != nil { + return 0, fmt.Errorf("failed to unmarshal LeveldbFlaggedMetadata: %w; %v", err, r.Value) + } + + if filter.HasUnsealedCopy != v.HasUnsealedCopy { + continue + } + } + i++ } diff --git a/extern/boostd-data/ldb/service.go b/extern/boostd-data/ldb/service.go index db737aa4a..55ef13313 100644 --- a/extern/boostd-data/ldb/service.go +++ b/extern/boostd-data/ldb/service.go @@ -455,7 +455,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error { return nil } -func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { +func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { log.Debugw("handle.flagged-pieces-list") ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_list") @@ -465,10 +465,10 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset log.Debugw("handled.flagged-pieces-list", "took", time.Since(now).String()) }(time.Now()) - return s.db.ListFlaggedPieces(ctx) + return s.db.ListFlaggedPieces(ctx, filter, cursor, offset, limit) } -func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) { +func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) { log.Debugw("handle.flagged-pieces-count") ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count") @@ -478,7 +478,7 @@ func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) { log.Debugw("handled.flagged-pieces-count", "took", time.Since(now).String()) }(time.Now()) - return s.db.FlaggedPiecesCount(ctx) + return s.db.FlaggedPiecesCount(ctx, filter) } func normalizePieceCidError(pieceCid cid.Cid, err error) error { diff --git a/extern/boostd-data/svc/types/types.go b/extern/boostd-data/svc/types/types.go index af93e6f61..626aa1173 100644 --- a/extern/boostd-data/svc/types/types.go +++ b/extern/boostd-data/svc/types/types.go @@ -34,6 +34,10 @@ type AddIndexProgress struct { Err string `json:"e,omitempty"` } +type FlaggedPiecesListFilter struct { + HasUnsealedCopy bool +} + type Service interface { AddDealForPiece(context.Context, cid.Cid, model.DealInfo) error AddIndex(context.Context, cid.Cid, []model.Record, bool) <-chan AddIndexProgress @@ -52,8 +56,8 @@ type Service interface { NextPiecesToCheck(ctx context.Context) ([]cid.Cid, error) FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy bool) error UnflagPiece(ctx context.Context, pieceCid cid.Cid) error - FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) - FlaggedPiecesCount(ctx context.Context) (int, error) + FlaggedPiecesList(ctx context.Context, filter *FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) + FlaggedPiecesCount(ctx context.Context, filter *FlaggedPiecesListFilter) (int, error) } type ServiceImpl interface { diff --git a/extern/boostd-data/yugabyte/piecedoctor.go b/extern/boostd-data/yugabyte/piecedoctor.go index 28f01a8e3..02d054b25 100644 --- a/extern/boostd-data/yugabyte/piecedoctor.go +++ b/extern/boostd-data/yugabyte/piecedoctor.go @@ -7,6 +7,7 @@ import ( "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/boostd-data/shared/tracing" + "github.com/filecoin-project/boostd-data/svc/types" "github.com/ipfs/go-cid" "github.com/jackc/pgtype" "go.opentelemetry.io/otel/attribute" @@ -220,7 +221,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error { return nil } -func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { +func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces") var spanCursor int if cursor != nil { @@ -233,12 +234,24 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset var args []interface{} idx := 0 - qry := `SELECT PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy from PieceFlagged` + qry := `SELECT PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy from PieceFlagged ` + where := "" if cursor != nil { - qry += `WHERE CreatedAt < $1 ` + where += `WHERE CreatedAt < $1 ` args = append(args, cursor) idx++ } + if filter != nil { + if where == "" { + where += `WHERE ` + } else { + where += `AND ` + } + where += fmt.Sprintf(`HasUnsealedCopy = $%d `, idx+1) + args = append(args, filter.HasUnsealedCopy) + idx++ + } + qry += where qry += `ORDER BY CreatedAt desc ` qry += fmt.Sprintf(`LIMIT $%d OFFSET $%d`, idx+1, idx+2) @@ -275,13 +288,19 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset return pieces, nil } -func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) { +func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) { ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count") defer span.End() + var args []interface{} var count int qry := `SELECT COUNT(*) FROM PieceFlagged` - err := s.db.QueryRow(ctx, qry).Scan(&count) + if filter != nil { + qry += ` WHERE HasUnsealedCopy = $1` + args = append(args, filter.HasUnsealedCopy) + } + + err := s.db.QueryRow(ctx, qry, args...).Scan(&count) if err != nil { return 0, fmt.Errorf("getting flagged pieces count: %w", err) } diff --git a/gql/resolver_lid.go b/gql/resolver_lid.go index 380d41294..2e2c5ad6b 100644 --- a/gql/resolver_lid.go +++ b/gql/resolver_lid.go @@ -60,7 +60,7 @@ func (r *resolver) LID(ctx context.Context) (*lidState, error) { } } - fp, err := r.piecedirectory.FlaggedPiecesCount(ctx) + fp, err := r.piecedirectory.FlaggedPiecesCount(ctx, nil) if err != nil { return nil, err } diff --git a/gql/resolver_piece.go b/gql/resolver_piece.go index 65fc7a569..4e585e675 100644 --- a/gql/resolver_piece.go +++ b/gql/resolver_piece.go @@ -70,9 +70,10 @@ type flaggedPieceResolver struct { } type piecesFlaggedArgs struct { - Cursor *gqltypes.BigInt // CreatedAt in milli-seconds - Offset graphql.NullInt - Limit graphql.NullInt + HasUnsealedCopy graphql.NullBool + Cursor *gqltypes.BigInt // CreatedAt in milli-seconds + Offset graphql.NullInt + Limit graphql.NullInt } type flaggedPieceListResolver struct { @@ -92,10 +93,15 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (* limit = int(*args.Limit.Value) } + var filter *types.FlaggedPiecesListFilter + if args.HasUnsealedCopy.Set && args.HasUnsealedCopy.Value != nil { + filter = &types.FlaggedPiecesListFilter{HasUnsealedCopy: *args.HasUnsealedCopy.Value} + } + // Fetch one extra row so that we can check if there are more rows // beyond the limit cursor := bigIntToTime(args.Cursor) - flaggedPieces, err := r.piecedirectory.FlaggedPiecesList(ctx, cursor, offset, limit+1) + flaggedPieces, err := r.piecedirectory.FlaggedPiecesList(ctx, filter, cursor, offset, limit+1) if err != nil { return nil, err } @@ -106,7 +112,7 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (* } // Get the total row count - count, err := r.piecedirectory.FlaggedPiecesCount(ctx) + count, err := r.piecedirectory.FlaggedPiecesCount(ctx, filter) if err != nil { return nil, err } diff --git a/gql/schema.graphql b/gql/schema.graphql index 9b23cf61a..70860119b 100644 --- a/gql/schema.graphql +++ b/gql/schema.graphql @@ -483,7 +483,7 @@ type RootQuery { retrievalLogsCount: RetrievalStatesCount! """Get a list of pieces that have been flagged as having problems""" - piecesFlagged(cursor: BigInt, offset: Int, limit: Int): FlaggedPiecesList! + piecesFlagged(hasUnsealedCopy: Boolean, cursor: BigInt, offset: Int, limit: Int): FlaggedPiecesList! """Get information about a piece from the piece store, DAG store and database""" pieceStatus(pieceCid: String!): PieceStatus! diff --git a/piecedirectory/doctor_test.go b/piecedirectory/doctor_test.go index c54671fb3..47ef7b337 100644 --- a/piecedirectory/doctor_test.go +++ b/piecedirectory/doctor_test.go @@ -297,11 +297,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) { require.NoError(t, err) // The piece should be flagged because there is no index for it - count, err := cl.FlaggedPiecesCount(ctx) + count, err := cl.FlaggedPiecesCount(ctx, nil) require.NoError(t, err) require.Equal(t, 1, count) - pcids, err := cl.FlaggedPiecesList(ctx, nil, 0, 10) + pcids, err := cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) require.NoError(t, err) require.Equal(t, 1, len(pcids)) @@ -315,11 +315,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) { require.NoError(t, err) // The piece should no longer be flagged - count, err = cl.FlaggedPiecesCount(ctx) + count, err = cl.FlaggedPiecesCount(ctx, nil) require.NoError(t, err) require.Equal(t, 0, count) - pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10) + pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) require.NoError(t, err) require.Equal(t, 0, len(pcids)) @@ -331,11 +331,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) { require.NoError(t, err) // The piece should be flagged because there is no unsealed copy - count, err = cl.FlaggedPiecesCount(ctx) + count, err = cl.FlaggedPiecesCount(ctx, nil) require.NoError(t, err) require.Equal(t, 1, count) - pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10) + pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) require.NoError(t, err) require.Equal(t, 1, len(pcids)) @@ -347,11 +347,11 @@ func testCheckPieces(ctx context.Context, t *testing.T, cl *client.Store) { require.NoError(t, err) // The piece should no longer be flagged - count, err = cl.FlaggedPiecesCount(ctx) + count, err = cl.FlaggedPiecesCount(ctx, nil) require.NoError(t, err) require.Equal(t, 0, count) - pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10) + pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) require.NoError(t, err) require.Equal(t, 0, len(pcids)) } diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index a05162e2b..3e16b3e3d 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -13,6 +13,7 @@ import ( bdclient "github.com/filecoin-project/boostd-data/client" "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/boostd-data/shared/tracing" + bdtypes "github.com/filecoin-project/boostd-data/svc/types" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/markets/dagstore" "github.com/hashicorp/go-multierror" @@ -54,12 +55,12 @@ func (ps *PieceDirectory) Start(ctx context.Context) { ps.ctx = ctx } -func (ps *PieceDirectory) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { - return ps.store.FlaggedPiecesList(ctx, cursor, offset, limit) +func (ps *PieceDirectory) FlaggedPiecesList(ctx context.Context, filter *bdtypes.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { + return ps.store.FlaggedPiecesList(ctx, filter, cursor, offset, limit) } -func (ps *PieceDirectory) FlaggedPiecesCount(ctx context.Context) (int, error) { - return ps.store.FlaggedPiecesCount(ctx) +func (ps *PieceDirectory) FlaggedPiecesCount(ctx context.Context, filter *bdtypes.FlaggedPiecesListFilter) (int, error) { + return ps.store.FlaggedPiecesCount(ctx, filter) } // Get all metadata about a particular piece diff --git a/piecedirectory/piecedirectory_test.go b/piecedirectory/piecedirectory_test.go index eabb14f77..4ae8252b2 100644 --- a/piecedirectory/piecedirectory_test.go +++ b/piecedirectory/piecedirectory_test.go @@ -278,11 +278,11 @@ func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) { require.NoError(t, err) // No pieces flagged, count and list of pieces should be empty - count, err := cl.FlaggedPiecesCount(ctx) + count, err := cl.FlaggedPiecesCount(ctx, nil) require.NoError(t, err) require.Equal(t, 0, count) - pcids, err := cl.FlaggedPiecesList(ctx, nil, 0, 10) + pcids, err := cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) require.NoError(t, err) require.Equal(t, 0, len(pcids)) @@ -291,24 +291,41 @@ func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) { require.NoError(t, err) // Count and list of pieces should contain one piece - count, err = cl.FlaggedPiecesCount(ctx) + count, err = cl.FlaggedPiecesCount(ctx, nil) require.NoError(t, err) require.Equal(t, 1, count) - pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10) + pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) require.NoError(t, err) require.Equal(t, 1, len(pcids)) + // Test that setting the filter returns the correct results + count, err = cl.FlaggedPiecesCount(ctx, &types.FlaggedPiecesListFilter{HasUnsealedCopy: false}) + require.NoError(t, err) + require.Equal(t, 1, count) + + count, err = cl.FlaggedPiecesCount(ctx, &types.FlaggedPiecesListFilter{HasUnsealedCopy: true}) + require.NoError(t, err) + require.Equal(t, 0, count) + + pcids, err = cl.FlaggedPiecesList(ctx, &types.FlaggedPiecesListFilter{HasUnsealedCopy: false}, nil, 0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(pcids)) + + pcids, err = cl.FlaggedPiecesList(ctx, &types.FlaggedPiecesListFilter{HasUnsealedCopy: true}, nil, 0, 10) + require.NoError(t, err) + require.Equal(t, 0, len(pcids)) + // Unflag the piece err = cl.UnflagPiece(ctx, commpCalc.PieceCID) require.NoError(t, err) // Count and list of pieces should be empty - count, err = cl.FlaggedPiecesCount(ctx) + count, err = cl.FlaggedPiecesCount(ctx, nil) require.NoError(t, err) require.Equal(t, 0, count) - pcids, err = cl.FlaggedPiecesList(ctx, nil, 0, 10) + pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) require.NoError(t, err) require.Equal(t, 0, len(pcids)) }