Skip to content

Commit

Permalink
misc(p2p/peerstat): remove pruneDeadline + update peer scoring mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Jul 10, 2024
1 parent 10c442b commit 39232eb
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 94 deletions.
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
Expand Down Expand Up @@ -69,6 +75,8 @@ require (
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
github.com/pion/ice/v2 v2.3.24 // indirect
Expand All @@ -85,6 +93,7 @@ require (
github.com/pion/transport/v2 v2.2.5 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pion/webrtc/v3 v3.2.40 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand All @@ -93,7 +102,10 @@ require (
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.44.0 // indirect
github.com/quic-go/webtransport-go v0.8.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.21.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8Nz
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down
40 changes: 14 additions & 26 deletions p2p/peer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ type peerStat struct {
sync.RWMutex
peerID peer.ID
// score is the average speed per single request
peerScore float32
// pruneDeadline specifies when disconnected peer will be removed if
// it does not return online.
pruneDeadline time.Time
peerScore int
}

// updateStats recalculates peer.score by averaging the last score
Expand All @@ -26,33 +23,28 @@ type peerStat struct {
// by dividing the amount by time, so the result score will represent how many bytes
// were retrieved in 1 millisecond. This value will then be averaged relative to the
// previous peerScore.
func (p *peerStat) updateStats(amount uint64, duration time.Duration) {
p.Lock()
defer p.Unlock()
func (p *peerStat) updateStats(amount uint64, duration time.Duration) int {
if amount == 0 && duration == 0 {
// decrease peerScore by 20% of the peer that failed the request by any reason.
// NOTE: peerScore will not be decreased if the score is less than 100.
p.peerScore -= p.peerScore / 100 * 20
return p.peerScore
}

averageSpeed := float32(amount)
if duration != 0 {
averageSpeed /= float32(duration.Milliseconds())
}
if p.peerScore == 0.0 {
p.peerScore = averageSpeed
return
p.peerScore = int(averageSpeed * 100)
return p.peerScore
}
p.peerScore = (p.peerScore + averageSpeed) / 2
}

// decreaseScore decreases peerScore by 20% of the peer that failed the request by any reason.
// NOTE: decreasing peerScore in one session will not affect its position in queue in another
// session(as we can have multiple sessions running concurrently).
// TODO(vgonkivs): to figure out the better scoring increments/decrements
func (p *peerStat) decreaseScore() {
p.Lock()
defer p.Unlock()

p.peerScore -= p.peerScore / 100 * 20
p.peerScore = (p.peerScore + int(averageSpeed*100)) / 2
return p.peerScore
}

// score reads a peer's latest score from the queue
func (p *peerStat) score() float32 {
func (p *peerStat) score() int {
p.RLock()
defer p.RUnlock()
return p.peerScore
Expand Down Expand Up @@ -123,10 +115,6 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue {
// in case if there are no peer available in current session, it blocks until
// the peer will be pushed in.
func (p *peerQueue) waitPop(ctx context.Context) *peerStat {
// TODO(vgonkivs): implement fallback solution for cases when peer queue is empty.
// As we discussed with @Wondertan there could be 2 possible solutions:
// * use libp2p.Discovery to find new peers outside peerTracker to request headers;
// * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore;
select {
case <-ctx.Done():
return &peerStat{}
Expand Down
4 changes: 2 additions & 2 deletions p2p/peer_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ func Test_StatDecreaseScore(t *testing.T) {
peerScore: 100,
}
// will decrease score by 20%
pStats.decreaseScore()
require.Equal(t, pStats.score(), float32(80.0))
pStats.updateStats(0, 0)
require.Equal(t, pStats.score(), 80)
}
75 changes: 14 additions & 61 deletions p2p/peer_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,67 +9,21 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
testpeer "github.com/libp2p/go-libp2p/core/test"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPeerTracker_GC(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

h := createMocknet(t, 1)

gcCycle = time.Millisecond * 200

connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)

pidstore := newDummyPIDStore()
p := newPeerTracker(h[0], connGater, pidstore, nil)

maxAwaitingTime = time.Millisecond

peerlist := generateRandomPeerlist(t, 10)
for i := 0; i < 10; i++ {
p.trackedPeers[peerlist[i]] = &peerStat{peerID: peerlist[i], peerScore: 0.5}
}

peerlist = generateRandomPeerlist(t, 4)
pid1 := peerlist[2]
pid2 := peerlist[3]

p.disconnectedPeers[pid1] = &peerStat{peerID: pid1, pruneDeadline: time.Now()}
p.disconnectedPeers[pid2] = &peerStat{peerID: pid2, pruneDeadline: time.Now().Add(time.Minute * 10)}
assert.True(t, len(p.trackedPeers) > 0)
assert.True(t, len(p.disconnectedPeers) > 0)

go p.track()
go p.gc()

time.Sleep(time.Millisecond * 500)

err = p.stop(ctx)
require.NoError(t, err)

require.Len(t, p.trackedPeers, 10)
require.Nil(t, p.disconnectedPeers[pid1])

// ensure good peers were dumped to store
peers, err := pidstore.Load(ctx)
require.NoError(t, err)
require.Equal(t, 10, len(peers))
}

func TestPeerTracker_BlockPeer(t *testing.T) {
h := createMocknet(t, 2)
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
p := newPeerTracker(h[0], connGater, nil, nil)
maxAwaitingTime = time.Millisecond
p := newPeerTracker(h[0], connGater, "private", nil, nil)
p.blockPeer(h[1].ID(), errors.New("test"))
require.Len(t, connGater.ListBlockedPeers(), 1)
require.True(t, connGater.ListBlockedPeers()[0] == h[1].ID())
Expand All @@ -82,26 +36,25 @@ func TestPeerTracker_Bootstrap(t *testing.T) {
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)

// mn := createMocknet(t, 10)
mn, err := mocknet.FullMeshConnected(10)
require.NoError(t, err)
hosts := make([]host.Host, 10)

for i := range hosts {
hosts[i], err = libp2p.New()
require.NoError(t, err)
hosts[i].SetStreamHandler(protocolID("private"), nil)
}

// store peers to peerstore
prevSeen := make([]peer.ID, 9)
for i, peer := range mn.Hosts()[1:] {
for i, peer := range hosts[1:] {
hosts[0].Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL)
prevSeen[i] = peer.ID()

// disconnect so they're not already connected on attempt to
// connect
err = mn.DisconnectPeers(mn.Hosts()[i].ID(), peer.ID())
require.NoError(t, err)
}
pidstore := newDummyPIDStore()
// only store 7 peers to pidstore, and use 2 as trusted
err = pidstore.Put(ctx, prevSeen[2:])
require.NoError(t, err)

tracker := newPeerTracker(mn.Hosts()[0], connGater, pidstore, nil)
tracker := newPeerTracker(hosts[0], connGater, "private", pidstore, nil)

go tracker.track()

Expand Down
4 changes: 2 additions & 2 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *session[H]) doRequest(
switch err {
case header.ErrNotFound, errEmptyResponse:
logFn = log.Debugw
stat.decreaseScore()
s.peerTracker.updateScore(stat, 0, 0)
default:
s.peerTracker.blockPeer(stat.peerID, err)
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func (s *session[H]) doRequest(
span.SetStatus(codes.Ok, "")

// update peer stats
stat.updateStats(size, took)
s.peerTracker.updateScore(stat, size, took)

// ensure that we received the correct amount of headers.
if remainingHeaders > 0 {
Expand Down
4 changes: 2 additions & 2 deletions p2p/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Test_Validate(t *testing.T) {
ses := newSession(
context.Background(),
nil,
&peerTracker{trackedPeers: make(map[peer.ID]*peerStat)},
&peerTracker{trackedPeers: make(map[peer.ID]struct{})},
"", time.Second, nil,
withValidation(head),
)
Expand All @@ -45,7 +45,7 @@ func Test_ValidateFails(t *testing.T) {
ses := newSession(
context.Background(),
nil,
&peerTracker{trackedPeers: make(map[peer.ID]*peerStat)},
&peerTracker{trackedPeers: make(map[peer.ID]struct{})},
"", time.Second, nil,
withValidation(head),
)
Expand Down

0 comments on commit 39232eb

Please sign in to comment.