Skip to content

Commit

Permalink
log error when parsing config block and disabled input on filebeat (e…
Browse files Browse the repository at this point in the history
…lastic#30534)

- log error when parsing config block and disabled input on filebeat
- adjust tests and add an optional error message for 'wait_until' on libbeat python tests
  • Loading branch information
AndersonQ authored Mar 2, 2022
1 parent 71d9685 commit b25fdf6
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fix the ability for subcommands to be ran properly from the beats containers. {pull}30452[30452]
- Update docker/distribution dependency library to fix a security issues concerning OCI Manifest Type Confusion Issue. {pull}30462[30462]
- Fixes Beats crashing when glibc >= 2.35 is used {issue}30576[30576]
- Log errors when parsing and applying config blocks and if the input is disabled. {pull}30534[30534]

*Auditbeat*

Expand Down
31 changes: 19 additions & 12 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,28 @@ func (c *crawler) Start(
) error {
log := c.log

log.Infof("Loading Inputs: %v", len(c.inputConfigs))
log.Infof("Loading Inputs: %d", len(c.inputConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig)
if err != nil {
return fmt.Errorf("starting input failed: %+v", err)
return fmt.Errorf("starting input failed: %w", err)
}
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %+v", err)
return fmt.Errorf("creating input reloader failed: %w", err)
}

}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %+v", err)
return fmt.Errorf("creating module reloader failed: %w", err)
}

}

if c.inputReloader != nil {
Expand All @@ -105,7 +103,7 @@ func (c *crawler) Start(
}()
}

log.Infof("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))
log.Infof("Loading and starting Inputs completed. Enabled inputs: %d", len(c.inputs))

return nil
}
Expand All @@ -114,23 +112,32 @@ func (c *crawler) startInput(
pipeline beat.PipelineConnector,
config *common.Config,
) error {
// TODO: Either use debug or remove it after https://github.com/elastic/beats/pull/30534
// is fixed.
c.log.Infof("starting input, keys present on the config: %v",
config.FlattenedKeys())

if !config.Enabled() {
c.log.Infof("input disabled, skipping it")
return nil
}

var h map[string]interface{}
config.Unpack(&h)
err := config.Unpack(&h)
if err != nil {
return fmt.Errorf("could not unpack config: %w", err)
}
id, err := hashstructure.Hash(h, nil)
if err != nil {
return fmt.Errorf("can not compute id from configuration: %v", err)
return fmt.Errorf("can not compute id from configuration: %w", err)
}
if _, ok := c.inputs[id]; ok {
return fmt.Errorf("input with same ID already exists: %v", id)
return fmt.Errorf("input with same ID already exists: %d", id)
}

runner, err := c.inputsFactory.Create(pipeline, config)
if err != nil {
return fmt.Errorf("error while initializing input: %v", err)
return fmt.Errorf("error while initializing input: %w", err)
}
if inputRunner, ok := runner.(*input.Runner); ok {
inputRunner.Once = c.once
Expand All @@ -155,7 +162,7 @@ func (c *crawler) Stop() {
}()
}

logp.Info("Stopping %v inputs", len(c.inputs))
logp.Info("Stopping %d inputs", len(c.inputs))
// Stop inputs in parallel
for id, p := range c.inputs {
id, p := id, p
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (h *Harvester) Run() error {
}
}(h.state.Source)

logger.Info("Harvester started for file.")
logger.Infof("Harvester started for paths: %v", h.config.Paths)

h.doneWg.Add(1)
go func() {
Expand Down
10 changes: 6 additions & 4 deletions filebeat/tests/system/test_stdin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ def test_stdin(self):

proc = self.start_beat()

msg = "Harvester started"
self.wait_until(
lambda: self.log_contains(
"Harvester started for file."),
max_timeout=10)
lambda: self.log_contains(msg),
max_timeout=10,
err_msg=f"did not find '{msg}' in the logs")

iterations1 = 5
for n in range(0, iterations1):
Expand Down Expand Up @@ -102,4 +103,5 @@ def test_stdin_is_exclusive(self):

filebeat = self.start_beat()
filebeat.check_wait(exit_code=1)
assert self.log_contains("Exiting: stdin requires to be run in exclusive mode, configured inputs: stdin, udp")
msg = "Exiting: stdin requires to be run in exclusive mode, configured inputs: stdin, udp"
assert self.log_contains(msg), f"did not find '{msg}' in the logs"
4 changes: 2 additions & 2 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def setUp(self):
# running tests in parallel
pass

def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond", err_msg=""):
"""
TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
Waits until the cond function returns true,
Expand All @@ -427,7 +427,7 @@ def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
while not cond():
if datetime.now() - start > timedelta(seconds=max_timeout):
raise WaitTimeoutError(
f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds.")
f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
time.sleep(poll_interval)

def wait_until_output_has_key(self, key: str, max_timeout=15):
Expand Down
6 changes: 4 additions & 2 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (cm *Manager) OnConfig(s string) {
if errs := cm.apply(blocks); errs != nil {
// `cm.apply` already logs the errors; currently allow beat to run degraded
cm.updateStatusWithError(err)
cm.logger.Errorf("failed applying config blocks: %v", err)
return
}

Expand Down Expand Up @@ -256,8 +257,8 @@ func (cm *Manager) apply(blocks ConfigBlocks) error {
}

// Unset missing configs
for name := range missing {
if missing[name] {
for name, isMissing := range missing {
if isMissing {
if err := cm.reload(name, []*ConfigBlock{}); err != nil {
errors = multierror.Append(errors, err)
}
Expand Down Expand Up @@ -319,6 +320,7 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (ConfigBlocks, error) {
for _, regName := range cm.registry.GetRegisteredNames() {
iBlock, err := cfg.GetValue(regName)
if err != nil {
cm.logger.Errorf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
continue
}

Expand Down

0 comments on commit b25fdf6

Please sign in to comment.