Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connmgr: use clock interface #1720

Merged
merged 1 commit into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -27,6 +28,8 @@ var log = logging.Logger("connmgr")
type BasicConnMgr struct {
*decayer

clock clock.Clock

cfg *config
segments segments

Expand Down Expand Up @@ -74,15 +77,15 @@ func (ss *segments) countPeers() (count int) {
return count
}

func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
func (s *segment) tagInfoFor(p peer.ID, now time.Time) *peerInfo {
pi, ok := s.peers[p]
if ok {
return pi
}
// create a temporary peer to buffer early tags before the Connected notification arrives.
pi = &peerInfo{
id: p,
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
firstSeen: now, // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
Expand All @@ -102,6 +105,7 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {
lowWater: low,
gracePeriod: time.Minute,
silencePeriod: 10 * time.Second,
clock: clock.New(),
}
for _, o := range opts {
if err := o(cfg); err != nil {
Expand All @@ -116,6 +120,7 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {

cm := &BasicConnMgr{
cfg: cfg,
clock: cfg.clock,
protected: make(map[peer.ID]map[string]struct{}, 16),
segments: func() (ret segments) {
for i := range ret {
Expand Down Expand Up @@ -167,7 +172,7 @@ func (cm *BasicConnMgr) memoryEmergency() {

// finally, update the last trim time.
cm.lastTrimMu.Lock()
cm.lastTrim = time.Now()
cm.lastTrim = cm.clock.Now()
cm.lastTrimMu.Unlock()
}

Expand Down Expand Up @@ -311,7 +316,7 @@ func (cm *BasicConnMgr) background() {
interval = cm.cfg.silencePeriod
}

ticker := time.NewTicker(interval)
ticker := cm.clock.Ticker(interval)
defer ticker.Stop()

for {
Expand All @@ -336,7 +341,7 @@ func (cm *BasicConnMgr) doTrim() {
if count == atomic.LoadUint64(&cm.trimCount) {
cm.trim()
cm.lastTrimMu.Lock()
cm.lastTrim = time.Now()
cm.lastTrim = cm.clock.Now()
cm.lastTrimMu.Unlock()
atomic.AddUint64(&cm.trimCount, 1)
}
Expand Down Expand Up @@ -427,7 +432,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {

candidates := make(peerInfos, 0, cm.segments.countPeers())
var ncandidates int
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
gracePeriodStart := cm.clock.Now().Add(-cm.cfg.gracePeriod)

cm.plk.RLock()
for _, s := range cm.segments {
Expand Down Expand Up @@ -529,7 +534,7 @@ func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
s.Lock()
defer s.Unlock()

pi := s.tagInfoFor(p)
pi := s.tagInfoFor(p, cm.clock.Now())

// Update the total value of the peer.
pi.value += val - pi.tags[tag]
Expand Down Expand Up @@ -559,7 +564,7 @@ func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
s.Lock()
defer s.Unlock()

pi := s.tagInfoFor(p)
pi := s.tagInfoFor(p, cm.clock.Now())

oldval := pi.tags[tag]
newval := upsert(oldval)
Expand Down Expand Up @@ -629,7 +634,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
if !ok {
pinfo = &peerInfo{
id: id,
firstSeen: time.Now(),
firstSeen: cm.clock.Now(),
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
Expand All @@ -640,7 +645,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
// Connected notification arrived: flip the temporary flag, and update the firstSeen
// timestamp to the real one.
pinfo.temp = false
pinfo.firstSeen = time.Now()
pinfo.firstSeen = cm.clock.Now()
}

_, ok = pinfo.conns[c]
Expand All @@ -649,7 +654,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
return
}

pinfo.conns[c] = time.Now()
pinfo.conns[c] = cm.clock.Now()
atomic.AddInt32(&cm.connCount, 1)
}

Expand Down
13 changes: 8 additions & 5 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -411,7 +412,8 @@ func TestDisconnected(t *testing.T) {

func TestGracePeriod(t *testing.T) {
const gp = 100 * time.Millisecond
cm, err := NewConnManager(10, 20, WithGracePeriod(gp), WithSilencePeriod(time.Hour))
mockClock := clock.NewMock()
cm, err := NewConnManager(10, 20, WithGracePeriod(gp), WithSilencePeriod(time.Hour), WithClock(mockClock))
require.NoError(t, err)
defer cm.Close()

Expand All @@ -425,7 +427,7 @@ func TestGracePeriod(t *testing.T) {
conns = append(conns, rc)
not.Connected(nil, rc)

time.Sleep(2 * gp)
mockClock.Add(2 * gp)

if rc.(*tconn).isClosed() {
t.Fatal("expected conn to remain open")
Expand All @@ -447,7 +449,7 @@ func TestGracePeriod(t *testing.T) {
}
}

time.Sleep(200 * time.Millisecond)
mockClock.Add(200 * time.Millisecond)

cm.TrimOpenConns(context.Background())

Expand All @@ -465,7 +467,8 @@ func TestGracePeriod(t *testing.T) {

// see https://github.com/libp2p/go-libp2p-connmgr/issues/23
func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
cm, err := NewConnManager(10, 20, WithGracePeriod(0))
mockClock := clock.NewMock()
cm, err := NewConnManager(10, 20, WithGracePeriod(0), WithClock(mockClock))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
Expand All @@ -480,7 +483,7 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
}

// wait for a few seconds
time.Sleep(time.Second * 3)
mockClock.Add(3 * time.Second)

// only the first trim is allowed in; make sure we close at most 20 connections, not all of them.
var closed int
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/connmgr/decay.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (d *decayer) process() {
s := d.mgr.segments.get(peer)
s.Lock()

p := s.tagInfoFor(peer)
p := s.tagInfoFor(peer, d.clock.Now())
v, ok := p.decaying[tag]
if !ok {
v = &connmgr.DecayingValue{
Expand All @@ -244,7 +244,7 @@ func (d *decayer) process() {
s := d.mgr.segments.get(rm.peer)
s.Lock()

p := s.tagInfoFor(rm.peer)
p := s.tagInfoFor(rm.peer, d.clock.Now())
v, ok := p.decaying[rm.tag]
if !ok {
s.Unlock()
Expand Down
11 changes: 11 additions & 0 deletions p2p/net/connmgr/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package connmgr
import (
"errors"
"time"

"github.com/benbjohnson/clock"
)

// config is the configuration struct for the basic connection manager.
Expand All @@ -13,6 +15,7 @@ type config struct {
silencePeriod time.Duration
decayer *DecayerCfg
emergencyTrim bool
clock clock.Clock
}

// Option represents an option for the basic connection manager.
Expand All @@ -26,6 +29,14 @@ func DecayerConfig(opts *DecayerCfg) Option {
}
}

// WithClock sets the internal clock impl
func WithClock(c clock.Clock) Option {
return func(cfg *config) error {
cfg.clock = c
return nil
}
}

// WithGracePeriod sets the grace period.
// The grace period is the time a newly opened connection is given before it becomes
// subject to pruning.
Expand Down