Skip to content

Commit

Permalink
libbeat: add processors "decode_duration", "move_fields" (#31301)
Browse files Browse the repository at this point in the history
Co-authored-by: hxms <hxms@live.cn>
Co-authored-by: DeDe Morton <dede.morton@elastic.co>
  • Loading branch information
3 people committed Nov 16, 2022
1 parent e05ce90 commit dc1a0ca
Show file tree
Hide file tree
Showing 8 changed files with 561 additions and 0 deletions.
2 changes: 2 additions & 0 deletions libbeat/cmd/instance/imports_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
_ "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/communityid"
_ "github.com/elastic/beats/v7/libbeat/processors/convert"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_duration"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_xml"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_xml_wineventlog"
_ "github.com/elastic/beats/v7/libbeat/processors/dissect"
_ "github.com/elastic/beats/v7/libbeat/processors/dns"
_ "github.com/elastic/beats/v7/libbeat/processors/extract_array"
_ "github.com/elastic/beats/v7/libbeat/processors/fingerprint"
_ "github.com/elastic/beats/v7/libbeat/processors/move_fields"
_ "github.com/elastic/beats/v7/libbeat/processors/ratelimit"
_ "github.com/elastic/beats/v7/libbeat/processors/registered_domain"
_ "github.com/elastic/beats/v7/libbeat/processors/script"
Expand Down
12 changes: 12 additions & 0 deletions libbeat/docs/processors-list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ endif::[]
ifndef::no_decode_csv_fields_processor[]
* <<decode-csv-fields,`decode_csv_fields`>>
endif::[]
ifndef::no_decode_duration_processor[]
* <<decode-duration,`decode_duration`>>
endif::[]
ifndef::no_decode_json_fields_processor[]
* <<decode-json-fields,`decode_json_fields`>>
endif::[]
Expand Down Expand Up @@ -95,6 +98,9 @@ endif::[]
ifndef::no_include_fields_processor[]
* <<include-fields,`include_fields`>>
endif::[]
ifndef::no_move_fields_processor[]
* <<move-fields,`move-fields`>>
endif::[]
ifndef::no_include_rate_limit_processor[]
* <<rate-limit,`rate_limit`>>
endif::[]
Expand Down Expand Up @@ -189,6 +195,9 @@ endif::[]
ifndef::no_decode_csv_fields_processor[]
include::{libbeat-processors-dir}/decode_csv_fields/docs/decode_csv_fields.asciidoc[]
endif::[]
ifndef::no_include_decode_duration_processor[]
include::{libbeat-processors-dir}/decode_duration/docs/decode_duration.asciidoc[]
endif::[]
ifndef::no_decode_json_fields_processor[]
include::{libbeat-processors-dir}/actions/docs/decode_json_fields.asciidoc[]
endif::[]
Expand Down Expand Up @@ -225,6 +234,9 @@ endif::[]
ifndef::no_include_fields_processor[]
include::{libbeat-processors-dir}/actions/docs/include_fields.asciidoc[]
endif::[]
ifndef::no_include_move_fields_processor[]
include::{libbeat-processors-dir}/move_fields/docs/move_fields.asciidoc[]
endif::[]
ifndef::no_include_rate_limit_processor[]
include::{libbeat-processors-dir}/ratelimit/docs/rate_limit.asciidoc[]
endif::[]
Expand Down
95 changes: 95 additions & 0 deletions libbeat/processors/decode_duration/decode_duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package decode_duration

import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/elastic-agent-libs/config"
)

func init() {
processors.RegisterPlugin("decode_duration",
checks.ConfigChecked(NewDecodeDuration,
checks.RequireFields("field", "format")))
jsprocessor.RegisterPlugin("DecodeDuration", NewDecodeDuration)
}

type decodeDurationConfig struct {
Field string `config:"field"`
Format string `config:"format"`
}

type decodeDuration struct {
config decodeDurationConfig
}

func (u decodeDuration) Run(event *beat.Event) (*beat.Event, error) {
fields := event.Fields
fieldName := u.config.Field
x, err := fields.GetValue(fieldName)
if err != nil {
return event, fmt.Errorf("field '%s' not present on event", fieldName)
}
durationString, ok := x.(string)
if !ok {
return event, fmt.Errorf("field '%s' is not a string, value: '%#v'", fieldName, x)
}
d, err := time.ParseDuration(durationString)
if err != nil {
return event, nil
}
switch u.config.Format {
case "milliseconds":
// keep the result is type float64
x = float64(d.Milliseconds())
case "seconds":
x = d.Seconds()
case "minutes":
x = d.Minutes()
case "hours":
x = d.Hours()
default:
x = float64(d.Milliseconds())
}
if _, err = fields.Put(fieldName, x); err != nil {
return event, fmt.Errorf("put field '%s' back to event failed: %w", fieldName, err)
}
return event, nil
}

func (u decodeDuration) String() string {
return "decode_duration"
}

func NewDecodeDuration(c *config.C) (processors.Processor, error) {
fc := decodeDurationConfig{}
err := c.Unpack(&fc)
if err != nil {
return nil, fmt.Errorf("failed to unpack decode duration config: %w", err)
}

return &decodeDuration{
config: fc,
}, nil
}
73 changes: 73 additions & 0 deletions libbeat/processors/decode_duration/decode_duration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package decode_duration

import (
"fmt"
"math"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestDecodeDuration(t *testing.T) {
cases := []struct {
Duration time.Duration
Format string
Result float64
}{
{time.Second + time.Millisecond, "", 1001},
{time.Second + time.Millisecond, "milliseconds", 1001},
{time.Second + time.Millisecond, "seconds", 1.001},
{3 * time.Second, "minutes", 0.05},
{3 * time.Minute, "hours", 0.05},
}

for _, testCase := range cases {
t.Run(fmt.Sprintf("%s format as %s", testCase.Duration, testCase.Format), func(t *testing.T) {
evt := &beat.Event{Fields: mapstr.M{}}
c := &decodeDuration{
config: decodeDurationConfig{
Field: "duration",
Format: testCase.Format,
},
}
if _, err := evt.PutValue("duration", testCase.Duration.String()); err != nil {
t.Fatal(err)
}
evt, err := c.Run(evt)
if err != nil {
t.Fatal(err)
}
d, err := evt.GetValue("duration")
if err != nil {
t.Fatal(err)
}
floatD, ok := d.(float64)
if !ok {
t.Fatal("result value is not duration")
}
floatD = math.Round(floatD*math.Pow10(6)) / math.Pow10(6)
if floatD != testCase.Result {
t.Fatalf("test case except: %f, actual: %f", testCase.Result, floatD)
}
})
}
}
26 changes: 26 additions & 0 deletions libbeat/processors/decode_duration/docs/decode_duration.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[[decode-duration]]
=== Decode duration

++++
<titleabbrev>decode_duration</titleabbrev>
++++

The `decode_duration` processor decodes a Go-style duration string into a specific `format`.

For more information about the Go `time.Duration` string style, refer to the https://pkg.go.dev/time#Duration[Go documentation].

.Decode-Duration options
[options="header"]
|======
| Name | Required | Default | Description |
| `field` | yes | | Which field of event needs to be decoded as `time.Duration` |
| `format` | yes | `milliseconds` | Supported formats: `milliseconds`/`seconds`/`minutes`/`hours` |
|======

[source,yaml]
----
processors:
- decode_duration:
field: "app.rpc.cost"
format: "milliseconds"
----
101 changes: 101 additions & 0 deletions libbeat/processors/move_fields/docs/move_fields.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
[[move-fields]]
=== Move fields

++++
<titleabbrev>move_fields</titleabbrev>
++++

The `move_fields` processor moves event fields from one object into another. It can also rearrange fields or add a prefix to fields.

The processor extracts fields from `from`, then uses `fields` and `exclude` as filters to choose which fields to move into the `to` field.

For example, given the following event:

[source,json]
----
{
"app": {
"method": "a",
"elapsed_time": 100,
"user_id": 100,
"message": "i'm a message"
}
}
----

To move `method` and `elapsed_time` into another object, use this configuration:

[source,yaml]
----
processors:
- move_fields:
from: "app"
fields: ["method", "elapsed_time"],
to: "rpc."
----

Your final event will be:

[source,json]
----
{
"app": {
"user_id": 100,
"message": "i'm a message",
"rpc": {
"method": "a",
"elapsed_time": 100
}
}
}
----


To add a prefix to the whole event:

[source,json]
----
{
"app": { "method": "a"},
"cost": 100
}
----

Use this configuration:

[source,yaml]
----
processors:
- move_fields:
to: "my_prefix_"
----

Your final event will be:

[source,json]
----
{
"my_prefix_app": { "method": "a"},
"my_prefix_cost": 100
}
----

.Move-fields options
[options="header"]
|======
| Name | Required | Default | Description |
| `from` | no | | Which field you want extract. This field and any nested fields will be moved into `to` unless they are filtered out. If empty, indicates event root. |
| `fields` | no | | Which fields to extract from `from` and move to `to`. An empty list indicates all fields. |
| `ignore_missing` | no | false | Ignore "not found" errors when extracting fields. |
| `exclude` | no | | A list of fields to exclude and not move. |
| `to` | yes | | These fields extract from `from` destination field prefix the `to` will base on fields root. |
|======

[source,yaml]
----
processors:
- move_fields:
from: "app"
fields: [ "method", "elapsed_time" ]
to: "rpc."
----
Loading

0 comments on commit dc1a0ca

Please sign in to comment.