From e6bb5c9a7e67843b5cb024dc95fa5cdca9e76f16 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 2 Feb 2021 13:00:01 +0100 Subject: [PATCH] Fix leak caused by input runners created when checking their configuration (#23722) Stop input v1 runners created to check config. `CheckConfig` for v1 inputs actually calls the constructors of the inputs. In some cases, as in the log input, the constructor creates resources that are never released unless the runner is stopped. This causes goroutines leaks with autodiscover or other dynamic configurations. --- CHANGELOG.next.asciidoc | 1 + filebeat/fileset/factory.go | 48 ++++++++++---- filebeat/input/container/input_test.go | 36 ++++++++++ filebeat/input/docker/input_test.go | 34 ++++++++++ filebeat/input/inputtest/input.go | 65 +++++++++++++++++++ filebeat/input/kafka/input_test.go | 36 ++++++++++ filebeat/input/log/input_other_test.go | 7 +- filebeat/input/log/input_test.go | 29 ++------- filebeat/input/mqtt/input_test.go | 8 +++ filebeat/input/redis/input_test.go | 34 ++++++++++ filebeat/input/runnerfactory.go | 8 ++- filebeat/input/stdin/input_test.go | 34 ++++++++++ filebeat/input/syslog/input_test.go | 8 +++ filebeat/input/tcp/input_test.go | 9 +++ filebeat/input/udp/input_test.go | 34 ++++++++++ .../input/awscloudwatch/input_test.go | 13 +++- .../input/azureeventhub/input_test.go | 11 ++++ x-pack/filebeat/input/gcppubsub/input_test.go | 27 ++++++++ x-pack/filebeat/input/netflow/input_test.go | 19 ++++++ 19 files changed, 416 insertions(+), 45 deletions(-) create mode 100644 filebeat/input/container/input_test.go create mode 100644 filebeat/input/docker/input_test.go create mode 100644 filebeat/input/inputtest/input.go create mode 100644 filebeat/input/kafka/input_test.go create mode 100644 filebeat/input/redis/input_test.go create mode 100644 filebeat/input/stdin/input_test.go create mode 100644 filebeat/input/udp/input_test.go create mode 100644 x-pack/filebeat/input/gcppubsub/input_test.go create mode 100644 x-pack/filebeat/input/netflow/input_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ce80d163deb..c05283738fc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -280,6 +280,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Change the `event.created` in Netflow events to be the time the event was created by Filebeat to be consistent with ECS. {pull}23094[23094] - Update `filestream` reader offset when a line is skipped. {pull}23417[23417] +- Fix goroutines leak with some inputs in autodiscover. {pull}23722[23722] *Filebeat* diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 8a4d4fe09dc..b0656205321 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -18,7 +18,10 @@ package fileset import ( + "fmt" + "github.com/gofrs/uuid" + "github.com/mitchellh/hashstructure" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" @@ -27,9 +30,6 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" - pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline" - - "github.com/mitchellh/hashstructure" ) var ( @@ -77,15 +77,9 @@ func NewFactory( // Create creates a module based on a config func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) { - // Start a registry of one module: - m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false) - if err != nil { - return nil, err - } - - pConfigs, err := m.GetInputConfigs() + m, pConfigs, err := f.createRegistry(c) if err != nil { - return nil, err + return nil, fmt.Errorf("could not create module registry for filesets: %w", err) } // Hash module ID @@ -116,8 +110,36 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Ru } func (f *Factory) CheckConfig(c *common.Config) error { - _, err := f.Create(pubpipeline.NewNilPipeline(), c) - return err + _, pConfigs, err := f.createRegistry(c) + if err != nil { + return fmt.Errorf("could not create module registry for filesets: %w", err) + } + + for _, pConfig := range pConfigs { + err = f.inputFactory.CheckConfig(pConfig) + if err != nil { + logp.Err("Error checking input configuration: %s", err) + return err + } + } + + return nil +} + +// createRegistry starts a registry for a set of filesets, it returns the registry and +// its input configurations +func (f *Factory) createRegistry(c *common.Config) (*ModuleRegistry, []*common.Config, error) { + m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false) + if err != nil { + return nil, nil, err + } + + pConfigs, err := m.GetInputConfigs() + if err != nil { + return nil, nil, err + } + + return m, pConfigs, err } func (p *inputsRunner) Start() { diff --git a/filebeat/input/container/input_test.go b/filebeat/input/container/input_test.go new file mode 100644 index 00000000000..8d898dd594a --- /dev/null +++ b/filebeat/input/container/input_test.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package container + +import ( + "os" + "path" + "testing" + + "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "paths": path.Join(os.TempDir(), "logs", "*.log"), + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/filebeat/input/docker/input_test.go b/filebeat/input/docker/input_test.go new file mode 100644 index 00000000000..bff4152a768 --- /dev/null +++ b/filebeat/input/docker/input_test.go @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package docker + +import ( + "testing" + + "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "containers.ids": "fad130edd3d2", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/filebeat/input/inputtest/input.go b/filebeat/input/inputtest/input.go new file mode 100644 index 00000000000..56af21bd4d0 --- /dev/null +++ b/filebeat/input/inputtest/input.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inputtest + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/tests/resources" +) + +// Outlet is an empty outlet for testing. +type Outlet struct{} + +func (o Outlet) OnEvent(event beat.Event) bool { return true } +func (o Outlet) Close() error { return nil } +func (o Outlet) Done() <-chan struct{} { return nil } + +// Connector is a connector to a test empty outlet. +var Connector = channel.ConnectorFunc( + func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + return Outlet{}, nil + }, +) + +// AssertNotStartedInputCanBeDone checks that the context of an input can be +// done before starting the input, and it doesn't leak goroutines. This is +// important to confirm that leaks don't happen with CheckConfig. +func AssertNotStartedInputCanBeDone(t *testing.T, factory input.Factory, configMap *common.MapStr) { + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + config, err := common.NewConfigFrom(configMap) + require.NoError(t, err) + + context := input.Context{ + Done: make(chan struct{}), + } + + _, err = factory(config, Connector, context) + assert.NoError(t, err) + + close(context.Done) +} diff --git a/filebeat/input/kafka/input_test.go b/filebeat/input/kafka/input_test.go new file mode 100644 index 00000000000..e83c0e908a8 --- /dev/null +++ b/filebeat/input/kafka/input_test.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package kafka + +import ( + "testing" + + "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "hosts": "localhost:9092", + "topics": "messages", + "group_id": "filebeat", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/filebeat/input/log/input_other_test.go b/filebeat/input/log/input_other_test.go index 0910bd2b291..9324e7f2d04 100644 --- a/filebeat/input/log/input_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -22,10 +22,11 @@ package log import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/common/match" - - "github.com/stretchr/testify/assert" ) var matchTests = []struct { @@ -148,7 +149,7 @@ func TestInit(t *testing.T) { Paths: test.paths, }, states: file.NewStates(), - outlet: TestOutlet{}, + outlet: inputtest.Outlet{}, fileStateIdentifier: &file.MockIdentifier{}, } diff --git a/filebeat/input/log/input_test.go b/filebeat/input/log/input_test.go index 0eb7d1899d7..471aaf549f2 100644 --- a/filebeat/input/log/input_test.go +++ b/filebeat/input/log/input_test.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/match" @@ -185,25 +186,10 @@ func testInputLifecycle(t *testing.T, context input.Context, closer func(input.C } func TestNewInputDone(t *testing.T) { - goroutines := resources.NewGoroutinesChecker() - defer goroutines.Check(t) - - config, _ := common.NewConfigFrom(common.MapStr{ + config := common.MapStr{ "paths": path.Join(os.TempDir(), "logs", "*.log"), - }) - - connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { - return TestOutlet{}, nil - }) - - context := input.Context{ - Done: make(chan struct{}), } - - _, err := NewInput(config, connector, context) - assert.NoError(t, err) - - close(context.Done) + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) } func TestNewInputError(t *testing.T) { @@ -213,7 +199,7 @@ func TestNewInputError(t *testing.T) { config := common.NewConfig() connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { - return TestOutlet{}, nil + return inputtest.Outlet{}, nil }) context := input.Context{} @@ -318,10 +304,3 @@ func (o *eventCapturer) Close() error { func (o *eventCapturer) Done() <-chan struct{} { return o.c } - -// TestOutlet is an empty outlet for testing -type TestOutlet struct{} - -func (o TestOutlet) OnEvent(event beat.Event) bool { return true } -func (o TestOutlet) Close() error { return nil } -func (o TestOutlet) Done() <-chan struct{} { return nil } diff --git a/filebeat/input/mqtt/input_test.go b/filebeat/input/mqtt/input_test.go index cc6bf4b05b7..a82ed2a4237 100644 --- a/filebeat/input/mqtt/input_test.go +++ b/filebeat/input/mqtt/input_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" finput "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/backoff" @@ -322,6 +323,13 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSignalDone(t *testing.T) { require.Equal(t, 1, mockedBackoff.resetCount) } +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "hosts": "tcp://:0", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} + func assertEventMatches(t *testing.T, expected mockedMessage, got beat.Event) { topic, err := got.GetValue("mqtt.topic") require.NoError(t, err) diff --git a/filebeat/input/redis/input_test.go b/filebeat/input/redis/input_test.go new file mode 100644 index 00000000000..b480c539efe --- /dev/null +++ b/filebeat/input/redis/input_test.go @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package redis + +import ( + "testing" + + "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "hosts": "localhost:3679", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index f4973e47948..f08ee90b660 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -58,10 +58,14 @@ func (r *RunnerFactory) Create( } func (r *RunnerFactory) CheckConfig(cfg *common.Config) error { - _, err := r.Create(pipeline.NewNilPipeline(), cfg) + runner, err := r.Create(pipeline.NewNilPipeline(), cfg) if _, ok := err.(*common.ErrInputNotFinished); ok { // error is related to state, and hence config can be considered valid return nil } - return err + if err != nil { + return err + } + runner.Stop() + return nil } diff --git a/filebeat/input/stdin/input_test.go b/filebeat/input/stdin/input_test.go new file mode 100644 index 00000000000..73183a30887 --- /dev/null +++ b/filebeat/input/stdin/input_test.go @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package stdin + +import ( + "testing" + + "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "paths": "-", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/filebeat/input/syslog/input_test.go b/filebeat/input/syslog/input_test.go index b6c29b74a51..78cd70d8362 100644 --- a/filebeat/input/syslog/input_test.go +++ b/filebeat/input/syslog/input_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -245,6 +246,13 @@ func TestParseAndCreateEvent(t *testing.T) { } } +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "protocol.tcp.host": "localhost:9000", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} + func dummyMetadata() inputsource.NetworkMetadata { ip := "127.0.0.1" parsedIP := net.ParseIP(ip) diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/tcp/input_test.go index d820e068723..47733803077 100644 --- a/filebeat/input/tcp/input_test.go +++ b/filebeat/input/tcp/input_test.go @@ -23,7 +23,9 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/filebeat/inputsource" + "github.com/elastic/beats/v7/libbeat/common" ) func TestCreateEvent(t *testing.T) { @@ -44,3 +46,10 @@ func TestCreateEvent(t *testing.T) { from, _ := event.GetValue("log.source.address") assert.Equal(t, ip, from) } + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "host": ":0", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/filebeat/input/udp/input_test.go b/filebeat/input/udp/input_test.go new file mode 100644 index 00000000000..8a6f1cad039 --- /dev/null +++ b/filebeat/input/udp/input_test.go @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package udp + +import ( + "testing" + + "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "hosts": ":0", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/x-pack/filebeat/input/awscloudwatch/input_test.go b/x-pack/filebeat/input/awscloudwatch/input_test.go index 6694ad26067..223bed56798 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_test.go @@ -9,13 +9,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" + "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/common" - - "github.com/stretchr/testify/assert" ) func TestGetStartPosition(t *testing.T) { @@ -154,3 +155,11 @@ func TestParseARN(t *testing.T) { assert.Equal(t, "us-east-1", regionName) assert.NoError(t, err) } + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "log_group_name": "some-group", + "region_name": "eu-west-1", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index 8a48593c1ca..8537e7529f5 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) @@ -109,6 +110,16 @@ func TestParseMultipleMessages(t *testing.T) { } } +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "connection_string": "Endpoint=sb://something", + "eventhub": "insights-operational-logs", + "storage_account": "someaccount", + "storage_account_key": "secret", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} + type stubOutleter struct { sync.Mutex cond *sync.Cond diff --git a/x-pack/filebeat/input/gcppubsub/input_test.go b/x-pack/filebeat/input/gcppubsub/input_test.go new file mode 100644 index 00000000000..e055ee39b97 --- /dev/null +++ b/x-pack/filebeat/input/gcppubsub/input_test.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package gcppubsub + +import ( + "testing" + + "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{ + "project_id": "some-project", + "topic": "sometopic", + "subscription.name": "subscription", + + // Provide some credentials to avoid trying to query GCP for them, + // what creates HTTP-related goroutines. + "credentials_json": "{}", + } + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +} diff --git a/x-pack/filebeat/input/netflow/input_test.go b/x-pack/filebeat/input/netflow/input_test.go new file mode 100644 index 00000000000..9e6e8360e72 --- /dev/null +++ b/x-pack/filebeat/input/netflow/input_test.go @@ -0,0 +1,19 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package netflow + +import ( + "testing" + + "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNewInputDone(t *testing.T) { + config := common.MapStr{} + inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +}