From 869c132e5188472ac63db47b9827f69cb194d487 Mon Sep 17 00:00:00 2001 From: Anuj Varma Date: Sun, 25 Apr 2021 12:48:12 -0700 Subject: [PATCH] Fix panic on writing to closed channel The docker daemon RingLogger implementation is written in such a way that it can call Close() on the underlying driver, without stopping to write to it. This can cause panics on fluent-golang-logger which has no checks on whether the channel was closed by the daemon. The contract on the logger.Logger interface in docker doesn't specify that it will make sure that the call to Close() and Log() will not compete so other logging driver implementations have similar semantics as this commit to account for that. Signed-off-by: Anuj Varma --- fluent/fluent.go | 27 +++++++++++++++++++-------- fluent/fluent_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index 902eb81..c2a4fb7 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -85,10 +85,12 @@ type msgToSend struct { type Fluent struct { Config - dialer dialer - stopRunning chan bool - pending chan *msgToSend - wg sync.WaitGroup + dialer dialer + stopRunning chan bool + pending chan *msgToSend + pendingMutex sync.RWMutex + chanClosed bool + wg sync.WaitGroup muconn sync.Mutex conn net.Conn @@ -143,10 +145,11 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { if config.Async { f = &Fluent{ - Config: config, - dialer: d, - pending: make(chan *msgToSend, config.BufferLimit), - stopRunning: make(chan bool, 1), + Config: config, + dialer: d, + pending: make(chan *msgToSend, config.BufferLimit), + pendingMutex: sync.RWMutex{}, + stopRunning: make(chan bool, 1), } f.wg.Add(1) go f.run() @@ -325,6 +328,9 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg // Close closes the connection, waiting for pending logs to be sent func (f *Fluent) Close() (err error) { if f.Config.Async { + f.pendingMutex.Lock() + f.chanClosed = true + f.pendingMutex.Unlock() if f.Config.ForceStopAsyncSend { f.stopRunning <- true close(f.stopRunning) @@ -338,6 +344,11 @@ func (f *Fluent) Close() (err error) { // appendBuffer appends data to buffer with lock. func (f *Fluent) appendBuffer(msg *msgToSend) error { + f.pendingMutex.RLock() + defer f.pendingMutex.RUnlock() + if f.chanClosed { + return fmt.Errorf("fluent#appendBuffer: Logger already closed") + } select { case f.pending <- msg: default: diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index 6db33e5..b54e4d4 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io/ioutil" "net" "reflect" @@ -572,6 +573,48 @@ func ackRespMsgp(t *testing.T, ack string) string { return buf.String() } +func TestNoPanicOnAsyncClose(t *testing.T) { + testcases := []struct { + name string + config Config + shouldError bool + }{ + { + name: "Channel closed before write", + config: Config{ + Async: true, + }, + shouldError: true, + }, + { + name: "Channel not closed at all", + config: Config{ + Async: true, + }, + shouldError: false, + }, + } + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + t.Parallel() + d := newTestDialer() + f, err := newWithDialer(testcase.config, d) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if testcase.shouldError { + f.Close() + } + e := f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) + if testcase.shouldError { + assert.Equal(t, fmt.Errorf("fluent#appendBuffer: Logger already closed"), e) + } else { + assert.Equal(t, nil, e) + } + }) + } +} + func TestCloseOnFailingAsyncReconnect(t *testing.T) { t.Skip("Broken tests")