From 9904be4833bc10c3492c4d666184cfe3aea73fe6 Mon Sep 17 00:00:00 2001 From: Erik Dubbelboer Date: Fri, 12 Jun 2020 21:58:06 +0200 Subject: [PATCH] Restart PipelineClient worker on error --- client.go | 32 +++++++++++++++---------- client_test.go | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 12 deletions(-) diff --git a/client.go b/client.go index 00db5257d2..2a7f298d56 100644 --- a/client.go +++ b/client.go @@ -2389,20 +2389,28 @@ func (c *pipelineConnClient) init() { c.chW = make(chan *pipelineWork, maxPendingRequests) } go func() { - if err := c.worker(); err != nil { - c.logger().Printf("error in PipelineClient(%q): %s", c.Addr, err) - if netErr, ok := err.(net.Error); ok && netErr.Temporary() { - // Throttle client reconnections on temporary errors - time.Sleep(time.Second) + // Keep restarting the worker if it fails (connection errors for example). + for { + if err := c.worker(); err != nil { + c.logger().Printf("error in PipelineClient(%q): %s", c.Addr, err) + if netErr, ok := err.(net.Error); ok && netErr.Temporary() { + // Throttle client reconnections on temporary errors + time.Sleep(time.Second) + } + } else { + c.chLock.Lock() + stop := len(c.chR) == 0 && len(c.chW) == 0 + if !stop { + c.chR = nil + c.chW = nil + } + c.chLock.Unlock() + + if stop { + break + } } } - - c.chLock.Lock() - // Do not reset c.chW to nil, since it may contain - // pending requests, which could be served on the next - // connection to the host. - c.chR = nil - c.chLock.Unlock() }() } c.chLock.Unlock() diff --git a/client_test.go b/client_test.go index 63c7f7e82d..9f121373c3 100644 --- a/client_test.go +++ b/client_test.go @@ -20,6 +20,71 @@ import ( "github.com/valyala/fasthttp/fasthttputil" ) +func TestPipelineClientIssue832(t *testing.T) { + t.Parallel() + + ln, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + + req := AcquireRequest() + defer ReleaseRequest(req) + req.SetHost(ln.Addr().String()) + + res := AcquireResponse() + defer ReleaseResponse(res) + + client := PipelineClient{ + Addr: ln.Addr().String(), + DialDualStack: true, + ReadTimeout: time.Millisecond * 100, + Logger: &testLogger{}, // Ignore log output. + } + + defer func() { + if err := ln.Close(); err != nil { + t.Fatal(err) + } + }() + + attempts := 4 + go func() { + for i := 0; i < attempts; i++ { + c, err := ln.Accept() + if err != nil { + // "use of closed network connection" is returned by Accept() when the connection is closed. + if !strings.Contains(err.Error(), "use of closed network connection") { + t.Error(err) + } + } + if c != nil { + go func() { + time.Sleep(time.Second) + c.Close() + }() + } + } + }() + + done := make(chan int) + go func() { + defer close(done) + + for i := 0; i < attempts; i++ { + if err := client.Do(req, res); err == nil { + t.Error("error expected") + } + } + }() + + select { + case <-time.After(time.Second * 4): + t.Fatal("PipelineClient did not restart worker") + case <-done: + } +} + func TestClientInvalidURI(t *testing.T) { t.Parallel()