Skip to content

Commit

Permalink
[Elastic Agent] Fix agent composable processor promotion to fix dupli…
Browse files Browse the repository at this point in the history
…cates (#22344)

* Fix agent composable processor promotion to fix duplicates.

* Add changelog entry.
  • Loading branch information
blakerouse authored Nov 2, 2020
1 parent 1aed77e commit 3d4cc25
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 3 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- Fix missing elastic_agent event data {pull}21994[21994]
- Ensure shell wrapper path exists before writing wrapper on install {pull}22144[22144]
- Fix deb/rpm packaging for Elastic Agent {pull}22153[22153]
- Fix composable input processor promotion to fix duplicates {pull}22344[22344]

==== New features

Expand Down
9 changes: 6 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func renderInputs(inputs transpiler.Node, varsArray []*transpiler.Vars) (transpi
if !ok {
return nil, fmt.Errorf("inputs must be an array")
}
nodes := []transpiler.Node{}
nodes := []*transpiler.Dict{}
nodesMap := map[string]*transpiler.Dict{}
for _, vars := range varsArray {
for _, node := range l.Value().([]transpiler.Node) {
Expand All @@ -202,7 +202,6 @@ func renderInputs(inputs transpiler.Node, varsArray []*transpiler.Vars) (transpi
continue
}
dict = n.(*transpiler.Dict)
dict = promoteProcessors(dict)
hash := string(dict.Hash())
_, exists := nodesMap[hash]
if !exists {
Expand All @@ -211,7 +210,11 @@ func renderInputs(inputs transpiler.Node, varsArray []*transpiler.Vars) (transpi
}
}
}
return transpiler.NewList(nodes), nil
nInputs := []transpiler.Node{}
for _, node := range nodes {
nInputs = append(nInputs, promoteProcessors(node))
}
return transpiler.NewList(nInputs), nil
}

func promoteProcessors(dict *transpiler.Dict) *transpiler.Dict {
Expand Down
70 changes: 70 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,76 @@ func TestRenderInputs(t *testing.T) {
}),
},
},
"same var result with different processors": {
input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{
transpiler.NewDict([]transpiler.Node{
transpiler.NewKey("type", transpiler.NewStrVal("logfile")),
transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{
transpiler.NewDict([]transpiler.Node{
transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{
transpiler.NewStrVal("/var/log/${var1.name}.log"),
})),
}),
})),
}),
})),
expected: transpiler.NewList([]transpiler.Node{
transpiler.NewDict([]transpiler.Node{
transpiler.NewKey("type", transpiler.NewStrVal("logfile")),
transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{
transpiler.NewDict([]transpiler.Node{
transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{
transpiler.NewStrVal("/var/log/value1.log"),
})),
}),
})),
transpiler.NewKey("processors", transpiler.NewList([]transpiler.Node{
transpiler.NewDict([]transpiler.Node{
transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{
transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{
transpiler.NewKey("custom", transpiler.NewStrVal("value1")),
})),
transpiler.NewKey("to", transpiler.NewStrVal("dynamic")),
})),
}),
})),
}),
}),
varsArray: []*transpiler.Vars{
mustMakeVarsP(map[string]interface{}{
"var1": map[string]interface{}{
"name": "value1",
},
},
"var1",
[]map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": map[string]interface{}{
"custom": "value1",
},
"to": "dynamic",
},
},
}),
mustMakeVarsP(map[string]interface{}{
"var1": map[string]interface{}{
"name": "value1",
},
},
"var1",
[]map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": map[string]interface{}{
"custom": "value2",
},
"to": "dynamic",
},
},
}),
},
},
}

for name, test := range testcases {
Expand Down

0 comments on commit 3d4cc25

Please sign in to comment.