Skip to content

Commit

Permalink
Check goroutines in places related to recent found leaks (#12106)
Browse files Browse the repository at this point in the history
Follows the strategy used in #10850 to check for goroutine leaks
on tests in some places related to the leaks investigated as part
of #9302.

Several things here:
* New helper to reuse the goroutine checker, it also prints now the
  goroutines dump on failure.
* Add goroutine checks for autodiscover tests
* Add goroutine checks for CloseOnSignal and SubOutlet
  tests (they detect the issues solved by #11263)
* Add goroutine checks for log input tests (they detect issues solved
  by #12125 and #12164)
  • Loading branch information
jsoriano committed May 24, 2019
1 parent 874e01f commit f841521
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 20 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}11777[11777],{pull}12065[12065],{pull}12067[12067],{pull}12160[12160]
- Update urllib3 version to 1.24.2 {pull}11930[11930]
- Add libbeat/common/cleanup package. {pull}12134[12134]
- New helper to check for leaked goroutines on tests. {pull}12106[12106]
- Only Load minimal template if no fields are provided. {pull}12103[12103]
- Add new option `IgnoreAllErrors` to `libbeat.common.schema` for skipping fields that failed while converting. {pull}12089[12089]
- Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132]
- Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132]
86 changes: 86 additions & 0 deletions filebeat/channel/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/tests/resources"
)

type dummyOutletter struct {
closed bool
c chan struct{}
}

func (o *dummyOutletter) OnEvent(event *util.Data) bool {
return true
}

func (o *dummyOutletter) Close() error {
o.closed = true
close(o.c)
return nil
}

func (o *dummyOutletter) Done() <-chan struct{} {
return o.c
}

func TestCloseOnSignal(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

o := &dummyOutletter{c: make(chan struct{})}
sig := make(chan struct{})
CloseOnSignal(o, sig)
close(sig)
}

func TestCloseOnSignalClosed(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

o := &dummyOutletter{c: make(chan struct{})}
sig := make(chan struct{})
c := CloseOnSignal(o, sig)
c.Close()
}

func TestSubOutlet(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

o := &dummyOutletter{c: make(chan struct{})}
so := SubOutlet(o)
so.Close()
assert.False(t, o.closed)
}

func TestCloseOnSignalSubOutlet(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

o := &dummyOutletter{c: make(chan struct{})}
c := CloseOnSignal(SubOutlet(o), make(chan struct{}))
o.Close()
c.Close()
}
10 changes: 1 addition & 9 deletions filebeat/input/log/input_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
// specific language governing permissions and limitations
// under the License.

// +build !windows
// +build !windows,!integration

package log

import (
"testing"

"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common/match"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -163,10 +162,3 @@ func TestInit(t *testing.T) {
assert.Equal(t, test.count, p.states.Count())
}
}

// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *util.Data) bool { return true }
func (o TestOutlet) Close() error { return nil }
func (o TestOutlet) Done() <-chan struct{} { return nil }
179 changes: 179 additions & 0 deletions filebeat/input/log/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@
package log

import (
"io/ioutil"
"os"
"path"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/tests/resources"
)

func TestInputFileExclude(t *testing.T) {
Expand Down Expand Up @@ -81,6 +89,139 @@ func TestIsCleanInactive(t *testing.T) {
}
}

func TestInputLifecycle(t *testing.T) {
cases := []struct {
title string
closer func(input.Context, *Input)
}{
{
title: "explicitly closed",
closer: func(_ input.Context, input *Input) {
input.Wait()
},
},
{
title: "context done",
closer: func(ctx input.Context, _ *Input) {
close(ctx.Done)
},
},
{
title: "beat context done",
closer: func(ctx input.Context, _ *Input) {
close(ctx.Done)
close(ctx.BeatDone)
},
},
}

for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
context := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}
testInputLifecycle(t, context, c.closer)
})
}
}

// TestInputLifecycle performs blackbock testing of the log input
func testInputLifecycle(t *testing.T, context input.Context, closer func(input.Context, *Input)) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Prepare a log file
tmpdir, err := ioutil.TempDir(os.TempDir(), "input-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
logs := []byte("some log line\nother log line\n")
err = ioutil.WriteFile(path.Join(tmpdir, "some.log"), logs, 0644)
assert.NoError(t, err)

// Setup the input
config, _ := common.NewConfigFrom(common.MapStr{
"paths": path.Join(tmpdir, "*.log"),
"close_eof": true,
})

events := make(chan *util.Data, 100)
defer close(events)
capturer := NewEventCapturer(events)
defer capturer.Close()
connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) {
return channel.SubOutlet(capturer), nil
}

input, err := NewInput(config, connector, context)
if err != nil {
t.Error(err)
return
}

// Run the input and wait for finalization
input.Run()

timeout := time.After(30 * time.Second)
done := make(chan struct{})
for {
select {
case event := <-events:
if state := event.GetState(); state.Finished {
assert.Equal(t, len(logs), int(state.Offset), "file has not been fully read")
go func() {
closer(context, input.(*Input))
close(done)
}()
}
case <-done:
return
case <-timeout:
t.Fatal("timeout waiting for closed state")
}
}
}

func TestNewInputDone(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

config, _ := common.NewConfigFrom(common.MapStr{
"paths": path.Join(os.TempDir(), "logs", "*.log"),
})

connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) {
return TestOutlet{}, nil
}

context := input.Context{
Done: make(chan struct{}),
}

_, err := NewInput(config, connector, context)
assert.NoError(t, err)

close(context.Done)
}

func TestNewInputError(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

config := common.NewConfig()

connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) {
return TestOutlet{}, nil
}

context := input.Context{}

_, err := NewInput(config, connector, context)
assert.Error(t, err)
}

func TestMatchesMeta(t *testing.T) {
tests := []struct {
Input *Input
Expand Down Expand Up @@ -146,3 +287,41 @@ func (t TestFileInfo) Mode() os.FileMode { return 0 }
func (t TestFileInfo) ModTime() time.Time { return t.time }
func (t TestFileInfo) IsDir() bool { return false }
func (t TestFileInfo) Sys() interface{} { return nil }

type eventCapturer struct {
closed bool
c chan struct{}
closeOnce sync.Once
events chan *util.Data
}

func NewEventCapturer(events chan *util.Data) channel.Outleter {
return &eventCapturer{
c: make(chan struct{}),
events: events,
}
}

func (o *eventCapturer) OnEvent(event *util.Data) bool {
o.events <- event
return true
}

func (o *eventCapturer) Close() error {
o.closeOnce.Do(func() {
o.closed = true
close(o.c)
})
return nil
}

func (o *eventCapturer) Done() <-chan struct{} {
return o.c
}

// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *util.Data) bool { return true }
func (o TestOutlet) Close() error { return nil }
func (o TestOutlet) Done() <-chan struct{} { return nil }
10 changes: 10 additions & 0 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/tests/resources"
)

type mockRunner struct {
Expand Down Expand Up @@ -135,6 +136,9 @@ func TestNilAutodiscover(t *testing.T) {
}

func TestAutodiscover(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Expand Down Expand Up @@ -255,6 +259,9 @@ func TestAutodiscover(t *testing.T) {
}

func TestAutodiscoverHash(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)

Expand Down Expand Up @@ -319,6 +326,9 @@ func TestAutodiscoverHash(t *testing.T) {
}

func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Expand Down
Loading

0 comments on commit f841521

Please sign in to comment.