From 610e998c121e9453363a0f429c5f8d197eb1350d Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 20 Oct 2020 12:29:00 -0400 Subject: [PATCH] [Elastic Agent] Fix missing elastic_agent event data (#21994) * Fix fields. * Remove from monitoring decorator. * Add changelog. * Fix tests. * Fix tests. * Fix import. --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + .../pkg/agent/application/local_mode.go | 2 +- .../pkg/agent/application/managed_mode.go | 2 +- .../agent/application/monitoring_decorator.go | 1 - .../pkg/agent/application/stream.go | 8 +++-- .../pkg/agent/operation/common_test.go | 4 ++- .../pkg/agent/operation/monitoring.go | 30 +++++++++++++++++++ .../pkg/agent/operation/monitoring_test.go | 4 ++- .../pkg/agent/operation/operator.go | 4 +++ .../testdata/enabled_output_true-filebeat.yml | 8 ++--- .../testdata/enabled_true-filebeat.yml | 8 ++--- .../testdata/single_config-filebeat.yml | 16 +++++----- .../testdata/single_config-metricbeat.yml | 24 +++++++-------- .../pkg/agent/transpiler/rules.go | 8 ++--- .../pkg/agent/transpiler/rules_test.go | 16 +++++----- 15 files changed, 88 insertions(+), 48 deletions(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index fa0198a6628..3882ba19712 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -20,6 +20,7 @@ - Use local temp instead of system one {pull}21883[21883] - Rename monitoring index from `elastic.agent` to `elastic_agent` {pull}21932[21932] - Fix issue with named pipes on Windows 7 {pull}21931[21931] +- Fix missing elastic_agent event data {pull}21994[21994] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index b58e260cab6..f0c4153f474 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -100,7 +100,7 @@ func newLocal( return nil, errors.New(err, "failed to initialize monitoring") } - router, err := newRouter(log, streamFactory(localApplication.bgContext, cfg.Settings, localApplication.srv, reporter, monitor)) + router, err := newRouter(log, streamFactory(localApplication.bgContext, agentInfo, cfg.Settings, localApplication.srv, reporter, monitor)) if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index e38685741c3..fa31215f75d 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -154,7 +154,7 @@ func newManaged( return nil, errors.New(err, "failed to initialize monitoring") } - router, err := newRouter(log, streamFactory(managedApplication.bgContext, cfg.Settings, managedApplication.srv, combinedReporter, monitor)) + router, err := newRouter(log, streamFactory(managedApplication.bgContext, agentInfo, cfg.Settings, managedApplication.srv, combinedReporter, monitor)) if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } diff --git a/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator.go b/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator.go index 3fc49ef17d3..920b1a4b5bf 100644 --- a/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator.go +++ b/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator.go @@ -94,7 +94,6 @@ func getMonitoringRule(outputName string) *transpiler.RuleList { return transpiler.NewRuleList( transpiler.Copy(monitoringOutputSelector, outputKey), transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey), - transpiler.InjectAgentInfo(), transpiler.Filter(monitoringKey, programsKey, outputKey), ) } diff --git a/x-pack/elastic-agent/pkg/agent/application/stream.go b/x-pack/elastic-agent/pkg/agent/application/stream.go index 784038e77ab..2d372ef4387 100644 --- a/x-pack/elastic-agent/pkg/agent/application/stream.go +++ b/x-pack/elastic-agent/pkg/agent/application/stream.go @@ -7,6 +7,7 @@ package application import ( "context" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -40,10 +41,10 @@ func (b *operatorStream) Shutdown() { b.configHandler.Shutdown() } -func streamFactory(ctx context.Context, cfg *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) { +func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) { return func(log *logger.Logger, id routingKey) (stream, error) { // new operator per stream to isolate processes without using tags - operator, err := newOperator(ctx, log, id, cfg, srv, r, m) + operator, err := newOperator(ctx, log, agentInfo, id, cfg, srv, r, m) if err != nil { return nil, err } @@ -55,7 +56,7 @@ func streamFactory(ctx context.Context, cfg *configuration.SettingsConfig, srv * } } -func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) { +func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) { fetcher := downloader.NewDownloader(log, config.DownloadConfig, false) allowEmptyPgp, pgp := release.PGP() verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp, false) @@ -81,6 +82,7 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config return operation.NewOperator( ctx, log, + agentInfo, id, config, fetcher, diff --git a/x-pack/elastic-agent/pkg/agent/operation/common_test.go b/x-pack/elastic-agent/pkg/agent/operation/common_test.go index e9d40bece87..ea16cfe77b8 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/common_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/common_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver" @@ -48,6 +49,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a } l := getLogger() + agentInfo, _ := info.NewAgentInfo() fetcher := &DummyDownloader{} verifier := &DummyVerifier{} @@ -67,7 +69,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a t.Fatal(err) } - operator, err := NewOperator(context.Background(), l, "p1", operatorCfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, noop.NewMonitor()) + operator, err := NewOperator(context.Background(), l, agentInfo, "p1", operatorCfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, noop.NewMonitor()) if err != nil { t.Fatal(err) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index 74d542d58e9..1959cd52818 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -206,6 +206,16 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i }, }, }, + { + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": o.agentInfo.AgentID(), + "version": o.agentInfo.Version(), + "snapshot": o.agentInfo.Snapshot(), + }, + }, + }, }, }, } @@ -240,6 +250,16 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i }, }, }, + { + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": o.agentInfo.AgentID(), + "version": o.agentInfo.Version(), + "snapshot": o.agentInfo.Snapshot(), + }, + }, + }, }, }) } @@ -290,6 +310,16 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string }, }, }, + { + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": o.agentInfo.AgentID(), + "version": o.agentInfo.Version(), + "snapshot": o.agentInfo.Snapshot(), + }, + }, + }, }, }) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index eef904096f7..3ca6a5f6b14 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver" @@ -112,6 +113,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M } l := getLogger() + agentInfo, _ := info.NewAgentInfo() fetcher := &DummyDownloader{} verifier := &DummyVerifier{} @@ -128,7 +130,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M } ctx := context.Background() - operator, err := NewOperator(ctx, l, "p1", cfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, m) + operator, err := NewOperator(ctx, l, agentInfo, "p1", cfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, m) if err != nil { t.Fatal(err) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index b4938278821..1a39e73500e 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -43,6 +44,7 @@ type Operator struct { bgContext context.Context pipelineID string logger *logger.Logger + agentInfo *info.AgentInfo config *configuration.SettingsConfig handlers map[string]handleFunc stateResolver *stateresolver.StateResolver @@ -66,6 +68,7 @@ type Operator struct { func NewOperator( ctx context.Context, logger *logger.Logger, + agentInfo *info.AgentInfo, pipelineID string, config *configuration.SettingsConfig, fetcher download.Downloader, @@ -85,6 +88,7 @@ func NewOperator( config: config, pipelineID: pipelineID, logger: logger, + agentInfo: agentInfo, downloader: fetcher, verifier: verifier, installer: installer, diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml index 38b251d95dc..82a47adc999 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml @@ -17,11 +17,11 @@ filebeat: fields: dataset: generic - add_fields: - target: "elastic" + target: "elastic_agent" fields: - agent.id: agent-id - agent.version: 8.0.0 - agent.snapshot: false + id: agent-id + version: 8.0.0 + snapshot: false output: elasticsearch: enabled: true diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml index 6e768db6aa4..1406a2dff65 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml @@ -18,11 +18,11 @@ filebeat: fields: dataset: generic - add_fields: - target: "elastic" + target: "elastic_agent" fields: - agent.id: agent-id - agent.version: 8.0.0 - agent.snapshot: false + id: agent-id + version: 8.0.0 + snapshot: false output: elasticsearch: hosts: diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml index 01ee955e4ec..524d6451f28 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml @@ -19,11 +19,11 @@ filebeat: fields: dataset: generic - add_fields: - target: "elastic" + target: "elastic_agent" fields: - agent.id: agent-id - agent.version: 8.0.0 - agent.snapshot: false + id: agent-id + version: 8.0.0 + snapshot: false - type: log paths: - /var/log/hello3.log @@ -43,11 +43,11 @@ filebeat: fields: dataset: generic - add_fields: - target: "elastic" + target: "elastic_agent" fields: - agent.id: agent-id - agent.version: 8.0.0 - agent.snapshot: false + id: agent-id + version: 8.0.0 + snapshot: false output: elasticsearch: hosts: diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml index d09e80accf1..2889e7605eb 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml @@ -16,11 +16,11 @@ metricbeat: fields: dataset: docker.status - add_fields: - target: "elastic" + target: "elastic_agent" fields: - agent.id: agent-id - agent.version: 8.0.0 - agent.snapshot: false + id: agent-id + version: 8.0.0 + snapshot: false - module: docker metricsets: [info] index: metrics-generic-default @@ -37,11 +37,11 @@ metricbeat: fields: dataset: generic - add_fields: - target: "elastic" + target: "elastic_agent" fields: - agent.id: agent-id - agent.version: 8.0.0 - agent.snapshot: false + id: agent-id + version: 8.0.0 + snapshot: false - module: apache metricsets: [info] index: metrics-generic-testing @@ -61,11 +61,11 @@ metricbeat: fields: dataset: generic - add_fields: - target: "elastic" + target: "elastic_agent" fields: - agent.id: agent-id - agent.version: 8.0.0 - agent.snapshot: false + id: agent-id + version: 8.0.0 + snapshot: false output: elasticsearch: hosts: [127.0.0.1:9200, 127.0.0.1:9300] diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/rules.go b/x-pack/elastic-agent/pkg/agent/transpiler/rules.go index 29ff1786d1e..42acd53d21a 100644 --- a/x-pack/elastic-agent/pkg/agent/transpiler/rules.go +++ b/x-pack/elastic-agent/pkg/agent/transpiler/rules.go @@ -715,11 +715,11 @@ func (r *InjectAgentInfoRule) Apply(agentInfo AgentInfo, ast *AST) error { // elastic.agent processorMap := &Dict{value: make([]Node, 0)} - processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "elastic"}}) + processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "elastic_agent"}}) processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{ - &Key{name: "agent.id", value: &StrVal{value: agentInfo.AgentID()}}, - &Key{name: "agent.version", value: &StrVal{value: agentInfo.Version()}}, - &Key{name: "agent.snapshot", value: &BoolVal{value: agentInfo.Snapshot()}}, + &Key{name: "id", value: &StrVal{value: agentInfo.AgentID()}}, + &Key{name: "version", value: &StrVal{value: agentInfo.Version()}}, + &Key{name: "snapshot", value: &BoolVal{value: agentInfo.Snapshot()}}, }}}) addFieldsMap := &Dict{value: []Node{&Key{"add_fields", processorMap}}} processorsList.value = mergeStrategy("").InjectItem(processorsList.value, addFieldsMap) diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go b/x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go index d92ba0de985..0fb59107844 100644 --- a/x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go +++ b/x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go @@ -184,11 +184,11 @@ inputs: type: file processors: - add_fields: - target: elastic + target: elastic_agent fields: - agent.id: agent-id - agent.snapshot: false - agent.version: 8.0.0 + id: agent-id + snapshot: false + version: 8.0.0 - name: With processors type: file processors: @@ -197,11 +197,11 @@ inputs: fields: data: more - add_fields: - target: elastic + target: elastic_agent fields: - agent.id: agent-id - agent.snapshot: false - agent.version: 8.0.0 + id: agent-id + snapshot: false + version: 8.0.0 `, rule: &RuleList{ Rules: []Rule{