Skip to content

Commit

Permalink
Merge pull request #29 from libp2p/feat/persist_peerstore
Browse files Browse the repository at this point in the history
Persist peerstore via Datastore
  • Loading branch information
bigs committed Aug 27, 2018
2 parents 72e0087 + 7660dfa commit 791a2fa
Show file tree
Hide file tree
Showing 6 changed files with 702 additions and 101 deletions.
116 changes: 75 additions & 41 deletions p2p/host/peerstore/addr_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"sync"
"time"

peer "github.com/libp2p/go-libp2p-peer"
addr "github.com/libp2p/go-libp2p-peerstore/addr"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore/addr"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -60,7 +60,7 @@ type AddrManager struct {
addrmu sync.Mutex // guards addrs
addrs map[peer.ID]addrSlice

addrSubs map[peer.ID][]*addrSub
subManager *AddrSubManager
}

// ensures the AddrManager is initialized.
Expand All @@ -69,8 +69,8 @@ func (mgr *AddrManager) init() {
if mgr.addrs == nil {
mgr.addrs = make(map[peer.ID]addrSlice)
}
if mgr.addrSubs == nil {
mgr.addrSubs = make(map[peer.ID][]*addrSub)
if mgr.subManager == nil {
mgr.subManager = NewAddrSubManager()
}
}

Expand Down Expand Up @@ -114,8 +114,6 @@ func (mgr *AddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
amap[string(ea.Addr.Bytes())] = ea
}

subs := mgr.addrSubs[p]

// only expand ttls
exp := time.Now().Add(ttl)
for _, addr := range addrs {
Expand All @@ -129,9 +127,7 @@ func (mgr *AddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
if !found || exp.After(a.Expires) {
amap[addrstr] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}

for _, sub := range subs {
sub.pubAddr(addr)
}
mgr.subManager.BroadcastAddr(p, addr)
}
}
newAddrs := make([]expiringAddr, 0, len(amap))
Expand Down Expand Up @@ -161,8 +157,6 @@ func (mgr *AddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
amap[string(ea.Addr.Bytes())] = ea
}

subs := mgr.addrSubs[p]

exp := time.Now().Add(ttl)
for _, addr := range addrs {
if addr == nil {
Expand All @@ -175,9 +169,7 @@ func (mgr *AddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
if ttl > 0 {
amap[addrs] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}

for _, sub := range subs {
sub.pubAddr(addr)
}
mgr.subManager.BroadcastAddr(p, addr)
} else {
delete(amap, addrs)
}
Expand Down Expand Up @@ -248,7 +240,7 @@ func (mgr *AddrManager) Addrs(p peer.ID) []ma.Multiaddr {
return good
}

// ClearAddresses removes all previously stored addresses
// ClearAddrs removes all previously stored addresses
func (mgr *AddrManager) ClearAddrs(p peer.ID) {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
Expand All @@ -257,57 +249,85 @@ func (mgr *AddrManager) ClearAddrs(p peer.ID) {
delete(mgr.addrs, p)
}

func (mgr *AddrManager) removeSub(p peer.ID, s *addrSub) {
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
subs := mgr.addrSubs[p]
mgr.init()

baseaddrslice := mgr.addrs[p]
initial := make([]ma.Multiaddr, 0, len(baseaddrslice))
for _, a := range baseaddrslice {
initial = append(initial, a.Addr)
}

return mgr.subManager.AddrStream(ctx, p, initial)
}

// An abstracted, pub-sub manager for address streams. Extracted from
// AddrManager in order to support additional implementations.
type AddrSubManager struct {
mu sync.RWMutex
subs map[peer.ID][]*addrSub
}

// NewAddrSubManager initializes an AddrSubManager.
func NewAddrSubManager() *AddrSubManager {
return &AddrSubManager{
subs: make(map[peer.ID][]*addrSub),
}
}

// Used internally by the address stream coroutine to remove a subscription
// from the manager.
func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
subs := mgr.subs[p]
if len(subs) == 1 {
if subs[0] != s {
return
}
delete(mgr.addrSubs, p)
delete(mgr.subs, p)
return
}
for i, v := range subs {
if v == s {
subs[i] = subs[len(subs)-1]
subs[len(subs)-1] = nil
mgr.addrSubs[p] = subs[:len(subs)-1]
mgr.subs[p] = subs[:len(subs)-1]
return
}
}
}

type addrSub struct {
pubch chan ma.Multiaddr
lk sync.Mutex
buffer []ma.Multiaddr
ctx context.Context
}
// BroadcastAddr broadcasts a new address to all subscribed streams.
func (mgr *AddrSubManager) BroadcastAddr(p peer.ID, addr ma.Multiaddr) {
mgr.mu.RLock()
defer mgr.mu.RUnlock()

func (s *addrSub) pubAddr(a ma.Multiaddr) {
select {
case s.pubch <- a:
case <-s.ctx.Done():
if subs, ok := mgr.subs[p]; ok {
for _, sub := range subs {
sub.pubAddr(addr)
}
}
}

func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
mgr.init()

// AddrStream creates a new subscription for a given peer ID, pre-populating the
// channel with any addresses we might already have on file.
func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []ma.Multiaddr) <-chan ma.Multiaddr {
sub := &addrSub{pubch: make(chan ma.Multiaddr), ctx: ctx}

out := make(chan ma.Multiaddr)

mgr.addrSubs[p] = append(mgr.addrSubs[p], sub)

baseaddrslice := mgr.addrs[p]
initial := make([]ma.Multiaddr, 0, len(baseaddrslice))
for _, a := range baseaddrslice {
initial = append(initial, a.Addr)
mgr.mu.Lock()
if _, ok := mgr.subs[p]; ok {
mgr.subs[p] = append(mgr.subs[p], sub)
} else {
mgr.subs[p] = []*addrSub{sub}
}
mgr.mu.Unlock()

sort.Sort(addr.AddrList(initial))

Expand Down Expand Up @@ -360,3 +380,17 @@ func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mul

return out
}

type addrSub struct {
pubch chan ma.Multiaddr
lk sync.Mutex
buffer []ma.Multiaddr
ctx context.Context
}

func (s *addrSub) pubAddr(a ma.Multiaddr) {
select {
case s.pubch <- a:
case <-s.ctx.Done():
}
}
Loading

0 comments on commit 791a2fa

Please sign in to comment.