Skip to content

Commit

Permalink
Merge branch 'main' into chore/remove-npm-build
Browse files Browse the repository at this point in the history
  • Loading branch information
2color committed Aug 30, 2024
2 parents f4e1c9c + efc30fe commit 8f2c5a2
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 106 deletions.
45 changes: 36 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,44 @@ Note that the `multiaddr` can be:

### Check results

The server performs several checks given a CID. The results of the check are expressed by the `output` type:
The server performs several checks depending on whether you also pass a **multiaddr** or just a **cid**.

#### Results when only a `cid` is passed

The results of the check are expressed by the `cidCheckOutput` type:

```go
type output struct {
type cidCheckOutput *[]providerOutput

type providerOutput struct {
ID string
ConnectionError string
PeerFoundInDHT map[string]int
CidInDHT bool
ConnectionMaddrs string[]
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}
```

The `providerOutput` type contains the following fields:

- `ID`: The peer ID of the provider.
- `ConnectionError`: An error message if the connection to the provider failed.
- `Addrs`: The multiaddrs of the provider from the DHT.
- `ConnectionMaddrs`: The multiaddrs that were used to connect to the provider.
- `DataAvailableOverBitswap`: The result of the Bitswap check.

#### Results when a `multiaddr` and a `cid` are passed

The results of the check are expressed by the `peerCheckOutput` type:

```go
type peerCheckOutput struct {
ConnectionError string
PeerFoundInDHT map[string]int
ProviderRecordFromPeerInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

type BitswapCheckOutput struct {
Duration time.Duration
Expand All @@ -95,9 +123,9 @@ type BitswapCheckOutput struct {
}
```

1. Is the CID (really multihash) advertised in the DHT (or later IPNI)?
1. Is the CID (really multihash) advertised in the DHT by the Passed PeerID (or later IPNI)?

- `CidInDHT`
- `ProviderRecordFromPeerInDHT`

2. Are the peer's addresses discoverable (particularly useful if the announcements are DHT based, but also independently useful)

Expand All @@ -120,8 +148,7 @@ type BitswapCheckOutput struct {

The ipfs-check server is instrumented and exposes two Prometheus metrics endpoints:

- `/metrics/libp2p` exposes [go-libp2p metrics](https://blog.libp2p.io/2023-08-15-metrics-in-go-libp2p/).
- `/metrics/http` exposes http metrics for the check endpoint.
- `/metrics` exposes [go-libp2p metrics](https://blog.libp2p.io/2023-08-15-metrics-in-go-libp2p/) and http metrics for the check endpoint.

### Securing the metrics endpoints

Expand Down
190 changes: 136 additions & 54 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-check / All

"errors" imported and not used (compile)

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-check / All

"errors" imported and not used (compile)

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

"errors" imported and not used

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

"errors" imported and not used

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

"errors" imported and not used

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

"errors" imported and not used

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

"errors" imported and not used

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

"errors" imported and not used

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

"errors" imported and not used

Check failure on line 6 in daemon.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

"errors" imported and not used
"fmt"
"log"
"net/url"
"sync"
"time"

Expand All @@ -21,10 +20,11 @@ import (
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
)

//go:embed web
Expand All @@ -40,8 +40,13 @@ type daemon struct {
dht kademlia
dhtMessenger *dhtpb.ProtocolMessenger
createTestHost func() (host.Host, error)
promRegistry *prometheus.Registry
}

// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
var MaxProvidersCount = 10

func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
rm, err := NewResourceManager()
if err != nil {
Expand All @@ -53,13 +58,17 @@ func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
return nil, err
}

// Create a custom registry for all prometheus metrics
promRegistry := prometheus.NewRegistry()

h, err := libp2p.New(
libp2p.DefaultMuxers,
libp2p.Muxer(mplex.ID, mplex.DefaultTransport),
libp2p.ConnectionManager(c),
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.ResourceManager(rm),
libp2p.EnableHolePunching(),
libp2p.PrometheusRegisterer(promRegistry),
libp2p.UserAgent(userAgent),
)
if err != nil {
Expand Down Expand Up @@ -92,15 +101,20 @@ func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
return nil, err
}

return &daemon{h: h, dht: d, dhtMessenger: pm, createTestHost: func() (host.Host, error) {
return libp2p.New(
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.EnableHolePunching(),
libp2p.UserAgent(userAgent),
)
}}, nil
return &daemon{
h: h,
dht: d,
dhtMessenger: pm,
promRegistry: promRegistry,
createTestHost: func() (host.Host, error) {
return libp2p.New(
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.EnableHolePunching(),
libp2p.UserAgent(userAgent),
)
}}, nil
}

func (d *daemon) mustStart() {
Expand All @@ -116,18 +130,101 @@ func (d *daemon) mustStart() {
}
}

func (d *daemon) runCheck(query url.Values) (*output, error) {
maStr := query.Get("multiaddr")
cidStr := query.Get("cid")
type cidCheckOutput *[]providerOutput

type providerOutput struct {
ID string
ConnectionError string
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

if maStr == "" {
return nil, errors.New("missing 'multiaddr' argument")
// runCidCheck looks up the DHT for providers of a given CID and then checks their connectivity and Bitswap availability
func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput, error) {
cid, err := cid.Decode(cidStr)
if err != nil {
return nil, err
}

if cidStr == "" {
return nil, errors.New("missing 'cid' argument")
out := make([]providerOutput, 0, MaxProvidersCount)

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.dht.FindProvidersAsync(queryCtx, cid, MaxProvidersCount)

var wg sync.WaitGroup
var mu sync.Mutex

for provider := range provsCh {
wg.Add(1)
go func(provider peer.AddrInfo) {
defer wg.Done()

addrs := []string{}
if len(provider.Addrs) > 0 {
for _, addr := range provider.Addrs {
if manet.IsPublicAddr(addr) { // only return public addrs
addrs = append(addrs, addr.String())
}
}
}

provOutput := providerOutput{
ID: provider.ID.String(),
Addrs: addrs,
DataAvailableOverBitswap: BitswapCheckOutput{},
}

testHost, err := d.createTestHost()
if err != nil {
log.Printf("Error creating test host: %v\n", err)
return
}
defer testHost.Close()

// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15)
defer dialCancel()

testHost.Connect(dialCtx, provider)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")

if connErr != nil {
provOutput.ConnectionError = connErr.Error()
} else {
// since we pass a libp2p host that's already connected to the peer the actual connection maddr we pass in doesn't matter
p2pAddr, _ := multiaddr.NewMultiaddr("/p2p/" + provider.ID.String())
provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cid, p2pAddr)

for _, c := range testHost.Network().ConnsToPeer(provider.ID) {
provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String())
}
}

mu.Lock()
out = append(out, provOutput)
mu.Unlock()
}(provider)
}

// Wait for all goroutines to finish
wg.Wait()

return &out, nil
}

type peerCheckOutput struct {
ConnectionError string
PeerFoundInDHT map[string]int
ProviderRecordFromPeerInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerCheckOutput, error) {
ma, err := multiaddr.NewMultiaddr(maStr)
if err != nil {
return nil, err
Expand All @@ -146,12 +243,11 @@ func (d *daemon) runCheck(query url.Values) (*output, error) {
return nil, err
}

ctx := context.Background()
out := &output{}
out := &peerCheckOutput{}

connectionFailed := false

out.CidInDHT = providerRecordInDHT(ctx, d.dht, c, ai.ID)
out.ProviderRecordFromPeerInDHT = ProviderRecordFromPeerInDHT(ctx, d.dht, c, ai.ID)

addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)
out.PeerFoundInDHT = addrMap
Expand Down Expand Up @@ -181,37 +277,38 @@ func (d *daemon) runCheck(query url.Values) (*output, error) {

if !connectionFailed {
// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15)
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*120)

// we call NewStream instead of Connect to force NAT hole punching
// See https://github.com/libp2p/go-libp2p/issues/2714
testHost.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.RecentlyConnectedAddrTTL)
testHost.Connect(dialCtx, *ai)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, ai.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")
dialCancel()
if connErr != nil {
out.ConnectionError = fmt.Sprintf("error dialing to peer: %s", connErr.Error())
connectionFailed = true
out.ConnectionError = connErr.Error()
return out, nil
}
}

if connectionFailed {
out.DataAvailableOverBitswap.Error = "could not connect to peer"
} else {
// If so is the data available over Bitswap?
out.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, c, ma)
// If so is the data available over Bitswap?
out.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, c, ma)

// Get the direct connection in case it was hole punched and we have both a limited connection
// directMaddr := getDirectMaddr()
for _, c := range testHost.Network().ConnsToPeer(ai.ID) {
out.ConnectionMaddrs = append(out.ConnectionMaddrs, c.RemoteMultiaddr().String())
}
// Get all connection maddrs to the peer (in case we hole punched, there will usually be two: limited relay and direct)
for _, c := range testHost.Network().ConnsToPeer(ai.ID) {
out.ConnectionMaddrs = append(out.ConnectionMaddrs, c.RemoteMultiaddr().String())
}

return out, nil
}

type BitswapCheckOutput struct {
Duration time.Duration
Found bool
Responded bool
Error string
}

func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiaddr.Multiaddr) BitswapCheckOutput {
log.Printf("Start of Bitswap check for cid %s by attempting to connect to ma: %v with the temporary peer: %s", c, ma, host.ID())
log.Printf("Start of Bitswap check for cid %s by attempting to connect to ma: %v with the peer: %s", c, ma, host.ID())
out := BitswapCheckOutput{}
start := time.Now()

Expand All @@ -231,21 +328,6 @@ func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiadd
return out
}

type BitswapCheckOutput struct {
Duration time.Duration
Found bool
Responded bool
Error string
}

type output struct {
ConnectionError string
PeerFoundInDHT map[string]int
CidInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMessenger, p peer.ID) (map[string]int, error) {
closestPeers, err := d.GetClosestPeers(ctx, string(p))
if err != nil {
Expand Down Expand Up @@ -289,7 +371,7 @@ func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMe
return addrMap, nil
}

func providerRecordInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
func ProviderRecordFromPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.FindProvidersAsync(queryCtx, c, 0)
Expand Down
Loading

0 comments on commit 8f2c5a2

Please sign in to comment.