generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 88
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
wip: initial import of backend gateway
- Loading branch information
Showing
8 changed files
with
1,955 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,137 @@ | ||
package gateway | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ipfs/boxo/verifcid" | ||
blocks "github.com/ipfs/go-block-format" | ||
"github.com/ipfs/go-cid" | ||
"github.com/ipfs/go-unixfsnode" | ||
"github.com/ipld/go-car" | ||
"github.com/ipld/go-ipld-prime" | ||
"github.com/ipld/go-ipld-prime/datamodel" | ||
"github.com/ipld/go-ipld-prime/linking" | ||
cidlink "github.com/ipld/go-ipld-prime/linking/cid" | ||
"github.com/multiformats/go-multihash" | ||
) | ||
|
||
type getBlock func(ctx context.Context, cid cid.Cid) (blocks.Block, error) | ||
|
||
var ErrNilBlock = errors.New("received a nil block with no error") | ||
|
||
func carToLinearBlockGetter(ctx context.Context, reader io.Reader, metrics *GraphGatewayMetrics) (getBlock, error) { | ||
cr, err := car.NewCarReaderWithOptions(reader, car.WithErrorOnEmptyRoots(false)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
cbCtx, cncl := context.WithCancel(ctx) | ||
|
||
type blockRead struct { | ||
block blocks.Block | ||
err error | ||
} | ||
|
||
blkCh := make(chan blockRead, 1) | ||
go func() { | ||
defer cncl() | ||
defer close(blkCh) | ||
for { | ||
blk, rdErr := cr.Next() | ||
select { | ||
case blkCh <- blockRead{blk, rdErr}: | ||
if rdErr != nil { | ||
cncl() | ||
} | ||
case <-cbCtx.Done(): | ||
return | ||
} | ||
} | ||
}() | ||
|
||
isFirstBlock := true | ||
mx := sync.Mutex{} | ||
|
||
return func(ctx context.Context, c cid.Cid) (blocks.Block, error) { | ||
mx.Lock() | ||
defer mx.Unlock() | ||
if err := verifcid.ValidateCid(verifcid.DefaultAllowlist, c); err != nil { | ||
return nil, err | ||
} | ||
|
||
isId, bdata := extractIdentityMultihashCIDContents(c) | ||
if isId { | ||
return blocks.NewBlockWithCid(bdata, c) | ||
} | ||
|
||
// initially set a higher timeout here so that if there's an initial timeout error we get it from the car reader. | ||
var t *time.Timer | ||
if isFirstBlock { | ||
t = time.NewTimer(getBlockTimeout * 2) | ||
} else { | ||
t = time.NewTimer(getBlockTimeout) | ||
} | ||
var blkRead blockRead | ||
var ok bool | ||
select { | ||
case blkRead, ok = <-blkCh: | ||
if !t.Stop() { | ||
<-t.C | ||
} | ||
t.Reset(getBlockTimeout) | ||
case <-t.C: | ||
return nil, ErrGatewayTimeout | ||
} | ||
if !ok || blkRead.err != nil { | ||
if !ok || errors.Is(blkRead.err, io.EOF) { | ||
return nil, io.ErrUnexpectedEOF | ||
} | ||
return nil, GatewayError(blkRead.err) | ||
} | ||
if blkRead.block != nil { | ||
metrics.carBlocksFetchedMetric.Inc() | ||
if !blkRead.block.Cid().Equals(c) { | ||
return nil, errors.New(fmt.Sprintf("received block with cid %s, expected %s", blkRead.block.Cid(), c)) | ||
} | ||
return blkRead.block, nil | ||
} | ||
return nil, ErrNilBlock | ||
}, nil | ||
} | ||
|
||
// extractIdentityMultihashCIDContents will check if a given CID has an identity multihash and if so return true and | ||
// the bytes encoded in the digest, otherwise will return false. | ||
// Taken from https://github.com/ipfs/boxo/blob/b96767cc0971ca279feb36e7844e527a774309ab/blockstore/idstore.go#L30 | ||
func extractIdentityMultihashCIDContents(k cid.Cid) (bool, []byte) { | ||
// Pre-check by calling Prefix(), this much faster than extracting the hash. | ||
if k.Prefix().MhType != multihash.IDENTITY { | ||
return false, nil | ||
} | ||
|
||
dmh, err := multihash.Decode(k.Hash()) | ||
if err != nil || dmh.Code != multihash.IDENTITY { | ||
return false, nil | ||
} | ||
return true, dmh.Digest | ||
} | ||
|
||
func getLinksystem(fn getBlock) *ipld.LinkSystem { | ||
lsys := cidlink.DefaultLinkSystem() | ||
lsys.StorageReadOpener = func(linkContext linking.LinkContext, link datamodel.Link) (io.Reader, error) { | ||
c := link.(cidlink.Link).Cid | ||
blk, err := fn(linkContext.Ctx, c) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return bytes.NewReader(blk.RawData()), nil | ||
} | ||
lsys.TrustedStorage = true | ||
unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) | ||
return &lsys | ||
} |
Oops, something went wrong.