From 3d4cc254acc6f5f403bf1dded95f78cd11213d6b Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 2 Nov 2020 07:50:07 -0600 Subject: [PATCH] [Elastic Agent] Fix agent composable processor promotion to fix duplicates (#22344) * Fix agent composable processor promotion to fix duplicates. * Add changelog entry. --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + .../pkg/agent/application/emitter.go | 9 ++- .../pkg/agent/application/emitter_test.go | 70 +++++++++++++++++++ 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 9a032035068..a8bf8fe9b7e 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -25,6 +25,7 @@ - Fix missing elastic_agent event data {pull}21994[21994] - Ensure shell wrapper path exists before writing wrapper on install {pull}22144[22144] - Fix deb/rpm packaging for Elastic Agent {pull}22153[22153] +- Fix composable input processor promotion to fix duplicates {pull}22344[22344] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/application/emitter.go b/x-pack/elastic-agent/pkg/agent/application/emitter.go index 07f8e1f460b..5faa6814e77 100644 --- a/x-pack/elastic-agent/pkg/agent/application/emitter.go +++ b/x-pack/elastic-agent/pkg/agent/application/emitter.go @@ -180,7 +180,7 @@ func renderInputs(inputs transpiler.Node, varsArray []*transpiler.Vars) (transpi if !ok { return nil, fmt.Errorf("inputs must be an array") } - nodes := []transpiler.Node{} + nodes := []*transpiler.Dict{} nodesMap := map[string]*transpiler.Dict{} for _, vars := range varsArray { for _, node := range l.Value().([]transpiler.Node) { @@ -202,7 +202,6 @@ func renderInputs(inputs transpiler.Node, varsArray []*transpiler.Vars) (transpi continue } dict = n.(*transpiler.Dict) - dict = promoteProcessors(dict) hash := string(dict.Hash()) _, exists := nodesMap[hash] if !exists { @@ -211,7 +210,11 @@ func renderInputs(inputs transpiler.Node, varsArray []*transpiler.Vars) (transpi } } } - return transpiler.NewList(nodes), nil + nInputs := []transpiler.Node{} + for _, node := range nodes { + nInputs = append(nInputs, promoteProcessors(node)) + } + return transpiler.NewList(nInputs), nil } func promoteProcessors(dict *transpiler.Dict) *transpiler.Dict { diff --git a/x-pack/elastic-agent/pkg/agent/application/emitter_test.go b/x-pack/elastic-agent/pkg/agent/application/emitter_test.go index b2286552f9f..d65ba9f074d 100644 --- a/x-pack/elastic-agent/pkg/agent/application/emitter_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/emitter_test.go @@ -654,6 +654,76 @@ func TestRenderInputs(t *testing.T) { }), }, }, + "same var result with different processors": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/${var1.name}.log"), + })), + }), + })), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value1.log"), + })), + }), + })), + transpiler.NewKey("processors", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("custom", transpiler.NewStrVal("value1")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("dynamic")), + })), + }), + })), + }), + }), + varsArray: []*transpiler.Vars{ + mustMakeVarsP(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, + "var1", + []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "custom": "value1", + }, + "to": "dynamic", + }, + }, + }), + mustMakeVarsP(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, + "var1", + []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "custom": "value2", + }, + "to": "dynamic", + }, + }, + }), + }, + }, } for name, test := range testcases {