Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: offsets to CAR section starts #1660

Merged
merged 2 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/multiformats/go-multibase v0.2.0
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/multiformats/go-varint v0.0.7
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/pressly/goose/v3 v3.14.0
github.com/prometheus/client_golang v1.15.1
Expand Down
25 changes: 19 additions & 6 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
format "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-car"
"github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
carindex "github.com/ipld/go-car/v2/index"
"github.com/jellydator/ttlcache/v2"
"github.com/multiformats/go-multihash"
mh "github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -255,10 +257,14 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid

blockMetadata, err := blockReader.SkipNext()
for err == nil {
// blockMetadata.SourceOffset gives us the offset of the block data, we need to rewind to
// the offset of the CAR section, which is before the CID and the uvarint length prefix
offset := blockMetadata.SourceOffset - uint64(blockMetadata.Cid.ByteLen())
offset -= uint64(varint.UvarintSize(blockMetadata.Size + uint64(blockMetadata.Cid.ByteLen())))
recs = append(recs, model.Record{
Cid: blockMetadata.Cid,
OffsetSize: model.OffsetSize{
Offset: blockMetadata.SourceOffset,
Offset: offset,
Size: blockMetadata.Size,
},
})
Expand Down Expand Up @@ -530,12 +536,16 @@ func (ps *PieceDirectory) BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte,
return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, pieceCid, err)
}

// Seek to the block offset
// Seek to the section offset
readerAt := readerutil.NewReadSeekerFromReaderAt(reader, int64(offsetSize.Offset))
data := make([]byte, offsetSize.Size)
if _, err = io.ReadFull(readerAt, data); err != nil {
// Read the block data
readCid, data, err := util.ReadNode(bufio.NewReader(readerAt))
if err != nil {
return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, pieceCid, err)
}
if !bytes.Equal(readCid.Hash(), c.Hash()) {
return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, pieceCid, c)
}
return data, nil
}()
if err != nil {
Expand Down Expand Up @@ -665,7 +675,7 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) (
if err != nil {
return nil, fmt.Errorf("getting car length for piece %s: %w", pieceCid, err)
}
sectionReader := io.NewSectionReader(reader, carv2.HeaderSize, size-carv2.HeaderSize)
sectionReader := io.NewSectionReader(reader, carv2.HeaderSize+carv2.PragmaSize, size-carv2.HeaderSize-carv2.PragmaSize)
ch, err := car.ReadHeader(bufio.NewReader(sectionReader))
if err != nil {
return nil, fmt.Errorf("reading car header for piece %s: %w", pieceCid, err)
Expand All @@ -674,11 +684,14 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) (
if err := car.WriteHeader(ch, headerBuf); err != nil {
return nil, fmt.Errorf("copying car header for piece %s: %w", pieceCid, err)
}
empty := [carv2.HeaderSize]byte{}
empty := [carv2.HeaderSize + carv2.PragmaSize]byte{}
headerBuf.Write(empty[:])
bsR = carutil.NewMultiReaderAt(bytes.NewReader(headerBuf.Bytes()), sectionReader)
} else {
bsR = reader
if _, err := reader.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("seeking back to start of piece %s: %w", pieceCid, err)
}
}
// Create a blockstore from the index and the piece reader
bs, err := blockstore.NewReadOnly(bsR, idx, carv2.ZeroLengthSectionAsEOF(true))
Expand Down