Skip to content

Commit

Permalink
[libbeat] Allow per beat.Client control of event normalization (#33657)
Browse files Browse the repository at this point in the history
Control over the addition of the "generalizeEvent" processor into the publishing pipeline was
only available at the Beat level. This adds a new option that can be set by input's when they
create their beat.Client.

This allows inputs to override the Beat's default behavior. My expected use case it to disable
event normalization for inputs that are known to only produce beat.Events containing the
standard data types expected by the processors and outputs (i.e. map[string]interface{}
containing primitives, slices, or other map[string]interface{}).

Inputs would want to disable the event normalization processor if they can because it adds
unnecessary processing (recurses over the fields and often allocates).

* lint / misspell - fix spelling
* lint / unused - remove `drop` field
* lint / errorlint - wrap error in fmt.Errorf
* lint / errcheck - add missing checks
  • Loading branch information
andrewkroh authored and chrisberkhout committed Jun 1, 2023
1 parent f3536b3 commit f9a6c18
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Added `.python-version` file {pull}32323[32323]
- Add support for multiple regions in GCP {pull}32964[32964]
- Use `T.TempDir` to create temporary test directory {pull}33082[33082]
- Add an option to disable event normalization when creating a `beat.Client`. {pull}33657[33657]

==== Deprecated

Expand Down
12 changes: 8 additions & 4 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ type ClientConfig struct {
// operations on ACKer are normally executed in different go routines. ACKers
// are required to be multi-threading safe.
type ACKer interface {
// AddEvent informs the ACKer that a new event has been send to the client.
// AddEvent informs the ACKer that a new event has been sent to the client.
// AddEvent is called after the processors have handled the event. If the
// event has been dropped by the processor `published` will be set to true.
// This allows the ACKer to do some bookeeping for dropped events.
// This allows the ACKer to do some bookkeeping for dropped events.
AddEvent(event Event, published bool)

// ACK Events from the output and pipeline queue are forwarded to ACKEvents.
Expand All @@ -83,7 +83,7 @@ type ACKer interface {
// Close informs the ACKer that the Client used to publish to the pipeline has been closed.
// No new events should be published anymore. The ACKEvents method still will be actively called
// as long as there are pending events for the client in the pipeline. The Close signal can be used
// to supress any ACK event propagation if required.
// to suppress any ACK event propagation if required.
// Close might be called from another go-routine than AddEvent and ACKEvents.
Close()
}
Expand Down Expand Up @@ -121,6 +121,10 @@ type ProcessingConfig struct {
// Disables the addition of host.name if it was enabled for the publisher.
DisableHost bool

// EventNormalization controls whether the event normalization processor
// is applied to events. If nil the Beat's default behavior prevails.
EventNormalization *bool

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
Expand Down Expand Up @@ -169,7 +173,7 @@ const (
// to update state keeping track of the sending status.
GuaranteedSend

// DropIfFull drops an event to be send if the pipeline is currently full.
// DropIfFull drops an event to be sent if the pipeline is currently full.
// This ensures a beats internals can continue processing if the pipeline has
// filled up. Useful if an event stream must be processed to keep internal
// state up-to-date.
Expand Down
25 changes: 16 additions & 9 deletions libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type builder struct {
// global pipeline processors
processors *group

drop bool // disabled is set if outputs have been disabled via CLI
alwaysCopy bool
}

Expand Down Expand Up @@ -109,7 +108,7 @@ func MakeDefaultSupport(

processors, err := processors.New(cfg.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
return nil, fmt.Errorf("error initializing processors: %w", err)
}

return newBuilder(info, log, processors, cfg.EventMetadata, modifiers, !normalize, cfg.TimeSeries)
Expand Down Expand Up @@ -169,7 +168,7 @@ func WithObserverMeta() modifier {
"version": info.Version,
}
if info.Name != info.Hostname {
metadata.Put("name", info.Name)
metadata["name"] = info.Name
}
return mapstr.M{"observer": metadata}
})
Expand Down Expand Up @@ -212,9 +211,11 @@ func newBuilder(
b.builtinMeta = builtin
}

if fields := eventMeta.Fields; len(fields) > 0 {
if len(eventMeta.Fields) > 0 {
b.fields = mapstr.M{}
mapstr.MergeFields(b.fields, fields.Clone(), eventMeta.FieldsUnderRoot)
if err := mapstr.MergeFields(b.fields, eventMeta.Fields.Clone(), eventMeta.FieldsUnderRoot); err != nil {
return nil, fmt.Errorf("failed merging event metadata into fields: %w", err)
}
}

if timeSeries {
Expand Down Expand Up @@ -268,7 +269,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor,
builtin := b.builtinMeta
if cfg.DisableHost {
tmp := builtin.Clone()
tmp.Delete("host")
delete(tmp, "host")
builtin = tmp
}

Expand All @@ -288,8 +289,12 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor,
builtin = tmp
}

if !b.skipNormalize {
// setup 1: generalize/normalize output (P)
// setup 1: generalize/normalize output (P)
if cfg.EventNormalization != nil {
if *cfg.EventNormalization {
processors.add(newGeneralizeProcessor(cfg.KeepNull))
}
} else if !b.skipNormalize {
processors.add(newGeneralizeProcessor(cfg.KeepNull))
}

Expand All @@ -310,7 +315,9 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor,
fields := cfg.Fields.Clone()
fields.DeepUpdate(b.fields.Clone())
if em := cfg.EventMetadata; len(em.Fields) > 0 {
mapstr.MergeFieldsDeep(fields, em.Fields.Clone(), em.FieldsUnderRoot)
if err := mapstr.MergeFieldsDeep(fields, em.Fields.Clone(), em.FieldsUnderRoot); err != nil {
return nil, fmt.Errorf("failed merging client event metadata into fields: %w", err)
}
}

if len(fields) > 0 {
Expand Down
37 changes: 37 additions & 0 deletions libbeat/publisher/processing/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,43 @@ func TestProcessorsConfigs(t *testing.T) {
}
}

// TestEventNormalizationOverride verifies that the EventNormalization option
// in beat.ProcessingConfig overrides the "skipNormalize" setting that is
// specified in the builder (this is the default value set by the Beat).
func TestEventNormalizationOverride(t *testing.T) {
boolPtr := func(b bool) *bool { return &b }

testCases := []struct {
skipNormalize bool
normalizeOverride *bool
hasGeneralizeProcessor bool
}{
{skipNormalize: false, normalizeOverride: nil, hasGeneralizeProcessor: true},
{skipNormalize: false, normalizeOverride: boolPtr(false), hasGeneralizeProcessor: false},
{skipNormalize: false, normalizeOverride: boolPtr(true), hasGeneralizeProcessor: true},
{skipNormalize: true, normalizeOverride: nil, hasGeneralizeProcessor: false},
{skipNormalize: true, normalizeOverride: boolPtr(false), hasGeneralizeProcessor: false},
{skipNormalize: true, normalizeOverride: boolPtr(true), hasGeneralizeProcessor: true},
}

for _, tc := range testCases {
builder, err := newBuilder(beat.Info{}, logp.NewLogger(""), nil, mapstr.EventMetadata{}, nil, tc.skipNormalize, false)
require.NoError(t, err)

processor, err := builder.Create(beat.ProcessingConfig{EventNormalization: tc.normalizeOverride}, false)
require.NoError(t, err)
group := processor.(*group)

if tc.hasGeneralizeProcessor {
if assert.NotEmpty(t, group.list) {
assert.Equal(t, "generalizeEvent", group.list[0].String())
}
} else {
assert.Empty(t, group.list)
}
}
}

func TestNormalization(t *testing.T) {
cases := map[string]struct {
normalize bool
Expand Down

0 comments on commit f9a6c18

Please sign in to comment.