Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check modules and prospectors settings when reload is off #5053

Merged
merged 3 commits into from
Sep 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Fix wrong MySQL CRUD queries timelion visualization {pull}4857[4857]
- Add new metrics to CPU metricsset {pull}4969[4969]
- Fix a memory allocation issue where more memory was allocated than needed in the windows-perfmon metricset. {issue}5035[5035]
- Don't start metricbeat if external modules config is wrong and reload is disabled {pull}5053[5053]

*Packetbeat*

Expand All @@ -71,6 +72,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add PostgreSQL module with slowlog support. {pull}4763[4763]
- Add Kafka log module. {pull}4885[4885]
- Add support for `/var/log/containers/` log path in `add_kubernetes_metadata` processor. {pull}4981[4981]
- Don't start filebeat if external modules/prospectors config is wrong and reload is disabled {pull}5053[5053]
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]

*Heartbeat*
Expand Down
8 changes: 8 additions & 0 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config

c.prospectorsReloader = cfgfile.NewReloader(configProspectors)
runnerFactory := prospector.NewRunnerFactory(c.out, r, c.beatDone)
if err := c.prospectorsReloader.Check(runnerFactory); err != nil {
return err
}

go func() {
c.prospectorsReloader.Run(runnerFactory)
}()
Expand All @@ -69,6 +73,10 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config

c.modulesReloader = cfgfile.NewReloader(configModules)
modulesFactory := fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, c.beatDone)
if err := c.modulesReloader.Check(modulesFactory); err != nil {
return err
}

go func() {
c.modulesReloader.Run(modulesFactory)
}()
Expand Down
3 changes: 1 addition & 2 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg

{% if reload or reload_path -%}
filebeat.config.{{ reload_type|default("prospectors") }}:
enabled: true
path: {{ reload_path }}
{% if reload -%}
reload.period: 1s
reload.enabled: true
reload.enabled: {{ reload|default("false") }}
{% endif -%}
{% endif -%}

Expand Down
32 changes: 32 additions & 0 deletions filebeat/tests/system/test_reload_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,35 @@ def test_load_configs(self):
assert output[0]["message"] == "Hello 1"
assert output[1]["message"] == "Hello 2"
proc.check_kill_and_wait()

def test_wrong_module_no_reload(self):
"""
Test beat errors when reload is disabled and some module config is wrong
"""
self.render_config_template(
reload=False,
reload_path=self.working_dir + "/configs/*.yml",
prospectors=False,
)
os.mkdir(self.working_dir + "/configs/")

config_path = self.working_dir + "/configs/wrong_module.yml"
moduleConfig = """
- module: test
test:
enabled: true
wrong_field: error
prospector:
scan_frequency: 1s
"""
with open(config_path, 'w') as f:
f.write(moduleConfig)

exit_code = self.run_beat()

# Wait until offset for new line is updated
self.wait_until(
lambda: self.log_contains("No paths were defined for prospector accessing"),
max_timeout=10)

assert exit_code == 1
83 changes: 67 additions & 16 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"sync"
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand Down Expand Up @@ -54,6 +57,7 @@ type Runner interface {
type Reloader struct {
registry *Registry
config DynamicConfig
path string
done chan struct{}
wg sync.WaitGroup
}
Expand All @@ -63,13 +67,56 @@ func NewReloader(cfg *common.Config) *Reloader {
config := DefaultDynamicConfig
cfg.Unpack(&config)

path := config.Path
if !filepath.IsAbs(path) {
path = paths.Resolve(paths.Config, path)
}

return &Reloader{
registry: NewRegistry(),
config: config,
path: path,
done: make(chan struct{}),
}
}

// Check configs are valid (only if reload is disabled)
func (rl *Reloader) Check(runnerFactory RunnerFactory) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is quite similar to the existing config reload code in Run function. I would extract anything that could be extracted into smaller functions. So those could be reused it in both Check and Run. Thus, code duplication could be eliminated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do some refactoring, thanks!

// If config reload is enabled we ignore errors (as they may be fixed afterwards)
if rl.config.Reload.Enabled {
return nil
}

debugf("Checking module configs from: %s", rl.path)
gw := NewGlobWatcher(rl.path)

files, _, err := gw.Scan()
if err != nil {
return errors.Wrap(err, "fetching config files")
}

// Load all config objects
configs, err := rl.loadConfigs(files)
if err != nil {
return err
}

debugf("Number of module configs found: %v", len(configs))

// Initialize modules
for _, c := range configs {
// Only add configs to startList which are enabled
if !c.Enabled() {
continue
}
_, err := runnerFactory.Create(c)
if err != nil {
return err
}
}
return nil
}

// Run runs the reloader
func (rl *Reloader) Run(runnerFactory RunnerFactory) {
logp.Info("Config reloader started")
Expand All @@ -80,12 +127,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
// Stop all running modules when method finishes
defer rl.stopRunners(rl.registry.CopyList())

path := rl.config.Path
if !filepath.IsAbs(path) {
path = paths.Resolve(paths.Config, path)
}

gw := NewGlobWatcher(path)
gw := NewGlobWatcher(rl.path)

// If reloading is disable, config files should be loaded immidiately
if !rl.config.Reload.Enabled {
Expand Down Expand Up @@ -118,16 +160,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
}

// Load all config objects
configs := []*common.Config{}
for _, file := range files {
c, err := LoadList(file)
if err != nil {
logp.Err("Error loading config: %s", err)
continue
}

configs = append(configs, c...)
}
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))

Expand Down Expand Up @@ -183,6 +216,24 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
}
}

func (rl *Reloader) loadConfigs(files []string) ([]*common.Config, error) {
// Load all config objects
configs := []*common.Config{}
var errs multierror.Errors
for _, file := range files {
c, err := LoadList(file)
if err != nil {
errs = append(errs, err)
logp.Err("Error loading config: %s", err)
continue
}

configs = append(configs, c...)
}

return configs, errs.Err()
}

// Stop stops the reloader and waits for all modules to properly stop
func (rl *Reloader) Stop() {
close(rl.done)
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
moduleReloader := cfgfile.NewReloader(bt.config.ConfigModules)
factory := module.NewFactory(bt.config.MaxStartDelay, b.Publisher)

if err := moduleReloader.Check(factory); err != nil {
return err
}

go moduleReloader.Run(factory)
wg.Add(1)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/tests/system/config/metricbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ metricbeat.modules:
{% endif -%}
{%- endfor %}

{% if reload -%}
{% if reload or reload_path -%}
metricbeat.config.modules:
path: {{ reload_path|default("${path.config}/modules.d/*.yml") }}
reload.period: 1s
reload.enabled: true
reload.enabled: {{ reload|default("false")}}
{% endif -%}

# Disable random start delay for metricsets.
Expand Down
29 changes: 29 additions & 0 deletions metricbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,32 @@ def test_start_stop(self):
assert lines == self.output_lines()

proc.check_kill_and_wait()

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os")
def test_wrong_module_no_reload(self):
"""
Test beat errors when reload is disabled and some module config is wrong
"""
self.render_config_template(
reload=False,
reload_path=self.working_dir + "/configs/*.yml",
)
os.mkdir(self.working_dir + "/configs/")

config_path = self.working_dir + "/configs/system.yml"
systemConfig = """
- module: system
metricsets: ["wrong_metricset"]
period: 1s
"""
with open(config_path, 'w') as f:
f.write(systemConfig)

exit_code = self.run_beat()

# Wait until offset for new line is updated
self.wait_until(
lambda: self.log_contains("metricset not found"),
max_timeout=10)

assert exit_code == 1