From 48e9eae2eefad991c93474267e7f31b9d575d873 Mon Sep 17 00:00:00 2001 From: dirkmc Date: Mon, 3 Jul 2023 16:45:47 +0200 Subject: [PATCH] refactor: separate yugabyte / leveldb tests for easier local testing (#1553) --- .../svc/setup_yugabyte_test_util.go | 7 + extern/boostd-data/svc/svc_size_test.go | 19 - extern/boostd-data/svc/svc_test.go | 506 +----------------- piecedirectory/piecedirectory_test.go | 414 -------------- 4 files changed, 22 insertions(+), 924 deletions(-) delete mode 100644 piecedirectory/piecedirectory_test.go diff --git a/extern/boostd-data/svc/setup_yugabyte_test_util.go b/extern/boostd-data/svc/setup_yugabyte_test_util.go index 179b6b8f1..6934a0390 100644 --- a/extern/boostd-data/svc/setup_yugabyte_test_util.go +++ b/extern/boostd-data/svc/setup_yugabyte_test_util.go @@ -18,6 +18,13 @@ var TestYugabyteSettings = yugabyte.DBSettings{ ConnectString: "postgresql://postgres:postgres@yugabyte:5433", } +// Use when testing against a local yugabyte instance. +// Warning: This will delete all tables in the local yugabyte instance. +var TestYugabyteSettingsLocal = yugabyte.DBSettings{ + Hosts: []string{"localhost"}, + ConnectString: "postgresql://postgres:postgres@localhost:5433", +} + func SetupYugabyte(t *testing.T) { ctx := context.Background() diff --git a/extern/boostd-data/svc/svc_size_test.go b/extern/boostd-data/svc/svc_size_test.go index 403e0c8fe..0810d938e 100644 --- a/extern/boostd-data/svc/svc_size_test.go +++ b/extern/boostd-data/svc/svc_size_test.go @@ -14,8 +14,6 @@ import ( "github.com/filecoin-project/boostd-data/model" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/ipld/go-car/v2/index" - "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) @@ -122,20 +120,3 @@ func generateRandomCid(baseCid []byte) (cid.Cid, error) { return c, nil } - -func toEntries(idx index.Index) (map[string]uint64, bool) { - it, ok := idx.(index.IterableIndex) - if !ok { - return nil, false - } - - entries := make(map[string]uint64) - err := it.ForEach(func(mh multihash.Multihash, o uint64) error { - entries[mh.String()] = o - return nil - }) - if err != nil { - return nil, false - } - return entries, true -} diff --git a/extern/boostd-data/svc/svc_test.go b/extern/boostd-data/svc/svc_test.go index e133604bb..511b35d9a 100644 --- a/extern/boostd-data/svc/svc_test.go +++ b/extern/boostd-data/svc/svc_test.go @@ -1,522 +1,46 @@ -//go:build test_lid -// +build test_lid - package svc import ( "context" - "encoding/hex" - "fmt" - "math/rand" - "os" - "testing" - "time" - - "github.com/filecoin-project/boost/testutil" - "github.com/filecoin-project/boostd-data/client" - "github.com/filecoin-project/boostd-data/model" - "github.com/filecoin-project/go-state-types/abi" - "github.com/google/uuid" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/ipld/go-car/v2" - "github.com/ipld/go-car/v2/index" - "github.com/multiformats/go-multicodec" - "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" + "testing" + "time" ) -func TestService(t *testing.T) { +func TestServiceLevelDB(t *testing.T) { _ = logging.SetLogLevel("cbtest", "debug") - t.Run("leveldb", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - bdsvc, err := NewLevelDB("") - require.NoError(t, err) - - testService(ctx, t, bdsvc, "localhost:0") - }) - - t.Run("yugabyte", func(t *testing.T) { - _ = logging.SetLogLevel("boostd-data-yb", "debug") - - // Running yugabyte tests may require download the docker container - // so set a high timeout - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) - defer cancel() - - SetupYugabyte(t) - - bdsvc := NewYugabyte(TestYugabyteSettings) - - addr := "localhost:0" - testService(ctx, t, bdsvc, addr) - }) -} - -func testService(ctx context.Context, t *testing.T, bdsvc *Service, addr string) { - ln, err := bdsvc.Start(ctx, addr) - require.NoError(t, err) - - cl := client.NewStore() - err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln)) - require.NoError(t, err) - defer cl.Close(ctx) - - sampleidx := "fixtures/baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka.full.idx" - - pieceCid, err := cid.Parse("baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka") - require.NoError(t, err) - - subject, err := loadIndex(sampleidx) - require.NoError(t, err) - - records, err := getRecords(subject) - require.NoError(t, err) - - randomuuid, err := uuid.Parse("4d8f5ce6-dbfd-40dc-8b03-29308e97357b") - require.NoError(t, err) - - err = cl.AddIndex(ctx, pieceCid, records, true) - require.NoError(t, err) - - di := model.DealInfo{ - DealUuid: randomuuid.String(), - SectorID: abi.SectorNumber(1), - PieceOffset: 1, - PieceLength: 2, - CarLength: 3, - } - - // Add a deal for the piece - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - - // Add the same deal a second time to test uniqueness - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - - // There should only be one deal - dis, err := cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 1) - require.Equal(t, di, dis[0]) - - // Add a second deal - di2 := model.DealInfo{ - DealUuid: uuid.NewString(), - SectorID: abi.SectorNumber(11), - PieceOffset: 11, - PieceLength: 12, - CarLength: 13, - } - err = cl.AddDealForPiece(ctx, pieceCid, di2) - require.NoError(t, err) - - // There should now be two deals - dis, err = cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 2) - require.Contains(t, dis, di) - require.Contains(t, dis, di2) - - b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842") - require.NoError(t, err) - - mhash, err := multihash.Cast(b) - require.NoError(t, err) - - offset, err := cl.GetOffsetSize(ctx, pieceCid, mhash) - require.NoError(t, err) - require.EqualValues(t, 3039040395, offset.Offset) - require.EqualValues(t, 0, offset.Size) - - pcids, err := cl.PiecesContainingMultihash(ctx, mhash) - require.NoError(t, err) - require.Len(t, pcids, 1) - require.Equal(t, pieceCid, pcids[0]) - - allPieceCids, err := cl.ListPieces(ctx) - require.NoError(t, err) - require.Len(t, allPieceCids, 1) - require.Equal(t, pieceCid, allPieceCids[0]) - - indexed, err := cl.IsIndexed(ctx, pieceCid) - require.NoError(t, err) - require.True(t, indexed) - - recs, err := cl.GetRecords(ctx, pieceCid) - require.NoError(t, err) - require.Equal(t, len(records), len(recs)) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - loadedSubject, err := cl.GetIndex(ctx, pieceCid) + bdsvc, err := NewLevelDB("") require.NoError(t, err) - ok, err := compareIndices(subject, loadedSubject) - require.NoError(t, err) - require.True(t, ok) + testService(ctx, t, bdsvc, "localhost:0") } -func TestServiceFuzz(t *testing.T) { +func TestServiceFuzzLevelDB(t *testing.T) { t.Skip() _ = logging.SetLogLevel("*", "info") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - t.Run("leveldb", func(t *testing.T) { - bdsvc, err := NewLevelDB("") - require.NoError(t, err) - addr := "localhost:0" - ln, err := bdsvc.Start(ctx, addr) - require.NoError(t, err) - testServiceFuzz(ctx, t, ln.String()) - }) - - t.Run("yugabyte", func(t *testing.T) { - SetupYugabyte(t) - bdsvc := NewYugabyte(TestYugabyteSettings) - - addr := "localhost:0" - ln, err := bdsvc.Start(ctx, addr) - require.NoError(t, err) - - testServiceFuzz(ctx, t, ln.String()) - }) -} - -func testServiceFuzz(ctx context.Context, t *testing.T, addr string) { - cl := client.NewStore() - err := cl.Dial(context.Background(), "ws://"+addr) + bdsvc, err := NewLevelDB("") require.NoError(t, err) - defer cl.Close(ctx) - - var idxs []index.Index - for i := 0; i < 10; i++ { - size := (5 + (i % 3)) << 20 - idxs = append(idxs, createCarIndex(t, size, i+1)) - } - - throttle := make(chan struct{}, 64) - var eg errgroup.Group - for _, idx := range idxs { - idx := idx - eg.Go(func() error { - records, err := getRecords(idx) - require.NoError(t, err) - - randomuuid := uuid.New() - pieceCid := testutil.GenerateCid() - err = cl.AddIndex(ctx, pieceCid, records, true) - require.NoError(t, err) - - di := model.DealInfo{ - DealUuid: randomuuid.String(), - SectorID: abi.SectorNumber(1), - PieceOffset: 1, - PieceLength: 2, - CarLength: 3, - } - - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - - dis, err := cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 1) - require.Equal(t, di, dis[0]) - - indexed, err := cl.IsIndexed(ctx, pieceCid) - require.NoError(t, err) - require.True(t, indexed) - - recs, err := cl.GetRecords(ctx, pieceCid) - require.NoError(t, err) - require.Equal(t, len(records), len(recs)) - - var offsetEG errgroup.Group - for _, r := range recs { - if rand.Float32() > 0.1 { - continue - } - - idx := idx - c := r.Cid - throttle <- struct{}{} - offsetEG.Go(func() error { - defer func() { <-throttle }() - - mhash := c.Hash() - var err error - err1 := idx.GetAll(c, func(expected uint64) bool { - var offsetSize *model.OffsetSize - offsetSize, err = cl.GetOffsetSize(ctx, pieceCid, mhash) - if err != nil { - return false - } - if expected != offsetSize.Offset { - err = fmt.Errorf("cid %s: expected offset %d, got offset %d", c, expected, offsetSize.Offset) - return false - } - return true - }) - if err != nil { - return err - } - if err1 != nil { - return err1 - } - - pcids, err := cl.PiecesContainingMultihash(ctx, mhash) - if err != nil { - return err - } - if len(pcids) != 1 { - return fmt.Errorf("expected 1 piece, got %d", len(pcids)) - } - if pieceCid != pcids[0] { - return fmt.Errorf("expected piece %s, got %s", pieceCid, pcids[0]) - } - return nil - }) - } - err = offsetEG.Wait() - require.NoError(t, err) - - loadedSubject, err := cl.GetIndex(ctx, pieceCid) - require.NoError(t, err) - - ok, err := compareIndices(idx, loadedSubject) - require.NoError(t, err) - require.True(t, ok) - - return nil - }) - } - - err = eg.Wait() - require.NoError(t, err) -} - -func createCarIndex(t *testing.T, size int, rseed int) index.Index { - // Create a CAR file - randomFilePath, err := testutil.CreateRandomFile(t.TempDir(), rseed, size) - require.NoError(t, err) - _, carFilePath, err := testutil.CreateDenseCARv2(t.TempDir(), randomFilePath) - require.NoError(t, err) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - idx, err := car.ReadOrGenerateIndex(carFile) + addr := "localhost:0" + ln, err := bdsvc.Start(ctx, addr) require.NoError(t, err) - return idx -} - -func loadIndex(path string) (index.Index, error) { - defer func(now time.Time) { - log.Debugw("loadindex", "took", time.Since(now).String()) - }(time.Now()) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - subject, err := index.ReadFrom(idxf) - if err != nil { - return nil, err - } - - return subject, nil -} - -func getRecords(subject index.Index) ([]model.Record, error) { - records := make([]model.Record, 0) - - switch idx := subject.(type) { - case index.IterableIndex: - err := idx.ForEach(func(m multihash.Multihash, offset uint64) error { - - cid := cid.NewCidV1(cid.Raw, m) - - records = append(records, model.Record{ - Cid: cid, - OffsetSize: model.OffsetSize{ - Offset: offset, - Size: 0, - }, - }) - - return nil - }) - if err != nil { - return nil, err - } - default: - return nil, fmt.Errorf("wanted %v but got %v\n", multicodec.CarMultihashIndexSorted, idx.Codec()) - } - return records, nil + testServiceFuzz(ctx, t, ln.String()) } -func compareIndices(subject, subjectDb index.Index) (bool, error) { - var b bytes.Buffer - w := bufio.NewWriter(&b) - - _, err := subject.Marshal(w) - if err != nil { - return false, err - } - - var b2 bytes.Buffer - w2 := bufio.NewWriter(&b2) - - _, err = subjectDb.Marshal(w2) - if err != nil { - return false, err - } - - equal := bytes.Equal(b.Bytes(), b2.Bytes()) - - if !equal { - a, oka := toEntries(subject) - b, okb := toEntries(subjectDb) - if oka && okb { - if len(a) != len(b) { - return false, fmt.Errorf("index length mismatch: first %d / second %d", len(a), len(b)) - } - for mh, oa := range a { - ob, ok := b[mh] - if !ok { - return false, fmt.Errorf("second index missing multihash %s", mh) - } - if oa != ob { - return false, fmt.Errorf("offset mismatch for multihash %s: first %d / second %d", mh, oa, ob) - } - } - } - } - - return equal, nil -} - -func TestCleanup(t *testing.T) { +func TestCleanupLevelDB(t *testing.T) { _ = logging.SetLogLevel("*", "debug") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - t.Run("leveldb", func(t *testing.T) { - bdsvc, err := NewLevelDB("") - require.NoError(t, err) - testCleanup(ctx, t, bdsvc, "localhost:0") - }) - - t.Run("yugabyte", func(t *testing.T) { - // Running yugabyte tests may require download the docker container - // so set a high timeout - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) - defer cancel() - - SetupYugabyte(t) - - bdsvc := NewYugabyte(TestYugabyteSettings) - testCleanup(ctx, t, bdsvc, "localhost:0") - }) -} - -func testCleanup(ctx context.Context, t *testing.T, bdsvc *Service, addr string) { - ln, err := bdsvc.Start(ctx, addr) - require.NoError(t, err) - - cl := client.NewStore() - err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln)) - require.NoError(t, err) - defer cl.Close(ctx) - - sampleidx := "fixtures/baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka.full.idx" - - pieceCid, err := cid.Parse("baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka") - require.NoError(t, err) - - subject, err := loadIndex(sampleidx) - require.NoError(t, err) - - records, err := getRecords(subject) - require.NoError(t, err) - - err = cl.AddIndex(ctx, pieceCid, records, true) - require.NoError(t, err) - - di := model.DealInfo{ - DealUuid: uuid.NewString(), - SectorID: abi.SectorNumber(1), - PieceOffset: 1, - PieceLength: 2, - CarLength: 3, - } - di2 := model.DealInfo{ - DealUuid: uuid.NewString(), - SectorID: abi.SectorNumber(10), - PieceOffset: 11, - PieceLength: 12, - CarLength: 13, - } - - // Add two deals for the piece - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - err = cl.AddDealForPiece(ctx, pieceCid, di2) - require.NoError(t, err) - - dis, err := cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 2) - - // Remove one deal for the piece - err = cl.RemoveDealForPiece(ctx, pieceCid, di.DealUuid) - require.NoError(t, err) - - dis, err = cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 1) - - b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842") - require.NoError(t, err) - - mhash, err := multihash.Cast(b) - require.NoError(t, err) - - offset, err := cl.GetOffsetSize(ctx, pieceCid, mhash) - require.NoError(t, err) - require.EqualValues(t, 3039040395, offset.Offset) - - pcids, err := cl.PiecesContainingMultihash(ctx, mhash) - require.NoError(t, err) - require.Len(t, pcids, 1) - require.Equal(t, pieceCid, pcids[0]) - - indexed, err := cl.IsIndexed(ctx, pieceCid) - require.NoError(t, err) - require.True(t, indexed) - - // Remove the other deal for the piece. - // After this call there are no deals left, so it should also cause the - // piece metadata and indexes to be removed. - err = cl.RemoveDealForPiece(ctx, pieceCid, di2.DealUuid) - require.NoError(t, err) - - _, err = cl.GetPieceDeals(ctx, pieceCid) - require.ErrorContains(t, err, "not found") - - _, err = cl.GetOffsetSize(ctx, pieceCid, mhash) - require.ErrorContains(t, err, "not found") - - _, err = cl.GetRecords(ctx, pieceCid) - require.ErrorContains(t, err, "not found") - - _, err = cl.PiecesContainingMultihash(ctx, mhash) - require.ErrorContains(t, err, "not found") - - indexed, err = cl.IsIndexed(ctx, pieceCid) + bdsvc, err := NewLevelDB("") require.NoError(t, err) - require.False(t, indexed) + testCleanup(ctx, t, bdsvc, "localhost:0") } diff --git a/piecedirectory/piecedirectory_test.go b/piecedirectory/piecedirectory_test.go deleted file mode 100644 index 6e114713b..000000000 --- a/piecedirectory/piecedirectory_test.go +++ /dev/null @@ -1,414 +0,0 @@ -//go:build test_lid -// +build test_lid - -package piecedirectory - -import ( - "bytes" - "context" - "fmt" - "io" - "os" - "testing" - "time" - - pdTypes "github.com/filecoin-project/boost/piecedirectory/types" - mock_piecedirectory "github.com/filecoin-project/boost/piecedirectory/types/mocks" - "github.com/filecoin-project/boostd-data/client" - "github.com/filecoin-project/boostd-data/model" - "github.com/filecoin-project/boostd-data/svc" - "github.com/filecoin-project/boostd-data/svc/types" - "github.com/filecoin-project/go-state-types/abi" - "github.com/golang/mock/gomock" - "github.com/google/uuid" - "github.com/ipfs/go-cid" - "github.com/ipld/go-car/v2" - "github.com/ipld/go-car/v2/blockstore" - "github.com/stretchr/testify/require" -) - -func TestPieceDirectory(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) - defer cancel() - - t.Run("leveldb", func(t *testing.T) { - bdsvc, err := svc.NewLevelDB("") - require.NoError(t, err) - testPieceDirectory(ctx, t, bdsvc) - }) - - t.Run("yugabyte", func(t *testing.T) { - svc.SetupYugabyte(t) - bdsvc := svc.NewYugabyte(svc.TestYugabyteSettings) - testPieceDirectory(ctx, t, bdsvc) - }) -} - -func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service) { - ln, err := bdsvc.Start(ctx, "localhost:0") - require.NoError(t, err) - - cl := client.NewStore() - err = cl.Dial(ctx, fmt.Sprintf("ws://%s", ln)) - require.NoError(t, err) - defer cl.Close(ctx) - - t.Run("not found", func(t *testing.T) { - testPieceDirectoryNotFound(ctx, t, cl) - }) - - t.Run("basic blockstore", func(t *testing.T) { - testBasicBlockstoreMethods(ctx, t, cl) - }) - - t.Run("imported index", func(t *testing.T) { - testImportedIndex(ctx, t, cl) - }) - - t.Run("flagging pieces", func(t *testing.T) { - testFlaggingPieces(ctx, t, cl) - }) - - t.Run("reIndexing pieces from multiple sectors", func(t *testing.T) { - testReIndexMultiSector(ctx, t, cl) - }) -} - -func testPieceDirectoryNotFound(ctx context.Context, t *testing.T, cl *client.Store) { - ctrl := gomock.NewController(t) - pr := mock_piecedirectory.NewMockPieceReader(ctrl) - pm := NewPieceDirectory(cl, pr, 1) - pm.Start(ctx) - - nonExistentPieceCid, err := cid.Parse("bafkqaaa") - require.NoError(t, err) - - _, err = pm.GetPieceMetadata(ctx, nonExistentPieceCid) - require.True(t, types.IsNotFound(err)) - require.Error(t, err) - - _, err = pm.GetPieceDeals(ctx, nonExistentPieceCid) - require.True(t, types.IsNotFound(err)) - require.Error(t, err) - - _, err = pm.GetOffsetSize(ctx, nonExistentPieceCid, nonExistentPieceCid.Hash()) - require.True(t, types.IsNotFound(err)) - require.Error(t, err) - - _, err = pm.GetIterableIndex(ctx, nonExistentPieceCid) - require.True(t, types.IsNotFound(err)) - require.Error(t, err) - - _, err = pm.PiecesContainingMultihash(ctx, nonExistentPieceCid.Hash()) - require.True(t, types.IsNotFound(err)) - require.Error(t, err) -} - -// Verify that Has, GetSize and Get block work -func testBasicBlockstoreMethods(ctx context.Context, t *testing.T, cl *client.Store) { - carFilePath := CreateCarFile(t) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - - // Create a random CAR file - carReader, err := car.OpenReader(carFilePath) - require.NoError(t, err) - defer carReader.Close() - carv1Reader, err := carReader.DataReader() - require.NoError(t, err) - - // Any calls to get a reader over data should return a reader over the random CAR file - pr := CreateMockPieceReader(t, carv1Reader) - - pm := NewPieceDirectory(cl, pr, 1) - pm.Start(ctx) - pieceCid := CalculateCommp(t, carv1Reader).PieceCID - - // Add deal info for the piece - it doesn't matter what it is, the piece - // just needs to have at least one deal associated with it - di := model.DealInfo{ - DealUuid: uuid.New().String(), - ChainDealID: 1, - SectorID: 2, - PieceOffset: 0, - PieceLength: 0, - } - err = pm.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - - // Create a CARv2 index over the CAR file - carFileIdxReader, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFileIdxReader.Close() - idx, err := car.ReadOrGenerateIndex(carFileIdxReader) - require.NoError(t, err) - - // Create a blockstore from the CAR file and index - _, err = carFile.Seek(0, io.SeekStart) - require.NoError(t, err) - bs, err := blockstore.NewReadOnly(carFile, idx) - require.NoError(t, err) - - // Get the index (offset and size information) - recs := GetRecords(t, carv1Reader) - - // Verify that blockstore has, get and get size work - for _, rec := range recs { - blk, err := bs.Get(ctx, rec.Cid) - require.NoError(t, err) - - has, err := pm.BlockstoreHas(ctx, rec.Cid) - require.NoError(t, err) - require.True(t, has) - - sz, err := pm.BlockstoreGetSize(ctx, rec.Cid) - require.NoError(t, err) - require.Equal(t, len(blk.RawData()), sz) - - pmblk, err := pm.BlockstoreGet(ctx, rec.Cid) - require.NoError(t, err) - require.True(t, bytes.Equal(blk.RawData(), pmblk)) - } -} - -// Verify that if the index has been imported from the DAG store, meaning -// it has offset information but not block size information, the local index directory -// will re-build the index -func testImportedIndex(ctx context.Context, t *testing.T, cl *client.Store) { - // Create a random CAR file - carFilePath := CreateCarFile(t) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - - carReader, err := car.OpenReader(carFilePath) - require.NoError(t, err) - defer carReader.Close() - carv1Reader, err := carReader.DataReader() - require.NoError(t, err) - - // Any calls to get a reader over data should return a reader over the random CAR file - pr := CreateMockPieceReader(t, carv1Reader) - - recs := GetRecords(t, carv1Reader) - pieceCid := CalculateCommp(t, carv1Reader).PieceCID - err = cl.AddIndex(ctx, pieceCid, recs, false) - require.NoError(t, err) - - // Add deal info for the piece - it doesn't matter what it is, the piece - // just needs to have at least one deal associated with it - di := model.DealInfo{ - DealUuid: uuid.New().String(), - ChainDealID: 1, - SectorID: 2, - PieceOffset: 0, - PieceLength: 0, - } - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - - // Remove size information from the index records. - // This is to simulate what happens when an index is imported from the - // DAG store: only the offset information is imported (not size - // information) - for i, r := range recs { - recs[i] = model.Record{ - Cid: r.Cid, - OffsetSize: model.OffsetSize{ - Offset: r.Offset, - Size: 0, - }, - } - } - - // Add the index to the local index directory, marked as incomplete - err = cl.AddIndex(ctx, pieceCid, recs, false) - require.NoError(t, err) - - // Create a CARv2 index over the CAR file - carFileIdxReader, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFileIdxReader.Close() - idx, err := car.ReadOrGenerateIndex(carFileIdxReader) - require.NoError(t, err) - - // Create a blockstore from the CAR file and index - _, err = carFile.Seek(0, io.SeekStart) - require.NoError(t, err) - bs, err := blockstore.NewReadOnly(carFile, idx) - require.NoError(t, err) - - // Pick a record in the middle of the DAG - rec := recs[len(recs)/2] - blk, err := bs.Get(ctx, rec.Cid) - require.NoError(t, err) - - // Verify that getting the size of a block works correctly: - // There is no size information in the index so the piece - // directory should re-build the index and then return the size. - pm := NewPieceDirectory(cl, pr, 1) - pm.Start(ctx) - sz, err := pm.BlockstoreGetSize(ctx, rec.Cid) - require.NoError(t, err) - require.Equal(t, len(blk.RawData()), sz) -} - -func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) { - // Create a random CAR file - carFilePath := CreateCarFile(t) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - - carReader, err := car.OpenReader(carFilePath) - require.NoError(t, err) - defer carReader.Close() - carv1Reader, err := carReader.DataReader() - require.NoError(t, err) - - recs := GetRecords(t, carv1Reader) - commpCalc := CalculateCommp(t, carv1Reader) - err = cl.AddIndex(ctx, commpCalc.PieceCID, recs, true) - require.NoError(t, err) - - // Add deal info for the piece - di := model.DealInfo{ - DealUuid: uuid.New().String(), - ChainDealID: 1, - SectorID: 1, - PieceOffset: 0, - PieceLength: commpCalc.PieceSize, - } - err = cl.AddDealForPiece(ctx, commpCalc.PieceCID, di) - require.NoError(t, err) - - // No pieces flagged, count and list of pieces should be empty - count, err := cl.FlaggedPiecesCount(ctx, nil) - require.NoError(t, err) - require.Equal(t, 0, count) - - pcids, err := cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) - require.NoError(t, err) - require.Equal(t, 0, len(pcids)) - - // Flag a piece - err = cl.FlagPiece(ctx, commpCalc.PieceCID, false) - require.NoError(t, err) - - // Count and list of pieces should contain one piece - count, err = cl.FlaggedPiecesCount(ctx, nil) - require.NoError(t, err) - require.Equal(t, 1, count) - - pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) - require.NoError(t, err) - require.Equal(t, 1, len(pcids)) - - // Test that setting the filter returns the correct results - count, err = cl.FlaggedPiecesCount(ctx, &types.FlaggedPiecesListFilter{HasUnsealedCopy: false}) - require.NoError(t, err) - require.Equal(t, 1, count) - - count, err = cl.FlaggedPiecesCount(ctx, &types.FlaggedPiecesListFilter{HasUnsealedCopy: true}) - require.NoError(t, err) - require.Equal(t, 0, count) - - pcids, err = cl.FlaggedPiecesList(ctx, &types.FlaggedPiecesListFilter{HasUnsealedCopy: false}, nil, 0, 10) - require.NoError(t, err) - require.Equal(t, 1, len(pcids)) - - pcids, err = cl.FlaggedPiecesList(ctx, &types.FlaggedPiecesListFilter{HasUnsealedCopy: true}, nil, 0, 10) - require.NoError(t, err) - require.Equal(t, 0, len(pcids)) - - // Unflag the piece - err = cl.UnflagPiece(ctx, commpCalc.PieceCID) - require.NoError(t, err) - - // Count and list of pieces should be empty - count, err = cl.FlaggedPiecesCount(ctx, nil) - require.NoError(t, err) - require.Equal(t, 0, count) - - pcids, err = cl.FlaggedPiecesList(ctx, nil, nil, 0, 10) - require.NoError(t, err) - require.Equal(t, 0, len(pcids)) -} - -// Verify that BuildIndexForPiece iterates over all deals return error if none of the deals (sectors) -// can be used to read the piece. We are testing 2 conditions here: -// 1. No eligible piece is found for both deals - error is expected -// 2. 1 eligible piece is found - no error is expected -func testReIndexMultiSector(ctx context.Context, t *testing.T, cl *client.Store) { - ctrl := gomock.NewController(t) - pr := mock_piecedirectory.NewMockPieceReader(ctrl) - pm := NewPieceDirectory(cl, pr, 1) - pm.Start(ctx) - - // Create a random CAR file - carFilePath := CreateCarFile(t) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - - carReader, err := car.OpenReader(carFilePath) - require.NoError(t, err) - defer carReader.Close() - carv1Reader, err := carReader.DataReader() - require.NoError(t, err) - - // Return error first 3 time as during the first attempt we want to surface errors from - // failed BuildIndexForPiece operation for both deals. 3rd time to return error for first deal - // in the second run where we want the method to succeed eventually. - pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("piece error")).Times(3) - pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn( - func(_ context.Context, _ abi.SectorNumber, _ abi.PaddedPieceSize, _ abi.PaddedPieceSize) (pdTypes.SectionReader, error) { - _, err := carv1Reader.Seek(0, io.SeekStart) - return MockSectionReader{carv1Reader}, err - }) - - pieceCid := CalculateCommp(t, carv1Reader).PieceCID - - // Add deal info for the piece - it doesn't matter what it is, the piece - // just needs to have 2 deals. One with no available pieceReader (simulating no unsealed sector) - // and other one with correct pieceReader - d1 := model.DealInfo{ - DealUuid: uuid.New().String(), - ChainDealID: 1, - SectorID: 2, - PieceOffset: 0, - PieceLength: 0, - } - - d2 := model.DealInfo{ - DealUuid: uuid.New().String(), - ChainDealID: 2, - SectorID: 3, - PieceOffset: 0, - PieceLength: 0, - } - - err = cl.AddDealForPiece(ctx, pieceCid, d1) - require.NoError(t, err) - - err = cl.AddDealForPiece(ctx, pieceCid, d2) - require.NoError(t, err) - - b, err := cl.IsIndexed(ctx, pieceCid) - require.NoError(t, err) - require.False(t, b) - - // Expect error as GetReader() mock will return error for both deals - err = pm.BuildIndexForPiece(ctx, pieceCid) - require.ErrorContains(t, err, "piece error") - - // No error is expected as GetReader() mock will return error for first deal - // but correct reader for the second deal - err = pm.BuildIndexForPiece(ctx, pieceCid) - require.NoError(t, err) - - b, err = cl.IsIndexed(ctx, pieceCid) - require.NoError(t, err) - require.True(t, b) -}