Skip to content

Commit

Permalink
Merge pull request #564 from nats-io/fix_563
Browse files Browse the repository at this point in the history
[FIXED] Added jitter in the reconnect logic
  • Loading branch information
kozlovic committed May 13, 2020
2 parents 3f06f2e + b2be4bb commit 1b0a85a
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 53 deletions.
179 changes: 129 additions & 50 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,21 @@ import (

// Default Constants
const (
Version = "1.9.2"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"
Version = "1.9.3"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultReconnectJitter = 100 * time.Millisecond
DefaultReconnectJitterTLS = time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"
)

const (
Expand Down Expand Up @@ -127,15 +129,17 @@ func init() {
// GetDefaultOptions returns default configuration options for the client.
func GetDefaultOptions() Options {
return Options{
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
}
}

Expand Down Expand Up @@ -182,6 +186,12 @@ type SignatureHandler func([]byte) ([]byte, error)
// AuthTokenHandler is used to generate a new token.
type AuthTokenHandler func() string

// ReconnectDelayHandler is used to get from the user the desired
// delay the library should pause before attempting to reconnect
// again. Note that this is invoked after the library tried the
// whole list of URLs and failed to reconnect.
type ReconnectDelayHandler func(attempts int) time.Duration

// asyncCB is used to preserve order for async callbacks.
type asyncCB struct {
f func()
Expand Down Expand Up @@ -258,6 +268,24 @@ type Options struct {
// to a server that we were already connected to previously.
ReconnectWait time.Duration

// CustomReconnectDelayCB is invoked after the library tried every
// URL in the server list and failed to reconnect. It passes to the
// user the current number of attempts. This function returns the
// amount of time the library will sleep before attempting to reconnect
// again. It is strongly recommended that this value contains some
// jitter to prevent all connections to attempt reconnecting at the same time.
CustomReconnectDelayCB ReconnectDelayHandler

// ReconnectJitter sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when no TLS is used.
// Note that any jitter is capped with ReconnectJitterMax.
ReconnectJitter time.Duration

// ReconnectJitterTLS sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when TLS is used.
// Note that any jitter is capped with ReconnectJitterMax.
ReconnectJitterTLS time.Duration

// Timeout sets the timeout for a Dial operation on a connection.
Timeout time.Duration

Expand Down Expand Up @@ -411,6 +439,7 @@ type Conn struct {
ptmr *time.Timer
pout int
ar bool // abort reconnect
rqch chan struct{}

// New style response handler
respSub string // The wildcard subject
Expand Down Expand Up @@ -488,13 +517,12 @@ type Statistics struct {

// Tracks individual backend servers.
type srv struct {
url *url.URL
didConnect bool
reconnects int
lastAttempt time.Time
lastErr error
isImplicit bool
tlsName string
url *url.URL
didConnect bool
reconnects int
lastErr error
isImplicit bool
tlsName string
}

type serverInfo struct {
Expand Down Expand Up @@ -672,6 +700,24 @@ func MaxReconnects(max int) Option {
}
}

// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
func ReconnectJitter(jitter, jitterForTLS time.Duration) Option {
return func(o *Options) error {
o.ReconnectJitter = jitter
o.ReconnectJitterTLS = jitterForTLS
return nil
}
}

// CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option.
// See CustomReconnectDelayCB Option for more details.
func CustomReconnectDelay(cb ReconnectDelayHandler) Option {
return func(o *Options) error {
o.CustomReconnectDelayCB = cb
return nil
}
}

// PingInterval is an Option to set the period for client ping commands.
func PingInterval(t time.Duration) Option {
return func(o *Options) error {
Expand Down Expand Up @@ -1254,8 +1300,6 @@ func (nc *Conn) createConn() (err error) {
}
if _, cur := nc.currentServer(); cur == nil {
return ErrNoServers
} else {
cur.lastAttempt = time.Now()
}

// We will auto-expand host names if they resolve to multiple IPs
Expand Down Expand Up @@ -1398,6 +1442,7 @@ func (nc *Conn) setup() {
nc.pongs = make([]chan struct{}, 0, 8)

nc.fch = make(chan struct{}, flushChanSize)
nc.rqch = make(chan struct{})

// Setup scratch outbound buffer for PUB
pub := nc.scratch[:len(_PUB_P_)]
Expand Down Expand Up @@ -1820,33 +1865,63 @@ func (nc *Conn) doReconnect(err error) {
// This is used to wait on go routines exit if we start them in the loop
// but an error occurs after that.
waitForGoRoutines := false
var rt *time.Timer
// Channel used to kick routine out of sleep when conn is closed.
rqch := nc.rqch
// Counter that is increased when the whole list of servers has been tried.
var wlf int

var jitter time.Duration
var rw time.Duration
// If a custom reconnect delay handler is set, this takes precedence.
crd := nc.Opts.CustomReconnectDelayCB
if crd == nil {
rw = nc.Opts.ReconnectWait
// TODO: since we sleep only after the whole list has been tried, we can't
// rely on individual *srv to know if it is a TLS or non-TLS url.
// We have to pick which type of jitter to use, for now, we use these hints:
jitter = nc.Opts.ReconnectJitter
if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
jitter = nc.Opts.ReconnectJitterTLS
}
}

for len(nc.srvPool) > 0 {
for i := 0; len(nc.srvPool) > 0; {
cur, err := nc.selectNextServer()
if err != nil {
nc.err = err
break
}

sleepTime := int64(0)

// Sleep appropriate amount of time before the
// connection attempt if connecting to same server
// we just got disconnected from..
if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait {
sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt))
}

// On Windows, createConn() will take more than a second when no
// server is running at that address. So it could be that the
// time elapsed between reconnect attempts is always > than
// the set option. Release the lock to give a chance to a parallel
// nc.Close() to break the loop.
doSleep := i+1 >= len(nc.srvPool)
nc.mu.Unlock()
if sleepTime <= 0 {

if !doSleep {
i++
// Release the lock to give a chance to a concurrent nc.Close() to break the loop.
runtime.Gosched()
} else {
time.Sleep(time.Duration(sleepTime))
i = 0
var st time.Duration
if crd != nil {
wlf++
st = crd(wlf)
} else {
st = rw
if jitter > 0 {
st += time.Duration(rand.Int63n(int64(jitter)))
}
}
if rt == nil {
rt = time.NewTimer(st)
} else {
rt.Reset(st)
}
select {
case <-rqch:
rt.Stop()
case <-rt.C:
}
}
// If the readLoop, etc.. go routines were started, wait for them to complete.
if waitForGoRoutines {
Expand Down Expand Up @@ -3663,9 +3738,13 @@ func (nc *Conn) close(status Status, doCBs bool, err error) {

// Kick the Go routines so they fall out.
nc.kickFlusher()
nc.mu.Unlock()

nc.mu.Lock()
// If the reconnect timer is waiting between a reconnect attempt,
// this will kick it out.
if nc.rqch != nil {
close(nc.rqch)
nc.rqch = nil
}

// Clear any queued pongs, e.g. pending flush calls.
nc.clearPendingFlushCalls()
Expand Down
Loading

0 comments on commit 1b0a85a

Please sign in to comment.