Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Added jitter in the reconnect logic #564

Merged
merged 2 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 123 additions & 41 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,21 @@ import (

// Default Constants
const (
Version = "1.9.2"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"
Version = "1.9.3"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultReconnectJitter = 100 * time.Millisecond
DefaultReconnectJitterTLS = time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"
)

const (
Expand Down Expand Up @@ -127,15 +129,17 @@ func init() {
// GetDefaultOptions returns default configuration options for the client.
func GetDefaultOptions() Options {
return Options{
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
}
}

Expand Down Expand Up @@ -182,6 +186,12 @@ type SignatureHandler func([]byte) ([]byte, error)
// AuthTokenHandler is used to generate a new token.
type AuthTokenHandler func() string

// ReconnectDelayHandler is used to get from the user the desired
// delay the library should pause before attempting to reconnect
// again. Note that this is invoked after the library tried the
// whole list of URLs and failed to reconnect.
type ReconnectDelayHandler func(attempts int) time.Duration

// asyncCB is used to preserve order for async callbacks.
type asyncCB struct {
f func()
Expand Down Expand Up @@ -258,6 +268,24 @@ type Options struct {
// to a server that we were already connected to previously.
ReconnectWait time.Duration

// CustomReconnectDelayCB is invoked after the library tried every
// URL in the server list and failed to reconnect. It passes to the
// user the current number of attempts. This function returns the
// amount of time the library will sleep before attempting to reconnect
// again. It is strongly recommended that this value contains some
// jitter to prevent all connections to attempt reconnecting at the same time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sweet 👍

CustomReconnectDelayCB ReconnectDelayHandler

// ReconnectJitter sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when no TLS is used.
// Note that any jitter is capped with ReconnectJitterMax.
ReconnectJitter time.Duration

// ReconnectJitterTLS sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when TLS is used.
// Note that any jitter is capped with ReconnectJitterMax.
ReconnectJitterTLS time.Duration

// Timeout sets the timeout for a Dial operation on a connection.
Timeout time.Duration

Expand Down Expand Up @@ -411,6 +439,7 @@ type Conn struct {
ptmr *time.Timer
pout int
ar bool // abort reconnect
rqch chan struct{}

// New style response handler
respSub string // The wildcard subject
Expand Down Expand Up @@ -672,6 +701,24 @@ func MaxReconnects(max int) Option {
}
}

// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
func ReconnectJitter(jitter, jitterForTLS time.Duration) Option {
return func(o *Options) error {
o.ReconnectJitter = jitter
o.ReconnectJitterTLS = jitterForTLS
return nil
}
}

// CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option.
// See CustomReconnectDelayCB Option for more details.
func CustomReconnectDelay(cb ReconnectDelayHandler) Option {
return func(o *Options) error {
o.CustomReconnectDelayCB = cb
return nil
}
}

// PingInterval is an Option to set the period for client ping commands.
func PingInterval(t time.Duration) Option {
return func(o *Options) error {
Expand Down Expand Up @@ -1396,6 +1443,7 @@ func (nc *Conn) setup() {
nc.pongs = make([]chan struct{}, 0, 8)

nc.fch = make(chan struct{}, flushChanSize)
nc.rqch = make(chan struct{})

// Setup scratch outbound buffer for PUB
pub := nc.scratch[:len(_PUB_P_)]
Expand Down Expand Up @@ -1818,33 +1866,63 @@ func (nc *Conn) doReconnect(err error) {
// This is used to wait on go routines exit if we start them in the loop
// but an error occurs after that.
waitForGoRoutines := false
var rt *time.Timer
// Channel used to kick routine out of sleep when conn is closed.
rqch := nc.rqch
// Counter that is increased when the whole list of servers has been tried.
var wlf int

var jitter time.Duration
var rw time.Duration
// If a custom reconnect delay handler is set, this takes precedence.
crd := nc.Opts.CustomReconnectDelayCB
if crd == nil {
rw = nc.Opts.ReconnectWait
// TODO: since we sleep only after the whole list has been tried, we can't
// rely on individual *srv to know if it is a TLS or non-TLS url.
// We have to pick which type of jitter to use, for now, we use these hints:
jitter = nc.Opts.ReconnectJitter
if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
jitter = nc.Opts.ReconnectJitterTLS
}
}

for len(nc.srvPool) > 0 {
for i := 0; len(nc.srvPool) > 0; {
cur, err := nc.selectNextServer()
if err != nil {
nc.err = err
break
}

sleepTime := int64(0)

// Sleep appropriate amount of time before the
// connection attempt if connecting to same server
// we just got disconnected from..
if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait {
sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt))
}

// On Windows, createConn() will take more than a second when no
// server is running at that address. So it could be that the
// time elapsed between reconnect attempts is always > than
// the set option. Release the lock to give a chance to a parallel
// nc.Close() to break the loop.
doSleep := i+1 >= len(nc.srvPool)
nc.mu.Unlock()
if sleepTime <= 0 {

if !doSleep {
i++
// Release the lock to give a chance to a concurrent nc.Close() to break the loop.
runtime.Gosched()
} else {
time.Sleep(time.Duration(sleepTime))
i = 0
var st time.Duration
if crd != nil {
wlf++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we count wlf regardless of crd? Noop now but we may use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will move it out of the condition when we actually need it, will be easy to do.

st = crd(wlf)
} else {
st = rw
if jitter > 0 {
st += time.Duration(rand.Int63n(int64(jitter)))
}
}
if rt == nil {
rt = time.NewTimer(st)
} else {
rt.Reset(st)
}
select {
case <-rqch:
rt.Stop()
case <-rt.C:
}
}
// If the readLoop, etc.. go routines were started, wait for them to complete.
if waitForGoRoutines {
Expand Down Expand Up @@ -3655,9 +3733,13 @@ func (nc *Conn) close(status Status, doCBs bool, err error) {

// Kick the Go routines so they fall out.
nc.kickFlusher()
nc.mu.Unlock()

nc.mu.Lock()
// If the reconnect timer is waiting between a reconnect attempt,
// this will kick it out.
if nc.rqch != nil {
close(nc.rqch)
nc.rqch = nil
}

// Clear any queued pongs, e.g. pending flush calls.
nc.clearPendingFlushCalls()
Expand Down
Loading