Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added perPeerRateLimit env config #1580

Merged
merged 10 commits into from
Jun 14, 2022
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Config struct {

EnableHolePunching bool
HolePunchingOptions []holepunch.Option

FDLimit int
PerPeerLimit int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no way for a libp2p user to directly set values in this config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, forgot to add, update again.

}

func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
Expand Down Expand Up @@ -151,6 +154,12 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
if cfg.ResourceManager != nil {
opts = append(opts, swarm.WithResourceManager(cfg.ResourceManager))
}
if cfg.FDLimit != 0 {
opts = append(opts, swarm.WithFDLimit(cfg.FDLimit))
}
if cfg.PerPeerLimit != 0 {
opts = append(opts, swarm.WithPerPeerLimit(cfg.PerPeerLimit))
}
// TODO: Make the swarm implementation configurable.
return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
}
Expand Down
14 changes: 14 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,3 +477,17 @@ func WithDialTimeout(t time.Duration) Option {
return nil
}
}

func WithPerPeerLimit(perPeerLimit int) Option {
return func(cfg *Config) error {
cfg.PerPeerLimit = perPeerLimit
return nil
}
}

func WithFDLimit(fdLimit int) Option {
return func(cfg *Config) error {
cfg.FDLimit = fdLimit
return nil
}
}
12 changes: 0 additions & 12 deletions p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package swarm

import (
"context"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -47,16 +45,6 @@ type dialLimiter struct {

type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)

func newDialLimiter(df dialfunc) *dialLimiter {
fd := ConcurrentFdDials
if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" {
if n, err := strconv.ParseInt(env, 10, 32); err == nil {
fd = int(n)
}
}
return newDialLimiterWithParams(df, fd, DefaultPerPeerRateLimit)
}

func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
return &dialLimiter{
fdLimit: fdLimit,
Expand Down
22 changes: 21 additions & 1 deletion p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ func WithResourceManager(m network.ResourceManager) Option {
}
}

func WithPerPeerLimit(perPeerLimit int) Option {
return func(s *Swarm) error {
s.perPeerLimit = perPeerLimit
return nil
}
}

func WithFDLimit(fdLimit int) Option {
return func(s *Swarm) error {
s.fdLimit = fdLimit
return nil
}
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
Expand Down Expand Up @@ -141,6 +155,10 @@ type Swarm struct {
ctxCancel context.CancelFunc

bwc metrics.Reporter

// dial limiter
perPeerLimit int
fdLimit int
}

// NewSwarm constructs a Swarm.
Expand All @@ -153,6 +171,8 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm,
ctxCancel: cancel,
dialTimeout: defaultDialTimeout,
dialTimeoutLocal: defaultDialTimeoutLocal,
perPeerLimit: DefaultPerPeerRateLimit,
fdLimit: ConcurrentFdDials,
}

s.conns.m = make(map[peer.ID][]*Conn)
Expand All @@ -170,7 +190,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm,
}

s.dsync = newDialSync(s.dialWorkerLoop)
s.limiter = newDialLimiter(s.dialAddr)
s.limiter = newDialLimiterWithParams(s.dialAddr, s.fdLimit, s.perPeerLimit)
s.backf.init(s.ctx)
return s, nil
}
Expand Down