diff --git a/fluent/fluent.go b/fluent/fluent.go index 902eb81..c2a4fb7 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -85,10 +85,12 @@ type msgToSend struct { type Fluent struct { Config - dialer dialer - stopRunning chan bool - pending chan *msgToSend - wg sync.WaitGroup + dialer dialer + stopRunning chan bool + pending chan *msgToSend + pendingMutex sync.RWMutex + chanClosed bool + wg sync.WaitGroup muconn sync.Mutex conn net.Conn @@ -143,10 +145,11 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { if config.Async { f = &Fluent{ - Config: config, - dialer: d, - pending: make(chan *msgToSend, config.BufferLimit), - stopRunning: make(chan bool, 1), + Config: config, + dialer: d, + pending: make(chan *msgToSend, config.BufferLimit), + pendingMutex: sync.RWMutex{}, + stopRunning: make(chan bool, 1), } f.wg.Add(1) go f.run() @@ -325,6 +328,9 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg // Close closes the connection, waiting for pending logs to be sent func (f *Fluent) Close() (err error) { if f.Config.Async { + f.pendingMutex.Lock() + f.chanClosed = true + f.pendingMutex.Unlock() if f.Config.ForceStopAsyncSend { f.stopRunning <- true close(f.stopRunning) @@ -338,6 +344,11 @@ func (f *Fluent) Close() (err error) { // appendBuffer appends data to buffer with lock. func (f *Fluent) appendBuffer(msg *msgToSend) error { + f.pendingMutex.RLock() + defer f.pendingMutex.RUnlock() + if f.chanClosed { + return fmt.Errorf("fluent#appendBuffer: Logger already closed") + } select { case f.pending <- msg: default: diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index 6db33e5..b54e4d4 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io/ioutil" "net" "reflect" @@ -572,6 +573,48 @@ func ackRespMsgp(t *testing.T, ack string) string { return buf.String() } +func TestNoPanicOnAsyncClose(t *testing.T) { + testcases := []struct { + name string + config Config + shouldError bool + }{ + { + name: "Channel closed before write", + config: Config{ + Async: true, + }, + shouldError: true, + }, + { + name: "Channel not closed at all", + config: Config{ + Async: true, + }, + shouldError: false, + }, + } + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + t.Parallel() + d := newTestDialer() + f, err := newWithDialer(testcase.config, d) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if testcase.shouldError { + f.Close() + } + e := f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) + if testcase.shouldError { + assert.Equal(t, fmt.Errorf("fluent#appendBuffer: Logger already closed"), e) + } else { + assert.Equal(t, nil, e) + } + }) + } +} + func TestCloseOnFailingAsyncReconnect(t *testing.T) { t.Skip("Broken tests")