Skip to content

Commit

Permalink
Fix config appender registration (#9873)
Browse files Browse the repository at this point in the history
* Fix config appender registration

Autodiscover config wasn't regsistered, this change updates the code,
adds integration tests and makes sure that the appender is correctly
registered and working.

* Add missing appender type in tests
  • Loading branch information
exekias committed Jan 9, 2019
1 parent febcc31 commit 370ecd4
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 93 deletions.
79 changes: 33 additions & 46 deletions libbeat/autodiscover/appenders/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package config
import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -38,51 +40,38 @@ type config struct {
Config *common.Config `config:"config"`
}

type configs []config

type configMap struct {
type configAppender struct {
condition conditions.Condition
config common.MapStr
}

type configAppender struct {
configMaps []configMap
}

// NewConfigAppender creates a configAppender that can append templatized configs into built configs
func NewConfigAppender(cfg *common.Config) (autodiscover.Appender, error) {
cfgwarn.Beta("The config appender is beta")

confs := configs{}
err := cfg.Unpack(&confs)
config := config{}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("unable to unpack config appender due to error: %+v", err)
}

var configMaps []configMap
for _, conf := range confs {
var cond conditions.Condition

if conf.ConditionConfig != nil {
cond, err = conditions.NewCondition(conf.ConditionConfig)
if err != nil {
logp.Warn("unable to create condition due to error: %+v", err)
continue
}
}
cm := configMap{condition: cond}
var cond conditions.Condition

// Unpack the config
cf := common.MapStr{}
err = conf.Config.Unpack(&cf)
if config.ConditionConfig != nil {
cond, err = conditions.NewCondition(config.ConditionConfig)
if err != nil {
logp.Warn("unable to unpack config due to error: %+v", err)
continue
return nil, errors.Wrap(err, "unable to create condition due to error")
}
cm.config = cf
configMaps = append(configMaps, cm)
}
return &configAppender{configMaps: configMaps}, nil

// Unpack the config
cf := common.MapStr{}
err = config.Config.Unpack(&cf)
if err != nil {
return nil, errors.Wrap(err, "unable to unpack config due to error")
}

return &configAppender{condition: cond, config: cf}, nil
}

// Append adds configuration into configs built by builds/templates. It applies conditions to filter out
Expand All @@ -99,25 +88,23 @@ func (c *configAppender) Append(event bus.Event) {
if !ok {
return
}
for _, configMap := range c.configMaps {
if configMap.condition == nil || configMap.condition.Check(common.MapStr(event)) == true {
// Merge the template with all the configs
for _, cfg := range cfgs {
cf := common.MapStr{}
err := cfg.Unpack(&cf)
if err != nil {
logp.Debug("config", "unable to unpack config due to error: %v", err)
continue
}
err = cfg.Merge(&configMap.config)
if err != nil {
logp.Debug("config", "unable to merge configs due to error: %v", err)
}
if c.condition == nil || c.condition.Check(common.MapStr(event)) == true {
// Merge the template with all the configs
for _, cfg := range cfgs {
cf := common.MapStr{}
err := cfg.Unpack(&cf)
if err != nil {
logp.Debug("config", "unable to unpack config due to error: %v", err)
continue
}
err = cfg.Merge(&c.config)
if err != nil {
logp.Debug("config", "unable to merge configs due to error: %v", err)
}

// Apply the template
template.ApplyConfigTemplate(event, cfgs)
}

// Apply the template
template.ApplyConfigTemplate(event, cfgs)
}

// Replace old config with newly appended configs
Expand Down
79 changes: 35 additions & 44 deletions libbeat/autodiscover/appenders/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,96 +28,87 @@ import (

func TestGenerateAppender(t *testing.T) {
tests := []struct {
name string
eventConfig common.MapStr
event bus.Event
result common.MapStr
config string
}{
// Appender without a condition should apply the config regardless
{
name: "Appender without a condition should apply the config regardless",
event: bus.Event{},
result: common.MapStr{
"test": "bar",
"test1": "foo",
"test2": "foo",
},
eventConfig: common.MapStr{
"test": "bar",
},
config: `
- config:
"test1": foo
- config:
"test2": foo
`,
config:
test1: foo`,
},
// Appender with a condition check that fails. Only appender with no condition should pass
{
name: "Appender with a condition check that fails",
event: bus.Event{
"foo": "bar",
"field": "notbar",
},
result: common.MapStr{
"test": "bar",
"test1": "foo",
"test": "bar",
},
eventConfig: common.MapStr{
"test": "bar",
},
config: `
- config:
"test1": foo
- config:
"test2": foo
condition.equals:
"foo": "bar1"
`,
config:
test2: foo
condition.equals:
field: bar`,
},
// Appender with a condition check that passes. It should get appended
{
name: "Appender with a condition check that passes. It should get appended",
event: bus.Event{
"foo": "bar",
"field": "bar",
},
result: common.MapStr{
"test": "bar",
"test1": "foo",
"test2": "foo",
},
eventConfig: common.MapStr{
"test": "bar",
},
config: `
- config:
"test1": foo
- config:
"test2": foo
condition.equals:
"foo": "bar"
`,
config:
test2: foo
condition.equals:
field: bar`,
},
}
for _, test := range tests {
config, err := common.NewConfigWithYAML([]byte(test.config), "")
if err != nil {
t.Fatal(err)
}
t.Run(test.name, func(t *testing.T) {
config, err := common.NewConfigWithYAML([]byte(test.config), "")
if err != nil {
t.Fatal(err)
}

appender, err := NewConfigAppender(config)
assert.Nil(t, err)
assert.NotNil(t, appender)
appender, err := NewConfigAppender(config)
assert.Nil(t, err)
assert.NotNil(t, appender)

eveConfig, err := common.NewConfigFrom(&test.eventConfig)
assert.Nil(t, err)
eveConfig, err := common.NewConfigFrom(&test.eventConfig)
assert.Nil(t, err)

test.event["config"] = []*common.Config{eveConfig}
appender.Append(test.event)
test.event["config"] = []*common.Config{eveConfig}
appender.Append(test.event)

cfgs, _ := test.event["config"].([]*common.Config)
assert.Equal(t, len(cfgs), 1)
cfgs, _ := test.event["config"].([]*common.Config)
assert.Equal(t, len(cfgs), 1)

out := common.MapStr{}
cfgs[0].Unpack(&out)
out := common.MapStr{}
cfgs[0].Unpack(&out)

assert.Equal(t, out, test.result)
assert.Equal(t, out, test.result)
})

}
}
4 changes: 1 addition & 3 deletions libbeat/autodiscover/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package autodiscover

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/conditions"
)

// Config settings for Autodiscover
Expand All @@ -39,6 +38,5 @@ type BuilderConfig struct {

// AppenderConfig settings
type AppenderConfig struct {
Type string `config:"type"`
ConditionConfig *conditions.Config `config:"condition"`
Type string `config:"type"`
}
1 change: 1 addition & 0 deletions libbeat/cmd/instance/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package instance

import (
_ "github.com/elastic/beats/libbeat/autodiscover/appenders/config" // Register autodiscover appenders
_ "github.com/elastic/beats/libbeat/autodiscover/providers/docker" // Register autodiscover providers
_ "github.com/elastic/beats/libbeat/autodiscover/providers/jolokia"
_ "github.com/elastic/beats/libbeat/autodiscover/providers/kubernetes"
Expand Down
48 changes: 48 additions & 0 deletions metricbeat/tests/system/test_autodiscover.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,51 @@ def test_docker_labels(self):
# Check metadata is added
assert output[0]['docker']['container']['image'] == 'memcached:latest'
assert 'name' in output[0]['docker']['container']

@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_config_appender(self):
"""
Test config appenders correctly updates configs
"""
import docker
docker_client = docker.from_env()

self.render_config_template(
autodiscover={
'docker': {
'hints.enabled': 'true',
'appenders': '''
- type: config
condition:
equals.docker.container.image: memcached:latest
config:
fields:
foo: bar
''',
},
},
)

proc = self.start_beat()
docker_client.images.pull('memcached:latest')
labels = {
'co.elastic.metrics/module': 'memcached',
'co.elastic.metrics/period': '1s',
'co.elastic.metrics/hosts': "'${data.host}:11211'",
}
container = docker_client.containers.run('memcached:latest', labels=labels, detach=True)

self.wait_until(lambda: self.log_contains('Starting runner: memcached'))

self.wait_until(lambda: self.output_count(lambda x: x >= 1))
container.stop()

self.wait_until(lambda: self.log_contains('Stopping runner: memcached'))

output = self.read_output_json()
proc.check_kill_and_wait()

# Check field is added
assert output[0]['fields']['foo'] == 'bar'

0 comments on commit 370ecd4

Please sign in to comment.