diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index 037dd3f402e..4fb86cd9e18 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -52,15 +52,14 @@ var validModuleNames = regexp.MustCompile("[^a-zA-Z0-9\\_\\-]+") type logHints struct { config *config registry *fileset.ModuleRegistry + log *logp.Logger } // NewLogHints builds a log hints builder func NewLogHints(cfg *common.Config) (autodiscover.Builder, error) { config := defaultConfig() - err := cfg.Unpack(&config) - - if err != nil { - return nil, fmt.Errorf("unable to unpack hints config due to error: %v", err) + if err := cfg.Unpack(&config); err != nil { + return nil, fmt.Errorf("unable to unpack hints config due to error: %w", err) } moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{}, false) @@ -68,39 +67,33 @@ func NewLogHints(cfg *common.Config) (autodiscover.Builder, error) { return nil, err } - return &logHints{&config, moduleRegistry}, nil + return &logHints{&config, moduleRegistry, logp.NewLogger("hints.builder")}, nil } // Create config based on input hints in the bus event func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*common.Config { var hints common.MapStr - hIface, ok := event["hints"] - if ok { - hints, _ = hIface.(common.MapStr) - } - - inputConfig := l.getInputsConfigs(hints) - - // If default config is disabled return nothing unless it's explicty enabled - if !l.config.DefaultConfig.Enabled() && !builder.IsEnabled(hints, l.config.Key) { - logp.Debug("hints.builder", "default config is disabled: %+v", event) - return []*common.Config{} + if hintsIfc, found := event["hints"]; found { + hints, _ = hintsIfc.(common.MapStr) } - // If explictly disabled, return nothing - if builder.IsDisabled(hints, l.config.Key) { - logp.Debug("hints.builder", "logs disabled by hint: %+v", event) - return []*common.Config{} + // Hint must be explicitly enabled when default_config sets enabled=false. + if !l.config.DefaultConfig.Enabled() && !builder.IsEnabled(hints, l.config.Key) || + builder.IsDisabled(hints, l.config.Key) { + l.log.Debugw("Hints config is not enabled.", "autodiscover.event", event) + return nil } - if inputConfig != nil { - configs := []*common.Config{} + if inputConfig := l.getInputsConfigs(hints); inputConfig != nil { + var configs []*common.Config for _, cfg := range inputConfig { if config, err := common.NewConfigFrom(cfg); err == nil { configs = append(configs, config) + } else { + l.log.Warnw("Failed to create config from input.", "error", err) } } - logp.Debug("hints.builder", "generated config %+v", configs) + l.log.Debugf("Generated %d input configs from hint.", len(configs)) // Apply information in event to the template to generate the final config return template.ApplyConfigTemplate(event, configs) } diff --git a/filebeat/autodiscover/builder/hints/logs_test.go b/filebeat/autodiscover/builder/hints/logs_test.go index 0dadfe54798..e00ec39920e 100644 --- a/filebeat/autodiscover/builder/hints/logs_test.go +++ b/filebeat/autodiscover/builder/hints/logs_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/bus" @@ -692,9 +693,9 @@ func TestGenerateHints(t *testing.T) { for _, test := range tests { // Configure path for modules access abs, _ := filepath.Abs("../../..") - err := paths.InitPaths(&paths.Path{ + require.NoError(t, paths.InitPaths(&paths.Path{ Home: abs, - }) + })) l, err := NewLogHints(test.config) if err != nil { @@ -927,9 +928,9 @@ func TestGenerateHintsWithPaths(t *testing.T) { // Configure path for modules access abs, _ := filepath.Abs("../../..") - err := paths.InitPaths(&paths.Path{ + require.NoError(t, paths.InitPaths(&paths.Path{ Home: abs, - }) + })) l, err := NewLogHints(cfg) if err != nil { @@ -937,7 +938,7 @@ func TestGenerateHintsWithPaths(t *testing.T) { } cfgs := l.CreateConfig(test.event) - assert.Equal(t, test.len, len(cfgs), test.msg) + require.Equal(t, test.len, len(cfgs), test.msg) if test.len != 0 { config := common.MapStr{} err := cfgs[0].Unpack(&config) diff --git a/filebeat/docs/autodiscover-hints.asciidoc b/filebeat/docs/autodiscover-hints.asciidoc index b63fb6979bd..55b8f4adb5f 100644 --- a/filebeat/docs/autodiscover-hints.asciidoc +++ b/filebeat/docs/autodiscover-hints.asciidoc @@ -259,7 +259,9 @@ You can label Docker containers with useful info to decode logs structured as JS [float] ==== Nomad -Nomad autodiscover provider supports hints using the https://www.nomadproject.io/docs/job-specification/meta.html[`meta` stanza]. To enable it just set `hints.enabled`: +Nomad autodiscover provider supports hints using the +https://www.nomadproject.io/docs/job-specification/meta.html[`meta` stanza]. To +enable it just set `hints.enabled`: [source,yaml] ----- @@ -269,7 +271,8 @@ filebeat.autodiscover: hints.enabled: true ----- -You can configure the default config that will be launched when a new job is seen, like this: +You can configure the default config that will be launched when a new job is +seen, like this: [source,yaml] ----- @@ -278,13 +281,13 @@ filebeat.autodiscover: - type: nomad hints.enabled: true hints.default_config: - type: nomad + type: log paths: - - /var/lib/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.* + - /opt/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.* ----- -You can also disable default settings entirely, so only Jobs annotated like `co.elastic.logs/enabled: true` -will be retrieved: +You can also disable the default config such that only logs from jobs explicitly +annotated with `"co.elastic.logs/enabled" = "true"` will be collected: [source,yaml] ----- @@ -292,17 +295,67 @@ filebeat.autodiscover: providers: - type: nomad hints.enabled: true - hints.default_config.enabled: false + hints.default_config: + enabled: false + type: log + paths: + - /opt/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.* ----- -You can annotate Nomad Jobs using the `meta` stanza with useful info to spin up {beatname_uc} inputs -or modules: +You can annotate Nomad Jobs using the `meta` stanza with useful info to spin up +{beatname_uc} inputs or modules: [source,hcl] ----- meta { - "co.elastic.logs/multiline.pattern" = "^\[" - "co.elastic.logs/multiline.negate" = true - "co.elastic.logs/multiline.match" = after + "co.elastic.logs/enabled" = "true" + "co.elastic.logs/multiline.pattern" = "^\\[" + "co.elastic.logs/multiline.negate" = "true" + "co.elastic.logs/multiline.match" = "after" } ----- + +If you are using autodiscover then in most cases you will want to use the +<> processor to enrich events with +Nomad metadata. This example configures {{beatname_uc}} to connect to the local +Nomad agent over HTTPS and adds the Nomad allocation ID to all events from the +input. Later in the pipeline the `add_nomad_metadata` processor will use that ID +to enrich the event. + +[source,yaml] +----- +filebeat.autodiscover: + providers: + - type: nomad + address: https://localhost:4646 + hints.enabled: true + hints.default_config: + enabled: false <1> + type: log + paths: + - /opt/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.* + processors: + - add_fields: <2> + target: nomad + fields: + allocation.id: ${data.nomad.allocation.id} + +processors: + - add_nomad_metadata: <3> + when.has_fields.fields: [nomad.allocation.id] + address: https://localhost:4646 + default_indexers.enabled: false + default_matchers.enabled: false + indexers: + - allocation_uuid: + matchers: + - fields: + lookup_fields: + - 'nomad.allocation.id' +----- +<1> The default config is disabled meaning any task without the +`"co.elastic.logs/enabled" = "true"` metadata will be ignored. +<2> The `add_fields` processor populates the `nomad.allocation.id` field with +the Nomad allocation UUID. +<3> The `add_nomad_metadata` processor is configured at the global level so +that it is only instantiated one time which saves resources. diff --git a/filebeat/docs/autodiscover-nomad-config.asciidoc b/filebeat/docs/autodiscover-nomad-config.asciidoc index 2e72ae7fae7..b04255223ab 100644 --- a/filebeat/docs/autodiscover-nomad-config.asciidoc +++ b/filebeat/docs/autodiscover-nomad-config.asciidoc @@ -43,6 +43,6 @@ filebeat.autodiscover: - /var/lib/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.* ------------------------------------------------------------------------------------- -WARNING: The `docker` input is currently not supported. Nomad doesn't expose the container id -associated with the allocation. Without the container id, there is no way of generating the proper +WARNING: The `docker` input is currently not supported. Nomad doesn't expose the container ID +associated with the allocation. Without the container ID, there is no way of generating the proper path for reading the container's logs. diff --git a/heartbeat/tests/system/test_autodiscovery.py b/heartbeat/tests/system/test_autodiscovery.py index 7f5060ace81..cbc1b679981 100644 --- a/heartbeat/tests/system/test_autodiscovery.py +++ b/heartbeat/tests/system/test_autodiscovery.py @@ -40,7 +40,7 @@ def test_docker(self): proc = self.start_beat() self.wait_until(lambda: self.log_contains( - re.compile('autodiscover.+Got a start event:', re.I))) + re.compile('autodiscover.+Got a start event', re.I))) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index f36faa6b8f2..8c6977e8362 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -169,7 +169,7 @@ func (a *Autodiscover) worker() { } func (a *Autodiscover) handleStart(event bus.Event) bool { - a.logger.Debugf("Got a start event: %v", event) + a.logger.Debugw("Got a start event.", "autodiscover.event", event) eventID := getID(event) if eventID == "" { diff --git a/libbeat/autodiscover/builder/helper.go b/libbeat/autodiscover/builder/helper.go index faaa112c65f..6f78973e6fb 100644 --- a/libbeat/autodiscover/builder/helper.go +++ b/libbeat/autodiscover/builder/helper.go @@ -29,6 +29,8 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) +const logName = "autodiscover.builder" + // GetContainerID returns the id of a container func GetContainerID(container common.MapStr) string { id, _ := container["id"].(string) @@ -92,7 +94,7 @@ func GetProcessors(hints common.MapStr, key string) []common.MapStr { if str, ok := value.(string); ok { cfg := common.MapStr{} if err := json.Unmarshal([]byte(str), &cfg); err != nil { - logp.Debug("autodiscover.builder", "unable to unmarshal json due to error: %v", err) + logp.NewLogger(logName).Debugw("Unable to unmarshal json due to error", "error", err) continue } proc[key] = cfg @@ -124,7 +126,7 @@ func GetConfigs(hints common.MapStr, key, name string) []common.MapStr { var configs []common.MapStr for _, key := range nums { - rawCfg, _ := raw[key] + rawCfg := raw[key] if config, ok := rawCfg.(common.MapStr); ok { configs = append(configs, config) } @@ -159,15 +161,15 @@ func GetHintAsConfigs(hints common.MapStr, key string) []common.MapStr { if str[0] != '[' { cfg := common.MapStr{} if err := json.Unmarshal([]byte(str), &cfg); err != nil { - logp.Debug("autodiscover.builder", "unable to unmarshal json due to error: %v", err) + logp.NewLogger(logName).Debugw("Unable to unmarshal json due to error", "error", err) return nil } return []common.MapStr{cfg} } - cfg := []common.MapStr{} + var cfg []common.MapStr if err := json.Unmarshal([]byte(str), &cfg); err != nil { - logp.Debug("autodiscover.builder", "unable to unmarshal json due to error: %v", err) + logp.NewLogger(logName).Debugw("Unable to unmarshal json due to error", "error", err) return nil } return cfg @@ -175,7 +177,7 @@ func GetHintAsConfigs(hints common.MapStr, key string) []common.MapStr { return nil } -// IsEnabled will return true when 'enabled' is **explicity** set to true +// IsEnabled will return true when 'enabled' is **explicitly** set to true. func IsEnabled(hints common.MapStr, key string) bool { if value, err := hints.GetValue(fmt.Sprintf("%s.enabled", key)); err == nil { enabled, _ := strconv.ParseBool(value.(string)) @@ -185,14 +187,16 @@ func IsEnabled(hints common.MapStr, key string) bool { return false } -// IsDisabled will return true when 'enabled' key is **explicity** set to false +// IsDisabled will return true when 'enabled' is **explicitly** set to false. func IsDisabled(hints common.MapStr, key string) bool { if value, err := hints.GetValue(fmt.Sprintf("%s.enabled", key)); err == nil { enabled, err := strconv.ParseBool(value.(string)) - if err == nil { - logp.Debug("autodiscover.builder", "error parsing 'enabled' hint from: %+v", hints) - return !enabled + if err != nil { + logp.NewLogger(logName).Debugw("Error parsing 'enabled' hint.", + "error", err, "autodiscover.hints", hints) + return false } + return !enabled } // keep reading disable (deprecated) for backwards compatibility @@ -271,7 +275,7 @@ func GetHintsAsList(hints common.MapStr, key string) []common.MapStr { var configs []common.MapStr for _, key := range nums { - rawCfg, _ := raw[key] + rawCfg := raw[key] if config, ok := rawCfg.(common.MapStr); ok { configs = append(configs, config) } diff --git a/libbeat/autodiscover/builder/plugin.go b/libbeat/autodiscover/builder/plugin.go index 83abccc2097..8ad423da13f 100644 --- a/libbeat/autodiscover/builder/plugin.go +++ b/libbeat/autodiscover/builder/plugin.go @@ -29,7 +29,7 @@ type builderPlugin struct { builder autodiscover.BuilderConstructor } -var pluginKey = "libbeat.autodiscover.builder" +const pluginKey = "libbeat.autodiscover.builder" // Plugin accepts a BuilderConstructor to be registered as a plugin func Plugin(name string, b autodiscover.BuilderConstructor) map[string][]interface{} { diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index 91940680cb6..79be3823a31 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -481,7 +481,21 @@ The `nomad` autodiscover provider has the following configuration settings: `namespace`:: (Optional) Namespace to use. If not provided the `default` namespace is used. -`secret_id`:: (Optional) SecretID to use if ACL is enabled in Nomad. +`secret_id`:: (Optional) SecretID to use if ACL is enabled in Nomad. This is an +example ACL policy to apply to the token. + +[source,hcl] +---- +namespace "*" { + policy = "read" +} +node { + policy = "read" +} +agent { + policy = "read" +} +---- `node`:: (Optional) Specify the node to scope {beatname_lc} to in case it cannot be accurately detected when `node` scope is used. @@ -495,6 +509,7 @@ The `nomad` autodiscover provider has the following configuration settings: `allow_stale`:: (Optional) allows any Nomad server (non-leader) to service a read. This normally means that the local node where filebeat is allocated will service filebeat's requests. + Defaults to `true`. include::../../{beatname_lc}/docs/autodiscover-nomad-config.asciidoc[] diff --git a/x-pack/libbeat/autodiscover/providers/nomad/config.go b/x-pack/libbeat/autodiscover/providers/nomad/config.go index 1a611aceee3..5ce13556d91 100644 --- a/x-pack/libbeat/autodiscover/providers/nomad/config.go +++ b/x-pack/libbeat/autodiscover/providers/nomad/config.go @@ -41,9 +41,6 @@ type Config struct { func defaultConfig() *Config { return &Config{ Address: "http://127.0.0.1:4646", - Region: "", - Namespace: "", - SecretID: "", Scope: ScopeNode, allowStale: true, waitTime: 15 * time.Second, @@ -53,7 +50,7 @@ func defaultConfig() *Config { } } -// Validate ensures correctness of config +// Validate ensures correctness of config. func (c *Config) Validate() error { // Make sure that prefix doesn't ends with a '.' if c.Prefix[len(c.Prefix)-1] == '.' && c.Prefix != "." { diff --git a/x-pack/libbeat/autodiscover/providers/nomad/nomad.go b/x-pack/libbeat/autodiscover/providers/nomad/nomad.go index 6b3d9bc70fa..17284c49fd7 100644 --- a/x-pack/libbeat/autodiscover/providers/nomad/nomad.go +++ b/x-pack/libbeat/autodiscover/providers/nomad/nomad.go @@ -50,12 +50,10 @@ func AutodiscoverBuilder( c *common.Config, keystore keystore.Keystore, ) (autodiscover.Provider, error) { - cfgwarn.Experimental("The nomad autodiscover is experimental") + cfgwarn.Experimental("The nomad autodiscover provider is experimental.") config := defaultConfig() - - err := c.Unpack(&config) - if err != nil { + if err := c.Unpack(&config); err != nil { return nil, err } @@ -130,18 +128,18 @@ func AutodiscoverBuilder( watcher.AddEventHandler(nomad.ResourceEventHandlerFuncs{ AddFunc: func(obj nomad.Resource) { - logger.Debugf("Watcher Allocation add: %+v", obj.ID) + logger.Debugw("Nomad allocation added", "nomad.allocation.id", obj.ID) p.emit(&obj, "start") }, UpdateFunc: func(obj nomad.Resource) { - logger.Debugf("nomad", "Watcher Allocation update: %+v", obj.ID) + logger.Debugw("Nomad allocation updated", "nomad.allocation.id", obj.ID) p.emit(&obj, "stop") // We have a CleanupTimeout grace period (defaults to 15s) to wait for the stop event // to be processed time.AfterFunc(config.CleanupTimeout, func() { p.emit(&obj, "start") }) }, DeleteFunc: func(obj nomad.Resource) { - logger.Debugf("Watcher Allocation delete: %+v", obj.ID) + logger.Debugw("Nomad allocation deleted", "nomad.allocation.id", obj.ID) p.emit(&obj, "stop") }, }) @@ -152,7 +150,7 @@ func AutodiscoverBuilder( // Start for Runner interface. func (p *Provider) Start() { if err := p.watcher.Start(); err != nil { - p.logger.Errorf("Error starting nomad autodiscover provider: %s", err) + p.logger.Errorw("Error starting nomad autodiscover provider", "error", err) } } @@ -175,7 +173,7 @@ func (p *Provider) emit(obj *nomad.Resource, flag string) { // the NodeID host, err := p.metagen.AllocationNodeName(obj.NodeID) if err != nil { - p.logger.Errorf("Error fetching node information: %s", err) + p.logger.Errorw("Error fetching node information", "error", err) } // If we cannot get a host, we assume that the allocation was stopped @@ -189,7 +187,7 @@ func (p *Provider) emit(obj *nomad.Resource, flag string) { // common metadata from the entire allocation allocMeta := p.metagen.ResourceMetadata(*obj) - // job metadatata merged with the task metadata + // job metadata merged with the task metadata tasks := p.metagen.GroupMeta(obj.Job) // emit per-task separated events @@ -225,7 +223,7 @@ func (p *Provider) publish(event bus.Event) { // Call all appenders to append any extra configuration p.appenders.Append(event) - p.logger.Debugf("nomad", "Publishing event: %+v", event) + p.logger.Debugw("Publishing nomad autodiscover event.", "autodiscover.event", event) p.bus.Publish(event) } @@ -275,12 +273,9 @@ func (p *Provider) generateHints(event bus.Event) bus.Event { cname := builder.GetContainerName(container) hints := builder.GenerateHints(tasks, cname, p.config.Prefix) - - p.logger.Debugf("Generated hints from %+v %+v", tasks, hints) - if len(hints) != 0 { + if len(hints) > 0 { e["hints"] = hints } - p.logger.Debugf("nomad", "Generated builder event %+v", e) prefix := strings.Split(p.config.Prefix, ".")[0] tasks.Delete(prefix) diff --git a/x-pack/libbeat/common/nomad/metadata.go b/x-pack/libbeat/common/nomad/metadata.go index b0648b6e3e7..51e2fe93ebc 100644 --- a/x-pack/libbeat/common/nomad/metadata.go +++ b/x-pack/libbeat/common/nomad/metadata.go @@ -82,7 +82,7 @@ func (g *metaGenerator) ResourceMetadata(obj Resource) common.MapStr { // Returns an array of per-task metadata aggregating the group metadata into the // task metadata func (g *metaGenerator) GroupMeta(job *Job) []common.MapStr { - tasksMeta := []common.MapStr{} + var tasksMeta []common.MapStr for _, group := range job.TaskGroups { meta := make(map[string]string, len(job.Meta)) @@ -131,7 +131,7 @@ func (g *metaGenerator) GroupMeta(job *Job) []common.MapStr { // Returns per-task metadata func (g *metaGenerator) tasksMeta(group *TaskGroup) []common.MapStr { - tasks := []common.MapStr{} + var tasks []common.MapStr for _, task := range group.Tasks { var svcNames, svcTags, svcCanaryTags []string @@ -201,7 +201,7 @@ func generateMapSubset(input common.MapStr, keys []string, dedot bool) common.Ma } // AllocationNodeName returns Name of the node where the task is allocated. It -// does one additional API call to circunvent the empty NodeName property of +// does one additional API call to circumvent the empty NodeName property of // older Nomad versions (up to v0.8) func (g *metaGenerator) AllocationNodeName(id string) (string, error) { if name, ok := g.nodesCache[id]; ok { diff --git a/x-pack/libbeat/common/nomad/watcher.go b/x-pack/libbeat/common/nomad/watcher.go index eba77e764ad..31cb5590ff9 100644 --- a/x-pack/libbeat/common/nomad/watcher.go +++ b/x-pack/libbeat/common/nomad/watcher.go @@ -45,7 +45,7 @@ type WatchOptions struct { SyncTimeout time.Duration // Node is used for filtering events Node string - // Namespace is used for filtering events on specified namespacesx, + // Namespace is used for filtering events on specified namespaces. Namespace string // RefreshInterval is the time interval that the Nomad API will be queried RefreshInterval time.Duration @@ -103,7 +103,7 @@ func (w *watcher) AddEventHandler(h ResourceEventHandlerFuncs) { // Sync the allocations on the given node and update the local metadata func (w *watcher) sync() error { - w.logger.Debugf("Syncing allocations and metadata") + w.logger.Debug("Syncing allocations and metadata") w.logger.Debugf("Starting with WaitIndex=%v", w.waitIndex) queryOpts := &api.QueryOptions{ @@ -114,21 +114,20 @@ func (w *watcher) sync() error { allocations, meta, err := w.getAllocations(queryOpts) if err != nil { - return fmt.Errorf("listing allocations: %w", err) + return fmt.Errorf("failed listing allocations: %w", err) } - w.logger.Debugf("Found %d allocations", len(allocations)) - remoteWaitIndex := meta.LastIndex localWaitIndex := queryOpts.WaitIndex // Only emit updated metadata if the WaitIndex have changed if remoteWaitIndex <= localWaitIndex { w.logger.Debugf("Allocations index is unchanged remoteWaitIndex=%v == localWaitIndex=%v", - fmt.Sprint(remoteWaitIndex), fmt.Sprint(localWaitIndex)) + remoteWaitIndex, localWaitIndex) return nil } + w.logger.Debugf("Found %d allocations", len(allocations)) for _, alloc := range allocations { // the allocation has not changed since last seen, ignore if w.waitIndex > alloc.AllocModifyIndex { @@ -184,7 +183,7 @@ func (w *watcher) sync() error { func (w *watcher) watch() { // Failures counter, do exponential backoff on retries var failures uint - logp.Info("Nomad: %s", "Watching API for resource events") + w.logger.Info("Watching API for resource events") ticker := time.NewTicker(w.options.RefreshInterval) defer ticker.Stop() @@ -193,9 +192,8 @@ func (w *watcher) watch() { case <-w.done: return case <-ticker.C: - err := w.sync() - if err != nil { - logp.Err("Nomad: Error while watching for allocation changes %v", err) + if err := w.sync(); err != nil { + w.logger.Warnw("Error while watching for Nomad allocation changes. Backing off and continuing.", "error", err) backoff(failures) failures++ } @@ -215,11 +213,14 @@ func (w *watcher) getAllocations(queryOpts *api.QueryOptions) ([]*api.Allocation if err != nil { return nil, meta, err } + var allocations []*api.Allocation for _, stub := range stubs { allocation, _, err := w.client.Allocations().Info(stub.ID, queryOpts) if err != nil { - w.logger.Warnf("Failed to get details of allocation '%s'", stub.ID) + w.logger.Warnw("Failed to get details of an allocation.", + "nomad.allocation.id", stub.ID) + continue } allocations = append(allocations, allocation) } @@ -232,12 +233,11 @@ func (w *watcher) fetchNodeID() (string, error) { AllowStale: w.options.AllowStale, } - // Fetch the nodeId from the node name, used to filter the allocations - // If for some reason the NodeID changes filebeat will have to be restarted + // Fetch the nodeId from the node name, used to filter the allocations. + // If for some reason the NodeID changes filebeat will have to be restarted. nodes, _, err := w.client.Nodes().List(queryOpts) - if err != nil { - w.logger.Fatalf("Nomad: Fetching node list err %s", err.Error()) + w.logger.Errorw("Failed fetching Nomad node list.", "error", err) return "", err } diff --git a/x-pack/libbeat/processors/add_nomad_metadata/docs/add_nomad_metadata.asciidoc b/x-pack/libbeat/processors/add_nomad_metadata/docs/add_nomad_metadata.asciidoc index 1ab6b8cab03..31c4d2ede20 100644 --- a/x-pack/libbeat/processors/add_nomad_metadata/docs/add_nomad_metadata.asciidoc +++ b/x-pack/libbeat/processors/add_nomad_metadata/docs/add_nomad_metadata.asciidoc @@ -32,7 +32,21 @@ uses `http://127.0.0.1:4646` by default. in this namespace will be annotated. `region`:: (Optional) Region to watch. If set, only events for allocations in this region will be annotated. -`secretID`:: (Optional) SecretID to use when connecting with the agent API. +`secret_id`:: (Optional) SecretID to use when connecting with the agent API. +This is an example ACL policy to apply to the token. + +[source,hcl] +---- +namespace "*" { + policy = "read" +} +node { + policy = "read" +} +agent { + policy = "read" +} +---- `refresh_interval`:: (Optional) Interval used to updated the cached metadata. It defaults to 30 seconds. `cleanup_timeout`:: (Optional) After an allocation has been removed, time to