Skip to content

Commit

Permalink
Allow ConcatFields for nodes of type object (#10959) (#11094)
Browse files Browse the repository at this point in the history
Checks if given field conflicts with current fields. Fixes an error where `append_fields` could not add valid field.
  • Loading branch information
simitt authored Mar 5, 2019
1 parent 4573965 commit 9d5f9a9
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ The list below covers the major changes between 6.6.0 and 6.6 head only.
==== Breaking changes

==== Bugfixes
- Fix duplication check for `append_fields` option. {pull}10959[10959]

==== Added
63 changes: 63 additions & 0 deletions libbeat/common/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"fmt"
"strings"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/go-ucfg/yaml"
)

Expand Down Expand Up @@ -216,3 +219,63 @@ func (f Fields) getKeys(namespace string) []string {

return keys
}

// ConcatFields concatenates two Fields lists into a new list.
// The operation fails if the input definitions define the same keys.
func ConcatFields(a, b Fields) (Fields, error) {
if len(b) == 0 {
return a, nil
}
if len(a) == 0 {
return b, nil
}

// check for duplicates
if err := a.conflicts(b); err != nil {
return nil, err
}

// concat a+b into new array
fields := make(Fields, 0, len(a)+len(b))
return append(append(fields, a...), b...), nil
}

func (f Fields) conflicts(fields Fields) error {
var errs multierror.Errors
for _, key := range fields.GetKeys() {
keys := strings.Split(key, ".")
if err := f.canConcat(key, keys); err != nil {
errs = append(errs, err)
}
}
return errs.Err()
}

// canConcat checks if the given string can be concatenated to the existing fields f
// a key cannot be concatenated if
// - f has a node with name key
// - f has a leaf with key's parent name and the leaf's type is not `object`
func (f Fields) canConcat(k string, keys []string) error {
if len(keys) == 0 {
return nil
}
key := keys[0]
keys = keys[1:]
for _, field := range f {
if field.Name != key {
continue
}
// last key to compare
if len(keys) == 0 {
return errors.Errorf("fields contain key <%s>", k)
}
// last field to compare, only valid if it is of type object
if len(field.Fields) == 0 {
if field.Type != "object" {
return errors.Errorf("fields contain non object node conflicting with key <%s>", k)
}
}
return field.Fields.canConcat(k, keys)
}
return nil
}
240 changes: 240 additions & 0 deletions libbeat/common/field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,66 @@
package common

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/go-ucfg/yaml"
)

func TestFieldsHasNode(t *testing.T) {
tests := map[string]struct {
node string
fields Fields
hasNode bool
}{
"empty fields": {
node: "a.b",
fields: Fields{},
hasNode: false,
},
"no node": {
node: "",
fields: Fields{Field{Name: "a"}},
hasNode: false,
},
"key not in fields, but node in fields": {
node: "a.b.c",
fields: Fields{
Field{Name: "a", Fields: Fields{Field{Name: "b"}}},
},
hasNode: true,
},
"last node in fields": {
node: "a.b.c",
fields: Fields{
Field{Name: "a", Fields: Fields{
Field{Name: "b", Fields: Fields{
Field{Name: "c"},
}}}},
},
hasNode: true,
},
"node in fields": {
node: "a.b",
fields: Fields{
Field{Name: "a", Fields: Fields{
Field{Name: "b", Fields: Fields{
Field{Name: "c"},
}}}},
},
hasNode: true,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, test.hasNode, test.fields.HasNode(test.node))
})
}
}

func TestFieldsHasKey(t *testing.T) {
tests := []struct {
key string
Expand Down Expand Up @@ -189,3 +242,190 @@ func TestGetKeys(t *testing.T) {
assert.Equal(t, test.keys, test.fields.GetKeys())
}
}

func TestFieldConcat(t *testing.T) {
tests := map[string]struct {
a, b Fields
want Fields
err string
}{
"empty lists": {},
"first list only": {
a: Fields{{Name: "a"}},
want: Fields{{Name: "a"}},
},
"second list only": {
b: Fields{{Name: "a"}},
want: Fields{{Name: "a"}},
},
"concat": {
a: Fields{{Name: "a"}},
b: Fields{{Name: "b"}},
want: Fields{{Name: "a"}, {Name: "b"}},
},
"duplicates fail": {
a: Fields{{Name: "a"}},
b: Fields{{Name: "a"}},
err: "1 error: fields contain key <a>",
},
"nested with common prefix": {
a: Fields{{
Name: "a",
Fields: Fields{{Name: "b"}},
}},
b: Fields{{
Name: "a",
Fields: Fields{{Name: "c"}},
}},
want: Fields{
{Name: "a", Fields: Fields{{Name: "b"}}},
{Name: "a", Fields: Fields{{Name: "c"}}},
},
},
"deep nested with common prefix": {
a: Fields{{
Name: "a",
Fields: Fields{{Name: "b"}},
}},
b: Fields{{
Name: "a",
Fields: Fields{{Name: "c", Fields: Fields{
{Name: "d"},
}}},
}},
want: Fields{
{Name: "a", Fields: Fields{{Name: "b"}}},
{Name: "a", Fields: Fields{{Name: "c", Fields: Fields{{Name: "d"}}}}},
},
},
"nested duplicates fail": {
a: Fields{{
Name: "a",
Fields: Fields{{Name: "b"}, {Name: "c"}},
}},
b: Fields{{
Name: "a",
Fields: Fields{{Name: "c"}},
}},
err: "1 error: fields contain key <a.c>",
},
"a is prefix of b": {
a: Fields{{Name: "a"}},
b: Fields{{
Name: "a",
Fields: Fields{{Name: "b"}},
}},
err: "1 error: fields contain non object node conflicting with key <a.b>",
},
"a is object and prefix of b": {
a: Fields{{Name: "a", Type: "object"}},
b: Fields{{
Name: "a",
Fields: Fields{{Name: "b"}},
}},
want: Fields{
{Name: "a", Type: "object"},
{Name: "a", Fields: Fields{{Name: "b"}}},
},
},
"b is prefix of a": {
a: Fields{{
Name: "a",
Fields: Fields{{Name: "b"}},
}},
b: Fields{{Name: "a"}},
err: "1 error: fields contain key <a>",
},
"multiple errors": {
a: Fields{
{Name: "a", Fields: Fields{{Name: "b"}}},
{Name: "foo", Fields: Fields{{Name: "b"}}},
{Name: "bar", Type: "object"},
},
b: Fields{
{Name: "bar", Fields: Fields{{Name: "foo"}}},
{Name: "a"},
{Name: "foo", Fields: Fields{{Name: "b", Fields: Fields{{Name: "c"}}}}},
},

err: "2 errors: fields contain key <a>; fields contain non object node conflicting with key <foo.b.c>",
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
fs, err := ConcatFields(test.a, test.b)
if test.err == "" {
assert.NoError(t, err)
assert.Equal(t, test.want, fs)
return
}
if assert.Error(t, err) {
assert.Equal(t, test.err, err.Error())
}
})
}
}

func TestFieldsCanConcat(t *testing.T) {
tests := map[string]struct {
key string
fields Fields
err string
}{
"empty fields": {
key: "a.b",
fields: Fields{},
},
"no key": {
key: "",
fields: Fields{Field{Name: "a"}},
},
"key not in fields, but parent node in fields": {
key: "a.b.c",
fields: Fields{
Field{Name: "a", Fields: Fields{Field{Name: "b"}}},
},
err: "fields contain non object node conflicting with key <a.b.c>",
},
"key not in fields, but parent node in fields and of type object": {
key: "a.b.c",
fields: Fields{
Field{Name: "a", Fields: Fields{Field{Name: "b", Type: "object"}}},
},
},
"last node in fields": {
key: "a.b.c",
fields: Fields{
Field{Name: "a", Fields: Fields{
Field{Name: "b", Fields: Fields{
Field{Name: "c"},
}}}},
},
err: "fields contain key <a.b.c>",
},
"node in fields": {
key: "a.b",
fields: Fields{
Field{Name: "a", Fields: Fields{
Field{Name: "b", Fields: Fields{
Field{Name: "c"},
}}}},
},
err: "fields contain key <a.b>",
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
err := test.fields.canConcat(test.key, strings.Split(test.key, "."))
if test.err == "" {
assert.Nil(t, err)
return
}
if assert.Error(t, err) {
assert.Equal(t, test.err, err.Error())
}
})
}
}
18 changes: 1 addition & 17 deletions libbeat/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (t *Template) load(fields common.Fields) (common.MapStr, error) {
var err error
if len(t.config.AppendFields) > 0 {
cfgwarn.Experimental("append_fields is used.")
fields, err = appendFields(fields, t.config.AppendFields)
fields, err = common.ConcatFields(fields, t.config.AppendFields)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -256,22 +256,6 @@ func (t *Template) Generate(properties common.MapStr, dynamicTemplates []common.
return basicStructure
}

func appendFields(fields, appendFields common.Fields) (common.Fields, error) {
if len(appendFields) > 0 {
appendFieldKeys := appendFields.GetKeys()

// Append is only allowed to add fields, not overwrite
for _, key := range appendFieldKeys {
if fields.HasNode(key) {
return nil, fmt.Errorf("append_fields contains an already existing key: %s", key)
}
}
// Appends fields to existing fields
fields = append(fields, appendFields...)
}
return fields, nil
}

func loadYamlByte(data []byte) (common.Fields, error) {

var keys []common.Field
Expand Down
2 changes: 1 addition & 1 deletion libbeat/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestAppendFields(t *testing.T) {
}

for _, test := range tests {
_, err := appendFields(test.fields, test.appendFields)
_, err := common.ConcatFields(test.fields, test.appendFields)
if test.error {
assert.Error(t, err)
} else {
Expand Down

0 comments on commit 9d5f9a9

Please sign in to comment.