Skip to content

Commit

Permalink
Merge pull request #1 from ipfs/feat/addr-stream
Browse files Browse the repository at this point in the history
Add peerstore method to subscribe to new addresses
  • Loading branch information
whyrusleeping committed May 31, 2016
2 parents 5f44cd0 + 53bbe68 commit 379ce35
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 0 deletions.
62 changes: 62 additions & 0 deletions p2p/host/peerstore/addr/sorting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package addr

import (
"bytes"

ma "github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-multiaddr-net"
mafmt "github.com/whyrusleeping/mafmt"
)

func isFDCostlyTransport(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}

type AddrList []ma.Multiaddr

func (al AddrList) Len() int {
return len(al)
}

func (al AddrList) Swap(i, j int) {
al[i], al[j] = al[j], al[i]
}

func (al AddrList) Less(i, j int) bool {
a := al[i]
b := al[j]

// dial localhost addresses next, they should fail immediately
lba := manet.IsIPLoopback(a)
lbb := manet.IsIPLoopback(b)
if lba {
if !lbb {
return true
}
}

// dial utp and similar 'non-fd-consuming' addresses first
fda := isFDCostlyTransport(a)
fdb := isFDCostlyTransport(b)
if !fda {
if fdb {
return true
}

// if neither consume fd's, assume equal ordering
return false
}

// if 'b' doesnt take a file descriptor
if !fdb {
return false
}

// if 'b' is loopback and both take file descriptors
if lbb {
return false
}

// for the rest, just sort by bytes
return bytes.Compare(a.Bytes(), b.Bytes()) > 0
}
32 changes: 32 additions & 0 deletions p2p/host/peerstore/addr/sorting_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package addr

import (
"sort"
"testing"
)

func TestAddressSorting(t *testing.T) {
u1 := newAddrOrFatal(t, "/ip4/152.12.23.53/udp/1234/utp")
u2l := newAddrOrFatal(t, "/ip4/127.0.0.1/udp/1234/utp")
local := newAddrOrFatal(t, "/ip4/127.0.0.1/tcp/1234")
norm := newAddrOrFatal(t, "/ip4/6.5.4.3/tcp/1234")

l := AddrList{local, u1, u2l, norm}
sort.Sort(l)

if !l[0].Equal(u2l) {
t.Fatal("expected utp local addr to be sorted first: ", l[0])
}

if !l[1].Equal(u1) {
t.Fatal("expected utp addr to be sorted second")
}

if !l[2].Equal(local) {
t.Fatal("expected tcp localhost addr thid")
}

if !l[3].Equal(norm) {
t.Fatal("expected normal addr last")
}
}
116 changes: 116 additions & 0 deletions p2p/host/peerstore/addr_manager.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package peer

import (
"sort"
"sync"
"time"

addr "github.com/ipfs/go-libp2p-peer/addr"
ma "github.com/jbenet/go-multiaddr"
"golang.org/x/net/context"
)

const (
Expand Down Expand Up @@ -51,6 +54,8 @@ type addrSet map[string]expiringAddr
type AddrManager struct {
addrmu sync.Mutex // guards addrs
addrs map[ID]addrSet

addrSubs map[ID][]*addrSub
}

// ensures the AddrManager is initialized.
Expand All @@ -59,6 +64,9 @@ func (mgr *AddrManager) init() {
if mgr.addrs == nil {
mgr.addrs = make(map[ID]addrSet)
}
if mgr.addrSubs == nil {
mgr.addrSubs = make(map[ID][]*addrSub)
}
}

func (mgr *AddrManager) Peers() []ID {
Expand Down Expand Up @@ -101,6 +109,8 @@ func (mgr *AddrManager) AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
mgr.addrs[p] = amap
}

subs := mgr.addrSubs[p]

// only expand ttls
exp := time.Now().Add(ttl)
for _, addr := range addrs {
Expand All @@ -113,6 +123,10 @@ func (mgr *AddrManager) AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
a, found := amap[addrstr]
if !found || exp.After(a.TTL) {
amap[addrstr] = expiringAddr{Addr: addr, TTL: exp}

for _, sub := range subs {
sub.pubAddr(addr)
}
}
}
}
Expand All @@ -137,6 +151,8 @@ func (mgr *AddrManager) SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
mgr.addrs[p] = amap
}

subs := mgr.addrSubs[p]

exp := time.Now().Add(ttl)
for _, addr := range addrs {
if addr == nil {
Expand All @@ -148,6 +164,10 @@ func (mgr *AddrManager) SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)

if ttl > 0 {
amap[addrs] = expiringAddr{Addr: addr, TTL: exp}

for _, sub := range subs {
sub.pubAddr(addr)
}
} else {
delete(amap, addrs)
}
Expand Down Expand Up @@ -195,3 +215,99 @@ func (mgr *AddrManager) ClearAddrs(p ID) {

mgr.addrs[p] = make(addrSet) // clear what was there before
}

func (mgr *AddrManager) removeSub(p ID, s *addrSub) {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
subs := mgr.addrSubs[p]
var filtered []*addrSub
for _, v := range subs {
if v != s {
filtered = append(filtered, v)
}
}
mgr.addrSubs[p] = filtered
}

type addrSub struct {
pubch chan ma.Multiaddr
lk sync.Mutex
buffer []ma.Multiaddr
ctx context.Context
}

func (s *addrSub) pubAddr(a ma.Multiaddr) {
select {
case s.pubch <- a:
case <-s.ctx.Done():
}
}

func (mgr *AddrManager) AddrStream(ctx context.Context, p ID) <-chan ma.Multiaddr {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
mgr.init()

sub := &addrSub{pubch: make(chan ma.Multiaddr), ctx: ctx}

out := make(chan ma.Multiaddr)

mgr.addrSubs[p] = append(mgr.addrSubs[p], sub)

baseaddrset := mgr.addrs[p]
var initial []ma.Multiaddr
for _, a := range baseaddrset {
initial = append(initial, a.Addr)
}

sort.Sort(addr.AddrList(initial))

go func(buffer []ma.Multiaddr) {
defer close(out)

sent := make(map[string]bool)
var outch chan ma.Multiaddr

for _, a := range buffer {
sent[a.String()] = true
}

var next ma.Multiaddr
if len(buffer) > 0 {
next = buffer[0]
buffer = buffer[1:]
outch = out
}

for {
select {
case outch <- next:
if len(buffer) > 0 {
next = buffer[0]
buffer = buffer[1:]
} else {
outch = nil
next = nil
}
case naddr := <-sub.pubch:
if sent[naddr.String()] {
continue
}

sent[naddr.String()] = true
if next == nil {
next = naddr
outch = out
} else {
buffer = append(buffer, naddr)
}
case <-ctx.Done():
mgr.removeSub(p, sub)
return
}
}

}(initial)

return out
}
6 changes: 6 additions & 0 deletions p2p/host/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
//ds "github.com/jbenet/go-datastore"
//dssync "github.com/jbenet/go-datastore/sync"
ma "github.com/jbenet/go-multiaddr"
"golang.org/x/net/context"
)

const (
Expand Down Expand Up @@ -61,6 +62,11 @@ type AddrBook interface {
// Addresses returns all known (and valid) addresses for a given
Addrs(p ID) []ma.Multiaddr

// AddrStream returns a channel that gets all addresses for a given
// peer sent on it. If new addresses are added after the call is made
// they will be sent along through the channel as well.
AddrStream(context.Context, ID) <-chan ma.Multiaddr

// ClearAddresses removes all previously stored addresses
ClearAddrs(p ID)
}
Expand Down
Loading

0 comments on commit 379ce35

Please sign in to comment.