Skip to content

Commit

Permalink
[FIXED] Added jitter in the reconnect logic
Browse files Browse the repository at this point in the history
When library would wait for the reconnect wait interval, it will
now wait for a random time bounded by this value.
This prevents thundering herd reconnect issue, especially with
TLS.

Resolves #563

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed May 1, 2020
1 parent c48e770 commit 9027138
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 3 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
module github.com/nats-io/nats.go

require (
github.com/golang/protobuf v1.4.0
github.com/nats-io/jwt v0.3.2
github.com/nats-io/nats-server/v2 v2.1.6
github.com/nats-io/nkeys v0.1.4
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.21.0
)

go 1.13
22 changes: 22 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA=
github.com/nats-io/nats-server/v2 v2.1.6 h1:qAaHZaS8pRRNQLFaiBA1rq5WynyEGp9DFgmMfoaiXGY=
github.com/nats-io/nats-server/v2 v2.1.6/go.mod h1:BL1NOtaBQ5/y97djERRVWNouMW7GT3gxnmbE/eC8u8A=
github.com/nats-io/nats.go v1.9.2/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
Expand All @@ -12,4 +26,12 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
18 changes: 15 additions & 3 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ type Conn struct {
ptmr *time.Timer
pout int
ar bool // abort reconnect
rqch chan struct{}

// New style response handler
respSub string // The wildcard subject
Expand Down Expand Up @@ -1396,6 +1397,7 @@ func (nc *Conn) setup() {
nc.pongs = make([]chan struct{}, 0, 8)

nc.fch = make(chan struct{}, flushChanSize)
nc.rqch = make(chan struct{})

// Setup scratch outbound buffer for PUB
pub := nc.scratch[:len(_PUB_P_)]
Expand Down Expand Up @@ -1818,6 +1820,8 @@ func (nc *Conn) doReconnect(err error) {
// This is used to wait on go routines exit if we start them in the loop
// but an error occurs after that.
waitForGoRoutines := false
rt := time.NewTimer(0)
rqch := nc.rqch

for len(nc.srvPool) > 0 {
cur, err := nc.selectNextServer()
Expand All @@ -1844,7 +1848,11 @@ func (nc *Conn) doReconnect(err error) {
if sleepTime <= 0 {
runtime.Gosched()
} else {
time.Sleep(time.Duration(sleepTime))
rt.Reset(time.Duration(rand.Int63n(sleepTime)))
select {
case <-rqch:
case <-rt.C:
}
}
// If the readLoop, etc.. go routines were started, wait for them to complete.
if waitForGoRoutines {
Expand Down Expand Up @@ -3655,9 +3663,13 @@ func (nc *Conn) close(status Status, doCBs bool, err error) {

// Kick the Go routines so they fall out.
nc.kickFlusher()
nc.mu.Unlock()

nc.mu.Lock()
// If the reconnect timer is waiting between a reconnect attempt,
// this will kick it out.
if nc.rqch != nil {
close(nc.rqch)
nc.rqch = nil
}

// Clear any queued pongs, e.g. pending flush calls.
nc.clearPendingFlushCalls()
Expand Down
72 changes: 72 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2389,3 +2389,75 @@ func TestNoPanicOnSrvPoolSizeChanging(t *testing.T) {
}
wg.Wait()
}

func TestReconnectWaitRandom(t *testing.T) {
s := RunServerOnPort(TEST_PORT)
defer s.Shutdown()

dch := make(chan bool, 1)
nc, err := Connect(s.ClientURL(),
ReconnectWait(100*time.Millisecond),
DisconnectErrHandler(func(_ *Conn, err error) {
dch <- true
}),
)
if err != nil {
t.Fatalf("Error during connect: %v", err)
}
defer nc.Close()

nr := 25
total := time.Duration(0)
for i := 0; i < nr; i++ {
nc.mu.Lock()
nc.conn.Close()
nc.mu.Unlock()
if err := WaitTime(dch, time.Second); err != nil {
t.Fatal(err.Error())
}
start := time.Now()
nc.Flush()
// The time will count for the reconnect + the ping pong, so it could
// be that we are a bit over the 100ms max.
dur := time.Since(start)
total += dur
if dur > 150*time.Millisecond {
t.Fatalf("Waited too long: %v", dur)
}
}
if avg := total / time.Duration(nr); avg >= 95*time.Millisecond {
t.Fatalf("Average of reconnect suspiciously high: %v", avg)
}
nc.Close()

// Use a long reconnect wait
nc, err = Connect(s.ClientURL(),
ReconnectWait(100*time.Second),
DisconnectErrHandler(func(_ *Conn, err error) {
dch <- true
}),
)
if err != nil {
t.Fatalf("Error during connect: %v", err)
}
defer nc.Close()

// Cause a disconnect
nc.mu.Lock()
nc.conn.Close()
nc.mu.Unlock()
if err := WaitTime(dch, time.Second); err != nil {
t.Fatal(err.Error())
}
// Wait a bit for the reconnect loop to go into wait mode.
time.Sleep(50 * time.Millisecond)
// Now close and expect the reconnect go routine to return..
nc.Close()
// Wait a bit to give a chance for the go routine to exit.
time.Sleep(50 * time.Millisecond)
buf := make([]byte, 100000)
n := runtime.Stack(buf, true)
if strings.Contains(string(buf[:n]), "doReconnect") {
t.Fatalf("doReconnect go routine still running:\n%s", buf[:n])
}
}

0 comments on commit 9027138

Please sign in to comment.