Skip to content

Commit

Permalink
[Filebeat] backwards compatibility for set processor (elastic#20908) (e…
Browse files Browse the repository at this point in the history
…lastic#21084)

- "ignore_empty_value" option for the set processor only works on
Elasticsearch >= 7.9.0.  This change removes that option and replaces
it with an if statement if pipeline is loaded on an earlier version of
elasticsearch.

(cherry picked from commit b6162c4)

Co-authored-by: Lee Hinman <57081003+leehinman@users.noreply.github.com>
  • Loading branch information
andrewkroh and leehinman committed Sep 15, 2020
1 parent b0a9bb1 commit d15f604
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixed `cloudfoundry.access` to have the correct `cloudfoundry.app.id` contents. {pull}17847[17847]
- Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834]
- Fixed typo in log message. {pull}17897[17897]
- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908]
- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048]
- Fix error when processing AWS Cloudtrail Digest logs. {pull}21086[21086] {issue}20943[20943]

Expand Down
59 changes: 59 additions & 0 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err)
}

err = modifySetProcessor(esClient.GetVersion(), pipelineID, content)
if err != nil {
return fmt.Errorf("failed to modify set processor in pipeline: %v", err)
}

body, err := esClient.LoadJSON(path, content)
if err != nil {
return interpretError(err, body)
Expand Down Expand Up @@ -232,3 +237,57 @@ func interpretError(initialErr error, body []byte) error {

return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

// modifySetProcessor replaces ignore_empty_value option with an if statement
// so ES less than 7.9 will still work
func modifySetProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error {
flagVersion := common.MustNewVersion("7.9.0")
if !esVersion.LessThan(flagVersion) {
return nil
}

p, ok := content["processors"]
if !ok {
return nil
}
processors, ok := p.([]interface{})
if !ok {
return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p)
}

for _, p := range processors {
processor, ok := p.(map[string]interface{})
if !ok {
continue
}
if options, ok := processor["set"].(map[string]interface{}); ok {
_, ok := options["ignore_empty_value"].(bool)
if !ok {
// don't have ignore_empty_value nothing to do
continue
}

logp.Debug("modules", "In pipeline %q removing unsupported 'ignore_empty_value' in set processor", pipelineID)
delete(options, "ignore_empty_value")

_, ok = options["if"].(string)
if ok {
// assume if check is sufficient
continue
}
val, ok := options["value"].(string)
if !ok {
continue
}

newIf := strings.TrimLeft(val, "{ ")
newIf = strings.TrimRight(newIf, "} ")
newIf = strings.ReplaceAll(newIf, ".", "?.")
newIf = "ctx?." + newIf + " != null"

logp.Debug("modules", "In pipeline %q adding if %s to replace 'ignore_empty_value' in set processor", pipelineID, newIf)
options["if"] = newIf
}
}
return nil
}
177 changes: 177 additions & 0 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,180 @@ func TestSetEcsProcessors(t *testing.T) {
})
}
}

func TestModifySetProcessor(t *testing.T) {
cases := []struct {
name string
esVersion *common.Version
content map[string]interface{}
expected map[string]interface{}
isErrExpected bool
}{
{
name: "ES < 7.9.0",
esVersion: common.MustNewVersion("7.8.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"if": "ctx?.panw?.panos?.ruleset != null",
},
},
},
},
isErrExpected: false,
},
{
name: "ES == 7.9.0",
esVersion: common.MustNewVersion("7.9.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
},
},
isErrExpected: false,
},
{
name: "ES > 7.9.0",
esVersion: common.MustNewVersion("8.0.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
},
},
isErrExpected: false,
},
{
name: "existing if",
esVersion: common.MustNewVersion("7.7.7"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
"if": "ctx?.panw?.panos?.ruleset != null",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"if": "ctx?.panw?.panos?.ruleset != null",
},
},
}},
isErrExpected: false,
},
{
name: "ignore_empty_value is false",
esVersion: common.MustNewVersion("7.7.7"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": false,
"if": "ctx?.panw?.panos?.ruleset != null",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"if": "ctx?.panw?.panos?.ruleset != null",
},
},
}},
isErrExpected: false,
},
{
name: "no value",
esVersion: common.MustNewVersion("7.7.7"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"ignore_empty_value": false,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
},
},
}},
isErrExpected: false,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := modifySetProcessor(*test.esVersion, "foo-pipeline", test.content)
if test.isErrExpected {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expected, test.content, test.name)
}
})
}
}

0 comments on commit d15f604

Please sign in to comment.