Skip to content

Commit

Permalink
fix: offsets to CAR section starts (#1660)
Browse files Browse the repository at this point in the history
* fix: offsets to CAR section starts

these are required for carv2 indexes when we pass them back to a Blockstore

* fix: use ReadNode and compare CID we find
  • Loading branch information
rvagg committed Sep 1, 2023
1 parent ccb2738 commit fda921a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/multiformats/go-multibase v0.2.0
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/multiformats/go-varint v0.0.7
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/pressly/goose/v3 v3.14.0
github.com/prometheus/client_golang v1.15.1
Expand Down
25 changes: 19 additions & 6 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
format "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-car"
"github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
carindex "github.com/ipld/go-car/v2/index"
"github.com/jellydator/ttlcache/v2"
"github.com/multiformats/go-multihash"
mh "github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -255,10 +257,14 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid

blockMetadata, err := blockReader.SkipNext()
for err == nil {
// blockMetadata.SourceOffset gives us the offset of the block data, we need to rewind to
// the offset of the CAR section, which is before the CID and the uvarint length prefix
offset := blockMetadata.SourceOffset - uint64(blockMetadata.Cid.ByteLen())
offset -= uint64(varint.UvarintSize(blockMetadata.Size + uint64(blockMetadata.Cid.ByteLen())))
recs = append(recs, model.Record{
Cid: blockMetadata.Cid,
OffsetSize: model.OffsetSize{
Offset: blockMetadata.SourceOffset,
Offset: offset,
Size: blockMetadata.Size,
},
})
Expand Down Expand Up @@ -530,12 +536,16 @@ func (ps *PieceDirectory) BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte,
return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, pieceCid, err)
}

// Seek to the block offset
// Seek to the section offset
readerAt := readerutil.NewReadSeekerFromReaderAt(reader, int64(offsetSize.Offset))
data := make([]byte, offsetSize.Size)
if _, err = io.ReadFull(readerAt, data); err != nil {
// Read the block data
readCid, data, err := util.ReadNode(bufio.NewReader(readerAt))
if err != nil {
return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, pieceCid, err)
}
if !bytes.Equal(readCid.Hash(), c.Hash()) {
return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, pieceCid, c)
}
return data, nil
}()
if err != nil {
Expand Down Expand Up @@ -665,7 +675,7 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) (
if err != nil {
return nil, fmt.Errorf("getting car length for piece %s: %w", pieceCid, err)
}
sectionReader := io.NewSectionReader(reader, carv2.HeaderSize, size-carv2.HeaderSize)
sectionReader := io.NewSectionReader(reader, carv2.HeaderSize+carv2.PragmaSize, size-carv2.HeaderSize-carv2.PragmaSize)
ch, err := car.ReadHeader(bufio.NewReader(sectionReader))
if err != nil {
return nil, fmt.Errorf("reading car header for piece %s: %w", pieceCid, err)
Expand All @@ -674,11 +684,14 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) (
if err := car.WriteHeader(ch, headerBuf); err != nil {
return nil, fmt.Errorf("copying car header for piece %s: %w", pieceCid, err)
}
empty := [carv2.HeaderSize]byte{}
empty := [carv2.HeaderSize + carv2.PragmaSize]byte{}
headerBuf.Write(empty[:])
bsR = carutil.NewMultiReaderAt(bytes.NewReader(headerBuf.Bytes()), sectionReader)
} else {
bsR = reader
if _, err := reader.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("seeking back to start of piece %s: %w", pieceCid, err)
}
}
// Create a blockstore from the index and the piece reader
bs, err := blockstore.NewReadOnly(bsR, idx, carv2.ZeroLengthSectionAsEOF(true))
Expand Down

0 comments on commit fda921a

Please sign in to comment.