Skip to content

Commit

Permalink
Check modules and prospectors settings when reload is off
Browse files Browse the repository at this point in the history
This change ensures that we check configs when loading them through
`filebeat.config.modules`, `filebeat.config.prospectors` or `metricbeat.config.modules`.

It will error and exit if settings are wrong and **reload is disabled**
  • Loading branch information
exekias committed Aug 29, 2017
1 parent 753cf9e commit 222ee2c
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Fix wrong MySQL CRUD queries timelion visualization {pull}4857[4857]
- Add new metrics to CPU metricsset {pull}4969[4969]
- Fix a memory allocation issue where more memory was allocated than needed in the windows-perfmon metricset. {issue}5035[5035]
- Don't start metricbeat if external modules config is wrong and reload is disabled {pull}5053[5053]

*Packetbeat*

Expand All @@ -68,6 +69,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add PostgreSQL module with slowlog support. {pull}4763[4763]
- Add Kafka log module. {pull}4885[4885]
- Add support for `/var/log/containers/` log path in `add_kubernetes_metadata` processor. {pull}4981[4981]
- Don't start filebeat if external modules/prospectors config is wrong and reload is disabled {pull}5053[5053]

*Heartbeat*

Expand Down
8 changes: 8 additions & 0 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config

c.prospectorsReloader = cfgfile.NewReloader(configProspectors)
registrarContext := prospector.NewRegistrarContext(c.out, r, c.beatDone)
if err := c.prospectorsReloader.Check(registrarContext); err != nil {
return err
}

go func() {
c.prospectorsReloader.Run(registrarContext)
}()
Expand All @@ -69,6 +73,10 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config

c.modulesReloader = cfgfile.NewReloader(configModules)
modulesFactory := fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, c.beatDone)
if err := c.modulesReloader.Check(modulesFactory); err != nil {
return err
}

go func() {
c.modulesReloader.Run(modulesFactory)
}()
Expand Down
3 changes: 1 addition & 2 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg

{% if reload or reload_path -%}
filebeat.config.{{ reload_type|default("prospectors") }}:
enabled: true
path: {{ reload_path }}
{% if reload -%}
reload.period: 1s
reload.enabled: true
reload.enabled: {{ reload|default("false") }}
{% endif -%}
{% endif -%}

Expand Down
32 changes: 32 additions & 0 deletions filebeat/tests/system/test_reload_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,35 @@ def test_load_configs(self):
assert output[0]["message"] == "Hello 1"
assert output[1]["message"] == "Hello 2"
proc.check_kill_and_wait()

def test_wrong_module_no_reload(self):
"""
Test beat errors when reload is disabled and some module config is wrong
"""
self.render_config_template(
reload=False,
reload_path=self.working_dir + "/configs/*.yml",
prospectors=False,
)
os.mkdir(self.working_dir + "/configs/")

config_path = self.working_dir + "/configs/wrong_module.yml"
moduleConfig = """
- module: test
test:
enabled: true
wrong_field: error
prospector:
scan_frequency: 1s
"""
with open(config_path, 'w') as f:
f.write(moduleConfig)

exit_code = self.run_beat()

# Wait until offset for new line is updated
self.wait_until(
lambda: self.log_contains("No paths were defined for prospector accessing"),
max_timeout=10)

assert exit_code == 1
57 changes: 51 additions & 6 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand Down Expand Up @@ -54,6 +56,7 @@ type Runner interface {
type Reloader struct {
registry *Registry
config DynamicConfig
path string
done chan struct{}
wg sync.WaitGroup
}
Expand All @@ -63,13 +66,60 @@ func NewReloader(cfg *common.Config) *Reloader {
config := DefaultDynamicConfig
cfg.Unpack(&config)

path := config.Path
if !filepath.IsAbs(path) {
path = paths.Resolve(paths.Config, path)
}

return &Reloader{
registry: NewRegistry(),
config: config,
path: path,
done: make(chan struct{}),
}
}

// Check configs are valid (only if reload is disabled)
func (rl *Reloader) Check(runnerFactory RunnerFactory) error {
// If config reload is enabled we ignore errors (as they may be fixed afterwards)
if rl.config.Reload.Enabled {
return nil
}

debugf("Checking module configs from: %s", rl.path)
gw := NewGlobWatcher(rl.path)

files, _, err := gw.Scan()
if err != nil {
return errors.Wrap(err, "fetching config files")
}

// Load all config objects
configs := []*common.Config{}
for _, file := range files {
c, err := LoadList(file)
if err != nil {
return errors.Wrap(err, "loading config")
}
configs = append(configs, c...)
}

debugf("Number of module configs found: %v", len(configs))

// Initialize modules
for _, c := range configs {
// Only add configs to startList which are enabled
if !c.Enabled() {
continue
}
_, err := runnerFactory.Create(c)
if err != nil {
return err
}
}
return nil
}

// Run runs the reloader
func (rl *Reloader) Run(runnerFactory RunnerFactory) {
logp.Info("Config reloader started")
Expand All @@ -80,12 +130,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
// Stop all running modules when method finishes
defer rl.stopRunners(rl.registry.CopyList())

path := rl.config.Path
if !filepath.IsAbs(path) {
path = paths.Resolve(paths.Config, path)
}

gw := NewGlobWatcher(path)
gw := NewGlobWatcher(rl.path)

// If reloading is disable, config files should be loaded immidiately
if !rl.config.Reload.Enabled {
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
moduleReloader := cfgfile.NewReloader(bt.config.ConfigModules)
factory := module.NewFactory(bt.config.MaxStartDelay, b.Publisher)

if err := moduleReloader.Check(factory); err != nil {
return err
}

go moduleReloader.Run(factory)
wg.Add(1)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/tests/system/config/metricbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ metricbeat.modules:
{% endif -%}
{%- endfor %}

{% if reload -%}
{% if reload or reload_path -%}
metricbeat.config.modules:
path: {{ reload_path|default("${path.config}/modules.d/*.yml") }}
reload.period: 1s
reload.enabled: true
reload.enabled: {{ reload|default("false")}}
{% endif -%}

# Disable random start delay for metricsets.
Expand Down
29 changes: 29 additions & 0 deletions metricbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,32 @@ def test_start_stop(self):
assert lines == self.output_lines()

proc.check_kill_and_wait()

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os")
def test_wrong_module_no_reload(self):
"""
Test beat errors when reload is disabled and some module config is wrong
"""
self.render_config_template(
reload=False,
reload_path=self.working_dir + "/configs/*.yml",
)
os.mkdir(self.working_dir + "/configs/")

config_path = self.working_dir + "/configs/system.yml"
systemConfig = """
- module: system
metricsets: ["wrong_metricset"]
period: 1s
"""
with open(config_path, 'w') as f:
f.write(systemConfig)

exit_code = self.run_beat()

# Wait until offset for new line is updated
self.wait_until(
lambda: self.log_contains("metricset not found"),
max_timeout=10)

assert exit_code == 1

0 comments on commit 222ee2c

Please sign in to comment.