-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Filebeat] backwards compatibility for set processor #20908
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -127,6 +127,11 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string | |||||
return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err) | ||||||
} | ||||||
|
||||||
err = modifySetProcessor(esClient.GetVersion(), pipelineID, content) | ||||||
if err != nil { | ||||||
return fmt.Errorf("failed to modify set processor in pipeline: %v", err) | ||||||
} | ||||||
|
||||||
body, err := esClient.LoadJSON(path, content) | ||||||
if err != nil { | ||||||
return interpretError(err, body) | ||||||
|
@@ -232,3 +237,47 @@ func interpretError(initialErr error, body []byte) error { | |||||
|
||||||
return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) | ||||||
} | ||||||
|
||||||
// modifySetProcessor replaces ignore_empty_value option with an if statement | ||||||
// so ES less than 7.9 will still work | ||||||
func modifySetProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error { | ||||||
flagVersion := common.MustNewVersion("7.9.0") | ||||||
if !esVersion.LessThan(flagVersion) { | ||||||
return nil | ||||||
} | ||||||
|
||||||
p, ok := content["processors"] | ||||||
if !ok { | ||||||
return nil | ||||||
} | ||||||
processors, ok := p.([]interface{}) | ||||||
if !ok { | ||||||
return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) | ||||||
} | ||||||
|
||||||
for _, p := range processors { | ||||||
processor, ok := p.(map[string]interface{}) | ||||||
if !ok { | ||||||
continue | ||||||
} | ||||||
if options, ok := processor["set"].(map[string]interface{}); ok { | ||||||
iev, ok := options["ignore_empty_value"].(bool) | ||||||
if !ok || !iev { | ||||||
continue | ||||||
} | ||||||
val, ok := options["value"].(string) | ||||||
if !ok { | ||||||
continue | ||||||
} | ||||||
newIf := strings.ReplaceAll(val, "{", "") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about |
||||||
newIf = strings.ReplaceAll(newIf, "}", "") | ||||||
newIf = strings.TrimSpace(newIf) | ||||||
newIf = strings.ReplaceAll(newIf, ".", "?.") | ||||||
newIf = "ctx?." + newIf + " != null" | ||||||
logp.Debug("modules", "in pipeline %s replacing unsupported 'ignore_empty_value' with if %s in set processor", pipelineID, newIf) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
delete(options, "ignore_empty_value") | ||||||
options["if"] = newIf | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -215,3 +215,106 @@ func TestSetEcsProcessors(t *testing.T) { | |
}) | ||
} | ||
} | ||
|
||
func TestModifySetProcessor(t *testing.T) { | ||
cases := []struct { | ||
name string | ||
esVersion *common.Version | ||
content map[string]interface{} | ||
expected map[string]interface{} | ||
isErrExpected bool | ||
}{ | ||
{ | ||
name: "ES < 7.9.0", | ||
esVersion: common.MustNewVersion("7.8.0"), | ||
content: map[string]interface{}{ | ||
"processors": []interface{}{ | ||
map[string]interface{}{ | ||
"set": map[string]interface{}{ | ||
"field": "rule.name", | ||
"value": "{{panw.panos.ruleset}}", | ||
"ignore_empty_value": true, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about a test case for when the pipeline already has an |
||
}, | ||
}, | ||
}}, | ||
expected: map[string]interface{}{ | ||
"processors": []interface{}{ | ||
map[string]interface{}{ | ||
"set": map[string]interface{}{ | ||
"field": "rule.name", | ||
"value": "{{panw.panos.ruleset}}", | ||
"if": "ctx?.panw?.panos?.ruleset != null", | ||
}, | ||
}, | ||
}, | ||
}, | ||
isErrExpected: false, | ||
}, | ||
{ | ||
name: "ES == 7.9.0", | ||
esVersion: common.MustNewVersion("7.9.0"), | ||
content: map[string]interface{}{ | ||
"processors": []interface{}{ | ||
map[string]interface{}{ | ||
"set": map[string]interface{}{ | ||
"field": "rule.name", | ||
"value": "{{panw.panos.ruleset}}", | ||
"ignore_empty_value": true, | ||
}, | ||
}, | ||
}}, | ||
expected: map[string]interface{}{ | ||
"processors": []interface{}{ | ||
map[string]interface{}{ | ||
"set": map[string]interface{}{ | ||
"field": "rule.name", | ||
"value": "{{panw.panos.ruleset}}", | ||
"ignore_empty_value": true, | ||
}, | ||
}, | ||
}, | ||
}, | ||
isErrExpected: false, | ||
}, | ||
{ | ||
name: "ES > 7.9.0", | ||
esVersion: common.MustNewVersion("8.0.0"), | ||
content: map[string]interface{}{ | ||
"processors": []interface{}{ | ||
map[string]interface{}{ | ||
"set": map[string]interface{}{ | ||
"field": "rule.name", | ||
"value": "{{panw.panos.ruleset}}", | ||
"ignore_empty_value": true, | ||
}, | ||
}, | ||
}}, | ||
expected: map[string]interface{}{ | ||
"processors": []interface{}{ | ||
map[string]interface{}{ | ||
"set": map[string]interface{}{ | ||
"field": "rule.name", | ||
"value": "{{panw.panos.ruleset}}", | ||
"ignore_empty_value": true, | ||
}, | ||
}, | ||
}, | ||
}, | ||
isErrExpected: false, | ||
}, | ||
} | ||
|
||
for _, test := range cases { | ||
test := test | ||
t.Run(test.name, func(t *testing.T) { | ||
t.Parallel() | ||
err := modifySetProcessor(*test.esVersion, "foo-pipeline", test.content) | ||
if test.isErrExpected { | ||
assert.Error(t, err) | ||
} else { | ||
assert.NoError(t, err) | ||
assert.Equal(t, test.expected, test.content) | ||
} | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.