Skip to content

Commit

Permalink
Merge pull request #47 from ipfs/just-cid
Browse files Browse the repository at this point in the history
feat: add check using only a CID
  • Loading branch information
2color committed Aug 30, 2024
2 parents e968620 + 233eab6 commit 6c45882
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 80 deletions.
32 changes: 30 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,38 @@ Note that the `multiaddr` can be:

### Check results

The server performs several checks given a CID. The results of the check are expressed by the `output` type:
The server performs several checks depending on whether you also pass a **multiaddr** or just a **cid**.

#### Results when only a `cid` is passed

The results of the check are expressed by the `cidCheckOutput` type:

```go
type cidCheckOutput *[]providerOutput

type providerOutput struct {
ID string
ConnectionError string
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}
```

The `providerOutput` type contains the following fields:

- `ID`: The peer ID of the provider.
- `ConnectionError`: An error message if the connection to the provider failed.
- `Addrs`: The multiaddrs of the provider from the DHT.
- `ConnectionMaddrs`: The multiaddrs that were used to connect to the provider.
- `DataAvailableOverBitswap`: The result of the Bitswap check.

#### Results when a `multiaddr` and a `cid` are passed

The results of the check are expressed by the `peerCheckOutput` type:

```go
type output struct {
type peerCheckOutput struct {
ConnectionError string
PeerFoundInDHT map[string]int
CidInDHT bool
Expand Down
189 changes: 145 additions & 44 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"context"
"errors"
"fmt"
"log"
"net/url"
"sync"
"time"

Expand All @@ -20,10 +18,12 @@ import (
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
)

type kademlia interface {
Expand All @@ -36,8 +36,13 @@ type daemon struct {
dht kademlia
dhtMessenger *dhtpb.ProtocolMessenger
createTestHost func() (host.Host, error)
promRegistry *prometheus.Registry
}

// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
var MaxProvidersCount = 10

func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
rm, err := NewResourceManager()
if err != nil {
Expand All @@ -49,13 +54,17 @@ func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
return nil, err
}

// Create a custom registry for all prometheus metrics
promRegistry := prometheus.NewRegistry()

h, err := libp2p.New(
libp2p.DefaultMuxers,
libp2p.Muxer(mplex.ID, mplex.DefaultTransport),
libp2p.ConnectionManager(c),
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.ResourceManager(rm),
libp2p.EnableHolePunching(),
libp2p.PrometheusRegisterer(promRegistry),
libp2p.UserAgent(userAgent),
)
if err != nil {
Expand Down Expand Up @@ -88,15 +97,20 @@ func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
return nil, err
}

return &daemon{h: h, dht: d, dhtMessenger: pm, createTestHost: func() (host.Host, error) {
return libp2p.New(
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.EnableHolePunching(),
libp2p.UserAgent(userAgent),
)
}}, nil
return &daemon{
h: h,
dht: d,
dhtMessenger: pm,
promRegistry: promRegistry,
createTestHost: func() (host.Host, error) {
return libp2p.New(
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.EnableHolePunching(),
libp2p.UserAgent(userAgent),
)
}}, nil
}

func (d *daemon) mustStart() {
Expand All @@ -109,18 +123,101 @@ func (d *daemon) mustStart() {

}

func (d *daemon) runCheck(query url.Values) (*output, error) {
maStr := query.Get("multiaddr")
cidStr := query.Get("cid")
type cidCheckOutput *[]providerOutput

if maStr == "" {
return nil, errors.New("missing 'multiaddr' argument")
type providerOutput struct {
ID string
ConnectionError string
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

// runCidCheck looks up the DHT for providers of a given CID and then checks their connectivity and Bitswap availability
func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput, error) {
cid, err := cid.Decode(cidStr)
if err != nil {
return nil, err
}

if cidStr == "" {
return nil, errors.New("missing 'cid' argument")
out := make([]providerOutput, 0, MaxProvidersCount)

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.dht.FindProvidersAsync(queryCtx, cid, MaxProvidersCount)

var wg sync.WaitGroup
var mu sync.Mutex

for provider := range provsCh {
wg.Add(1)
go func(provider peer.AddrInfo) {
defer wg.Done()

addrs := []string{}
if len(provider.Addrs) > 0 {
for _, addr := range provider.Addrs {
if manet.IsPublicAddr(addr) { // only return public addrs
addrs = append(addrs, addr.String())
}
}
}

provOutput := providerOutput{
ID: provider.ID.String(),
Addrs: addrs,
DataAvailableOverBitswap: BitswapCheckOutput{},
}

testHost, err := d.createTestHost()
if err != nil {
log.Printf("Error creating test host: %v\n", err)
return
}
defer testHost.Close()

// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15)
defer dialCancel()

testHost.Connect(dialCtx, provider)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")

if connErr != nil {
provOutput.ConnectionError = connErr.Error()
} else {
// since we pass a libp2p host that's already connected to the peer the actual connection maddr we pass in doesn't matter
p2pAddr, _ := multiaddr.NewMultiaddr("/p2p/" + provider.ID.String())
provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cid, p2pAddr)

for _, c := range testHost.Network().ConnsToPeer(provider.ID) {
provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String())
}
}

mu.Lock()
out = append(out, provOutput)
mu.Unlock()
}(provider)
}

// Wait for all goroutines to finish
wg.Wait()

return &out, nil
}

type peerCheckOutput struct {
ConnectionError string
PeerFoundInDHT map[string]int
CidInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerCheckOutput, error) {
ma, err := multiaddr.NewMultiaddr(maStr)
if err != nil {
return nil, err
Expand All @@ -139,12 +236,11 @@ func (d *daemon) runCheck(query url.Values) (*output, error) {
return nil, err
}

ctx := context.Background()
out := &output{}
out := &peerCheckOutput{}

connectionFailed := false

out.CidInDHT = providerRecordInDHT(ctx, d.dht, c, ai.ID)
out.CidInDHT = providerRecordForPeerInDHT(ctx, d.dht, c, ai.ID)

addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)
out.PeerFoundInDHT = addrMap
Expand Down Expand Up @@ -174,15 +270,28 @@ func (d *daemon) runCheck(query url.Values) (*output, error) {

if !connectionFailed {
// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15)
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*120)

// we call NewStream instead of Connect to force NAT hole punching
// See https://github.com/libp2p/go-libp2p/issues/2714
testHost.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.RecentlyConnectedAddrTTL)
testHost.Connect(dialCtx, *ai)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, ai.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")
dialCancel()
if connErr != nil {
out.ConnectionError = fmt.Sprintf("error dialing to peer: %s", connErr.Error())
log.Printf("Error connecting to peer %s: %v", ai.ID, connErr)
ids, ok := testHost.(interface{ IDService() identify.IDService })
if ok {
log.Printf("Own observed addrs: %v", ids.IDService().OwnObservedAddrs())
}

// Log all open connections
for _, conn := range testHost.Network().Conns() {
log.Printf("Open connection: Peer ID: %s, Remote Addr: %s, Local Addr: %s",
conn.RemotePeer(),
conn.RemoteMultiaddr(),
conn.LocalMultiaddr(),
)
}
out.ConnectionError = connErr.Error()
connectionFailed = true
}
}
Expand All @@ -203,8 +312,15 @@ func (d *daemon) runCheck(query url.Values) (*output, error) {
return out, nil
}

type BitswapCheckOutput struct {
Duration time.Duration
Found bool
Responded bool
Error string
}

func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiaddr.Multiaddr) BitswapCheckOutput {
log.Printf("Start of Bitswap check for cid %s by attempting to connect to ma: %v with the temporary peer: %s", c, ma, host.ID())
log.Printf("Start of Bitswap check for cid %s by attempting to connect to ma: %v with the peer: %s", c, ma, host.ID())
out := BitswapCheckOutput{}
start := time.Now()

Expand All @@ -224,21 +340,6 @@ func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiadd
return out
}

type BitswapCheckOutput struct {
Duration time.Duration
Found bool
Responded bool
Error string
}

type output struct {
ConnectionError string
PeerFoundInDHT map[string]int
CidInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMessenger, p peer.ID) (map[string]int, error) {
closestPeers, err := d.GetClosestPeers(ctx, string(p))
if err != nil {
Expand Down Expand Up @@ -282,7 +383,7 @@ func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMe
return addrMap, nil
}

func providerRecordInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
func providerRecordForPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.FindProvidersAsync(queryCtx, c, 0)
Expand Down
33 changes: 33 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multihash"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -61,6 +63,7 @@ func TestBasicIntegration(t *testing.T) {
require.NoError(t, err)

d := &daemon{
promRegistry: prometheus.NewRegistry(),
h: queryHost,
dht: queryDHT,
dhtMessenger: pm,
Expand Down Expand Up @@ -157,4 +160,34 @@ func TestBasicIntegration(t *testing.T) {
obj.Value("DataAvailableOverBitswap").Object().Value("Found").Boolean().IsFalse()
obj.Value("DataAvailableOverBitswap").Object().Value("Responded").Boolean().IsTrue()
})

t.Run("Data found on reachable peer with just cid", func(t *testing.T) {
testData := []byte(t.Name())
mh, err := multihash.Sum(testData, multihash.SHA2_256, -1)
require.NoError(t, err)
testCid := cid.NewCidV1(cid.Raw, mh)
testBlock, err := blocks.NewBlockWithCid(testData, testCid)
require.NoError(t, err)
err = bstore.Put(ctx, testBlock)
require.NoError(t, err)
err = dhtClient.Provide(ctx, testCid, true)
require.NoError(t, err)

res := test.QueryCid(t, "http://localhost:1234", testCid.String())

res.Length().IsEqual(1)
res.Value(0).Object().Value("ID").String().IsEqual(h.ID().String())
res.Value(0).Object().Value("ConnectionError").String().IsEmpty()
testHostAddrs := h.Addrs()
for _, addr := range testHostAddrs {
if manet.IsPublicAddr(addr) {
res.Value(0).Object().Value("Addrs").Array().ContainsAny(addr.String())
}
}

res.Value(0).Object().Value("ConnectionMaddrs").Array()
res.Value(0).Object().Value("DataAvailableOverBitswap").Object().Value("Error").String().IsEmpty()
res.Value(0).Object().Value("DataAvailableOverBitswap").Object().Value("Found").Boolean().IsTrue()
res.Value(0).Object().Value("DataAvailableOverBitswap").Object().Value("Responded").Boolean().IsTrue()
})
}
Loading

0 comments on commit 6c45882

Please sign in to comment.