Skip to content

Commit

Permalink
GODRIVER-3156 Detect and discard closed idle connections. (#1815) [re…
Browse files Browse the repository at this point in the history
…lease/1.17] (#1841)

Co-authored-by: Matt Dale <9760375+matthewdale@users.noreply.github.com>
  • Loading branch information
1 parent b473d1b commit b45e5d9
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 169 deletions.
61 changes: 51 additions & 10 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -55,7 +56,7 @@ type connection struct {
nc net.Conn // When nil, the connection is closed.
addr address.Address
idleTimeout time.Duration
idleDeadline atomic.Value // Stores a time.Time
idleStart atomic.Value // Stores a time.Time
readTimeout time.Duration
writeTimeout time.Duration
desc description.Server
Expand Down Expand Up @@ -561,25 +562,65 @@ func (c *connection) close() error {
return err
}

// closed returns true if the connection has been closed by the driver.
func (c *connection) closed() bool {
return atomic.LoadInt64(&c.state) == connDisconnected
}

// isAlive returns true if the connection is alive and ready to be used for an
// operation.
//
// Note that the liveness check can be slow (at least 1ms), so isAlive only
// checks the liveness of the connection if it's been idle for at least 10
// seconds. For frequently in-use connections, a network error during an
// operation will be the first indication of a dead connection.
func (c *connection) isAlive() bool {
if c.nc == nil {
return false
}

// If the connection has been idle for less than 10 seconds, skip the
// liveness check.
//
// The 10-seconds idle bypass is based on the liveness check implementation
// in the Python Driver. That implementation uses 1 second as the idle
// threshold, but we chose to be more conservative in the Go Driver because
// this is new behavior with unknown side-effects. See
// https://github.com/mongodb/mongo-python-driver/blob/e6b95f65953e01e435004af069a6976473eaf841/pymongo/synchronous/pool.py#L983-L985
idleStart, ok := c.idleStart.Load().(time.Time)
if !ok || idleStart.Add(10*time.Second).After(time.Now()) {
return true
}

// Set a 1ms read deadline and attempt to read 1 byte from the connection.
// Expect it to block for 1ms then return a deadline exceeded error. If it
// returns any other error, the connection is not usable, so return false.
// If it doesn't return an error and actually reads data, the connection is
// also not usable, so return false.
//
// Note that we don't need to un-set the read deadline because the "read"
// and "write" methods always reset the deadlines.
err := c.nc.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
if err != nil {
return false
}
var b [1]byte
_, err = c.nc.Read(b[:])
return errors.Is(err, os.ErrDeadlineExceeded)
}

func (c *connection) idleTimeoutExpired() bool {
now := time.Now()
if c.idleTimeout > 0 {
idleDeadline, ok := c.idleDeadline.Load().(time.Time)
if ok && now.After(idleDeadline) {
return true
}
if c.idleTimeout == 0 {
return false
}

return false
idleStart, ok := c.idleStart.Load().(time.Time)
return ok && idleStart.Add(c.idleTimeout).Before(time.Now())
}

func (c *connection) bumpIdleDeadline() {
func (c *connection) bumpIdleStart() {
if c.idleTimeout > 0 {
c.idleDeadline.Store(time.Now().Add(c.idleTimeout))
c.idleStart.Store(time.Now())
}
}

Expand Down
87 changes: 85 additions & 2 deletions x/mongo/driver/topology/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/google/go-cmp/cmp"
"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/require"
"go.mongodb.org/mongo-driver/mongo/address"
"go.mongodb.org/mongo-driver/mongo/description"
"go.mongodb.org/mongo-driver/x/mongo/driver"
Expand Down Expand Up @@ -427,7 +428,7 @@ func TestConnection(t *testing.T) {

want := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A}
err := conn.writeWireMessage(context.Background(), want)
noerr(t, err)
require.NoError(t, err)
got := tnc.buf
if !cmp.Equal(got, want) {
t.Errorf("writeWireMessage did not write the proper bytes. got %v; want %v", got, want)
Expand Down Expand Up @@ -624,7 +625,7 @@ func TestConnection(t *testing.T) {
conn.cancellationListener = listener

got, err := conn.readWireMessage(context.Background())
noerr(t, err)
require.NoError(t, err)
if !cmp.Equal(got, want) {
t.Errorf("did not read full wire message. got %v; want %v", got, want)
}
Expand Down Expand Up @@ -1251,3 +1252,85 @@ func (tcl *testCancellationListener) assertCalledOnce(t *testing.T) {
assert.Equal(t, 1, tcl.numListen, "expected Listen to be called once, got %d", tcl.numListen)
assert.Equal(t, 1, tcl.numStopListening, "expected StopListening to be called once, got %d", tcl.numListen)
}

func TestConnection_IsAlive(t *testing.T) {
t.Parallel()

t.Run("uninitialized", func(t *testing.T) {
t.Parallel()

conn := newConnection("")
assert.False(t,
conn.isAlive(),
"expected isAlive for an uninitialized connection to always return false")
})

t.Run("connection open", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
// Keep the connection open until the end of the test.
<-cleanup
_ = nc.Close()
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.True(t,
conn.isAlive(),
"expected isAlive for an open connection to return true")
})

t.Run("connection closed", func(t *testing.T) {
t.Parallel()

conns := make(chan net.Conn)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
conns <- nc
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

// Close the connection before calling isAlive.
nc := <-conns
err = nc.Close()
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.False(t,
conn.isAlive(),
"expected isAlive for a closed connection to return false")
})

t.Run("connection reads data", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
// Write some data to the connection before calling isAlive.
_, err := nc.Write([]byte{5, 0, 0, 0, 0})
require.NoError(t, err)

// Keep the connection open until the end of the test.
<-cleanup
_ = nc.Close()
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.False(t,
conn.isAlive(),
"expected isAlive for an open connection that reads data to return false")
})
}
23 changes: 14 additions & 9 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,11 @@ type reason struct {
// connectionPerished checks if a given connection is perished and should be removed from the pool.
func connectionPerished(conn *connection) (reason, bool) {
switch {
case conn.closed():
// A connection would only be closed if it encountered a network error during an operation and closed itself.
case conn.closed() || !conn.isAlive():
// A connection would only be closed if it encountered a network error
// during an operation and closed itself. If a connection is not alive
// (e.g. the connection was closed by the server-side), it's also
// considered a network error.
return reason{
loggerConn: logger.ReasonConnClosedError,
event: event.ReasonError,
Expand Down Expand Up @@ -898,13 +901,15 @@ func (p *pool) checkInNoEvent(conn *connection) error {
return nil
}

// Bump the connection idle deadline here because we're about to make the connection "available".
// The idle deadline is used to determine when a connection has reached its max idle time and
// should be closed. A connection reaches its max idle time when it has been "available" in the
// idle connections stack for more than the configured duration (maxIdleTimeMS). Set it before
// we call connectionPerished(), which checks the idle deadline, because a newly "available"
// connection should never be perished due to max idle time.
conn.bumpIdleDeadline()
// Bump the connection idle start time here because we're about to make the
// connection "available". The idle start time is used to determine how long
// a connection has been idle and when it has reached its max idle time and
// should be closed. A connection reaches its max idle time when it has been
// "available" in the idle connections stack for more than the configured
// duration (maxIdleTimeMS). Set it before we call connectionPerished(),
// which checks the idle deadline, because a newly "available" connection
// should never be perished due to max idle time.
conn.bumpIdleStart()

r, perished := connectionPerished(conn)
if !perished && conn.pool.getState() == poolClosed {
Expand Down
Loading

0 comments on commit b45e5d9

Please sign in to comment.