diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 75c1c6ad7d0..d8c08d59e08 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -393,6 +393,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `kibana.log` pipeline when `event.duration` calculation becomes a Long. {issue}24556[24556] {pull}25675[25675] - Removed incorrect `http.request.referrer` field from `aws.elb` module. {issue}26435[26435] {pull}26441[26441] - Fix `threatintel.indicator.url.full` not being populated. {issue}26351[26351] {pull}26508[26508] +- Fix Elasticsearch compatibility for modules that use `type: ip` with `convert` processors. {issue}26629[26629] {pull}26676[26676] +- Fix Elasticsearch compatibility for modules that use the `network_direction` processor. {issue}26629[26629] {pull}26676[26676] +- Fix Elasticsearch compatibility for modules that use the `registered_domain` processor. {issue}26629[26629] {pull}26676[26676] *Heartbeat* diff --git a/filebeat/fileset/compatibility.go b/filebeat/fileset/compatibility.go index 210c93e1b4c..5e9c0cd91f1 100644 --- a/filebeat/fileset/compatibility.go +++ b/filebeat/fileset/compatibility.go @@ -30,9 +30,31 @@ import ( // processorCompatibility defines a processor's minimum version requirements or // a transformation to make it compatible. type processorCompatibility struct { - checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies. - procType string // Elasticsearch Ingest Node processor type. - adaptConfig func(processor map[string]interface{}, log *logp.Logger) (drop bool, err error) // Adapt the configuration to make it compatible. + checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies. + procType string // Elasticsearch Ingest Node processor type. + adaptConfig func(processor map[string]interface{}, log *logp.Logger) compatAction // Adapt the configuration to make it compatible. +} + +type compatAction func(interface{}) (interface{}, error) + +func keepProcessor(original interface{}) (interface{}, error) { + return original, nil +} + +func dropProcessor(interface{}) (interface{}, error) { + return nil, nil +} + +func replaceProcessor(newProc interface{}) compatAction { + return func(interface{}) (interface{}, error) { + return newProc, nil + } +} + +func fail(err error) compatAction { + return func(interface{}) (interface{}, error) { + return nil, err + } } var processorCompatibilityChecks = []processorCompatibility{ @@ -70,9 +92,9 @@ var processorCompatibilityChecks = []processorCompatibility{ return esVersion.LessThan(common.MustNewVersion("7.0.0")) && !esVersion.LessThan(common.MustNewVersion("6.7.0")) }, - adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) { + adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction { config["ecs"] = true - return false, nil + return keepProcessor }, }, { @@ -80,16 +102,37 @@ var processorCompatibilityChecks = []processorCompatibility{ checkVersion: func(esVersion *common.Version) bool { return esVersion.LessThan(common.MustNewVersion("6.7.0")) }, - adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) { - return false, errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") + adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction { + return fail(errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required")) + }, + }, + { + procType: "convert", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.13.0")) + }, + adaptConfig: replaceConvertIP, + }, + { + procType: "network_direction", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.13.0")) + }, + adaptConfig: deleteProcessor, + }, + { + procType: "registered_domain", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.13.0")) }, + adaptConfig: deleteProcessor, }, } // adaptPipelineForCompatibility iterates over all processors in the pipeline // and adapts them for version of Elasticsearch used. Adapt can mean modifying // processor options or removing the processor. -func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) error { +func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) (err error) { p, ok := content["processors"] if !ok { return errors.New("'processors' is missing from the pipeline definition") @@ -104,12 +147,12 @@ func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, nextProcessor: for i, obj := range processors { - processor, ok := obj.(map[string]interface{}) - if !ok { - return fmt.Errorf("processor at index %d is not an object, got %T", i, obj) - } - for _, proc := range processorCompatibilityChecks { + processor, ok := obj.(map[string]interface{}) + if !ok { + return fmt.Errorf("processor at index %d is not an object, got %T", i, obj) + } + configIfc, found := processor[proc.procType] if !found { continue @@ -123,16 +166,17 @@ nextProcessor: continue } - drop, err := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i)) + act := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i)) + obj, err = act(obj) if err != nil { return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err) } - if drop { + if obj == nil { continue nextProcessor } } - filteredProcs = append(filteredProcs, processors[i]) + filteredProcs = append(filteredProcs, obj) } content["processors"] = filteredProcs @@ -141,14 +185,16 @@ nextProcessor: // deleteProcessor returns true to indicate that the processor should be deleted // in order to adapt the pipeline for backwards compatibility to Elasticsearch. -func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) (bool, error) { return true, nil } +func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) compatAction { + return dropProcessor +} // replaceSetIgnoreEmptyValue replaces ignore_empty_value option with an if // statement so ES less than 7.9 will work. -func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) (bool, error) { +func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) compatAction { _, ok := config["ignore_empty_value"].(bool) if !ok { - return false, nil + return keepProcessor } log.Debug("Removing unsupported 'ignore_empty_value' from set processor.") @@ -157,11 +203,11 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) _, ok = config["if"].(string) if ok { // assume if check is sufficient - return false, nil + return keepProcessor } val, ok := config["value"].(string) if !ok { - return false, nil + return keepProcessor } newIf := strings.TrimLeft(val, "{ ") @@ -171,15 +217,15 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) log.Debug("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf) config["if"] = newIf - return false, nil + return keepProcessor } // replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement // so ES less than 7.10 will work. -func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) (bool, error) { +func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) compatAction { allow, ok := config["allow_duplicates"].(bool) if !ok { - return false, nil + return keepProcessor } log.Debug("Removing unsupported 'allow_duplicates' from append processor.") @@ -187,21 +233,21 @@ func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logge if allow { // It was set to true, nothing else to do after removing the option. - return false, nil + return keepProcessor } currIf, _ := config["if"].(string) if strings.Contains(strings.ToLower(currIf), "contains") { // If it has a contains statement, we assume it is checking for duplicates already. - return false, nil + return keepProcessor } field, ok := config["field"].(string) if !ok { - return false, nil + return keepProcessor } val, ok := config["value"].(string) if !ok { - return false, nil + return keepProcessor } field = strings.ReplaceAll(field, ".", "?.") @@ -220,5 +266,33 @@ func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logge log.Debug("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf) config["if"] = newIf - return false, nil + return keepProcessor +} + +// replaceConvertIP replaces convert processors with type: ip with a grok expression that uses +// the IP pattern. +func replaceConvertIP(config map[string]interface{}, log *logp.Logger) compatAction { + wantedType, found := config["type"] + if !found || wantedType != "ip" { + return keepProcessor + } + log.Debug("processor input=", config) + delete(config, "type") + var srcIf, dstIf interface{} + if srcIf, found = config["field"]; !found { + return fail(errors.New("field option is required for convert processor")) + } + if dstIf, found = config["target_field"]; found { + delete(config, "target_field") + } else { + dstIf = srcIf + } + config["patterns"] = []string{ + fmt.Sprintf("^%%{IP:%s}$", dstIf), + } + grok := map[string]interface{}{ + "grok": config, + } + log.Debug("processor output=", grok) + return replaceProcessor(grok) } diff --git a/filebeat/fileset/compatibility_test.go b/filebeat/fileset/compatibility_test.go index bc089879082..e7fa8a1267a 100644 --- a/filebeat/fileset/compatibility_test.go +++ b/filebeat/fileset/compatibility_test.go @@ -641,3 +641,430 @@ func TestRemoveURIPartsProcessor(t *testing.T) { }) } } + +func TestRemoveNetworkDirectionProcessor(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.13.0", + esVersion: common.MustNewVersion("7.12.34"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "network_direction": map[string]interface{}{ + "internal_networks": []string{ + "loopback", + "private", + }, + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.13.0", + esVersion: common.MustNewVersion("7.13.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "network_direction": map[string]interface{}{ + "internal_networks": []string{ + "loopback", + "private", + }, + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "network_direction": map[string]interface{}{ + "internal_networks": []string{ + "loopback", + "private", + }, + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES > 7.13.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "network_direction": map[string]interface{}{ + "internal_networks": []string{ + "loopback", + "private", + }, + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "network_direction": map[string]interface{}{ + "internal_networks": []string{ + "loopback", + "private", + }, + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} + +func TestReplaceConvertIPWithGrok(t *testing.T) { + logp.TestingSetup() + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES >= 7.13.0: keep processor", + esVersion: common.MustNewVersion("7.13.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "convert": map[string]interface{}{ + "field": "foo", + "target_field": "bar", + "type": "ip", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "convert": map[string]interface{}{ + "field": "foo", + "target_field": "bar", + "type": "ip", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES < 7.13.0: replace with grok", + esVersion: common.MustNewVersion("7.12.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "convert": map[string]interface{}{ + "field": "foo", + "target_field": "bar", + "type": "ip", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "grok": map[string]interface{}{ + "field": "foo", + "patterns": []string{ + "^%{IP:bar}$", + }, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "implicit target", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "convert": map[string]interface{}{ + "field": "foo", + "type": "ip", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "grok": map[string]interface{}{ + "field": "foo", + "patterns": []string{ + "^%{IP:foo}$", + }, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "missing field", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "convert": map[string]interface{}{ + "type": "ip", + }, + }, + }, + }, + isErrExpected: true, + }, + { + name: "keep settings in grok", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "convert": map[string]interface{}{ + "field": "foo", + "target_field": "bar", + "type": "ip", + "ignore_missing": true, + "description": "foo bar", + "if": "condition", + "ignore_failure": false, + "tag": "myTag", + "on_failure": []interface{}{ + "foo", + map[string]interface{}{ + "bar": []int{1, 2, 3}, + }, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "grok": map[string]interface{}{ + "field": "foo", + "patterns": []string{ + "^%{IP:bar}$", + }, + "ignore_missing": true, + "description": "foo bar", + "if": "condition", + "ignore_failure": false, + "tag": "myTag", + "on_failure": []interface{}{ + "foo", + map[string]interface{}{ + "bar": []int{1, 2, 3}, + }, + }, + }, + }, + }, + }, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} + +func TestRemoveRegisteredDomainProcessor(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.13.0", + esVersion: common.MustNewVersion("7.12.34"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + map[string]interface{}{ + "registered_domain": map[string]interface{}{ + "field": "foo", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.13.0", + esVersion: common.MustNewVersion("7.13.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "registered_domain": map[string]interface{}{ + "field": "foo", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "registered_domain": map[string]interface{}{ + "field": "foo", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES > 7.13.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "registered_domain": map[string]interface{}{ + "field": "foo", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "registered_domain": map[string]interface{}{ + "field": "foo", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +}