Skip to content

Commit

Permalink
Merge pull request #27 from libp2p/feat/content-routing
Browse files Browse the repository at this point in the history
Discovery based Content Routing
  • Loading branch information
aschmahmann committed Oct 16, 2019
2 parents 83826f4 + 702512d commit 77e85c3
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 1 deletion.
28 changes: 28 additions & 0 deletions p2p/discovery/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"context"
"github.com/libp2p/go-libp2p-core/discovery"
"time"

cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -83,3 +84,30 @@ func nsToCid(ns string) (cid.Cid, error) {

return cid.NewCidV1(cid.Raw, h), nil
}

func NewDiscoveryRouting(disc discovery.Discovery, opts ...discovery.Option) *DiscoveryRouting {
return &DiscoveryRouting{disc, opts}
}

type DiscoveryRouting struct {
discovery.Discovery
opts []discovery.Option
}

func (r *DiscoveryRouting) Provide(ctx context.Context, c cid.Cid, bcast bool) error {
if !bcast {
return nil
}

_, err := r.Advertise(ctx, cidToNs(c), r.opts...)
return err
}

func (r *DiscoveryRouting) FindProvidersAsync(ctx context.Context, c cid.Cid, limit int) <-chan peer.AddrInfo {
ch, _ := r.FindPeers(ctx, cidToNs(c), append([]discovery.Option{discovery.Limit(limit)}, r.opts...)...)
return ch
}

func cidToNs(c cid.Cid) string {
return "/provider/" + c.String()
}
145 changes: 144 additions & 1 deletion p2p/discovery/routing/routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"sync"
"testing"
"time"

cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
bhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
Expand Down Expand Up @@ -69,6 +71,107 @@ func (m *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, limit
return ch
}

type mockDiscoveryServer struct {
mx sync.Mutex
db map[string]map[peer.ID]*discoveryRegistration
}

type discoveryRegistration struct {
info peer.AddrInfo
expiration time.Time
}

func newDiscoveryServer() *mockDiscoveryServer {
return &mockDiscoveryServer{
db: make(map[string]map[peer.ID]*discoveryRegistration),
}
}

func (s *mockDiscoveryServer) Advertise(ns string, info peer.AddrInfo, ttl time.Duration) (time.Duration, error) {
s.mx.Lock()
defer s.mx.Unlock()

peers, ok := s.db[ns]
if !ok {
peers = make(map[peer.ID]*discoveryRegistration)
s.db[ns] = peers
}
peers[info.ID] = &discoveryRegistration{info, time.Now().Add(ttl)}
return ttl, nil
}

func (s *mockDiscoveryServer) FindPeers(ns string, limit int) (<-chan peer.AddrInfo, error) {
s.mx.Lock()
defer s.mx.Unlock()

peers, ok := s.db[ns]
if !ok || len(peers) == 0 {
emptyCh := make(chan peer.AddrInfo)
close(emptyCh)
return emptyCh, nil
}

count := len(peers)
if limit != 0 && count > limit {
count = limit
}

iterTime := time.Now()
ch := make(chan peer.AddrInfo, count)
numSent := 0
for p, reg := range peers {
if numSent == count {
break
}
if iterTime.After(reg.expiration) {
delete(peers, p)
continue
}

numSent++
ch <- reg.info
}
close(ch)

return ch, nil
}

func (s *mockDiscoveryServer) hasPeerRecord(ns string, pid peer.ID) bool {
s.mx.Lock()
defer s.mx.Unlock()

if peers, ok := s.db[ns]; ok {
_, ok := peers[pid]
return ok
}
return false
}

type mockDiscoveryClient struct {
host host.Host
server *mockDiscoveryServer
}

func (d *mockDiscoveryClient) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return 0, err
}

return d.server.Advertise(ns, *host.InfoFromHost(d.host), options.Ttl)
}

func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return nil, err
}

return d.server.FindPeers(ns, options.Limit)
}

func TestRoutingDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -102,3 +205,43 @@ func TestRoutingDiscovery(t *testing.T) {
t.Fatalf("Unexpected peer: %s", pi.ID)
}
}

func TestDiscoveryRouting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))

dserver := newDiscoveryServer()
d1 := &mockDiscoveryClient{h1, dserver}
d2 := &mockDiscoveryClient{h2, dserver}

r1 := NewDiscoveryRouting(d1, discovery.TTL(time.Hour))
r2 := NewDiscoveryRouting(d2, discovery.TTL(time.Hour))

c, err := nsToCid("/test")
if err != nil {
t.Fatal(err)
}

if err := r1.Provide(ctx, c, true); err != nil {
t.Fatal(err)
}

pch := r2.FindProvidersAsync(ctx, c, 20)

var allAIs []peer.AddrInfo
for ai := range pch {
allAIs = append(allAIs, ai)
}

if len(allAIs) != 1 {
t.Fatalf("Expected 1 peer, got %d", len(allAIs))
}

ai := allAIs[0]
if ai.ID != h1.ID() {
t.Fatalf("Unexpected peer: %s", ai.ID)
}
}

0 comments on commit 77e85c3

Please sign in to comment.