Skip to content

Commit

Permalink
autorelay: remove old candidates
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jun 24, 2022
1 parent 445cad2 commit 23fb7e1
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 63 deletions.
132 changes: 83 additions & 49 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import (
const protoIDv2 = circuitv2_proto.ProtoIDv2Hop

func numRelays(h host.Host) int {
peers := make(map[peer.ID]struct{})
return len(usedRelays(h))
}

func usedRelays(h host.Host) []peer.ID {
m := make(map[peer.ID]struct{})
for _, addr := range h.Addrs() {
addr, comp := ma.SplitLast(addr)
if comp.Protocol().Code != ma.P_CIRCUIT { // not a relay addr
Expand All @@ -37,9 +41,13 @@ func numRelays(h host.Host) int {
if err != nil {
panic(err)
}
peers[id] = struct{}{}
m[id] = struct{}{}
}
return len(peers)
peers := make([]peer.ID, 0, len(m))
for id := range m {
peers = append(peers, id)
}
return peers
}

func newPrivateNode(t *testing.T, opts ...autorelay.Option) host.Host {
Expand Down Expand Up @@ -207,52 +215,6 @@ func TestWaitForCandidates(t *testing.T) {
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
}

func TestMaxCandidateAge(t *testing.T) {
const numCandidates = 3

// Precompute the candidates.
// Generating public-private key pairs might be too expensive to do it sync on CI.
candidates := make([]peer.AddrInfo, 0, 2*numCandidates)
for i := 0; i < 2*numCandidates; i++ {
r := newRelay(t)
t.Cleanup(func() { r.Close() })
candidates = append(candidates, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()})
}

var counter int32 // to be used atomically
cl := clock.NewMock()
h := newPrivateNode(t,
autorelay.WithPeerSource(func(num int) <-chan peer.AddrInfo {
c := atomic.AddInt32(&counter, 1)
require.LessOrEqual(t, int(c), 2, "expected the callback to only be called twice")
require.Equal(t, numCandidates, num)
peerChan := make(chan peer.AddrInfo, num)
defer close(peerChan)
for i := 0; i < num; i++ {
peerChan <- candidates[0]
candidates = candidates[1:]
}
return peerChan
}, time.Hour),
autorelay.WithMaxCandidates(numCandidates),
autorelay.WithNumRelays(1),
autorelay.WithMaxCandidateAge(time.Hour),
autorelay.WithBootDelay(0),
autorelay.WithClock(cl),
)
defer h.Close()

r1 := newRelay(t)
t.Cleanup(func() { r1.Close() })

require.Eventually(t, func() bool { return numRelays(h) == 1 }, 5*time.Second, 50*time.Millisecond)
require.Equal(t, 1, int(atomic.LoadInt32(&counter)))
cl.Add(40 * time.Minute)
require.Never(t, func() bool { return numRelays(h) > 1 }, 100*time.Millisecond, 25*time.Millisecond)
cl.Add(30 * time.Minute)
require.Eventually(t, func() bool { return atomic.LoadInt32(&counter) == 2 }, time.Second, 25*time.Millisecond)
}

func TestBackoff(t *testing.T) {
const backoff = 20 * time.Second
cl := clock.NewMock()
Expand Down Expand Up @@ -358,3 +320,75 @@ func TestRelayV1(t *testing.T) {
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
})
}

func TestMaxAge(t *testing.T) {
cl := clock.NewMock()

const num = 4
peerChan1 := make(chan peer.AddrInfo, num)
peerChan2 := make(chan peer.AddrInfo, num)
relays1 := make([]host.Host, 0, num)
relays2IDs := make([]peer.ID, 0, num)
for i := 0; i < num; i++ {
r1 := newRelay(t)
t.Cleanup(func() { r1.Close() })
peerChan1 <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
relays1 = append(relays1, r1)
r2 := newRelay(t)
t.Cleanup(func() { r2.Close() })
peerChan2 <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()}
relays2IDs = append(relays2IDs, r2.ID())
}

var counter int
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo {
counter++
switch counter {
case 1:
return peerChan1
case 2:
return peerChan2
default:
t.Fatal("unexpected call to PeerSource")
}
return nil
}, time.Second),
autorelay.WithNumRelays(1),
autorelay.WithMaxCandidates(num),
autorelay.WithBootDelay(0),
autorelay.WithMaxCandidateAge(20*time.Minute),
autorelay.WithClock(cl),
)
defer h.Close()

require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
relays := usedRelays(h)
require.Len(t, relays, 1)

cl.Add(21 * time.Minute)
// by now the 3 relays should have been garbage collected
close(peerChan1)
time.Sleep(250 * time.Millisecond) // give autorelay some time to set the timer to call the peer source again
cl.Add(time.Second)
var oldRelay peer.ID
for _, r := range relays1 {
if r.ID() == relays[0] {
oldRelay = r.ID()
t.Log("closing old relay", oldRelay)
r.Close()
}
}
require.NotEmpty(t, oldRelay)

require.Eventually(t, func() bool {
relays = usedRelays(h)
if len(relays) != 1 {
return false
}
return relays[0] != oldRelay
}, 3*time.Second, 100*time.Millisecond)

require.Len(t, relays, 1)
require.Contains(t, relays2IDs, relays[0])
}
6 changes: 6 additions & 0 deletions p2p/host/autorelay/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func WithNumRelays(n int) Option {
func WithMaxCandidates(n int) Option {
return func(c *config) error {
c.maxCandidates = n
if c.minCandidates > n {
c.minCandidates = n
}
return nil
}
}
Expand All @@ -137,6 +140,9 @@ func WithMinCandidates(n int) Option {
if len(c.staticRelays) > 0 {
return errStaticRelaysMinCandidates
}
if n > c.maxCandidates {
n = c.maxCandidates
}
c.minCandidates = n
c.setMinCandidates = true
return nil
Expand Down
40 changes: 26 additions & 14 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
)

type candidate struct {
added time.Time
supportsRelayV2 bool
ai peer.AddrInfo
}
Expand All @@ -63,14 +64,14 @@ type relayFinder struct {

candidateFound chan struct{} // receives every time we find a new relay candidate
candidateMx sync.Mutex
lastCandidateAdded time.Time
candidates map[peer.ID]*candidate
backoff map[peer.ID]time.Time
handleNewCandidateTrigger chan struct{} // cap: 1
// Any time _something_ hapens that might cause us to need new candidates.
// This could be
// * the disconnection of a relay
// * the failed attempt to obtain a reservation with a current candidate
// * a candidate is deleted due to its age
maybeRequestNewCandidates chan struct{} // cap: 1.

relayUpdated chan struct{}
Expand Down Expand Up @@ -134,6 +135,8 @@ func (rf *relayFinder) background(ctx context.Context) {
defer refreshTicker.Stop()
backoffTicker := rf.conf.clock.Ticker(rf.conf.backoff / 5)
defer backoffTicker.Stop()
oldCandidateTicker := rf.conf.clock.Ticker(rf.conf.maxCandidateAge / 5)
defer oldCandidateTicker.Stop()

for {
// when true, we need to identify push
Expand Down Expand Up @@ -182,6 +185,21 @@ func (rf *relayFinder) background(ctx context.Context) {
}
}
rf.candidateMx.Unlock()
case now := <-oldCandidateTicker.C:
var deleted bool
rf.candidateMx.Lock()
for id, cand := range rf.candidates {
fmt.Println(id, cand.added.Add(rf.conf.maxCandidateAge).Sub(now))
if !cand.added.Add(rf.conf.maxCandidateAge).After(now) {
deleted = true
log.Debugw("deleting candidate due to age", "id", id)
delete(rf.candidates, id)
}
}
rf.candidateMx.Unlock()
if deleted {
rf.notifyMaybeNeedNewCandidates()
}
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -216,14 +234,6 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
if numCandidates < rf.conf.minCandidates {
log.Debugw("not enough candidates. Resetting timer", "num", numCandidates, "desired", rf.conf.minCandidates)
timer.Reset(nextAllowedCallToPeerSource)
} else if !rf.lastCandidateAdded.IsZero() {
newestCandidateAge := now.Sub(rf.lastCandidateAdded)
log.Debugw("resetting timer. candidate will be too old in", "dur", rf.conf.maxCandidateAge-newestCandidateAge)
t := rf.conf.maxCandidateAge - newestCandidateAge
if t < nextAllowedCallToPeerSource {
t = nextAllowedCallToPeerSource
}
timer.Reset(t)
}
}

Expand Down Expand Up @@ -256,7 +266,6 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
log.Debugw("skipping node. Already have enough candidates", "id", pi.ID, "num", numCandidates, "max", rf.conf.maxCandidates)
continue
}
rf.lastCandidateAdded = rf.conf.clock.Now()
rf.refCount.Add(1)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -326,7 +335,11 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) {
return
}
log.Debugw("node supports relay protocol", "peer", pi.ID, "supports circuit v2", supportsV2)
rf.candidates[pi.ID] = &candidate{ai: pi, supportsRelayV2: supportsV2}
rf.candidates[pi.ID] = &candidate{
added: rf.conf.clock.Now(),
ai: pi,
supportsRelayV2: supportsV2,
}
rf.candidateMx.Unlock()

// Don't notify when we're using static relays. We need to process all entries first.
Expand Down Expand Up @@ -521,12 +534,11 @@ func (rf *relayFinder) connectToRelay(ctx context.Context, cand *candidate) (*ci
}
rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
delete(rf.candidates, id)
if failed {
rf.backoff[id] = rf.conf.clock.Now()
delete(rf.candidates, id)
return nil, err
}
return rsvp, nil
return rsvp, err
}

func (rf *relayFinder) refreshReservations(ctx context.Context, now time.Time) bool {
Expand Down

0 comments on commit 23fb7e1

Please sign in to comment.