Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
feat(graph-gateway): add some (hacky) support for fetching blocks as …
Browse files Browse the repository at this point in the history
…well as CARs
  • Loading branch information
aschmahmann committed Mar 24, 2023
1 parent ce1ffd4 commit d8b4d4e
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 17 deletions.
2 changes: 1 addition & 1 deletion blockstore_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package main
import (
"context"
"fmt"
"github.com/ipfs/bifrost-gateway/lib"
"io"
"log"
"math/rand"
"net/http"
"net/url"
"time"

"github.com/ipfs/bifrost-gateway/lib"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-libipfs/blocks"
Expand Down
8 changes: 7 additions & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC
return nil, err
}
} else {
gwAPI, err = lib.NewGraphGatewayBackend(bs.(lib.CarFetcher), lib.WithValueStore(routing))
// Sets up an exchange based on the given Block Store
exch, err := newExchange(bs)
if err != nil {
return nil, err
}

gwAPI, err = lib.NewGraphGatewayBackend(bs.(lib.CarFetcher), exch, lib.WithValueStore(routing))
if err != nil {
return nil, err
}
Expand Down
158 changes: 143 additions & 15 deletions lib/graph_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"go.uber.org/multierr"
"io"
"net/http"
)
Expand Down Expand Up @@ -70,14 +71,15 @@ func WithBlockstore(bs blockstore.Blockstore) GraphGatewayOption {
type GraphGatewayOption func(gwOptions *gwOptions) error

type GraphGateway struct {
fetcher CarFetcher
routing routing.ValueStore
namesys namesys.NameSystem
bstore blockstore.Blockstore
bsrv blockservice.BlockService
fetcher CarFetcher
blockFetcher exchange.Fetcher
routing routing.ValueStore
namesys namesys.NameSystem
bstore blockstore.Blockstore
bsrv blockservice.BlockService
}

func NewGraphGatewayBackend(f CarFetcher, opts ...GraphGatewayOption) (*GraphGateway, error) {
func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts ...GraphGatewayOption) (*GraphGateway, error) {
var compiledOptions gwOptions
for _, o := range opts {
if err := o(&compiledOptions); err != nil {
Expand Down Expand Up @@ -118,10 +120,11 @@ func NewGraphGatewayBackend(f CarFetcher, opts ...GraphGatewayOption) (*GraphGat
}

return &GraphGateway{
fetcher: f,
routing: vs,
namesys: ns,
bstore: bs,
fetcher: f,
blockFetcher: blockFetcher,
routing: vs,
namesys: ns,
bstore: bs,
}, nil
}

Expand All @@ -141,17 +144,21 @@ func (api *GraphGateway) loadRequestIntoMemoryBlockstoreAndBlocksGateway(ctx con
return nil, err
}
bstore := blockstore.NewBlockstore(ds)
exch := newInboundBlockExchange()

doneWithFetcher := make(chan error, 1)
carFetchingExch := newInboundBlockExchange()
doneWithFetcher := make(chan struct{}, 1)
exch := &handoffExchange{
startingExchange: carFetchingExch,
followupExchange: &blockFetcherExchWrapper{api.blockFetcher},
handoffCh: doneWithFetcher,
}

go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered fetcher error", r)
}
}()
doneWithFetcher <- api.fetcher.Fetch(ctx, path, func(resource string, reader io.Reader) error {
err := api.fetcher.Fetch(ctx, path, func(resource string, reader io.Reader) error {
cr, err := car.NewCarReader(reader)
if err != nil {
return err
Expand All @@ -167,11 +174,16 @@ func (api *GraphGateway) loadRequestIntoMemoryBlockstoreAndBlocksGateway(ctx con
if err := bstore.Put(ctx, blk); err != nil {
return err
}
if err := exch.NotifyNewBlocks(ctx, blk); err != nil {
if err := carFetchingExch.NotifyNewBlocks(ctx, blk); err != nil {
return err
}
}
})
if err != nil {
goLog.Error(err)
}
doneWithFetcher <- struct{}{}
close(doneWithFetcher)
}()

bserv := blockservice.New(bstore, exch)
Expand Down Expand Up @@ -352,3 +364,119 @@ func (i *inboundBlockExchange) Close() error {
}

var _ exchange.Interface = (*inboundBlockExchange)(nil)

type handoffExchange struct {
startingExchange, followupExchange exchange.Interface
handoffCh <-chan struct{}
}

func (f *handoffExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blkCh, err := f.startingExchange.GetBlocks(ctx, []cid.Cid{c})
if err != nil {
return nil, err
}
blk, ok := <-blkCh
if ok {
return blk, nil
}

select {
case <-f.handoffCh:
goLog.Infof("needed to use use a backup fetcher for cid %s", c)
return f.followupExchange.GetBlock(ctx, c)
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (f *handoffExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
blkCh, err := f.startingExchange.GetBlocks(ctx, cids)
if err != nil {
return nil, err
}

retCh := make(chan blocks.Block)

go func() {
cs := cid.NewSet()
for cs.Len() < len(cids) {
blk, ok := <-blkCh
if !ok {
break
}
select {
case retCh <- blk:
cs.Add(blk.Cid())
case <-ctx.Done():
}
}

for cs.Len() < len(cids) {
select {
case <-ctx.Done():
return
case <-f.handoffCh:
var newCidArr []cid.Cid
for _, c := range cids {
if !cs.Has(c) {
newCidArr = append(newCidArr, c)
}
}
goLog.Infof("needed to use use a backup fetcher for cids %v", newCidArr)
fch, err := f.followupExchange.GetBlocks(ctx, newCidArr)
if err != nil {
goLog.Error(fmt.Errorf("error getting blocks from followup exchange %w", err))
return
}
for cs.Len() < len(cids) {
select {
case blk := <-fch:
select {
case retCh <- blk:
cs.Add(blk.Cid())
case <-ctx.Done():
return
}
}
}
}
}
}()
return retCh, nil
}

func (f *handoffExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
err1 := f.startingExchange.NotifyNewBlocks(ctx, blocks...)
err2 := f.followupExchange.NotifyNewBlocks(ctx, blocks...)
return multierr.Combine(err1, err2)
}

func (f *handoffExchange) Close() error {
err1 := f.startingExchange.Close()
err2 := f.followupExchange.Close()
return multierr.Combine(err1, err2)
}

var _ exchange.Interface = (*handoffExchange)(nil)

type blockFetcherExchWrapper struct {
f exchange.Fetcher
}

func (b *blockFetcherExchWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
return b.f.GetBlock(ctx, c)
}

func (b *blockFetcherExchWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
return b.f.GetBlocks(ctx, cids)
}

func (b *blockFetcherExchWrapper) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
return nil
}

func (b *blockFetcherExchWrapper) Close() error {
return nil
}

var _ exchange.Interface = (*blockFetcherExchWrapper)(nil)

0 comments on commit d8b4d4e

Please sign in to comment.