Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Jul 10, 2024
1 parent c5112c3 commit 7ae6615
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 60 deletions.
11 changes: 8 additions & 3 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ func (ex *Exchange[H]) Start(ctx context.Context) error {
ex.ctx, ex.cancel = context.WithCancel(context.Background())
log.Infow("client: starting client", "protocol ID", ex.protocolID)

go ex.peerTracker.track()
err := ex.peerTracker.track()
if err != nil {
return err
}

// bootstrap the peerTracker with trusted peers as well as previously seen
// peers if provided.
Expand Down Expand Up @@ -150,9 +153,11 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
// their Head and verify against the given trusted header.
useTrackedPeers := !reqParams.TrustedHead.IsZero()
if useTrackedPeers {
trackedPeers := ex.peerTracker.getPeers(maxUntrustedHeadRequests)
trackedPeers := ex.peerTracker.peers(maxUntrustedHeadRequests)
if len(trackedPeers) > 0 {
peers = trackedPeers
peers = transform(trackedPeers, func(p *peerStat) peer.ID {
return p.peerID
})
log.Debugw("requesting head from tracked peers", "amount", len(peers))
}
}
Expand Down
10 changes: 10 additions & 0 deletions p2p/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,13 @@ func convertStatusCodeToError(code p2p_pb.StatusCode) error {
return fmt.Errorf("unknown status code %d", code)
}
}

// transform applies a provided function to each element of the input slice,
// producing a new slice with the results of the function.
func transform[T, U any](ts []T, f func(T) U) []U {
us := make([]U, len(ts))
for i := range ts {
us[i] = f(ts[i])
}
return us
}
106 changes: 52 additions & 54 deletions p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type peerTracker struct {

peerLk sync.RWMutex
// trackedPeers contains active peers that we can request to.
// we cache the peer once they disconnect,
// so we can guarantee that peerQueue will only contain active peers
trackedPeers map[libpeer.ID]struct{}

Expand Down Expand Up @@ -101,72 +100,59 @@ func (p *peerTracker) connectToPeer(ctx context.Context, peer libpeer.ID) {
}
}

func (p *peerTracker) track() {
defer func() {
p.done <- struct{}{}
}()
// track creates subscriptions for different types of libp2p.Events to efficiently handle peers.
func (p *peerTracker) track() error {
evtBus := p.host.EventBus()

connSubs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
connSubs, err := evtBus.Subscribe(&event.EvtPeerConnectednessChanged{})
if err != nil {
log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err)
return
return err
}

identifySub, err := p.host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
identifySub, err := evtBus.Subscribe(&event.EvtPeerIdentificationCompleted{})
if err != nil {
log.Errorw("subscribing to EvtPeerIdentificationCompleted", "err", err)
return
return err
}

protocolSub, err := p.host.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
protocolSub, err := evtBus.Subscribe(&event.EvtPeerProtocolsUpdated{})
if err != nil {
log.Errorw("subscribing to EvtPeerProtocolsUpdated", "err", err)
return
return err
}

for {
select {
case <-p.ctx.Done():
err = connSubs.Close()
errors.Join(err, identifySub.Close(), protocolSub.Close())
if err != nil {
log.Errorw("closing subscriptions", "err", err)
go func() {
for {
select {
case <-p.ctx.Done():
if err := closeSubscriptions(connSubs, identifySub, protocolSub); err != nil {
log.Errorw("closing subscriptions", "err", err)
}
p.done <- struct{}{}
return
case connSubscription := <-connSubs.Out():
ev := connSubscription.(event.EvtPeerConnectednessChanged)
if network.NotConnected == ev.Connectedness {
p.disconnected(ev.Peer)
}
case identSubscription := <-identifySub.Out():
ev := identSubscription.(event.EvtPeerIdentificationCompleted)
if slices.Contains(ev.Protocols, p.protocolID) {
p.connected(ev.Peer)
}
case protocolSubscription := <-protocolSub.Out():
ev := protocolSubscription.(event.EvtPeerProtocolsUpdated)
if slices.Contains(ev.Removed, p.protocolID) {
p.disconnected(ev.Peer)
}
if slices.Contains(ev.Added, p.protocolID) {
p.connected(ev.Peer)
}
}
return
case connSubscription := <-connSubs.Out():
ev := connSubscription.(event.EvtPeerConnectednessChanged)
if network.NotConnected == ev.Connectedness {
p.disconnected(ev.Peer)
}
case identSubscription := <-identifySub.Out():
ev := identSubscription.(event.EvtPeerIdentificationCompleted)
if slices.Contains(ev.Protocols, p.protocolID) {
p.connected(ev.Peer)
}
case protocolSubscription := <-protocolSub.Out():
ev := protocolSubscription.(event.EvtPeerProtocolsUpdated)
if slices.Contains(ev.Removed, p.protocolID) {
p.disconnected(ev.Peer)
break
}
p.connected(ev.Peer)
}
}
}

// getPeers returns the tracker's currently tracked peers up to the `max`.
func (p *peerTracker) getPeers(max int) []libpeer.ID {
p.peerLk.RLock()
defer p.peerLk.RUnlock()

peers := make([]libpeer.ID, 0, max)
for peer := range p.trackedPeers {
peers = append(peers, peer)
if len(peers) == max {
break
}
}
return peers
}()
return nil
}

func (p *peerTracker) connected(pID libpeer.ID) {
Expand Down Expand Up @@ -215,17 +201,21 @@ func (p *peerTracker) disconnected(pID libpeer.ID) {
p.metrics.peersDisconnected(1)
}

func (p *peerTracker) peers() []*peerStat {
// peers returns the tracker's currently tracked peers up to the `max`.
func (p *peerTracker) peers(max int) []*peerStat {
p.peerLk.RLock()
defer p.peerLk.RUnlock()

peers := make([]*peerStat, 0)
peers := make([]*peerStat, 0, max)
for peerID := range p.trackedPeers {
score := 0
if info := p.host.ConnManager().GetTagInfo(peerID); info != nil {
score = info.Tags[string(p.protocolID)]
}
peers = append(peers, &peerStat{peerID: peerID, peerScore: score})
if len(peers) == max {
break
}
}
return peers
}
Expand Down Expand Up @@ -296,3 +286,11 @@ func (p *peerTracker) updateScore(stats *peerStat, size uint64, duration time.Du
score := stats.updateStats(size, duration)
p.host.ConnManager().TagPeer(stats.peerID, string(p.protocolID), score)
}

func closeSubscriptions(subs ...event.Subscription) error {
var err error
for _, sub := range subs {
err = errors.Join(err, sub.Close())
}
return err
}
5 changes: 3 additions & 2 deletions p2p/peer_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ func TestPeerTracker_Bootstrap(t *testing.T) {
require.NoError(t, err)
tracker := newPeerTracker(hosts[0], connGater, "private", pidstore, nil)

go tracker.track()
err = tracker.track()
require.NoError(t, err)

err = tracker.bootstrap(prevSeen[:2])
require.NoError(t, err)

assert.Eventually(t, func() bool {
return len(tracker.getPeers(7)) > 0
return len(tracker.peers(7)) > 0
}, time.Millisecond*500, time.Millisecond*100)
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newSession[H header.Header[H]](
metrics: metrics,
}

peers := peerTracker.peers()
peers := peerTracker.peers(len(peerTracker.trackedPeers))
if len(peers) == 0 {
return nil, errors.New("empty peer tracker")
}
Expand Down

0 comments on commit 7ae6615

Please sign in to comment.