From e3d3206eb8a73bb8fade789a3665e38ce0263573 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Thu, 7 Mar 2024 14:30:03 +0100 Subject: [PATCH] wip: initial import of backend gateway --- examples/go.mod | 7 + examples/go.sum | 17 + gateway/backend_blocks.go | 20 +- gateway/backend_remote.go | 136 ++++ gateway/backend_remote_2.go | 1227 +++++++++++++++++++++++++++++++ gateway/backend_remote_files.go | 533 ++++++++++++++ go.mod | 9 +- go.sum | 16 + 8 files changed, 1955 insertions(+), 10 deletions(-) create mode 100644 gateway/backend_remote_2.go create mode 100644 gateway/backend_remote_files.go diff --git a/examples/go.mod b/examples/go.mod index 906c0dbeb..db42aed6f 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -60,7 +60,11 @@ require ( github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect + github.com/ipfs/go-blockservice v0.5.0 // indirect + github.com/ipfs/go-ipfs-blockstore v1.3.0 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect + github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect + github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect @@ -69,9 +73,12 @@ require ( github.com/ipfs/go-ipld-legacy v0.2.1 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect + github.com/ipfs/go-merkledag v0.11.0 // indirect github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipfs/go-peertaskqueue v0.8.1 // indirect github.com/ipfs/go-unixfsnode v1.9.0 // indirect + github.com/ipfs/go-verifcid v0.0.2 // indirect + github.com/ipld/go-car v0.6.2 // indirect github.com/ipld/go-codec-dagpb v1.6.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index 8b731f057..9f2df360d 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -132,6 +132,7 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 h1:dHLYa5D8/Ta0aLR2XcPsrkpAgGeFs6thhMcQK0oQ0n8= github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -161,28 +162,37 @@ github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= +github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs= github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM= github.com/ipfs/go-blockservice v0.5.0 h1:B2mwhhhVQl2ntW2EIpaWPwSCxSuqr5fFA93Ms4bYLEY= +github.com/ipfs/go-blockservice v0.5.0/go.mod h1:W6brZ5k20AehbmERplmERn8o2Ni3ZZubvAxaIUeaT6w= +github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog= github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= +github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ipfs-blockstore v1.3.0 h1:m2EXaWgwTzAfsmt5UdJ7Is6l4gJcaM/A12XwJyvYvMM= +github.com/ipfs/go-ipfs-blockstore v1.3.0/go.mod h1:KgtZyc9fq+P2xJUiCAzbRdhhqJHvsw8u2Dlqy2MyRTE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-chunker v0.0.5 h1:ojCf7HV/m+uS2vhUGWcogIIxiO5ubl5O57Q7NapWLY8= +github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q= +github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU= github.com/ipfs/go-ipfs-exchange-interface v0.2.0 h1:8lMSJmKogZYNo2jjhUs0izT+dck05pqUw4mWNW9Pw6Y= +github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y= github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA= github.com/ipfs/go-ipfs-pq v0.0.3 h1:YpoHVJB+jzK15mr/xsWC574tyDLkezVrDNeaalQBsTE= github.com/ipfs/go-ipfs-pq v0.0.3/go.mod h1:btNw5hsHBpRcSSgZtiNm/SLj5gYIZ18AKtv3kERkRb4= github.com/ipfs/go-ipfs-redirects-file v0.1.1 h1:Io++k0Vf/wK+tfnhEh63Yte1oQK5VGT2hIEYpD0Rzx8= github.com/ipfs/go-ipfs-redirects-file v0.1.1/go.mod h1:tAwRjCV0RjLTjH8DR/AU7VYvfQECg+lpUy2Mdzv7gyk= +github.com/ipfs/go-ipfs-routing v0.3.0 h1:9W/W3N+g+y4ZDeffSgqhgo7BsBSJwPMcyssET9OWevc= github.com/ipfs/go-ipfs-util v0.0.3 h1:2RFdGez6bu2ZlZdI+rWfIdbQb1KudQp3VGwPtdNCmE0= github.com/ipfs/go-ipfs-util v0.0.3/go.mod h1:LHzG1a0Ig4G+iZ26UUOMjHd+lfM84LZCrn17xAKWBvs= github.com/ipfs/go-ipld-cbor v0.1.0 h1:dx0nS0kILVivGhfWuB6dUpMa/LAwElHPw1yOGYopoYs= @@ -197,6 +207,7 @@ github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Ax github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= github.com/ipfs/go-merkledag v0.11.0 h1:DgzwK5hprESOzS4O1t/wi6JDpyVQdvm9Bs59N/jqfBY= +github.com/ipfs/go-merkledag v0.11.0/go.mod h1:Q4f/1ezvBiJV0YCIXvt51W/9/kqJGH4I1LsA7+djsM4= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= @@ -205,6 +216,9 @@ github.com/ipfs/go-unixfs v0.4.5 h1:wj8JhxvV1G6CD7swACwSKYa+NgtdWC1RUit+gFnymDU= github.com/ipfs/go-unixfsnode v1.9.0 h1:ubEhQhr22sPAKO2DNsyVBW7YB/zA8Zkif25aBvz8rc8= github.com/ipfs/go-unixfsnode v1.9.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= +github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= +github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc= +github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4= github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= @@ -234,6 +248,7 @@ github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZY github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0= github.com/koron/go-ssdp v0.0.4/go.mod h1:oDXq+E5IL5q0U8uSBcoAXzTzInwy5lEgC91HoKtbmZk= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -317,6 +332,7 @@ github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2 github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk= github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= +github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= @@ -676,6 +692,7 @@ google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7 google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/gateway/backend_blocks.go b/gateway/backend_blocks.go index 0cfe45a10..d8414183d 100644 --- a/gateway/backend_blocks.go +++ b/gateway/backend_blocks.go @@ -341,9 +341,17 @@ func (bb *BlocksBackend) GetCAR(ctx context.Context, p path.ImmutablePath, param unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) lsys.StorageReadOpener = blockOpener(ctx, blockGetter) + // First resolve the path since we always need to. + lastCid, remainder, err := pathResolver.ResolveToLastNode(ctx, p) + if err != nil { + // io.PipeWriter.CloseWithError always returns nil. + _ = w.CloseWithError(err) + return + } + // TODO: support selectors passed as request param: https://github.com/ipfs/kubo/issues/8769 // TODO: this is very slow if blocks are remote due to linear traversal. Do we need deterministic traversals here? - carWriteErr := walkGatewaySimpleSelector(ctx, p, params, &lsys, pathResolver) + carWriteErr := walkGatewaySimpleSelector(ctx, lastCid, remainder, params, &lsys) // io.PipeWriter.CloseWithError always returns nil. _ = w.CloseWithError(carWriteErr) @@ -353,19 +361,13 @@ func (bb *BlocksBackend) GetCAR(ctx context.Context, p path.ImmutablePath, param } // walkGatewaySimpleSelector walks the subgraph described by the path and terminal element parameters -func walkGatewaySimpleSelector(ctx context.Context, p path.ImmutablePath, params CarParams, lsys *ipld.LinkSystem, pathResolver resolver.Resolver) error { - // First resolve the path since we always need to. - lastCid, remainder, err := pathResolver.ResolveToLastNode(ctx, p) - if err != nil { - return err - } - +func walkGatewaySimpleSelector(ctx context.Context, lastCid cid.Cid, remainder []string, params CarParams, lsys *ipld.LinkSystem) error { lctx := ipld.LinkContext{Ctx: ctx} pathTerminalCidLink := cidlink.Link{Cid: lastCid} // If the scope is the block, now we only need to retrieve the root block of the last element of the path. if params.Scope == DagScopeBlock { - _, err = lsys.LoadRaw(lctx, pathTerminalCidLink) + _, err := lsys.LoadRaw(lctx, pathTerminalCidLink) return err } diff --git a/gateway/backend_remote.go b/gateway/backend_remote.go index fa0ed3c2c..10e661589 100644 --- a/gateway/backend_remote.go +++ b/gateway/backend_remote.go @@ -1 +1,137 @@ package gateway + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/ipfs/boxo/verifcid" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-unixfsnode" + "github.com/ipld/go-car" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multihash" +) + +type getBlock func(ctx context.Context, cid cid.Cid) (blocks.Block, error) + +var ErrNilBlock = errors.New("received a nil block with no error") + +func carToLinearBlockGetter(ctx context.Context, reader io.Reader, metrics *GraphGatewayMetrics) (getBlock, error) { + cr, err := car.NewCarReaderWithOptions(reader, car.WithErrorOnEmptyRoots(false)) + if err != nil { + return nil, err + } + + cbCtx, cncl := context.WithCancel(ctx) + + type blockRead struct { + block blocks.Block + err error + } + + blkCh := make(chan blockRead, 1) + go func() { + defer cncl() + defer close(blkCh) + for { + blk, rdErr := cr.Next() + select { + case blkCh <- blockRead{blk, rdErr}: + if rdErr != nil { + cncl() + } + case <-cbCtx.Done(): + return + } + } + }() + + isFirstBlock := true + mx := sync.Mutex{} + + return func(ctx context.Context, c cid.Cid) (blocks.Block, error) { + mx.Lock() + defer mx.Unlock() + if err := verifcid.ValidateCid(verifcid.DefaultAllowlist, c); err != nil { + return nil, err + } + + isId, bdata := extractIdentityMultihashCIDContents(c) + if isId { + return blocks.NewBlockWithCid(bdata, c) + } + + // initially set a higher timeout here so that if there's an initial timeout error we get it from the car reader. + var t *time.Timer + if isFirstBlock { + t = time.NewTimer(getBlockTimeout * 2) + } else { + t = time.NewTimer(getBlockTimeout) + } + var blkRead blockRead + var ok bool + select { + case blkRead, ok = <-blkCh: + if !t.Stop() { + <-t.C + } + t.Reset(getBlockTimeout) + case <-t.C: + return nil, ErrGatewayTimeout + } + if !ok || blkRead.err != nil { + if !ok || errors.Is(blkRead.err, io.EOF) { + return nil, io.ErrUnexpectedEOF + } + return nil, GatewayError(blkRead.err) + } + if blkRead.block != nil { + metrics.carBlocksFetchedMetric.Inc() + if !blkRead.block.Cid().Equals(c) { + return nil, errors.New(fmt.Sprintf("received block with cid %s, expected %s", blkRead.block.Cid(), c)) + } + return blkRead.block, nil + } + return nil, ErrNilBlock + }, nil +} + +// extractIdentityMultihashCIDContents will check if a given CID has an identity multihash and if so return true and +// the bytes encoded in the digest, otherwise will return false. +// Taken from https://github.com/ipfs/boxo/blob/b96767cc0971ca279feb36e7844e527a774309ab/blockstore/idstore.go#L30 +func extractIdentityMultihashCIDContents(k cid.Cid) (bool, []byte) { + // Pre-check by calling Prefix(), this much faster than extracting the hash. + if k.Prefix().MhType != multihash.IDENTITY { + return false, nil + } + + dmh, err := multihash.Decode(k.Hash()) + if err != nil || dmh.Code != multihash.IDENTITY { + return false, nil + } + return true, dmh.Digest +} + +func getLinksystem(fn getBlock) *ipld.LinkSystem { + lsys := cidlink.DefaultLinkSystem() + lsys.StorageReadOpener = func(linkContext linking.LinkContext, link datamodel.Link) (io.Reader, error) { + c := link.(cidlink.Link).Cid + blk, err := fn(linkContext.Ctx, c) + if err != nil { + return nil, err + } + return bytes.NewReader(blk.RawData()), nil + } + lsys.TrustedStorage = true + unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) + return &lsys +} diff --git a/gateway/backend_remote_2.go b/gateway/backend_remote_2.go new file mode 100644 index 000000000..3b0dd1c7b --- /dev/null +++ b/gateway/backend_remote_2.go @@ -0,0 +1,1227 @@ +package gateway + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/ipfs/boxo/files" + "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/boxo/ipld/unixfs" + "github.com/ipfs/boxo/path" + "github.com/ipfs/boxo/path/resolver" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-unixfsnode" + ufsData "github.com/ipfs/go-unixfsnode/data" + "github.com/ipfs/go-unixfsnode/hamt" + ufsiter "github.com/ipfs/go-unixfsnode/iter" + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/storage" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/schema" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/multiformats/go-multicodec" + "github.com/prometheus/client_golang/prometheus" +) + +const GetBlockTimeout = time.Second * 60 + +type DataCallback = func(resource string, reader io.Reader) error + +var ErrFetcherUnexpectedEOF = fmt.Errorf("failed to fetch IPLD data") + +type CarFetcher interface { + Fetch(ctx context.Context, path string, cb DataCallback) error +} + +type GraphGateway struct { + baseBackend + + fetcher CarFetcher + + pc traversal.LinkTargetNodePrototypeChooser + + metrics *GraphGatewayMetrics +} + +type GraphGatewayMetrics struct { + contextAlreadyCancelledMetric prometheus.Counter + carFetchAttemptMetric prometheus.Counter + carBlocksFetchedMetric prometheus.Counter + carParamsMetric *prometheus.CounterVec + + bytesRangeStartMetric prometheus.Histogram + bytesRangeSizeMetric prometheus.Histogram +} + +func NewGraphGatewayBackend(f CarFetcher, opts ...BlocksBackendOption) (*GraphGateway, error) { + var compiledOptions blocksBackendOptions + for _, o := range opts { + if err := o(&compiledOptions); err != nil { + return nil, err + } + } + + // Setup the [baseBackend] which takes care of some shared functionality, such + // as resolving /ipns links. + baseBackend, err := newBaseBackend(compiledOptions.vs, compiledOptions.ns) + if err != nil { + return nil, err + } + + var promReg prometheus.Registerer = prometheus.NewRegistry() + + return &GraphGateway{ + baseBackend: baseBackend, + fetcher: f, + metrics: registerGraphGatewayMetrics(promReg), + pc: dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) { + if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok { + return tlnkNd.LinkTargetNodePrototype(), nil + } + return basicnode.Prototype.Any, nil + }), + }, nil +} + +func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGatewayMetrics { + // How many CAR Fetch attempts we had? Need this to calculate % of various graph request types. + // We only count attempts here, because success/failure with/without retries are provided by caboose: + // - ipfs_caboose_fetch_duration_car_success_count + // - ipfs_caboose_fetch_duration_car_failure_count + // - ipfs_caboose_fetch_duration_car_peer_success_count + // - ipfs_caboose_fetch_duration_car_peer_failure_count + carFetchAttemptMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "gw_graph_backend", + Name: "car_fetch_attempts", + Help: "The number of times a CAR fetch was attempted by IPFSBackend.", + }) + registerer.MustRegister(carFetchAttemptMetric) + + contextAlreadyCancelledMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "gw_graph_backend", + Name: "car_fetch_context_already_cancelled", + Help: "The number of times context is already cancelled when a CAR fetch was attempted by IPFSBackend.", + }) + registerer.MustRegister(contextAlreadyCancelledMetric) + + // How many blocks were read via CARs? + // Need this as a baseline to reason about error ratio vs raw_block_recovery_attempts. + carBlocksFetchedMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "gw_graph_backend", + Name: "car_blocks_fetched", + Help: "The number of blocks successfully read via CAR fetch.", + }) + registerer.MustRegister(carBlocksFetchedMetric) + + carParamsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "gw_graph_backend", + Name: "car_fetch_params", + Help: "How many times specific CAR parameter was used during CAR data fetch.", + }, []string{"dagScope", "entityRanges"}) // we use 'ranges' instead of 'bytes' here because we only count the number of ranges present + registerer.MustRegister(carParamsMetric) + + bytesRangeStartMetric := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "ipfs", + Subsystem: "gw_graph_backend", + Name: "range_request_start", + Help: "Tracks where did the range request start.", + Buckets: prometheus.ExponentialBuckets(1024, 2, 24), // 1024 bytes to 8 GiB + }) + registerer.MustRegister(bytesRangeStartMetric) + + bytesRangeSizeMetric := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "ipfs", + Subsystem: "gw_graph_backend", + Name: "range_request_size", + Help: "Tracks the size of range requests.", + Buckets: prometheus.ExponentialBuckets(256*1024, 2, 10), // From 256KiB to 100MiB + }) + registerer.MustRegister(bytesRangeSizeMetric) + + return &GraphGatewayMetrics{ + contextAlreadyCancelledMetric, + carFetchAttemptMetric, + carBlocksFetchedMetric, + carParamsMetric, + bytesRangeStartMetric, + bytesRangeSizeMetric, + } +} + +func (api *GraphGateway) fetchCAR(ctx context.Context, path path.ImmutablePath, params CarParams, cb DataCallback) error { + urlWithoutHost := contentPathToCarUrl(path, params).String() + + api.metrics.carFetchAttemptMetric.Inc() + var ipldError error + fetchErr := api.fetcher.Fetch(ctx, urlWithoutHost, func(resource string, reader io.Reader) error { + return checkRetryableError(&ipldError, func() error { + return cb(resource, reader) + }) + }) + + if ipldError != nil { + fetchErr = ipldError + } else if fetchErr != nil { + fetchErr = GatewayError(fetchErr) + } + + return fetchErr +} + +// resolvePathWithRootsAndBlock takes a path and linksystem and returns the set of non-terminal cids, the terminal cid, the remainder, and the block corresponding to the terminal cid +func resolvePathWithRootsAndBlock(ctx context.Context, fpath path.ImmutablePath, unixFSLsys *ipld.LinkSystem) ([]cid.Cid, cid.Cid, []string, blocks.Block, error) { + pathRootCids, terminalCid, remainder, terminalBlk, err := resolvePathToLastWithRoots(ctx, fpath, unixFSLsys) + if err != nil { + return nil, cid.Undef, nil, nil, err + } + + if terminalBlk == nil { + lctx := ipld.LinkContext{Ctx: ctx} + lnk := cidlink.Link{Cid: terminalCid} + blockData, err := unixFSLsys.LoadRaw(lctx, lnk) + if err != nil { + return nil, cid.Undef, nil, nil, err + } + terminalBlk, err = blocks.NewBlockWithCid(blockData, terminalCid) + if err != nil { + return nil, cid.Undef, nil, nil, err + } + } + + return pathRootCids, terminalCid, remainder, terminalBlk, err +} + +// resolvePathToLastWithRoots takes a path and linksystem and returns the set of non-terminal cids, the terminal cid, +// the remainder pathing, the last block loaded, and the last node loaded. +// +// Note: the block returned will be nil if the terminal element is a link or the path is just a CID +func resolvePathToLastWithRoots(ctx context.Context, fpath path.ImmutablePath, unixFSLsys *ipld.LinkSystem) ([]cid.Cid, cid.Cid, []string, blocks.Block, error) { + c, p := fpath.RootCid(), fpath.Segments()[2:] + if len(p) == 0 { + return nil, c, nil, nil, nil + } + + unixFSLsys.NodeReifier = unixfsnode.Reify + defer func() { unixFSLsys.NodeReifier = nil }() + + var cids []cid.Cid + cids = append(cids, c) + + pc := dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) { + if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok { + return tlnkNd.LinkTargetNodePrototype(), nil + } + return basicnode.Prototype.Any, nil + }) + + loadNode := func(ctx context.Context, c cid.Cid) (blocks.Block, ipld.Node, error) { + lctx := ipld.LinkContext{Ctx: ctx} + rootLnk := cidlink.Link{Cid: c} + np, err := pc(rootLnk, lctx) + if err != nil { + return nil, nil, err + } + nd, blockData, err := unixFSLsys.LoadPlusRaw(lctx, rootLnk, np) + if err != nil { + return nil, nil, err + } + blk, err := blocks.NewBlockWithCid(blockData, c) + if err != nil { + return nil, nil, err + } + return blk, nd, nil + } + + nextBlk, nextNd, err := loadNode(ctx, c) + if err != nil { + return nil, cid.Undef, nil, nil, err + } + + depth := 0 + for i, elem := range p { + nextNd, err = nextNd.LookupBySegment(ipld.ParsePathSegment(elem)) + if err != nil { + return nil, cid.Undef, nil, nil, err + } + if nextNd.Kind() == ipld.Kind_Link { + depth = 0 + lnk, err := nextNd.AsLink() + if err != nil { + return nil, cid.Undef, nil, nil, err + } + cidLnk, ok := lnk.(cidlink.Link) + if !ok { + return nil, cid.Undef, nil, nil, fmt.Errorf("link is not a cidlink: %v", cidLnk) + } + cids = append(cids, cidLnk.Cid) + + if i < len(p)-1 { + nextBlk, nextNd, err = loadNode(ctx, cidLnk.Cid) + if err != nil { + return nil, cid.Undef, nil, nil, err + } + } + } else { + depth++ + } + } + + // if last node is not a link, just return it's cid, add path to remainder and return + if nextNd.Kind() != ipld.Kind_Link { + // return the cid and the remainder of the path + return cids[:len(cids)-1], cids[len(cids)-1], p[len(p)-depth:], nextBlk, nil + } + + return cids[:len(cids)-1], cids[len(cids)-1], nil, nil, nil +} + +func contentMetadataFromRootsAndRemainder(p path.Path, pathRoots []cid.Cid, terminalCid cid.Cid, remainder []string) ContentPathMetadata { + var rootCid cid.Cid + if len(pathRoots) > 0 { + rootCid = pathRoots[0] + } else { + rootCid = terminalCid + } + + md := ContentPathMetadata{ + PathSegmentRoots: pathRoots, + LastSegment: path.FromCid(rootCid), + LastSegmentRemainder: remainder, + } + return md +} + +var errNotUnixFS = fmt.Errorf("data was not unixfs") + +func (api *GraphGateway) Get(ctx context.Context, path path.ImmutablePath, byteRanges ...ByteRange) (ContentPathMetadata, *GetResponse, error) { + rangeCount := len(byteRanges) + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": strconv.Itoa(rangeCount)}).Inc() + + carParams := CarParams{Scope: DagScopeEntity} + + // fetch CAR with &bytes= to get minimal set of blocks for the request + // Note: majority of requests have 0 or max 1 ranges. if there are more ranges than one, + // that is a niche edge cache we don't prefetch as CAR and use fallback blockstore instead. + if rangeCount > 0 { + r := byteRanges[0] + carParams.Range = &DagByteRange{ + From: int64(r.From), + } + + // TODO: move to boxo or to loadRequestIntoSharedBlockstoreAndBlocksGateway after we pass params in a humane way + api.metrics.bytesRangeStartMetric.Observe(float64(r.From)) + + if r.To != nil { + carParams.Range.To = r.To + + // TODO: move to boxo or to loadRequestIntoSharedBlockstoreAndBlocksGateway after we pass params in a humane way + api.metrics.bytesRangeSizeMetric.Observe(float64(*r.To) - float64(r.From) + 1) + } + } + + md, terminalElem, err := fetchWithPartialRetries(ctx, path, carParams, loadTerminalEntity, api.metrics, api.fetchCAR) + if err != nil { + return ContentPathMetadata{}, nil, err + } + + var resp *GetResponse + + switch typedTerminalElem := terminalElem.(type) { + case *GetResponse: + resp = typedTerminalElem + case *backpressuredFile: + resp = NewGetResponseFromReader(typedTerminalElem, typedTerminalElem.size) + case *backpressuredHAMTDirIterNoRecursion: + ch := make(chan unixfs.LinkResult) + go func() { + defer close(ch) + for typedTerminalElem.Next() { + l := typedTerminalElem.Link() + select { + case ch <- l: + case <-ctx.Done(): + return + } + } + if err := typedTerminalElem.Err(); err != nil { + select { + case ch <- unixfs.LinkResult{Err: err}: + case <-ctx.Done(): + return + } + } + }() + resp = NewGetResponseFromDirectoryListing(typedTerminalElem.dagSize, ch, nil) + default: + return ContentPathMetadata{}, nil, fmt.Errorf("invalid data type") + } + + return md, resp, nil + +} + +// loadTerminalEntity returns either a [*GetResponse], [*backpressuredFile], or [*backpressuredHAMTDirIterNoRecursion] +func loadTerminalEntity(ctx context.Context, c cid.Cid, blk blocks.Block, lsys *ipld.LinkSystem, params CarParams, getLsys lsysGetter) (interface{}, error) { + var err error + if lsys == nil { + lsys, err = getLsys(ctx, c, params) + if err != nil { + return nil, err + } + } + + lctx := ipld.LinkContext{Ctx: ctx} + + if c.Type() != uint64(multicodec.DagPb) { + var blockData []byte + + if blk != nil { + blockData = blk.RawData() + } else { + blockData, err = lsys.LoadRaw(lctx, cidlink.Link{Cid: c}) + if err != nil { + return nil, err + } + } + + f := files.NewBytesFile(blockData) + if params.Range != nil && params.Range.From != 0 { + if _, err := f.Seek(params.Range.From, io.SeekStart); err != nil { + return nil, err + } + } + + return NewGetResponseFromReader(f, int64(len(blockData))), nil + } + + blockData, pbn, ufsFieldData, fieldNum, err := loadUnixFSBase(ctx, c, blk, lsys) + if err != nil { + return nil, err + } + + switch fieldNum { + case ufsData.Data_Symlink: + if !ufsFieldData.FieldData().Exists() { + return nil, fmt.Errorf("invalid UnixFS symlink object") + } + lnkTarget := string(ufsFieldData.FieldData().Must().Bytes()) + f := NewGetResponseFromSymlink(files.NewLinkFile(lnkTarget, nil).(*files.Symlink), int64(len(lnkTarget))) + return f, nil + case ufsData.Data_Metadata: + return nil, fmt.Errorf("UnixFS Metadata unsupported") + case ufsData.Data_HAMTShard, ufsData.Data_Directory: + blk, err := blocks.NewBlockWithCid(blockData, c) + if err != nil { + return nil, fmt.Errorf("could not create block: %w", err) + } + dirRootNd, err := merkledag.ProtoNodeConverter(blk, pbn) + if err != nil { + return nil, fmt.Errorf("could not create dag-pb universal block from UnixFS directory root: %w", err) + } + pn, ok := dirRootNd.(*merkledag.ProtoNode) + if !ok { + return nil, fmt.Errorf("could not create dag-pb node from UnixFS directory root: %w", err) + } + + dirDagSize, err := pn.Size() + if err != nil { + return nil, fmt.Errorf("could not get cumulative size from dag-pb node: %w", err) + } + + switch fieldNum { + case ufsData.Data_Directory: + ch := make(chan unixfs.LinkResult, pbn.Links.Length()) + defer close(ch) + iter := pbn.Links.Iterator() + for !iter.Done() { + _, v := iter.Next() + c := v.Hash.Link().(cidlink.Link).Cid + var name string + var size int64 + if v.Name.Exists() { + name = v.Name.Must().String() + } + if v.Tsize.Exists() { + size = v.Tsize.Must().Int() + } + lnk := unixfs.LinkResult{Link: &format.Link{ + Name: name, + Size: uint64(size), + Cid: c, + }} + ch <- lnk + } + return NewGetResponseFromDirectoryListing(dirDagSize, ch, nil), nil + case ufsData.Data_HAMTShard: + dirNd, err := unixfsnode.Reify(lctx, pbn, lsys) + if err != nil { + return nil, fmt.Errorf("could not reify sharded directory: %w", err) + } + + d := &backpressuredHAMTDirIterNoRecursion{ + dagSize: dirDagSize, + linksItr: dirNd.MapIterator(), + dirCid: c, + lsys: lsys, + getLsys: getLsys, + ctx: ctx, + closed: make(chan error), + hasClosed: false, + } + return d, nil + default: + return nil, fmt.Errorf("not a basic or HAMT directory: should be unreachable") + } + case ufsData.Data_Raw, ufsData.Data_File: + nd, err := unixfsnode.Reify(lctx, pbn, lsys) + if err != nil { + return nil, err + } + + fnd, ok := nd.(datamodel.LargeBytesNode) + if !ok { + return nil, fmt.Errorf("could not process file since it did not present as large bytes") + } + f, err := fnd.AsLargeBytes() + if err != nil { + return nil, err + } + + fileSize, err := f.Seek(0, io.SeekEnd) + if err != nil { + return nil, fmt.Errorf("unable to get UnixFS file size: %w", err) + } + + from := int64(0) + var byteRange DagByteRange + if params.Range != nil { + from = params.Range.From + byteRange = *params.Range + } + _, err = f.Seek(from, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("unable to get reset UnixFS file reader: %w", err) + } + + return &backpressuredFile{ctx: ctx, fileCid: c, byteRange: byteRange, size: fileSize, f: f, getLsys: getLsys, closed: make(chan error)}, nil + default: + return nil, fmt.Errorf("unknown UnixFS field type") + } +} + +type backpressuredHAMTDirIterNoRecursion struct { + dagSize uint64 + linksItr ipld.MapIterator + dirCid cid.Cid + + lsys *ipld.LinkSystem + getLsys lsysGetter + ctx context.Context + + curLnk unixfs.LinkResult + curProcessed int + + closed chan error + hasClosed bool + err error +} + +func (it *backpressuredHAMTDirIterNoRecursion) AwaitClose() <-chan error { + return it.closed +} + +func (it *backpressuredHAMTDirIterNoRecursion) Link() unixfs.LinkResult { + return it.curLnk +} + +func (it *backpressuredHAMTDirIterNoRecursion) Next() bool { + defer func() { + if it.linksItr.Done() || it.err != nil { + if !it.hasClosed { + it.hasClosed = true + close(it.closed) + } + } + }() + + if it.err != nil { + return false + } + + iter := it.linksItr + if iter.Done() { + return false + } + + /* + Since there is no way to make a graph request for part of a HAMT during errors we can either fill in the HAMT with + block requests, or we can re-request the HAMT and skip over the parts we already have. + + Here we choose the latter, however in the event of a re-request we request the entity rather than the entire DAG as + a compromise between more requests and over-fetching data. + */ + + var err error + for { + if it.ctx.Err() != nil { + it.err = it.ctx.Err() + return false + } + + retry, processedErr := isRetryableError(err) + if !retry { + it.err = processedErr + return false + } + + var nd ipld.Node + if err != nil { + var lsys *ipld.LinkSystem + lsys, err = it.getLsys(it.ctx, it.dirCid, CarParams{Scope: DagScopeEntity}) + if err != nil { + continue + } + + _, pbn, ufsFieldData, _, ufsBaseErr := loadUnixFSBase(it.ctx, it.dirCid, nil, lsys) + if ufsBaseErr != nil { + err = ufsBaseErr + continue + } + + nd, err = hamt.NewUnixFSHAMTShard(it.ctx, pbn, ufsFieldData, lsys) + if err != nil { + err = fmt.Errorf("could not reify sharded directory: %w", err) + continue + } + + iter = nd.MapIterator() + for i := 0; i < it.curProcessed; i++ { + _, _, err = iter.Next() + if err != nil { + continue + } + } + + it.linksItr = iter + } + + var k, v ipld.Node + k, v, err = iter.Next() + if err != nil { + retry, processedErr = isRetryableError(err) + if retry { + err = processedErr + continue + } + it.err = processedErr + return false + } + + var name string + name, err = k.AsString() + if err != nil { + it.err = err + return false + } + + var lnk ipld.Link + lnk, err = v.AsLink() + if err != nil { + it.err = err + return false + } + + cl, ok := lnk.(cidlink.Link) + if !ok { + it.err = fmt.Errorf("link not a cidlink") + return false + } + + c := cl.Cid + + pbLnk, ok := v.(*ufsiter.IterLink) + if !ok { + it.err = fmt.Errorf("HAMT value is not a dag-pb link") + return false + } + + cumulativeDagSize := uint64(0) + if pbLnk.Substrate.Tsize.Exists() { + cumulativeDagSize = uint64(pbLnk.Substrate.Tsize.Must().Int()) + } + + it.curLnk = unixfs.LinkResult{ + Link: &format.Link{ + Name: name, + Size: cumulativeDagSize, + Cid: c, + }, + } + it.curProcessed++ + break + } + + return true +} + +func (it *backpressuredHAMTDirIterNoRecursion) Err() error { + return it.err +} + +var _ AwaitCloser = (*backpressuredHAMTDirIterNoRecursion)(nil) + +func (api *GraphGateway) GetAll(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, files.Node, error) { + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "all", "entityRanges": "0"}).Inc() + return fetchWithPartialRetries(ctx, path, CarParams{Scope: DagScopeAll}, loadTerminalUnixFSElementWithRecursiveDirectories, api.metrics, api.fetchCAR) +} + +type loadTerminalElement[T any] func(ctx context.Context, c cid.Cid, blk blocks.Block, lsys *ipld.LinkSystem, params CarParams, getLsys lsysGetter) (T, error) +type fetchCarFn = func(ctx context.Context, path path.ImmutablePath, params CarParams, cb DataCallback) error + +type terminalPathType[T any] struct { + resp T + err error + md ContentPathMetadata +} + +type nextReq struct { + c cid.Cid + params CarParams +} + +func fetchWithPartialRetries[T any](ctx context.Context, p path.ImmutablePath, initialParams CarParams, resolveTerminalElementFn loadTerminalElement[T], metrics *GraphGatewayMetrics, fetchCAR fetchCarFn) (ContentPathMetadata, T, error) { + var zeroReturnType T + + terminalPathElementCh := make(chan terminalPathType[T], 1) + + go func() { + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + hasSentAsyncData := false + var closeCh <-chan error + + sendRequest := make(chan nextReq, 1) + sendResponse := make(chan *ipld.LinkSystem, 1) + getLsys := func(ctx context.Context, c cid.Cid, params CarParams) (*ipld.LinkSystem, error) { + select { + case sendRequest <- nextReq{c: c, params: params}: + case <-ctx.Done(): + return nil, ctx.Err() + } + + select { + case lsys := <-sendResponse: + return lsys, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + params := initialParams + + err := fetchCAR(cctx, p, params, func(resource string, reader io.Reader) error { + gb, err := carToLinearBlockGetter(cctx, reader, metrics) + if err != nil { + return err + } + + lsys := getLinksystem(gb) + + if hasSentAsyncData { + _, _, _, _, err = resolvePathToLastWithRoots(cctx, p, lsys) + if err != nil { + return err + } + + select { + case sendResponse <- lsys: + case <-cctx.Done(): + return cctx.Err() + } + } else { + // First resolve the path since we always need to. + pathRootCids, terminalCid, remainder, terminalBlk, err := resolvePathWithRootsAndBlock(cctx, p, lsys) + if err != nil { + return err + } + md := contentMetadataFromRootsAndRemainder(p, pathRootCids, terminalCid, remainder) + + if len(remainder) > 0 { + terminalPathElementCh <- terminalPathType[T]{err: errNotUnixFS} + return nil + } + + if hasSentAsyncData { + select { + case sendResponse <- lsys: + case <-ctx.Done(): + return ctx.Err() + } + } + + nd, err := resolveTerminalElementFn(cctx, terminalCid, terminalBlk, lsys, params, getLsys) + if err != nil { + return err + } + + ndAc, ok := any(nd).(AwaitCloser) + if !ok { + terminalPathElementCh <- terminalPathType[T]{ + resp: nd, + md: md, + } + return nil + } + + hasSentAsyncData = true + terminalPathElementCh <- terminalPathType[T]{ + resp: nd, + md: md, + } + + closeCh = ndAc.AwaitClose() + } + + select { + case closeErr := <-closeCh: + return closeErr + case req := <-sendRequest: + // set path and params for next iteration + p = path.FromCid(req.c) + params = req.params + remainderUrl := contentPathToCarUrl(p, params).String() + return fmt.Errorf("received partial response, still need %s", remainderUrl) + case <-cctx.Done(): + return cctx.Err() + } + }) + + if !hasSentAsyncData && err != nil { + terminalPathElementCh <- terminalPathType[T]{err: err} + return + } + + if err != nil { + lsys := getLinksystem(func(ctx context.Context, cid cid.Cid) (blocks.Block, error) { + return nil, multierror.Append(ErrFetcherUnexpectedEOF, format.ErrNotFound{Cid: cid}) + }) + for { + select { + case <-closeCh: + return + case <-sendRequest: + case sendResponse <- lsys: + case <-cctx.Done(): + return + } + } + } + }() + + select { + case t := <-terminalPathElementCh: + if t.err != nil { + return ContentPathMetadata{}, zeroReturnType, t.err + } + return t.md, t.resp, nil + case <-ctx.Done(): + return ContentPathMetadata{}, zeroReturnType, ctx.Err() + } +} + +func (api *GraphGateway) GetBlock(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, files.File, error) { + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "block", "entityRanges": "0"}).Inc() + + var md ContentPathMetadata + var f files.File + // TODO: if path is `/ipfs/cid`, we should use ?format=raw + err := api.fetchCAR(ctx, p, CarParams{Scope: DagScopeBlock}, func(resource string, reader io.Reader) error { + gb, err := carToLinearBlockGetter(ctx, reader, api.metrics) + if err != nil { + return err + } + lsys := getLinksystem(gb) + + // First resolve the path since we always need to. + pathRoots, terminalCid, remainder, terminalBlk, err := resolvePathToLastWithRoots(ctx, p, lsys) + if err != nil { + return err + } + + var blockData []byte + if terminalBlk != nil { + blockData = terminalBlk.RawData() + } else { + lctx := ipld.LinkContext{Ctx: ctx} + lnk := cidlink.Link{Cid: terminalCid} + blockData, err = lsys.LoadRaw(lctx, lnk) + if err != nil { + return err + } + } + + md = contentMetadataFromRootsAndRemainder(p, pathRoots, terminalCid, remainder) + + f = files.NewBytesFile(blockData) + return nil + }) + + if err != nil { + return ContentPathMetadata{}, nil, err + } + + return md, f, nil +} + +func (api *GraphGateway) Head(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, *HeadResponse, error) { + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": "1"}).Inc() + + // TODO: we probably want to move this either to boxo, or at least to loadRequestIntoSharedBlockstoreAndBlocksGateway + api.metrics.bytesRangeStartMetric.Observe(0) + api.metrics.bytesRangeSizeMetric.Observe(3071) + + var md ContentPathMetadata + var n *HeadResponse + // TODO: fallback to dynamic fetches in case we haven't requested enough data + rangeTo := int64(3071) + err := api.fetchCAR(ctx, p, CarParams{Scope: DagScopeEntity, Range: &DagByteRange{From: 0, To: &rangeTo}}, func(resource string, reader io.Reader) error { + gb, err := carToLinearBlockGetter(ctx, reader, api.metrics) + if err != nil { + return err + } + lsys := getLinksystem(gb) + + // First resolve the path since we always need to. + pathRoots, terminalCid, remainder, terminalBlk, err := resolvePathWithRootsAndBlock(ctx, p, lsys) + if err != nil { + return err + } + + md = contentMetadataFromRootsAndRemainder(p, pathRoots, terminalCid, remainder) + + lctx := ipld.LinkContext{Ctx: ctx} + pathTerminalCidLink := cidlink.Link{Cid: terminalCid} + + // Load the block at the root of the terminal path element + dataBytes := terminalBlk.RawData() + + // It's not UnixFS if there is a remainder or it's not dag-pb + if len(remainder) > 0 || terminalCid.Type() != uint64(multicodec.DagPb) { + n = NewHeadResponseForFile(files.NewBytesFile(dataBytes), int64(len(dataBytes))) + return nil + } + + // Let's figure out if the terminal element is valid UnixFS and if so what kind + np, err := api.pc(pathTerminalCidLink, lctx) + if err != nil { + return err + } + + nodeDecoder, err := lsys.DecoderChooser(pathTerminalCidLink) + if err != nil { + return err + } + + nb := np.NewBuilder() + err = nodeDecoder(nb, bytes.NewReader(dataBytes)) + if err != nil { + return err + } + lastCidNode := nb.Build() + + if pbn, ok := lastCidNode.(dagpb.PBNode); !ok { + // This shouldn't be possible since we already checked for dag-pb usage + return fmt.Errorf("node was not go-codec-dagpb node") + } else if !pbn.FieldData().Exists() { + // If it's not valid UnixFS then just return the block bytes + n = NewHeadResponseForFile(files.NewBytesFile(dataBytes), int64(len(dataBytes))) + return nil + } else if unixfsFieldData, decodeErr := ufsData.DecodeUnixFSData(pbn.Data.Must().Bytes()); decodeErr != nil { + // If it's not valid UnixFS then just return the block bytes + n = NewHeadResponseForFile(files.NewBytesFile(dataBytes), int64(len(dataBytes))) + return nil + } else { + switch fieldNum := unixfsFieldData.FieldDataType().Int(); fieldNum { + case ufsData.Data_Directory, ufsData.Data_HAMTShard: + dirRootNd, err := merkledag.ProtoNodeConverter(terminalBlk, lastCidNode) + if err != nil { + return fmt.Errorf("could not create dag-pb universal block from UnixFS directory root: %w", err) + } + pn, ok := dirRootNd.(*merkledag.ProtoNode) + if !ok { + return fmt.Errorf("could not create dag-pb node from UnixFS directory root: %w", err) + } + + sz, err := pn.Size() + if err != nil { + return fmt.Errorf("could not get cumulative size from dag-pb node: %w", err) + } + + n = NewHeadResponseForDirectory(int64(sz)) + return nil + case ufsData.Data_Symlink: + fd := unixfsFieldData.FieldData() + if fd.Exists() { + n = NewHeadResponseForSymlink(int64(len(fd.Must().Bytes()))) + return nil + } + // If there is no target then it's invalid so just return the block + NewHeadResponseForFile(files.NewBytesFile(dataBytes), int64(len(dataBytes))) + return nil + case ufsData.Data_Metadata: + n = NewHeadResponseForFile(files.NewBytesFile(dataBytes), int64(len(dataBytes))) + return nil + case ufsData.Data_Raw, ufsData.Data_File: + ufsNode, err := unixfsnode.Reify(lctx, pbn, lsys) + if err != nil { + return err + } + fileNode, ok := ufsNode.(datamodel.LargeBytesNode) + if !ok { + return fmt.Errorf("data not a large bytes node despite being UnixFS bytes") + } + f, err := fileNode.AsLargeBytes() + if err != nil { + return err + } + + fileSize, err := f.Seek(0, io.SeekEnd) + if err != nil { + return fmt.Errorf("unable to get UnixFS file size: %w", err) + } + _, err = f.Seek(0, io.SeekStart) + if err != nil { + return fmt.Errorf("unable to get reset UnixFS file reader: %w", err) + } + + out, err := io.ReadAll(io.LimitReader(f, 3072)) + if errors.Is(err, io.EOF) { + n = NewHeadResponseForFile(files.NewBytesFile(out), fileSize) + return nil + } + return err + } + } + return nil + }) + + if err != nil { + return ContentPathMetadata{}, nil, err + } + + return md, n, nil +} + +func (api *GraphGateway) ResolvePath(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, error) { + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "block", "entityRanges": "0"}).Inc() + + var md ContentPathMetadata + err := api.fetchCAR(ctx, p, CarParams{Scope: DagScopeBlock}, func(resource string, reader io.Reader) error { + gb, err := carToLinearBlockGetter(ctx, reader, api.metrics) + if err != nil { + return err + } + lsys := getLinksystem(gb) + + // First resolve the path since we always need to. + pathRoots, terminalCid, remainder, _, err := resolvePathToLastWithRoots(ctx, p, lsys) + if err != nil { + return err + } + + md = contentMetadataFromRootsAndRemainder(p, pathRoots, terminalCid, remainder) + + return nil + }) + + if err != nil { + return ContentPathMetadata{}, err + } + + return md, nil +} + +func (api *GraphGateway) GetCAR(ctx context.Context, p path.ImmutablePath, params CarParams) (ContentPathMetadata, io.ReadCloser, error) { + numRanges := "0" + if params.Range != nil { + numRanges = "1" + } + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": string(params.Scope), "entityRanges": numRanges}).Inc() + rootCid, err := getRootCid(p) + if err != nil { + return ContentPathMetadata{}, nil, err + } + + switch params.Order { + case DagOrderUnspecified, DagOrderUnknown, DagOrderDFS: + default: + return ContentPathMetadata{}, nil, fmt.Errorf("unsupported dag order %q", params.Order) + } + + r, w := io.Pipe() + go func() { + numBlocksSent := 0 + var cw storage.WritableCar + var blockBuffer []blocks.Block + err = api.fetchCAR(ctx, p, params, func(resource string, reader io.Reader) error { + numBlocksThisCall := 0 + gb, err := carToLinearBlockGetter(ctx, reader, api.metrics) + if err != nil { + return err + } + teeBlock := func(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blk, err := gb(ctx, c) + if err != nil { + return nil, err + } + if numBlocksThisCall >= numBlocksSent { + if cw == nil { + blockBuffer = append(blockBuffer, blk) + } else { + err = cw.Put(ctx, blk.Cid().KeyString(), blk.RawData()) + if err != nil { + return nil, fmt.Errorf("error writing car block: %w", err) + } + } + numBlocksSent++ + } + numBlocksThisCall++ + return blk, nil + } + l := getLinksystem(teeBlock) + + // First resolve the path since we always need to. + _, terminalCid, remainder, _, err := resolvePathWithRootsAndBlock(ctx, p, l) + if err != nil { + return err + } + if len(remainder) > 0 { + return nil + } + + if cw == nil { + cw, err = storage.NewWritable(w, []cid.Cid{terminalCid}, carv2.WriteAsCarV1(true), carv2.AllowDuplicatePuts(params.Duplicates.Bool())) + if err != nil { + // io.PipeWriter.CloseWithError always returns nil. + _ = w.CloseWithError(err) + return nil + } + for _, blk := range blockBuffer { + err = cw.Put(ctx, blk.Cid().KeyString(), blk.RawData()) + if err != nil { + _ = w.CloseWithError(fmt.Errorf("error writing car block: %w", err)) + return nil + } + } + blockBuffer = nil + } + + err = walkGatewaySimpleSelector(ctx, terminalCid, remainder, params, l) + if err != nil { + return err + } + return nil + }) + + _ = w.CloseWithError(err) + }() + + return ContentPathMetadata{ + PathSegmentRoots: []cid.Cid{rootCid}, + LastSegment: path.FromCid(rootCid), + ContentType: "", + }, r, nil +} + +func getRootCid(imPath path.ImmutablePath) (cid.Cid, error) { + imPathStr := imPath.String() + if !strings.HasPrefix(imPathStr, "/ipfs/") { + return cid.Undef, fmt.Errorf("path does not have /ipfs/ prefix") + } + + firstSegment, _, _ := strings.Cut(imPathStr[6:], "/") + rootCid, err := cid.Decode(firstSegment) + if err != nil { + return cid.Undef, err + } + + return rootCid, nil +} + +func (api *GraphGateway) IsCached(ctx context.Context, path path.Path) bool { + return false +} + +var _ IPFSBackend = (*GraphGateway)(nil) + +func checkRetryableError(e *error, fn func() error) error { + err := fn() + retry, processedErr := isRetryableError(err) + if retry { + return processedErr + } + *e = processedErr + return nil +} + +func isRetryableError(err error) (bool, error) { + if errors.Is(err, ErrFetcherUnexpectedEOF) { + return false, err + } + + if format.IsNotFound(err) { + return true, err + } + initialErr := err + + // Checks if err is of a type that does not implement the .Is interface and + // cannot be directly compared to. Therefore, errors.Is cannot be used. + for { + _, ok := err.(*resolver.ErrNoLink) + if ok { + return false, err + } + + _, ok = err.(datamodel.ErrWrongKind) + if ok { + return false, err + } + + _, ok = err.(datamodel.ErrNotExists) + if ok { + return false, err + } + + errNoSuchField, ok := err.(schema.ErrNoSuchField) + if ok { + // Convert into a more general error type so the gateway code can know what this means + // TODO: Have either a more generally usable error type system for IPLD errors (e.g. a base type indicating that data cannot exist) + // or at least have one that is specific to the gateway consumer and part of the Backend contract instead of this being implicit + err = datamodel.ErrNotExists{Segment: errNoSuchField.Field} + return false, err + } + + err = errors.Unwrap(err) + if err == nil { + return true, initialErr + } + } +} diff --git a/gateway/backend_remote_files.go b/gateway/backend_remote_files.go new file mode 100644 index 000000000..740b3f2e2 --- /dev/null +++ b/gateway/backend_remote_files.go @@ -0,0 +1,533 @@ +package gateway + +import ( + "bytes" + "context" + "fmt" + "io" + + "github.com/ipfs/boxo/files" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-unixfsnode" + ufsData "github.com/ipfs/go-unixfsnode/data" + "github.com/ipfs/go-unixfsnode/hamt" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/schema" + "github.com/multiformats/go-multicodec" +) + +type AwaitCloser interface { + AwaitClose() <-chan error +} + +type backpressuredFile struct { + size int64 + f io.ReadSeeker + getLsys lsysGetter + + ctx context.Context + fileCid cid.Cid + byteRange DagByteRange + retErr error + + closed chan error +} + +func (b *backpressuredFile) AwaitClose() <-chan error { + return b.closed +} + +func (b *backpressuredFile) Close() error { + close(b.closed) + return nil +} + +func (b *backpressuredFile) Size() (int64, error) { + return b.size, nil +} + +func (b *backpressuredFile) Read(p []byte) (n int, err error) { + if b.retErr == nil { + n, err = b.f.Read(p) + if err == nil || err == io.EOF { + return n, err + } + + if n > 0 { + b.retErr = err + return n, nil + } + } else { + err = b.retErr + } + + from, seekErr := b.f.Seek(0, io.SeekCurrent) + if seekErr != nil { + // Return the seek error since by this point seeking failures like this should be impossible + return 0, seekErr + } + + // we had an error while reading so attempt to reset the underlying reader + for { + if b.ctx.Err() != nil { + return 0, b.ctx.Err() + } + + retry, processedErr := isRetryableError(err) + if !retry { + return 0, processedErr + } + + var nd files.Node + nd, err = loadTerminalUnixFSElementWithRecursiveDirectories(b.ctx, b.fileCid, nil, nil, CarParams{Scope: DagScopeEntity, Range: &DagByteRange{From: from, To: b.byteRange.To}}, b.getLsys) + if err != nil { + continue + } + + f, ok := nd.(files.File) + if !ok { + return 0, fmt.Errorf("not a file, should be unreachable") + } + + b.f = f + break + } + + // now that we've reset the reader try reading again + return b.Read(p) +} + +func (b *backpressuredFile) Seek(offset int64, whence int) (int64, error) { + return b.f.Seek(offset, whence) +} + +var _ files.File = (*backpressuredFile)(nil) +var _ AwaitCloser = (*backpressuredFile)(nil) + +type singleUseDirectory struct { + dirIter files.DirIterator + closed chan error +} + +func (b *singleUseDirectory) AwaitClose() <-chan error { + return b.closed +} + +func (b *singleUseDirectory) Close() error { + close(b.closed) + return nil +} + +func (b *singleUseDirectory) Size() (int64, error) { + //TODO implement me + panic("implement me") +} + +func (b *singleUseDirectory) Entries() files.DirIterator { + return b.dirIter +} + +var _ files.Directory = (*singleUseDirectory)(nil) +var _ AwaitCloser = (*singleUseDirectory)(nil) + +type backpressuredFlatDirIter struct { + linksItr *dagpb.PBLinks__Itr + lsys *ipld.LinkSystem + getLsys lsysGetter + ctx context.Context + + curName string + curFile files.Node + + err error +} + +func (it *backpressuredFlatDirIter) Name() string { + return it.curName +} + +func (it *backpressuredFlatDirIter) Node() files.Node { + return it.curFile +} + +func (it *backpressuredFlatDirIter) Next() bool { + if it.err != nil { + return false + } + + iter := it.linksItr + if iter.Done() { + return false + } + + _, v := iter.Next() + c := v.Hash.Link().(cidlink.Link).Cid + var name string + if v.Name.Exists() { + name = v.Name.Must().String() + } + + var nd files.Node + var err error + params := CarParams{Scope: DagScopeAll} + for { + if it.ctx.Err() != nil { + it.err = it.ctx.Err() + return false + } + if err != nil { + it.lsys, err = it.getLsys(it.ctx, c, params) + continue + } + nd, err = loadTerminalUnixFSElementWithRecursiveDirectories(it.ctx, c, nil, it.lsys, params, it.getLsys) + if err != nil { + if ctxErr := it.ctx.Err(); ctxErr != nil { + continue + } + retry, processedErr := isRetryableError(err) + if retry { + err = processedErr + continue + } + it.err = processedErr + return false + } + break + } + + it.curName = name + it.curFile = nd + return true +} + +func (it *backpressuredFlatDirIter) Err() error { + return it.err +} + +var _ files.DirIterator = (*backpressuredFlatDirIter)(nil) + +type backpressuredHAMTDirIter struct { + linksItr ipld.MapIterator + dirCid cid.Cid + + lsys *ipld.LinkSystem + getLsys lsysGetter + ctx context.Context + + curName string + curFile files.Node + curProcessed int + + err error +} + +func (it *backpressuredHAMTDirIter) Name() string { + return it.curName +} + +func (it *backpressuredHAMTDirIter) Node() files.Node { + return it.curFile +} + +func (it *backpressuredHAMTDirIter) Next() bool { + if it.err != nil { + return false + } + + iter := it.linksItr + if iter.Done() { + return false + } + + /* + Since there is no way to make a graph request for part of a HAMT during errors we can either fill in the HAMT with + block requests, or we can re-request the HAMT and skip over the parts we already have. + + Here we choose the latter, however in the event of a re-request we request the entity rather than the entire DAG as + a compromise between more requests and over-fetching data. + */ + + var err error + for { + if it.ctx.Err() != nil { + it.err = it.ctx.Err() + return false + } + + retry, processedErr := isRetryableError(err) + if !retry { + it.err = processedErr + return false + } + + var nd ipld.Node + if err != nil { + var lsys *ipld.LinkSystem + lsys, err = it.getLsys(it.ctx, it.dirCid, CarParams{Scope: DagScopeEntity}) + if err != nil { + continue + } + + _, pbn, ufsFieldData, _, ufsBaseErr := loadUnixFSBase(it.ctx, it.dirCid, nil, lsys) + if ufsBaseErr != nil { + err = ufsBaseErr + continue + } + + nd, err = hamt.NewUnixFSHAMTShard(it.ctx, pbn, ufsFieldData, lsys) + if err != nil { + err = fmt.Errorf("could not reify sharded directory: %w", err) + continue + } + + iter = nd.MapIterator() + for i := 0; i < it.curProcessed; i++ { + _, _, err = iter.Next() + if err != nil { + continue + } + } + + it.linksItr = iter + } + + var k, v ipld.Node + k, v, err = iter.Next() + if err != nil { + retry, processedErr = isRetryableError(err) + if retry { + err = processedErr + continue + } + it.err = processedErr + return false + } + + var name string + name, err = k.AsString() + if err != nil { + it.err = err + return false + } + var lnk ipld.Link + lnk, err = v.AsLink() + if err != nil { + it.err = err + return false + } + + cl, ok := lnk.(cidlink.Link) + if !ok { + it.err = fmt.Errorf("link not a cidlink") + return false + } + + c := cl.Cid + params := CarParams{Scope: DagScopeAll} + var childNd files.Node + for { + if it.ctx.Err() != nil { + it.err = it.ctx.Err() + return false + } + + if err != nil { + retry, processedErr = isRetryableError(err) + if !retry { + it.err = processedErr + return false + } + + it.lsys, err = it.getLsys(it.ctx, c, params) + continue + } + + childNd, err = loadTerminalUnixFSElementWithRecursiveDirectories(it.ctx, c, nil, it.lsys, params, it.getLsys) + if err != nil { + continue + } + break + } + + it.curName = name + it.curFile = childNd + it.curProcessed++ + break + } + + return true +} + +func (it *backpressuredHAMTDirIter) Err() error { + return it.err +} + +var _ files.DirIterator = (*backpressuredHAMTDirIter)(nil) + +/* +1. Run traversal to get the top-level response +2. Response can do a callback for another response +*/ + +type lsysGetter = func(ctx context.Context, c cid.Cid, params CarParams) (*ipld.LinkSystem, error) + +func loadUnixFSBase(ctx context.Context, c cid.Cid, blk blocks.Block, lsys *ipld.LinkSystem) ([]byte, dagpb.PBNode, ufsData.UnixFSData, int64, error) { + lctx := ipld.LinkContext{Ctx: ctx} + pathTerminalCidLink := cidlink.Link{Cid: c} + + var blockData []byte + var err error + + if blk != nil { + blockData = blk.RawData() + } else { + blockData, err = lsys.LoadRaw(lctx, pathTerminalCidLink) + if err != nil { + return nil, nil, nil, 0, err + } + } + + if c.Type() == uint64(multicodec.Raw) { + return blockData, nil, nil, 0, nil + } + + // decode the terminal block into a node + pc := dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) { + if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok { + return tlnkNd.LinkTargetNodePrototype(), nil + } + return basicnode.Prototype.Any, nil + }) + + np, err := pc(pathTerminalCidLink, lctx) + if err != nil { + return nil, nil, nil, 0, err + } + + decoder, err := lsys.DecoderChooser(pathTerminalCidLink) + if err != nil { + return nil, nil, nil, 0, err + } + nb := np.NewBuilder() + if err := decoder(nb, bytes.NewReader(blockData)); err != nil { + return nil, nil, nil, 0, err + } + lastCidNode := nb.Build() + + if pbn, ok := lastCidNode.(dagpb.PBNode); !ok { + // If it's not valid dag-pb then we're done + return nil, nil, nil, 0, errNotUnixFS + } else if !pbn.FieldData().Exists() { + // If it's not valid UnixFS then we're done + return nil, nil, nil, 0, errNotUnixFS + } else if unixfsFieldData, decodeErr := ufsData.DecodeUnixFSData(pbn.Data.Must().Bytes()); decodeErr != nil { + return nil, nil, nil, 0, errNotUnixFS + } else { + switch fieldNum := unixfsFieldData.FieldDataType().Int(); fieldNum { + case ufsData.Data_Symlink, ufsData.Data_Metadata, ufsData.Data_Raw, ufsData.Data_File, ufsData.Data_Directory, ufsData.Data_HAMTShard: + return nil, pbn, unixfsFieldData, fieldNum, nil + default: + return nil, nil, nil, 0, errNotUnixFS + } + } +} + +func loadTerminalUnixFSElementWithRecursiveDirectories(ctx context.Context, c cid.Cid, blk blocks.Block, lsys *ipld.LinkSystem, params CarParams, getLsys lsysGetter) (files.Node, error) { + var err error + if lsys == nil { + lsys, err = getLsys(ctx, c, params) + if err != nil { + return nil, err + } + } + + lctx := ipld.LinkContext{Ctx: ctx} + blockData, pbn, ufsFieldData, fieldNum, err := loadUnixFSBase(ctx, c, blk, lsys) + if err != nil { + return nil, err + } + + if c.Type() == uint64(multicodec.Raw) { + return files.NewBytesFile(blockData), nil + } + + switch fieldNum { + case ufsData.Data_Symlink: + if !ufsFieldData.FieldData().Exists() { + return nil, fmt.Errorf("invalid UnixFS symlink object") + } + lnkTarget := string(ufsFieldData.FieldData().Must().Bytes()) + f := files.NewLinkFile(lnkTarget, nil) + return f, nil + case ufsData.Data_Metadata: + return nil, fmt.Errorf("UnixFS Metadata unsupported") + case ufsData.Data_HAMTShard, ufsData.Data_Directory: + switch fieldNum { + case ufsData.Data_Directory: + d := &singleUseDirectory{&backpressuredFlatDirIter{ + ctx: ctx, + linksItr: pbn.Links.Iterator(), + lsys: lsys, + getLsys: getLsys, + }, make(chan error)} + return d, nil + case ufsData.Data_HAMTShard: + dirNd, err := unixfsnode.Reify(lctx, pbn, lsys) + if err != nil { + return nil, fmt.Errorf("could not reify sharded directory: %w", err) + } + + d := &singleUseDirectory{ + &backpressuredHAMTDirIter{ + linksItr: dirNd.MapIterator(), + dirCid: c, + lsys: lsys, + getLsys: getLsys, + ctx: ctx, + }, make(chan error), + } + return d, nil + default: + return nil, fmt.Errorf("not a basic or HAMT directory: should be unreachable") + } + case ufsData.Data_Raw, ufsData.Data_File: + nd, err := unixfsnode.Reify(lctx, pbn, lsys) + if err != nil { + return nil, err + } + + fnd, ok := nd.(datamodel.LargeBytesNode) + if !ok { + return nil, fmt.Errorf("could not process file since it did not present as large bytes") + } + f, err := fnd.AsLargeBytes() + if err != nil { + return nil, err + } + + fileSize, err := f.Seek(0, io.SeekEnd) + if err != nil { + return nil, fmt.Errorf("unable to get UnixFS file size: %w", err) + } + + from := int64(0) + var byteRange DagByteRange + if params.Range != nil { + byteRange = *params.Range + from = params.Range.From + } + _, err = f.Seek(from, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("unable to get reset UnixFS file reader: %w", err) + } + + return &backpressuredFile{ctx: ctx, fileCid: c, byteRange: byteRange, size: fileSize, f: f, getLsys: getLsys, closed: make(chan error)}, nil + default: + return nil, fmt.Errorf("unknown UnixFS field type") + } +} diff --git a/go.mod b/go.mod index 022798a77..9da15bc2a 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.5.0 github.com/gorilla/mux v1.8.1 + github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/ipfs/bbloom v0.0.4 github.com/ipfs/go-bitfield v1.1.0 @@ -30,6 +31,7 @@ require ( github.com/ipfs/go-metrics-interface v0.0.1 github.com/ipfs/go-peertaskqueue v0.8.1 github.com/ipfs/go-unixfsnode v1.9.0 + github.com/ipld/go-car v0.6.2 github.com/ipld/go-car/v2 v2.13.1 github.com/ipld/go-codec-dagpb v1.6.0 github.com/ipld/go-ipld-prime v0.21.0 @@ -104,14 +106,19 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/huin/goupnp v1.3.0 // indirect + github.com/ipfs/go-blockservice v0.5.0 // indirect + github.com/ipfs/go-ipfs-blockstore v1.3.0 // indirect + github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect + github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect github.com/ipfs/go-ipld-cbor v0.1.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect + github.com/ipfs/go-merkledag v0.11.0 // indirect github.com/ipfs/go-unixfs v0.4.5 // indirect + github.com/ipfs/go-verifcid v0.0.2 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/klauspost/compress v1.17.4 // indirect diff --git a/go.sum b/go.sum index 933c78cec..51122743d 100644 --- a/go.sum +++ b/go.sum @@ -134,6 +134,7 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 h1:dHLYa5D8/Ta0aLR2XcPsrkpAgGeFs6thhMcQK0oQ0n8= github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -167,33 +168,42 @@ github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= +github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs= github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM= github.com/ipfs/go-blockservice v0.5.0 h1:B2mwhhhVQl2ntW2EIpaWPwSCxSuqr5fFA93Ms4bYLEY= +github.com/ipfs/go-blockservice v0.5.0/go.mod h1:W6brZ5k20AehbmERplmERn8o2Ni3ZZubvAxaIUeaT6w= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= +github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog= github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q= github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA= +github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ipfs-blockstore v1.3.0 h1:m2EXaWgwTzAfsmt5UdJ7Is6l4gJcaM/A12XwJyvYvMM= +github.com/ipfs/go-ipfs-blockstore v1.3.0/go.mod h1:KgtZyc9fq+P2xJUiCAzbRdhhqJHvsw8u2Dlqy2MyRTE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= github.com/ipfs/go-ipfs-chunker v0.0.5 h1:ojCf7HV/m+uS2vhUGWcogIIxiO5ubl5O57Q7NapWLY8= +github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q= +github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU= github.com/ipfs/go-ipfs-exchange-interface v0.2.0 h1:8lMSJmKogZYNo2jjhUs0izT+dck05pqUw4mWNW9Pw6Y= +github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y= github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA= github.com/ipfs/go-ipfs-pq v0.0.3 h1:YpoHVJB+jzK15mr/xsWC574tyDLkezVrDNeaalQBsTE= github.com/ipfs/go-ipfs-pq v0.0.3/go.mod h1:btNw5hsHBpRcSSgZtiNm/SLj5gYIZ18AKtv3kERkRb4= github.com/ipfs/go-ipfs-redirects-file v0.1.1 h1:Io++k0Vf/wK+tfnhEh63Yte1oQK5VGT2hIEYpD0Rzx8= github.com/ipfs/go-ipfs-redirects-file v0.1.1/go.mod h1:tAwRjCV0RjLTjH8DR/AU7VYvfQECg+lpUy2Mdzv7gyk= +github.com/ipfs/go-ipfs-routing v0.3.0 h1:9W/W3N+g+y4ZDeffSgqhgo7BsBSJwPMcyssET9OWevc= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-ipfs-util v0.0.3 h1:2RFdGez6bu2ZlZdI+rWfIdbQb1KudQp3VGwPtdNCmE0= github.com/ipfs/go-ipfs-util v0.0.3/go.mod h1:LHzG1a0Ig4G+iZ26UUOMjHd+lfM84LZCrn17xAKWBvs= @@ -209,6 +219,7 @@ github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Ax github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= github.com/ipfs/go-merkledag v0.11.0 h1:DgzwK5hprESOzS4O1t/wi6JDpyVQdvm9Bs59N/jqfBY= +github.com/ipfs/go-merkledag v0.11.0/go.mod h1:Q4f/1ezvBiJV0YCIXvt51W/9/kqJGH4I1LsA7+djsM4= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= @@ -218,6 +229,9 @@ github.com/ipfs/go-unixfs v0.4.5/go.mod h1:BIznJNvt/gEx/ooRMI4Us9K8+qeGO7vx1ohnb github.com/ipfs/go-unixfsnode v1.9.0 h1:ubEhQhr22sPAKO2DNsyVBW7YB/zA8Zkif25aBvz8rc8= github.com/ipfs/go-unixfsnode v1.9.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= +github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= +github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc= +github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4= github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= @@ -248,6 +262,7 @@ github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZY github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0= github.com/koron/go-ssdp v0.0.4/go.mod h1:oDXq+E5IL5q0U8uSBcoAXzTzInwy5lEgC91HoKtbmZk= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -702,6 +717,7 @@ google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7 google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=