From 5f4a073de1b7210da93460774162a23d392537a8 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Sep 2020 08:53:03 +0200 Subject: [PATCH] Add backwards compatibility for append allow_duplicates (#21159) (cherry picked from commit e51494f729c8869d62aa2ef3e4cba0738d6ca425) --- CHANGELOG.next.asciidoc | 1 + filebeat/fileset/pipelines.go | 74 +++++++++++ filebeat/fileset/pipelines_test.go | 202 +++++++++++++++++++++++++++++ 3 files changed, 277 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8ab50e22981..94ca691a3a7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -292,6 +292,7 @@ field. You can revert this change by configuring tags for the module and omittin - Fix an error updating file size being logged when EOF is reached. {pull}21048[21048] - Fix error when processing AWS Cloudtrail Digest logs. {pull}21086[21086] {issue}20943[20943] - Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908] +- Provide backwards compatibility for the `append` processor when Elasticsearch is less than 7.10.0. {pull}21159[21159] *Heartbeat* diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 3d89e607ec6..fe7eb86c884 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -132,6 +132,10 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return fmt.Errorf("failed to modify set processor in pipeline: %v", err) } + if err := modifyAppendProcessor(esClient.GetVersion(), pipelineID, content); err != nil { + return fmt.Errorf("failed to modify append processor in pipeline: %v", err) + } + body, err := esClient.LoadJSON(path, content) if err != nil { return interpretError(err, body) @@ -291,3 +295,73 @@ func modifySetProcessor(esVersion common.Version, pipelineID string, content map } return nil } + +// modifyAppendProcessor replaces allow_duplicates option with an if statement +// so ES less than 7.10 will still work +func modifyAppendProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error { + flagVersion := common.MustNewVersion("7.10.0") + if !esVersion.LessThan(flagVersion) { + return nil + } + + p, ok := content["processors"] + if !ok { + return nil + } + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + for _, p := range processors { + processor, ok := p.(map[string]interface{}) + if !ok { + continue + } + if options, ok := processor["append"].(map[string]interface{}); ok { + allow, ok := options["allow_duplicates"].(bool) + if !ok { + // don't have allow_duplicates, nothing to do + continue + } + + logp.Debug("modules", "In pipeline %q removing unsupported 'allow_duplicates' in append processor", pipelineID) + delete(options, "allow_duplicates") + if allow { + // it was set to true, nothing else to do after removing the option + continue + } + + currIf, _ := options["if"].(string) + if strings.Contains(strings.ToLower(currIf), "contains") { + // if it has a contains statement, we assume it is checking for duplicates already + continue + } + field, ok := options["field"].(string) + if !ok { + continue + } + val, ok := options["value"].(string) + if !ok { + continue + } + + field = strings.ReplaceAll(field, ".", "?.") + + val = strings.TrimLeft(val, "{ ") + val = strings.TrimRight(val, "} ") + val = strings.ReplaceAll(val, ".", "?.") + + if currIf == "" { + // if there is not a previous if we add a value sanity check + currIf = fmt.Sprintf("ctx?.%s != null", val) + } + + newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) + + logp.Debug("modules", "In pipeline %q adding if %s to replace 'allow_duplicates: false' in append processor", pipelineID, newIf) + options["if"] = newIf + } + } + return nil +} diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 65a10212b6b..7c617034f10 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -392,3 +392,205 @@ func TestModifySetProcessor(t *testing.T) { }) } } + +func TestModifyAppendProcessor(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.10.0: set to true", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: set to false", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.10.0", + esVersion: common.MustNewVersion("7.10.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES > 7.10.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: existing if", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "ctx?.host?.hostname != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: existing if with contains", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: no value", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := modifyAppendProcessor(*test.esVersion, "foo-pipeline", test.content) + if test.isErrExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +}