From fda921a9c825a65ced0033f5b2d2901219e7b24e Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 1 Sep 2023 19:10:29 +1000 Subject: [PATCH] fix: offsets to CAR section starts (#1660) * fix: offsets to CAR section starts these are required for carv2 indexes when we pass them back to a Blockstore * fix: use ReadNode and compare CID we find --- go.mod | 2 +- piecedirectory/piecedirectory.go | 25 +++++++++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index cdef0210d..a0b65b68b 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 - github.com/multiformats/go-varint v0.0.7 // indirect + github.com/multiformats/go-varint v0.0.7 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/pressly/goose/v3 v3.14.0 github.com/prometheus/client_golang v1.15.1 diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index b60dd7c33..eb4da030d 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -26,12 +26,14 @@ import ( format "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-car" + "github.com/ipld/go-car/util" carv2 "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" carindex "github.com/ipld/go-car/v2/index" "github.com/jellydator/ttlcache/v2" "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash" + "github.com/multiformats/go-varint" "go.opentelemetry.io/otel/attribute" ) @@ -255,10 +257,14 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid blockMetadata, err := blockReader.SkipNext() for err == nil { + // blockMetadata.SourceOffset gives us the offset of the block data, we need to rewind to + // the offset of the CAR section, which is before the CID and the uvarint length prefix + offset := blockMetadata.SourceOffset - uint64(blockMetadata.Cid.ByteLen()) + offset -= uint64(varint.UvarintSize(blockMetadata.Size + uint64(blockMetadata.Cid.ByteLen()))) recs = append(recs, model.Record{ Cid: blockMetadata.Cid, OffsetSize: model.OffsetSize{ - Offset: blockMetadata.SourceOffset, + Offset: offset, Size: blockMetadata.Size, }, }) @@ -530,12 +536,16 @@ func (ps *PieceDirectory) BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, pieceCid, err) } - // Seek to the block offset + // Seek to the section offset readerAt := readerutil.NewReadSeekerFromReaderAt(reader, int64(offsetSize.Offset)) - data := make([]byte, offsetSize.Size) - if _, err = io.ReadFull(readerAt, data); err != nil { + // Read the block data + readCid, data, err := util.ReadNode(bufio.NewReader(readerAt)) + if err != nil { return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, pieceCid, err) } + if !bytes.Equal(readCid.Hash(), c.Hash()) { + return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, pieceCid, c) + } return data, nil }() if err != nil { @@ -665,7 +675,7 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) ( if err != nil { return nil, fmt.Errorf("getting car length for piece %s: %w", pieceCid, err) } - sectionReader := io.NewSectionReader(reader, carv2.HeaderSize, size-carv2.HeaderSize) + sectionReader := io.NewSectionReader(reader, carv2.HeaderSize+carv2.PragmaSize, size-carv2.HeaderSize-carv2.PragmaSize) ch, err := car.ReadHeader(bufio.NewReader(sectionReader)) if err != nil { return nil, fmt.Errorf("reading car header for piece %s: %w", pieceCid, err) @@ -674,11 +684,14 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) ( if err := car.WriteHeader(ch, headerBuf); err != nil { return nil, fmt.Errorf("copying car header for piece %s: %w", pieceCid, err) } - empty := [carv2.HeaderSize]byte{} + empty := [carv2.HeaderSize + carv2.PragmaSize]byte{} headerBuf.Write(empty[:]) bsR = carutil.NewMultiReaderAt(bytes.NewReader(headerBuf.Bytes()), sectionReader) } else { bsR = reader + if _, err := reader.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("seeking back to start of piece %s: %w", pieceCid, err) + } } // Create a blockstore from the index and the piece reader bs, err := blockstore.NewReadOnly(bsR, idx, carv2.ZeroLengthSectionAsEOF(true))