Skip to content

Commit

Permalink
Add _skip_empty to modify & flag to re filter (#680)
Browse files Browse the repository at this point in the history
* Add _skip_empty to modify & flag to re filter

* Fix review comments

* Fix

* gen-doc
  • Loading branch information
kirillov6 authored Oct 3, 2024
1 parent c0f5cea commit dd71bb8
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 28 deletions.
48 changes: 34 additions & 14 deletions cfg/substitution/regex_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
)

type RegexFilter struct {
re *regexp.Regexp
limit int
groups []int
separator []byte
re *regexp.Regexp
limit int
groups []int
separator []byte
emptyOnNotMatched bool

buf []byte
}
Expand All @@ -24,6 +25,9 @@ func (r *RegexFilter) Apply(src []byte, dst []byte) []byte {
}
indexes := r.re.FindAllSubmatchIndex(src, r.limit)
if len(indexes) == 0 {
if r.emptyOnNotMatched {
return []byte("")
}
return dst
}
r.buf = r.buf[:0]
Expand Down Expand Up @@ -57,9 +61,10 @@ func (r *RegexFilter) setBuffer(buf []byte) {

// compareArgs is used for testing. Checks filter args values.
func (r *RegexFilter) compareArgs(args []any) error {
wantArgsCnt := 4
if len(args) != wantArgsCnt {
return fmt.Errorf("wrong regex filter amount of args, want=%d got=%d", wantArgsCnt, len(args))
wantArgsCntMin := 4
wantArgsCntMax := 5
if len(args) < wantArgsCntMin || len(args) > wantArgsCntMax {
return fmt.Errorf("wrong regex filter amount of args, want=[%d:%d] got=%d", wantArgsCntMin, wantArgsCntMax, len(args))
}
wantRe := args[0].(string)
gotRe := r.re.String()
Expand All @@ -86,24 +91,33 @@ func (r *RegexFilter) compareArgs(args []any) error {
if wantSeparator != gotSeparator {
return fmt.Errorf("wrong regex filter separator, want=%q got=%q", wantSeparator, gotSeparator)
}
if len(args) > wantArgsCntMin {
wantEmptyOnNotMatched := args[4].(bool)
gotEmptyOnNotMatched := r.emptyOnNotMatched
if wantEmptyOnNotMatched != gotEmptyOnNotMatched {
return fmt.Errorf("wrong regex filter flag 'emptyOnNotMatched', want=%v got=%v", wantEmptyOnNotMatched, gotEmptyOnNotMatched)
}
}
return nil
}

func parseRegexFilter(data string, offset int, logger *zap.Logger) (FieldFilter, int, error) {
expArgsCnt := 4
expArgsCntMin := 4
expArgsCntMax := 5
filterEndPos := -1
args, argsEndPos, err := parseFilterArgs(data[len(regexFilterPrefix):])
if err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse filter args: %w", err)
}
filterEndPos = argsEndPos + len(regexFilterPrefix) + offset
if len(args) != expArgsCnt {
return nil, filterEndPos, fmt.Errorf("invalid args for regexp filter, exptected %d, got %d", expArgsCnt, len(args))
if len(args) < expArgsCntMin || len(args) > expArgsCntMax {
return nil, filterEndPos, fmt.Errorf("invalid args for regexp filter, exptected from %d to %d, got %d", expArgsCntMin, expArgsCntMax, len(args))
}
var reStr string
var limit int
var groups []int
var separator string
var emptyOnNotMatched bool
if err := json.Unmarshal([]byte(args[0]), &reStr); err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter regexp string: %w", err)
}
Expand All @@ -118,11 +132,17 @@ func parseRegexFilter(data string, offset int, logger *zap.Logger) (FieldFilter,
if err := json.Unmarshal([]byte(args[3]), &separator); err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter separator: %w", err)
}
if len(args) > expArgsCntMin {
if err := json.Unmarshal([]byte(args[4]), &emptyOnNotMatched); err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter flag 'emptyOnNotMatched': %w", err)
}
}
filter := &RegexFilter{
re: re,
limit: limit,
groups: groups,
separator: []byte(separator),
re: re,
limit: limit,
groups: groups,
separator: []byte(separator),
emptyOnNotMatched: emptyOnNotMatched,
}
return filter, filterEndPos, nil
}
30 changes: 29 additions & 1 deletion cfg/substitution/substitution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestParseFieldWithFilter(t *testing.T) {
},
{
name: "with_two_substitutions_one_filter",
substitution: `days till world end ${prediction.days|re("(\\d),(test.+)",-1,[1,2]," , ")}. Hello, ${name|re("(\\w+)",1,[1],",")}`,
substitution: `days till world end ${prediction.days|re("(\\d),(test.+)",-1,[1,2]," , ")}. Hello, ${name|re("(\\w+)",1,[1],",",true)}`,
data: [][]string{
{"days till world end "},
{"prediction", "days"},
Expand All @@ -167,6 +167,7 @@ func TestParseFieldWithFilter(t *testing.T) {
1,
[]int{1},
",",
true,
},
},
},
Expand All @@ -187,6 +188,16 @@ func TestParseFieldWithFilter(t *testing.T) {
substitution: `test ${field|re()} test2`,
wantErr: true,
},
{
name: "err_invalid_args_count_min",
substitution: `test ${field|re("invalid", -1, [1,2])} test2`,
wantErr: true,
},
{
name: "err_invalid_args_count_max",
substitution: `test ${field|re("invalid", -1, [1,2], "|", 1, 2)} test2`,
wantErr: true,
},
{
name: "err_re_filter_invalid_args_invalid_first_arg",
substitution: `test ${field|re('(invalid)',-1,[1,],"|")} test2`,
Expand All @@ -207,6 +218,11 @@ func TestParseFieldWithFilter(t *testing.T) {
substitution: `test ${field|re("(invalid)",-1,[1],'invalid')} test2`,
wantErr: true,
},
{
name: "err_re_filter_invalid_args_invalid_fifth_arg",
substitution: `test ${field|re("(invalid)",-1,[1],"|",100)} test2`,
wantErr: true,
},
{
name: "err_invalid_args_no_last_bracket",
substitution: `test ${field|re('invalid'} test2`,
Expand Down Expand Up @@ -332,6 +348,18 @@ func TestRegexFilterApply(t *testing.T) {
data: `this is some text re1 re2 re3 re4 end`,
want: "re1|re2",
},
{
name: "ok_re_filter_empty_on_not_matched_false",
substitution: `${field|re("(re\\d)",1,[1],"|")}`,
data: `this is some text`,
want: "this is some text",
},
{
name: "ok_re_filter_empty_on_not_matched_true",
substitution: `${field|re("(re\\d)",1,[1],"|",true)}`,
data: `this is some text`,
want: "",
},
{
name: "ok_single_trim_filter_trim_all",
substitution: `${field|trim("all","\\n")}`,
Expand Down
12 changes: 11 additions & 1 deletion plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ pipelines:
## modify
It modifies the content for a field or add new field. It works only with strings.
You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`.
When `_skip_empty` is set to `true`, the field won't be modified/added in the case of field value is empty.

> Note: When used to add new nested fields, each child field is added step by step, which can cause performance issues.

Expand Down Expand Up @@ -468,10 +469,11 @@ and its result is formed into a value to be put in modified field.

Currently available filters are:

+ `regex filter` - `re(regex string, limit int, groups []int, separator string)`, filters data using `regex`, extracts `limit` occurrences,
+ `regex filter` - `re(regex string, limit int, groups []int, separator string[, emptyOnNotMatched bool])`, filters data using `regex`, extracts `limit` occurrences,
takes regex groups listed in `groups` list, and if there are more than one extracted element concatenates result using `separator`.
Negative value of `limit` means all occurrences are extracted, `limit` 0 means no occurrences are extracted, `limit` greater than 0 means
at most `limit` occurrences are extracted.
Optional flag `emptyOnNotMatched` allows to returns empty string if no matches occurred for regex.

+ `trim filter` - `trim(mode string, cutset string)`, trims data by the `cutset` substring. Available modes are `all` - trim both sides,
`left` - trim only left, `right` - trim only right.
Expand Down Expand Up @@ -504,6 +506,14 @@ Result: `{"message:"service=service-test-1 exec took 200ms","took":"200ms"}`

Example #4:

Data: `{"message:"message without matching re"}`

Substitution: `extracted: ${message|re("test",1,[1],",",true)}`

Result: `{"message:"message without matching re","extracted":""}`

Example #5:

Data: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}\n"}`

Substitution: `message: ${message|trim("right","\n")}`
Expand Down
12 changes: 11 additions & 1 deletion plugin/action/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ pipelines:
## modify
It modifies the content for a field or add new field. It works only with strings.
You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`.
When `_skip_empty` is set to `true`, the field won't be modified/added in the case of field value is empty.

> Note: When used to add new nested fields, each child field is added step by step, which can cause performance issues.

Expand Down Expand Up @@ -299,10 +300,11 @@ and its result is formed into a value to be put in modified field.

Currently available filters are:

+ `regex filter` - `re(regex string, limit int, groups []int, separator string)`, filters data using `regex`, extracts `limit` occurrences,
+ `regex filter` - `re(regex string, limit int, groups []int, separator string[, emptyOnNotMatched bool])`, filters data using `regex`, extracts `limit` occurrences,
takes regex groups listed in `groups` list, and if there are more than one extracted element concatenates result using `separator`.
Negative value of `limit` means all occurrences are extracted, `limit` 0 means no occurrences are extracted, `limit` greater than 0 means
at most `limit` occurrences are extracted.
Optional flag `emptyOnNotMatched` allows to returns empty string if no matches occurred for regex.

+ `trim filter` - `trim(mode string, cutset string)`, trims data by the `cutset` substring. Available modes are `all` - trim both sides,
`left` - trim only left, `right` - trim only right.
Expand Down Expand Up @@ -335,6 +337,14 @@ Result: `{"message:"service=service-test-1 exec took 200ms","took":"200ms"}`

Example #4:

Data: `{"message:"message without matching re"}`

Substitution: `extracted: ${message|re("test",1,[1],",",true)}`

Result: `{"message:"message without matching re","extracted":""}`

Example #5:

Data: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}\n"}`

Substitution: `message: ${message|trim("right","\n")}`
Expand Down
12 changes: 11 additions & 1 deletion plugin/action/modify/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Modify plugin
It modifies the content for a field or add new field. It works only with strings.
You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`.
When `_skip_empty` is set to `true`, the field won't be modified/added in the case of field value is empty.

> Note: When used to add new nested fields, each child field is added step by step, which can cause performance issues.
Expand Down Expand Up @@ -39,10 +40,11 @@ and its result is formed into a value to be put in modified field.

Currently available filters are:

+ `regex filter` - `re(regex string, limit int, groups []int, separator string)`, filters data using `regex`, extracts `limit` occurrences,
+ `regex filter` - `re(regex string, limit int, groups []int, separator string[, emptyOnNotMatched bool])`, filters data using `regex`, extracts `limit` occurrences,
takes regex groups listed in `groups` list, and if there are more than one extracted element concatenates result using `separator`.
Negative value of `limit` means all occurrences are extracted, `limit` 0 means no occurrences are extracted, `limit` greater than 0 means
at most `limit` occurrences are extracted.
Optional flag `emptyOnNotMatched` allows to returns empty string if no matches occurred for regex.

+ `trim filter` - `trim(mode string, cutset string)`, trims data by the `cutset` substring. Available modes are `all` - trim both sides,
`left` - trim only left, `right` - trim only right.
Expand Down Expand Up @@ -75,6 +77,14 @@ Result: `{"message:"service=service-test-1 exec took 200ms","took":"200ms"}`

Example #4:

Data: `{"message:"message without matching re"}`

Substitution: `extracted: ${message|re("test",1,[1],",",true)}`

Result: `{"message:"message without matching re","extracted":""}`

Example #5:

Data: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}\n"}`

Substitution: `message: ${message|trim("right","\n")}`
Expand Down
39 changes: 31 additions & 8 deletions plugin/action/modify/modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"go.uber.org/zap"
)

const filterBufInitSize = 1024

/*{ introduction
It modifies the content for a field or add new field. It works only with strings.
You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`.
When `_skip_empty` is set to `true`, the field won't be modified/added in the case of field value is empty.
> Note: When used to add new nested fields, each child field is added step by step, which can cause performance issues.
Expand Down Expand Up @@ -51,10 +50,11 @@ and its result is formed into a value to be put in modified field.
Currently available filters are:
+ `regex filter` - `re(regex string, limit int, groups []int, separator string)`, filters data using `regex`, extracts `limit` occurrences,
+ `regex filter` - `re(regex string, limit int, groups []int, separator string[, emptyOnNotMatched bool])`, filters data using `regex`, extracts `limit` occurrences,
takes regex groups listed in `groups` list, and if there are more than one extracted element concatenates result using `separator`.
Negative value of `limit` means all occurrences are extracted, `limit` 0 means no occurrences are extracted, `limit` greater than 0 means
at most `limit` occurrences are extracted.
Optional flag `emptyOnNotMatched` allows to returns empty string if no matches occurred for regex.
+ `trim filter` - `trim(mode string, cutset string)`, trims data by the `cutset` substring. Available modes are `all` - trim both sides,
`left` - trim only left, `right` - trim only right.
Expand Down Expand Up @@ -87,6 +87,14 @@ Result: `{"message:"service=service-test-1 exec took 200ms","took":"200ms"}`
Example #4:
Data: `{"message:"message without matching re"}`
Substitution: `extracted: ${message|re("test",1,[1],",",true)}`
Result: `{"message:"message without matching re","extracted":""}`
Example #5:
Data: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}\n"}`
Substitution: `message: ${message|trim("right","\n")}`
Expand All @@ -95,17 +103,24 @@ Result: `{"message:"{\"service\":\"service-test-1\",\"took\":\"200ms\"}"}`
}*/

const (
filterBufInitSize = 1024

skipEmptyKey = "_skip_empty"
)

type fieldOp struct {
field []string
ops []substitution.SubstitutionOp
}

type Plugin struct {
config *Config
logger *zap.Logger
fieldOps []fieldOp
buf []byte
fieldBuf []byte
config *Config
logger *zap.Logger
fieldOps []fieldOp
skipEmpty bool
buf []byte
fieldBuf []byte
}

type Config map[string]string
Expand All @@ -128,6 +143,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP

filtersBuf := make([]byte, 0, filterBufInitSize)
for key, value := range *p.config {
if key == skipEmptyKey {
p.skipEmpty = value == "true"
continue
}

// if there are field filters in substitutions, they will have single buffer for all
// substitution ops in this plugin
ops, err := substitution.ParseSubstitution(value, filtersBuf, p.logger)
Expand Down Expand Up @@ -169,6 +189,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
}
}

if p.skipEmpty && len(p.buf) == 0 {
continue
}
pipeline.CreateNestedField(event.Root, fo.field).MutateToBytesCopy(event.Root, p.buf)
}

Expand Down
Loading

0 comments on commit dd71bb8

Please sign in to comment.