Skip to content

Commit

Permalink
[release-branch.go1.13] net/http: fix wantConnQueue memory leaks in T…
Browse files Browse the repository at this point in the history
…ransport

I'm trying to keep the code changes minimal for backporting to Go 1.13,
so it is still possible for a handful of entries to leak,
but the leaks are now O(1) instead of O(N) in the steady state.

Longer-term, I think it would be a good idea to coalesce idleMu with
connsPerHostMu and clear entries out of both queues as soon as their
goroutines are done waiting.

Cherry-picked from CL 191964.

Updates #33849
Updates #33850
Fixes #33878

Change-Id: Ia66bc64671eb1014369f2d3a01debfc023b44281
Reviewed-on: https://go-review.googlesource.com/c/go/+/191964
Run-TryBot: Bryan C. Mills <bcmills@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
(cherry picked from commit 94bf9a8)
Reviewed-on: https://go-review.googlesource.com/c/go/+/191967
  • Loading branch information
Bryan C. Mills committed Aug 27, 2019
1 parent 633d0c9 commit 44a66ac
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 1 deletion.
28 changes: 27 additions & 1 deletion src/net/http/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.idleConnWait[w.key]
q.cleanFront()
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
Expand Down Expand Up @@ -1137,7 +1138,7 @@ func (q *wantConnQueue) pushBack(w *wantConn) {
q.tail = append(q.tail, w)
}

// popFront removes and returns the w at the front of the queue.
// popFront removes and returns the wantConn at the front of the queue.
func (q *wantConnQueue) popFront() *wantConn {
if q.headPos >= len(q.head) {
if len(q.tail) == 0 {
Expand All @@ -1152,6 +1153,30 @@ func (q *wantConnQueue) popFront() *wantConn {
return w
}

// peekFront returns the wantConn at the front of the queue without removing it.
func (q *wantConnQueue) peekFront() *wantConn {
if q.headPos < len(q.head) {
return q.head[q.headPos]
}
if len(q.tail) > 0 {
return q.tail[0]
}
return nil
}

// cleanFront pops any wantConns that are no longer waiting from the head of the
// queue, reporting whether any were popped.
func (q *wantConnQueue) cleanFront() (cleaned bool) {
for {
w := q.peekFront()
if w == nil || w.waiting() {
return cleaned
}
q.popFront()
cleaned = true
}
}

// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS. If this doesn't return an error, the persistConn
Expand Down Expand Up @@ -1261,6 +1286,7 @@ func (t *Transport) queueForDial(w *wantConn) {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}
Expand Down
170 changes: 170 additions & 0 deletions src/net/http/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,176 @@ func TestTransportPersistConnLeakShortBody(t *testing.T) {
}
}

// A countedConn is a net.Conn that decrements an atomic counter when finalized.
type countedConn struct {
net.Conn
}

// A countingDialer dials connections and counts the number that remain reachable.
type countingDialer struct {
dialer net.Dialer
mu sync.Mutex
total, live int64
}

func (d *countingDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
conn, err := d.dialer.DialContext(ctx, network, address)
if err != nil {
return nil, err
}

counted := new(countedConn)
counted.Conn = conn

d.mu.Lock()
defer d.mu.Unlock()
d.total++
d.live++

runtime.SetFinalizer(counted, d.decrement)
return counted, nil
}

func (d *countingDialer) decrement(*countedConn) {
d.mu.Lock()
defer d.mu.Unlock()
d.live--
}

func (d *countingDialer) Read() (total, live int64) {
d.mu.Lock()
defer d.mu.Unlock()
return d.total, d.live
}

func TestTransportPersistConnLeakNeverIdle(t *testing.T) {
defer afterTest(t)

ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
// Close every connection so that it cannot be kept alive.
conn, _, err := w.(Hijacker).Hijack()
if err != nil {
t.Errorf("Hijack failed unexpectedly: %v", err)
return
}
conn.Close()
}))
defer ts.Close()

var d countingDialer
c := ts.Client()
c.Transport.(*Transport).DialContext = d.DialContext

body := []byte("Hello")
for i := 0; ; i++ {
total, live := d.Read()
if live < total {
break
}
if i >= 1<<12 {
t.Fatalf("Count of live client net.Conns (%d) not lower than total (%d) after %d Do / GC iterations.", live, total, i)
}

req, err := NewRequest("POST", ts.URL, bytes.NewReader(body))
if err != nil {
t.Fatal(err)
}
_, err = c.Do(req)
if err == nil {
t.Fatal("expected broken connection")
}

runtime.GC()
}
}

type countedContext struct {
context.Context
}

type contextCounter struct {
mu sync.Mutex
live int64
}

func (cc *contextCounter) Track(ctx context.Context) context.Context {
counted := new(countedContext)
counted.Context = ctx
cc.mu.Lock()
defer cc.mu.Unlock()
cc.live++
runtime.SetFinalizer(counted, cc.decrement)
return counted
}

func (cc *contextCounter) decrement(*countedContext) {
cc.mu.Lock()
defer cc.mu.Unlock()
cc.live--
}

func (cc *contextCounter) Read() (live int64) {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.live
}

func TestTransportPersistConnContextLeakMaxConnsPerHost(t *testing.T) {
defer afterTest(t)

ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
runtime.Gosched()
w.WriteHeader(StatusOK)
}))
defer ts.Close()

c := ts.Client()
c.Transport.(*Transport).MaxConnsPerHost = 1

ctx := context.Background()
body := []byte("Hello")
doPosts := func(cc *contextCounter) {
var wg sync.WaitGroup
for n := 64; n > 0; n-- {
wg.Add(1)
go func() {
defer wg.Done()

ctx := cc.Track(ctx)
req, err := NewRequest("POST", ts.URL, bytes.NewReader(body))
if err != nil {
t.Error(err)
}

_, err = c.Do(req.WithContext(ctx))
if err != nil {
t.Errorf("Do failed with error: %v", err)
}
}()
}
wg.Wait()
}

var initialCC contextCounter
doPosts(&initialCC)

// flushCC exists only to put pressure on the GC to finalize the initialCC
// contexts: the flushCC allocations should eventually displace the initialCC
// allocations.
var flushCC contextCounter
for i := 0; ; i++ {
live := initialCC.Read()
if live == 0 {
break
}
if i >= 100 {
t.Fatalf("%d Contexts still not finalized after %d GC cycles.", live, i)
}
doPosts(&flushCC)
runtime.GC()
}
}

// This used to crash; https://golang.org/issue/3266
func TestTransportIdleConnCrash(t *testing.T) {
defer afterTest(t)
Expand Down

0 comments on commit 44a66ac

Please sign in to comment.