From dd71bb8bfcba6fffa743fe7253374853fb4d0569 Mon Sep 17 00:00:00 2001 From: Yaroslav Kirillov Date: Thu, 3 Oct 2024 15:32:58 +0500 Subject: [PATCH] Add _skip_empty to modify & flag to re filter (#680) * Add _skip_empty to modify & flag to re filter * Fix review comments * Fix * gen-doc --- cfg/substitution/regex_filter.go | 48 ++++++++++++++++++-------- cfg/substitution/substitution_test.go | 30 +++++++++++++++- plugin/README.md | 12 ++++++- plugin/action/README.md | 12 ++++++- plugin/action/modify/README.md | 12 ++++++- plugin/action/modify/modify.go | 39 ++++++++++++++++----- plugin/action/modify/modify_test.go | 12 ++++++- plugin/output/clickhouse/clickhouse.go | 2 +- 8 files changed, 139 insertions(+), 28 deletions(-) diff --git a/cfg/substitution/regex_filter.go b/cfg/substitution/regex_filter.go index cb894a931..99c825d14 100644 --- a/cfg/substitution/regex_filter.go +++ b/cfg/substitution/regex_filter.go @@ -10,10 +10,11 @@ import ( ) type RegexFilter struct { - re *regexp.Regexp - limit int - groups []int - separator []byte + re *regexp.Regexp + limit int + groups []int + separator []byte + emptyOnNotMatched bool buf []byte } @@ -24,6 +25,9 @@ func (r *RegexFilter) Apply(src []byte, dst []byte) []byte { } indexes := r.re.FindAllSubmatchIndex(src, r.limit) if len(indexes) == 0 { + if r.emptyOnNotMatched { + return []byte("") + } return dst } r.buf = r.buf[:0] @@ -57,9 +61,10 @@ func (r *RegexFilter) setBuffer(buf []byte) { // compareArgs is used for testing. Checks filter args values. func (r *RegexFilter) compareArgs(args []any) error { - wantArgsCnt := 4 - if len(args) != wantArgsCnt { - return fmt.Errorf("wrong regex filter amount of args, want=%d got=%d", wantArgsCnt, len(args)) + wantArgsCntMin := 4 + wantArgsCntMax := 5 + if len(args) < wantArgsCntMin || len(args) > wantArgsCntMax { + return fmt.Errorf("wrong regex filter amount of args, want=[%d:%d] got=%d", wantArgsCntMin, wantArgsCntMax, len(args)) } wantRe := args[0].(string) gotRe := r.re.String() @@ -86,24 +91,33 @@ func (r *RegexFilter) compareArgs(args []any) error { if wantSeparator != gotSeparator { return fmt.Errorf("wrong regex filter separator, want=%q got=%q", wantSeparator, gotSeparator) } + if len(args) > wantArgsCntMin { + wantEmptyOnNotMatched := args[4].(bool) + gotEmptyOnNotMatched := r.emptyOnNotMatched + if wantEmptyOnNotMatched != gotEmptyOnNotMatched { + return fmt.Errorf("wrong regex filter flag 'emptyOnNotMatched', want=%v got=%v", wantEmptyOnNotMatched, gotEmptyOnNotMatched) + } + } return nil } func parseRegexFilter(data string, offset int, logger *zap.Logger) (FieldFilter, int, error) { - expArgsCnt := 4 + expArgsCntMin := 4 + expArgsCntMax := 5 filterEndPos := -1 args, argsEndPos, err := parseFilterArgs(data[len(regexFilterPrefix):]) if err != nil { return nil, filterEndPos, fmt.Errorf("failed to parse filter args: %w", err) } filterEndPos = argsEndPos + len(regexFilterPrefix) + offset - if len(args) != expArgsCnt { - return nil, filterEndPos, fmt.Errorf("invalid args for regexp filter, exptected %d, got %d", expArgsCnt, len(args)) + if len(args) < expArgsCntMin || len(args) > expArgsCntMax { + return nil, filterEndPos, fmt.Errorf("invalid args for regexp filter, exptected from %d to %d, got %d", expArgsCntMin, expArgsCntMax, len(args)) } var reStr string var limit int var groups []int var separator string + var emptyOnNotMatched bool if err := json.Unmarshal([]byte(args[0]), &reStr); err != nil { return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter regexp string: %w", err) } @@ -118,11 +132,17 @@ func parseRegexFilter(data string, offset int, logger *zap.Logger) (FieldFilter, if err := json.Unmarshal([]byte(args[3]), &separator); err != nil { return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter separator: %w", err) } + if len(args) > expArgsCntMin { + if err := json.Unmarshal([]byte(args[4]), &emptyOnNotMatched); err != nil { + return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter flag 'emptyOnNotMatched': %w", err) + } + } filter := &RegexFilter{ - re: re, - limit: limit, - groups: groups, - separator: []byte(separator), + re: re, + limit: limit, + groups: groups, + separator: []byte(separator), + emptyOnNotMatched: emptyOnNotMatched, } return filter, filterEndPos, nil } diff --git a/cfg/substitution/substitution_test.go b/cfg/substitution/substitution_test.go index 35281e89e..07a9cf64d 100644 --- a/cfg/substitution/substitution_test.go +++ b/cfg/substitution/substitution_test.go @@ -143,7 +143,7 @@ func TestParseFieldWithFilter(t *testing.T) { }, { name: "with_two_substitutions_one_filter", - substitution: `days till world end ${prediction.days|re("(\\d),(test.+)",-1,[1,2]," , ")}. Hello, ${name|re("(\\w+)",1,[1],",")}`, + substitution: `days till world end ${prediction.days|re("(\\d),(test.+)",-1,[1,2]," , ")}. Hello, ${name|re("(\\w+)",1,[1],",",true)}`, data: [][]string{ {"days till world end "}, {"prediction", "days"}, @@ -167,6 +167,7 @@ func TestParseFieldWithFilter(t *testing.T) { 1, []int{1}, ",", + true, }, }, }, @@ -187,6 +188,16 @@ func TestParseFieldWithFilter(t *testing.T) { substitution: `test ${field|re()} test2`, wantErr: true, }, + { + name: "err_invalid_args_count_min", + substitution: `test ${field|re("invalid", -1, [1,2])} test2`, + wantErr: true, + }, + { + name: "err_invalid_args_count_max", + substitution: `test ${field|re("invalid", -1, [1,2], "|", 1, 2)} test2`, + wantErr: true, + }, { name: "err_re_filter_invalid_args_invalid_first_arg", substitution: `test ${field|re('(invalid)',-1,[1,],"|")} test2`, @@ -207,6 +218,11 @@ func TestParseFieldWithFilter(t *testing.T) { substitution: `test ${field|re("(invalid)",-1,[1],'invalid')} test2`, wantErr: true, }, + { + name: "err_re_filter_invalid_args_invalid_fifth_arg", + substitution: `test ${field|re("(invalid)",-1,[1],"|",100)} test2`, + wantErr: true, + }, { name: "err_invalid_args_no_last_bracket", substitution: `test ${field|re('invalid'} test2`, @@ -332,6 +348,18 @@ func TestRegexFilterApply(t *testing.T) { data: `this is some text re1 re2 re3 re4 end`, want: "re1|re2", }, + { + name: "ok_re_filter_empty_on_not_matched_false", + substitution: `${field|re("(re\\d)",1,[1],"|")}`, + data: `this is some text`, + want: "this is some text", + }, + { + name: "ok_re_filter_empty_on_not_matched_true", + substitution: `${field|re("(re\\d)",1,[1],"|",true)}`, + data: `this is some text`, + want: "", + }, { name: "ok_single_trim_filter_trim_all", substitution: `${field|trim("all","\\n")}`, diff --git a/plugin/README.md b/plugin/README.md index 352c6e357..4f94109fd 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -430,6 +430,7 @@ pipelines: ## modify It modifies the content for a field or add new field. It works only with strings. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`. +When `_skip_empty` is set to `true`, the field won't be modified/added in the case of field value is empty. > Note: When used to add new nested fields, each child field is added step by step, which can cause performance issues. @@ -468,10 +469,11 @@ and its result is formed into a value to be put in modified field. Currently available filters are: -+ `regex filter` - `re(regex string, limit int, groups []int, separator string)`, filters data using `regex`, extracts `limit` occurrences, ++ `regex filter` - `re(regex string, limit int, groups []int, separator string[, emptyOnNotMatched bool])`, filters data using `regex`, extracts `limit` occurrences, takes regex groups listed in `groups` list, and if there are more than one extracted element concatenates result using `separator`. Negative value of `limit` means all occurrences are extracted, `limit` 0 means no occurrences are extracted, `limit` greater than 0 means at most `limit` occurrences are extracted. +Optional flag `emptyOnNotMatched` allows to returns empty string if no matches occurred for regex. + `trim filter` - `trim(mode string, cutset string)`, trims data by the `cutset` substring. Available modes are `all` - trim both sides, `left` - trim only left, `right` - trim only right. @@ -504,6 +506,14 @@ Result: `{"message:"service=service-test-1 exec took 200ms","took":"200ms"}` Example #4: +Data: `{"message:"message without matching re"}` + +Substitution: `extracted: ${message|re("test",1,[1],",",true)}` + +Result: `{"message:"message without matching re","extracted":""}` + +Example #5: + Data: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}\n"}` Substitution: `message: ${message|trim("right","\n")}` diff --git a/plugin/action/README.md b/plugin/action/README.md index 8cb543fe8..d88067d65 100755 --- a/plugin/action/README.md +++ b/plugin/action/README.md @@ -261,6 +261,7 @@ pipelines: ## modify It modifies the content for a field or add new field. It works only with strings. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`. +When `_skip_empty` is set to `true`, the field won't be modified/added in the case of field value is empty. > Note: When used to add new nested fields, each child field is added step by step, which can cause performance issues. @@ -299,10 +300,11 @@ and its result is formed into a value to be put in modified field. Currently available filters are: -+ `regex filter` - `re(regex string, limit int, groups []int, separator string)`, filters data using `regex`, extracts `limit` occurrences, ++ `regex filter` - `re(regex string, limit int, groups []int, separator string[, emptyOnNotMatched bool])`, filters data using `regex`, extracts `limit` occurrences, takes regex groups listed in `groups` list, and if there are more than one extracted element concatenates result using `separator`. Negative value of `limit` means all occurrences are extracted, `limit` 0 means no occurrences are extracted, `limit` greater than 0 means at most `limit` occurrences are extracted. +Optional flag `emptyOnNotMatched` allows to returns empty string if no matches occurred for regex. + `trim filter` - `trim(mode string, cutset string)`, trims data by the `cutset` substring. Available modes are `all` - trim both sides, `left` - trim only left, `right` - trim only right. @@ -335,6 +337,14 @@ Result: `{"message:"service=service-test-1 exec took 200ms","took":"200ms"}` Example #4: +Data: `{"message:"message without matching re"}` + +Substitution: `extracted: ${message|re("test",1,[1],",",true)}` + +Result: `{"message:"message without matching re","extracted":""}` + +Example #5: + Data: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}\n"}` Substitution: `message: ${message|trim("right","\n")}` diff --git a/plugin/action/modify/README.md b/plugin/action/modify/README.md index 3495facbb..c75833a49 100755 --- a/plugin/action/modify/README.md +++ b/plugin/action/modify/README.md @@ -1,6 +1,7 @@ # Modify plugin It modifies the content for a field or add new field. It works only with strings. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`. +When `_skip_empty` is set to `true`, the field won't be modified/added in the case of field value is empty. > Note: When used to add new nested fields, each child field is added step by step, which can cause performance issues. @@ -39,10 +40,11 @@ and its result is formed into a value to be put in modified field. Currently available filters are: -+ `regex filter` - `re(regex string, limit int, groups []int, separator string)`, filters data using `regex`, extracts `limit` occurrences, ++ `regex filter` - `re(regex string, limit int, groups []int, separator string[, emptyOnNotMatched bool])`, filters data using `regex`, extracts `limit` occurrences, takes regex groups listed in `groups` list, and if there are more than one extracted element concatenates result using `separator`. Negative value of `limit` means all occurrences are extracted, `limit` 0 means no occurrences are extracted, `limit` greater than 0 means at most `limit` occurrences are extracted. +Optional flag `emptyOnNotMatched` allows to returns empty string if no matches occurred for regex. + `trim filter` - `trim(mode string, cutset string)`, trims data by the `cutset` substring. Available modes are `all` - trim both sides, `left` - trim only left, `right` - trim only right. @@ -75,6 +77,14 @@ Result: `{"message:"service=service-test-1 exec took 200ms","took":"200ms"}` Example #4: +Data: `{"message:"message without matching re"}` + +Substitution: `extracted: ${message|re("test",1,[1],",",true)}` + +Result: `{"message:"message without matching re","extracted":""}` + +Example #5: + Data: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}\n"}` Substitution: `message: ${message|trim("right","\n")}` diff --git a/plugin/action/modify/modify.go b/plugin/action/modify/modify.go index 1410bcf83..182c1c776 100644 --- a/plugin/action/modify/modify.go +++ b/plugin/action/modify/modify.go @@ -8,11 +8,10 @@ import ( "go.uber.org/zap" ) -const filterBufInitSize = 1024 - /*{ introduction It modifies the content for a field or add new field. It works only with strings. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`. +When `_skip_empty` is set to `true`, the field won't be modified/added in the case of field value is empty. > Note: When used to add new nested fields, each child field is added step by step, which can cause performance issues. @@ -51,10 +50,11 @@ and its result is formed into a value to be put in modified field. Currently available filters are: -+ `regex filter` - `re(regex string, limit int, groups []int, separator string)`, filters data using `regex`, extracts `limit` occurrences, ++ `regex filter` - `re(regex string, limit int, groups []int, separator string[, emptyOnNotMatched bool])`, filters data using `regex`, extracts `limit` occurrences, takes regex groups listed in `groups` list, and if there are more than one extracted element concatenates result using `separator`. Negative value of `limit` means all occurrences are extracted, `limit` 0 means no occurrences are extracted, `limit` greater than 0 means at most `limit` occurrences are extracted. +Optional flag `emptyOnNotMatched` allows to returns empty string if no matches occurred for regex. + `trim filter` - `trim(mode string, cutset string)`, trims data by the `cutset` substring. Available modes are `all` - trim both sides, `left` - trim only left, `right` - trim only right. @@ -87,6 +87,14 @@ Result: `{"message:"service=service-test-1 exec took 200ms","took":"200ms"}` Example #4: +Data: `{"message:"message without matching re"}` + +Substitution: `extracted: ${message|re("test",1,[1],",",true)}` + +Result: `{"message:"message without matching re","extracted":""}` + +Example #5: + Data: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}\n"}` Substitution: `message: ${message|trim("right","\n")}` @@ -95,17 +103,24 @@ Result: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}"}` }*/ +const ( + filterBufInitSize = 1024 + + skipEmptyKey = "_skip_empty" +) + type fieldOp struct { field []string ops []substitution.SubstitutionOp } type Plugin struct { - config *Config - logger *zap.Logger - fieldOps []fieldOp - buf []byte - fieldBuf []byte + config *Config + logger *zap.Logger + fieldOps []fieldOp + skipEmpty bool + buf []byte + fieldBuf []byte } type Config map[string]string @@ -128,6 +143,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP filtersBuf := make([]byte, 0, filterBufInitSize) for key, value := range *p.config { + if key == skipEmptyKey { + p.skipEmpty = value == "true" + continue + } + // if there are field filters in substitutions, they will have single buffer for all // substitution ops in this plugin ops, err := substitution.ParseSubstitution(value, filtersBuf, p.logger) @@ -169,6 +189,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { } } + if p.skipEmpty && len(p.buf) == 0 { + continue + } pipeline.CreateNestedField(event.Root, fo.field).MutateToBytesCopy(event.Root, p.buf) } diff --git a/plugin/action/modify/modify_test.go b/plugin/action/modify/modify_test.go index 103a8c726..05589bfbd 100644 --- a/plugin/action/modify/modify_test.go +++ b/plugin/action/modify/modify_test.go @@ -11,9 +11,11 @@ import ( func TestModify(t *testing.T) { config := test.NewConfig(&Config{ + "_skip_empty": "true", "new_field": "new_value", "my_object.field.subfield": "${existing_field}", "my_object.new_field.new_subfield": "new_subfield_value", + "not_exists": "${not_existing_field}", }, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} @@ -23,6 +25,7 @@ func TestModify(t *testing.T) { assert.Equal(t, "new_value", e.Root.Dig("new_field").AsString(), "wrong event field") assert.Equal(t, "existing_value", e.Root.Dig("my_object", "field", "subfield").AsString(), "wrong event field") assert.Equal(t, "new_subfield_value", e.Root.Dig("my_object", "new_field", "new_subfield").AsString(), "wrong event field") + assert.Nil(t, e.Root.Dig("not_exists"), "wrong event field") wg.Done() }) @@ -51,11 +54,18 @@ func TestModifyRegex(t *testing.T) { "substitution_field": "", }, }, + { + []byte(`{"existing_field":"not_matched_re"}`), + map[string]string{ + "new_field": "new_value", + "substitution_field": "", + }, + }, } config := test.NewConfig(&Config{ "new_field": "new_value", - "substitution_field": "${existing_field|re(\"(existing).*(value)\", -1, [1,2], \" | \")}", + "substitution_field": "${existing_field|re(\"(existing).*(value)\", -1, [1,2], \" | \", true)}", }, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 385101645..f1285393b 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -521,7 +521,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err for i := range p.instances { requestID := p.requestID.Inc() clickhouse := p.getInstance(requestID, i) - err := p.do(clickhouse, data.input) + err = p.do(clickhouse, data.input) if err == nil { return nil }