From d8b4d4e577f9e5e2960a0b5d8da9543913c18962 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 24 Mar 2023 07:13:02 -0400 Subject: [PATCH] feat(graph-gateway): add some (hacky) support for fetching blocks as well as CARs --- blockstore_proxy.go | 2 +- handlers.go | 8 ++- lib/graph_gateway.go | 158 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 151 insertions(+), 17 deletions(-) diff --git a/blockstore_proxy.go b/blockstore_proxy.go index 0c12c3e..5f17450 100644 --- a/blockstore_proxy.go +++ b/blockstore_proxy.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "github.com/ipfs/bifrost-gateway/lib" "io" "log" "math/rand" @@ -11,6 +10,7 @@ import ( "net/url" "time" + "github.com/ipfs/bifrost-gateway/lib" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-libipfs/blocks" diff --git a/handlers.go b/handlers.go index 9e82978..23136b1 100644 --- a/handlers.go +++ b/handlers.go @@ -72,7 +72,13 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC return nil, err } } else { - gwAPI, err = lib.NewGraphGatewayBackend(bs.(lib.CarFetcher), lib.WithValueStore(routing)) + // Sets up an exchange based on the given Block Store + exch, err := newExchange(bs) + if err != nil { + return nil, err + } + + gwAPI, err = lib.NewGraphGatewayBackend(bs.(lib.CarFetcher), exch, lib.WithValueStore(routing)) if err != nil { return nil, err } diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index 3f344b9..5edda8b 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -24,6 +24,7 @@ import ( "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" + "go.uber.org/multierr" "io" "net/http" ) @@ -70,14 +71,15 @@ func WithBlockstore(bs blockstore.Blockstore) GraphGatewayOption { type GraphGatewayOption func(gwOptions *gwOptions) error type GraphGateway struct { - fetcher CarFetcher - routing routing.ValueStore - namesys namesys.NameSystem - bstore blockstore.Blockstore - bsrv blockservice.BlockService + fetcher CarFetcher + blockFetcher exchange.Fetcher + routing routing.ValueStore + namesys namesys.NameSystem + bstore blockstore.Blockstore + bsrv blockservice.BlockService } -func NewGraphGatewayBackend(f CarFetcher, opts ...GraphGatewayOption) (*GraphGateway, error) { +func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts ...GraphGatewayOption) (*GraphGateway, error) { var compiledOptions gwOptions for _, o := range opts { if err := o(&compiledOptions); err != nil { @@ -118,10 +120,11 @@ func NewGraphGatewayBackend(f CarFetcher, opts ...GraphGatewayOption) (*GraphGat } return &GraphGateway{ - fetcher: f, - routing: vs, - namesys: ns, - bstore: bs, + fetcher: f, + blockFetcher: blockFetcher, + routing: vs, + namesys: ns, + bstore: bs, }, nil } @@ -141,9 +144,13 @@ func (api *GraphGateway) loadRequestIntoMemoryBlockstoreAndBlocksGateway(ctx con return nil, err } bstore := blockstore.NewBlockstore(ds) - exch := newInboundBlockExchange() - - doneWithFetcher := make(chan error, 1) + carFetchingExch := newInboundBlockExchange() + doneWithFetcher := make(chan struct{}, 1) + exch := &handoffExchange{ + startingExchange: carFetchingExch, + followupExchange: &blockFetcherExchWrapper{api.blockFetcher}, + handoffCh: doneWithFetcher, + } go func() { defer func() { @@ -151,7 +158,7 @@ func (api *GraphGateway) loadRequestIntoMemoryBlockstoreAndBlocksGateway(ctx con fmt.Println("Recovered fetcher error", r) } }() - doneWithFetcher <- api.fetcher.Fetch(ctx, path, func(resource string, reader io.Reader) error { + err := api.fetcher.Fetch(ctx, path, func(resource string, reader io.Reader) error { cr, err := car.NewCarReader(reader) if err != nil { return err @@ -167,11 +174,16 @@ func (api *GraphGateway) loadRequestIntoMemoryBlockstoreAndBlocksGateway(ctx con if err := bstore.Put(ctx, blk); err != nil { return err } - if err := exch.NotifyNewBlocks(ctx, blk); err != nil { + if err := carFetchingExch.NotifyNewBlocks(ctx, blk); err != nil { return err } } }) + if err != nil { + goLog.Error(err) + } + doneWithFetcher <- struct{}{} + close(doneWithFetcher) }() bserv := blockservice.New(bstore, exch) @@ -352,3 +364,119 @@ func (i *inboundBlockExchange) Close() error { } var _ exchange.Interface = (*inboundBlockExchange)(nil) + +type handoffExchange struct { + startingExchange, followupExchange exchange.Interface + handoffCh <-chan struct{} +} + +func (f *handoffExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blkCh, err := f.startingExchange.GetBlocks(ctx, []cid.Cid{c}) + if err != nil { + return nil, err + } + blk, ok := <-blkCh + if ok { + return blk, nil + } + + select { + case <-f.handoffCh: + goLog.Infof("needed to use use a backup fetcher for cid %s", c) + return f.followupExchange.GetBlock(ctx, c) + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (f *handoffExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + blkCh, err := f.startingExchange.GetBlocks(ctx, cids) + if err != nil { + return nil, err + } + + retCh := make(chan blocks.Block) + + go func() { + cs := cid.NewSet() + for cs.Len() < len(cids) { + blk, ok := <-blkCh + if !ok { + break + } + select { + case retCh <- blk: + cs.Add(blk.Cid()) + case <-ctx.Done(): + } + } + + for cs.Len() < len(cids) { + select { + case <-ctx.Done(): + return + case <-f.handoffCh: + var newCidArr []cid.Cid + for _, c := range cids { + if !cs.Has(c) { + newCidArr = append(newCidArr, c) + } + } + goLog.Infof("needed to use use a backup fetcher for cids %v", newCidArr) + fch, err := f.followupExchange.GetBlocks(ctx, newCidArr) + if err != nil { + goLog.Error(fmt.Errorf("error getting blocks from followup exchange %w", err)) + return + } + for cs.Len() < len(cids) { + select { + case blk := <-fch: + select { + case retCh <- blk: + cs.Add(blk.Cid()) + case <-ctx.Done(): + return + } + } + } + } + } + }() + return retCh, nil +} + +func (f *handoffExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + err1 := f.startingExchange.NotifyNewBlocks(ctx, blocks...) + err2 := f.followupExchange.NotifyNewBlocks(ctx, blocks...) + return multierr.Combine(err1, err2) +} + +func (f *handoffExchange) Close() error { + err1 := f.startingExchange.Close() + err2 := f.followupExchange.Close() + return multierr.Combine(err1, err2) +} + +var _ exchange.Interface = (*handoffExchange)(nil) + +type blockFetcherExchWrapper struct { + f exchange.Fetcher +} + +func (b *blockFetcherExchWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + return b.f.GetBlock(ctx, c) +} + +func (b *blockFetcherExchWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + return b.f.GetBlocks(ctx, cids) +} + +func (b *blockFetcherExchWrapper) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + return nil +} + +func (b *blockFetcherExchWrapper) Close() error { + return nil +} + +var _ exchange.Interface = (*blockFetcherExchWrapper)(nil)