From f08fb99fb4cd8b1336f29cf87c7747d2b1424385 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Fri, 8 Mar 2019 16:52:24 +0100 Subject: [PATCH] Beats event processing and default fields (#10801) This changes moves the generation of the event processing into it's distinct package, such that the actual publisher pipeline will not define any processors anymore. A new instance of a publisher pipeline must not add fields on it's own. This change converts the event processing pipline into the 'Supporter' pattern, which is already used for Index Management. As different beats ask for slightly different behavior in the event processing (e.g. normalize, default builtins and so on), the `processing.Support` can be used for customizations. (cherry picked from commit 83dfb2f7b7b411cb266682572a6691c221f903b8) --- CHANGELOG-developer.next.asciidoc | 3 + filebeat/channel/factory.go | 16 +- heartbeat/monitors/task.go | 8 +- journalbeat/input/input.go | 10 +- libbeat/beat/pipeline.go | 50 +- libbeat/cmd/instance/beat.go | 14 + libbeat/cmd/instance/settings.go | 3 + .../report/elasticsearch/elasticsearch.go | 9 +- libbeat/publisher/pipeline/module.go | 20 +- libbeat/publisher/pipeline/pipeline.go | 86 +--- libbeat/publisher/pipeline/processor.go | 327 ------------- libbeat/publisher/pipeline/processor_test.go | 433 ------------------ libbeat/publisher/pipeline/stress/run.go | 11 +- libbeat/publisher/processing/default.go | 325 +++++++++++++ libbeat/publisher/processing/default_test.go | 364 +++++++++++++++ libbeat/publisher/processing/processing.go | 38 ++ libbeat/publisher/processing/processors.go | 200 ++++++++ metricbeat/mb/module/connector.go | 8 +- packetbeat/beater/packetbeat.go | 6 +- packetbeat/publish/publish.go | 6 +- winlogbeat/beater/eventlogger.go | 10 +- x-pack/functionbeat/beater/functionbeat.go | 8 +- 22 files changed, 1052 insertions(+), 903 deletions(-) delete mode 100644 libbeat/publisher/pipeline/processor.go delete mode 100644 libbeat/publisher/pipeline/processor_test.go create mode 100644 libbeat/publisher/processing/default.go create mode 100644 libbeat/publisher/processing/default_test.go create mode 100644 libbeat/publisher/processing/processing.go create mode 100644 libbeat/publisher/processing/processors.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index f4270e0c02e..0dc3843ba52 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -23,10 +23,13 @@ The list below covers the major changes between 7.0.0-beta1 and master only. be used to create an index selector. {pull}10347[10347] - Remove support for loading dashboards to Elasticsearch 5. {pull}10451[10451] - Remove support for deprecated `GenRootCmd` methods. {pull}10721[10721] +- Remove SkipNormalization, SkipAgentMetadata, SkipAddHostName. {pull}10801[10801] {pull}10769[10769] ==== Bugfixes + - Align default index between elasticsearch and logstash and kafka output. {pull}10841[10841] - Fix duplication check for `append_fields` option. {pull}10959[10959] ==== Added +- Introduce processing.Support to instance.Setting. This allows Beats to fully modify the event processing. {pull}10801[10801] diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index c938aafe8bd..c295d1a4056 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -125,13 +125,15 @@ func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *c } client, err := p.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: config.EventMetadata, - DynamicFields: dynFields, - Meta: meta, - Fields: fields, - Processor: processors, - Events: f.eventer, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: config.EventMetadata, + DynamicFields: dynFields, + Meta: meta, + Fields: fields, + Processor: processors, + }, + Events: f.eventer, }) if err != nil { return nil, err diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 3ee62eff235..43e4a8056fa 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -139,9 +139,11 @@ func (t *configuredJob) Start() { } t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{ - EventMetadata: t.config.EventMetadata, - Processor: t.processors, - Fields: fields, + Processing: beat.ProcessingConfig{ + EventMetadata: t.config.EventMetadata, + Processor: t.processors, + Fields: fields, + }, }) if err != nil { logp.Err("could not start monitor: %v", err) diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 70d9bcb6874..7f1941271dc 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -125,10 +125,12 @@ func New( func (i *Input) Run() { var err error i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: i.eventMeta, - Meta: nil, - Processor: i.processors, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: i.eventMeta, + Meta: nil, + Processor: i.processors, + }, ACKCount: func(n int) { i.logger.Infof("journalbeat successfully published %d events", n) }, diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 3db7baae6d2..5743e1cc92a 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -46,22 +46,7 @@ type Client interface { type ClientConfig struct { PublishMode PublishMode - // EventMetadata configures additional fields/tags to be added to published events. - EventMetadata common.EventMetadata - - // Meta provides additional meta data to be added to the Meta field in the beat.Event - // structure. - Meta common.MapStr - - // Fields provides additional 'global' fields to be added to every event - Fields common.MapStr - - // DynamicFields provides additional fields to be added to every event, supporting live updates - DynamicFields *common.MapStrPointer - - // Processors passes additional processor to the client, to be executed before - // the pipeline processors. - Processor ProcessorList + Processing ProcessingConfig // WaitClose sets the maximum duration to wait on ACK, if client still has events // active non-acknowledged events in the publisher pipeline. @@ -72,14 +57,6 @@ type ClientConfig struct { // Events configures callbacks for common client callbacks Events ClientEventer - // By default events are normalized within processor pipeline, - // if the normalization step should be skipped set this to true. - SkipNormalization bool - - // By default events are decorated with agent metadata. - // To skip adding that metadata set this to true. - SkipAgentMetadata bool - // ACK handler strategies. // Note: ack handlers are run in another go-routine owned by the publisher pipeline. // They should not block for to long, to not block the internal buffers for @@ -101,6 +78,31 @@ type ClientConfig struct { ACKLastEvent func(interface{}) } +// ProcessingConfig provides additional event processing settings a client can +// pass to the publisher pipeline on Connect. +type ProcessingConfig struct { + // EventMetadata configures additional fields/tags to be added to published events. + EventMetadata common.EventMetadata + + // Meta provides additional meta data to be added to the Meta field in the beat.Event + // structure. + Meta common.MapStr + + // Fields provides additional 'global' fields to be added to every event + Fields common.MapStr + + // DynamicFields provides additional fields to be added to every event, supporting live updates + DynamicFields *common.MapStrPointer + + // Processors passes additional processor to the client, to be executed before + // the pipeline processors. + Processor ProcessorList + + // Private contains additional information to be passed to the processing + // pipeline builder. + Private interface{} +} + // ClientEventer provides access to internal client events. type ClientEventer interface { Closing() // Closing indicates the client is being shutdown next diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 44615ccef33..9beb5a33bda 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -62,6 +62,7 @@ import ( "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/plugin" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" svc "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/libbeat/version" sysinfo "github.com/elastic/go-sysinfo" @@ -78,6 +79,8 @@ type Beat struct { keystore keystore.Keystore index idxmgmt.Supporter + + processing processing.Supporter } type beatConfig struct { @@ -310,6 +313,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { Logger: logp.L().Named("publisher"), }, b.Config.Pipeline, + b.processing, b.makeOutputFactory(b.Config.Output), ) @@ -594,6 +598,16 @@ func (b *Beat) configure(settings Settings) error { imFactory = idxmgmt.MakeDefaultSupport(settings.ILM) } b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig) + if err != nil { + return err + } + + processingFactory := settings.Processing + if processingFactory == nil { + processingFactory = processing.MakeDefaultBeatSupport(true) + } + b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) + return err } diff --git a/libbeat/cmd/instance/settings.go b/libbeat/cmd/instance/settings.go index 9b2bfd34a05..765206fac8a 100644 --- a/libbeat/cmd/instance/settings.go +++ b/libbeat/cmd/instance/settings.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/idxmgmt/ilm" "github.com/elastic/beats/libbeat/monitoring/report" + "github.com/elastic/beats/libbeat/publisher/processing" ) // Settings contains basic settings for any beat to pass into GenRootCmd @@ -40,4 +41,6 @@ type Settings struct { // load custom index manager. The config object will be the Beats root configuration. IndexManagement idxmgmt.SupportFactory ILM ilm.SupportFactory + + Processing processing.SupportFactory } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index c53d6263962..46bd2104f5c 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/outputs/transport" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" "github.com/elastic/beats/libbeat/publisher/queue/memqueue" ) @@ -169,11 +170,16 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) outClient := outputs.NewFailoverClient(clients) outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max) + processing, err := processing.MakeDefaultSupport(true)(beat, log, common.NewConfig()) + if err != nil { + return nil, err + } + pipeline, err := pipeline.New( beat, pipeline.Monitors{ Metrics: monitoring, - Logger: logp.NewLogger(selector), + Logger: log, }, queueFactory, outputs.Group{ @@ -184,6 +190,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) pipeline.Settings{ WaitClose: 0, WaitCloseMode: pipeline.NoWaitOnClose, + Processors: processing, }) if err != nil { return nil, err diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 2c854c13676..8b5a66c63c6 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" ) @@ -60,6 +60,7 @@ func Load( beatInfo beat.Info, monitors Monitors, config Config, + processors processing.Supporter, makeOutput func(outputs.Observer) (string, outputs.Group, error), ) (*Pipeline, error) { log := monitors.Logger @@ -71,28 +72,11 @@ func Load( log.Info("Dry run mode. All output types except the file based one are disabled.") } - processors, err := processors.New(config.Processors) - if err != nil { - return nil, fmt.Errorf("error initializing processors: %v", err) - } - name := beatInfo.Name settings := Settings{ WaitClose: 0, WaitCloseMode: NoWaitOnClose, - Disabled: publishDisabled, Processors: processors, - Annotations: Annotations{ - Event: config.EventMetadata, - Builtin: common.MapStr{ - "host": common.MapStr{ - "name": name, - }, - "ecs": common.MapStr{ - "version": "1.0.0-beta2", - }, - }, - }, } queueBuilder, err := createQueueBuilder(config.Queue, monitors) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index fbe49510e0d..e8289e28ba5 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -31,8 +31,8 @@ import ( "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" ) @@ -73,21 +73,7 @@ type Pipeline struct { ackBuilder ackBuilder eventSema *sema - processors pipelineProcessors -} - -type pipelineProcessors struct { - // The pipeline its processor settings for - // constructing the clients complete processor - // pipeline on connect. - builtinMeta common.MapStr - fields common.MapStr - tags []string - - processors beat.Processor - - disabled bool // disabled is set if outputs have been disabled via CLI - alwaysCopy bool + processors processing.Supporter } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -98,19 +84,7 @@ type Settings struct { WaitCloseMode WaitCloseMode - Annotations Annotations - Processors *processors.Processors - - Disabled bool -} - -// Annotations configures additional metadata to be adde to every single event -// being published. The meta data will be added before executing the configured -// processors, so all processors configured with the pipeline or client will see -// the same/complete event. -type Annotations struct { - Event common.EventMetadata - Builtin common.MapStr + Processors processing.Supporter } // WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline. @@ -172,17 +146,13 @@ func New( monitors.Logger = logp.NewLogger("publish") } - annotations := settings.Annotations - processors := settings.Processors - log := monitors.Logger - disabledOutput := settings.Disabled p := &Pipeline{ beatInfo: beat, monitors: monitors, observer: nilObserver, waitCloseMode: settings.WaitCloseMode, waitCloseTimeout: settings.WaitClose, - processors: makePipelineProcessors(log, annotations, processors, disabledOutput), + processors: settings.Processors, } p.ackBuilder = &pipelineEmptyACK{p} p.ackActive = atomic.MakeBool(true) @@ -347,7 +317,10 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } } - processors := newProcessorPipeline(p.beatInfo, p.monitors, p.processors, cfg) + processors, err := p.createEventProcessing(cfg.Processing, publishDisabled) + if err != nil { + return nil, err + } acker := p.makeACKer(processors != nil, &cfg, waitClose) producerCfg := queue.ProducerConfig{ // Cancel events from queue if acker is configured @@ -389,6 +362,13 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return client, nil } +func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) { + if p.processors == nil { + return nil, nil + } + return p.processors.Create(cfg, noPublish) +} + func (e *pipelineEventer) OnACK(n int) { e.observer.queueACKed(n) @@ -414,42 +394,6 @@ func (e *waitCloser) wait() { e.events.Wait() } -func makePipelineProcessors( - log *logp.Logger, - annotations Annotations, - processors *processors.Processors, - disabled bool, -) pipelineProcessors { - p := pipelineProcessors{ - disabled: disabled, - } - - hasProcessors := processors != nil && len(processors.List) > 0 - if hasProcessors { - tmp := newProgram("global", log) - for _, p := range processors.List { - tmp.add(p) - } - p.processors = tmp - } - - if meta := annotations.Builtin; meta != nil { - p.builtinMeta = meta - } - - if em := annotations.Event; len(em.Fields) > 0 { - fields := common.MapStr{} - common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) - p.fields = fields - } - - if t := annotations.Event.Tags; len(t) > 0 { - p.tags = t - } - - return p -} - // OutputReloader returns a reloadable object for the output section of this pipeline func (p *Pipeline) OutputReloader() OutputReloader { return p.output diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go deleted file mode 100644 index 81834f7bdff..00000000000 --- a/libbeat/publisher/pipeline/processor.go +++ /dev/null @@ -1,327 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "fmt" - "strings" - "sync" - - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/outputs/codec/json" - "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/processors/actions" -) - -type program struct { - log *logp.Logger - title string - list []beat.Processor -} - -type processorFn struct { - name string - fn func(event *beat.Event) (*beat.Event, error) -} - -// newProcessorPipeline prepares the processor pipeline, merging -// post processing, event annotations and actual configured processors. -// The pipeline generated ensure the client and pipeline processors -// will see the complete events with all meta data applied. -// -// Pipeline (C=client, P=pipeline) -// -// 1. (P) generalize/normalize event -// 2. (C) add Meta from client Config to event.Meta -// 3. (C) add Fields from client config to event.Fields -// 4. (P) add pipeline fields + tags -// 5. (C) add client fields + tags -// 6. (C) client processors list -// 7. (P) add beats metadata -// 8. (P) pipeline processors list -// 9. (P) (if publish/debug enabled) log event -// 10. (P) (if output disabled) dropEvent -func newProcessorPipeline( - info beat.Info, - monitors Monitors, - global pipelineProcessors, - config beat.ClientConfig, -) beat.Processor { - var ( - // pipeline processors - processors = &program{ - title: "processPipeline", - log: monitors.Logger, - } - - // client fields and metadata - clientMeta = config.Meta - localProcessors = makeClientProcessors(monitors, config) - ) - - needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil - - if !config.SkipNormalization { - // setup 1: generalize/normalize output (P) - processors.add(generalizeProcessor) - } - - // setup 2: add Meta from client config (C) - if m := clientMeta; len(m) > 0 { - processors.add(clientEventMeta(m, needsCopy)) - } - - // setup 4, 5: pipeline tags + client tags - var tags []string - tags = append(tags, global.tags...) - tags = append(tags, config.EventMetadata.Tags...) - if len(tags) > 0 { - processors.add(actions.NewAddTags("tags", tags)) - } - - // setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata - fields := config.Fields.Clone() - fields.DeepUpdate(global.fields.Clone()) - if em := config.EventMetadata; len(em.Fields) > 0 { - common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) - } - - if len(fields) > 0 { - // Enforce a copy of fields if dynamic fields are configured or agent - // metadata will be merged into the fields. - // With dynamic fields potentially changing at any time, we need to copy, - // so we do not change shared structures be accident. - fieldsNeedsCopy := needsCopy || config.DynamicFields != nil || fields["agent"] != nil - processors.add(actions.NewAddFields(fields, fieldsNeedsCopy)) - } - - if config.DynamicFields != nil { - checkCopy := func(m common.MapStr) bool { - return needsCopy || hasKey(m, "agent") - } - processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, checkCopy)) - } - - // setup 5: client processor list - processors.add(localProcessors) - - // setup 6: add beats and host metadata - if meta := global.builtinMeta; len(meta) > 0 { - processors.add(actions.NewAddFields(meta, needsCopy)) - } - - // setup 7: add agent metadata - if !config.SkipAgentMetadata { - needsCopy := global.alwaysCopy || global.processors != nil - processors.add(actions.NewAddFields(createAgentFields(info), needsCopy)) - } - - // setup 8: pipeline processors list - processors.add(global.processors) - - // setup 9: debug print final event (P) - if logp.IsDebug("publish") { - processors.add(debugPrintProcessor(info, monitors)) - } - - // setup 10: drop all events if outputs are disabled (P) - if global.disabled { - processors.add(dropDisabledProcessor) - } - - return processors -} - -func newProgram(title string, log *logp.Logger) *program { - return &program{ - title: title, - log: log, - } -} - -func (p *program) add(processor processors.Processor) { - if processor != nil { - p.list = append(p.list, processor) - } -} - -func (p *program) String() string { - var s []string - for _, p := range p.list { - s = append(s, p.String()) - } - - str := strings.Join(s, ", ") - if p.title == "" { - return str - } - return fmt.Sprintf("%v{%v}", p.title, str) -} - -func (p *program) Run(event *beat.Event) (*beat.Event, error) { - if p == nil || len(p.list) == 0 { - return event, nil - } - - for _, sub := range p.list { - var err error - - event, err = sub.Run(event) - if err != nil { - // XXX: We don't drop the event, but continue filtering here iff the most - // recent processor did return an event. - // We want processors having this kind of implicit behavior - // on errors? - - p.log.Debugf("Fail to apply processor %s: %s", p, err) - } - - if event == nil { - return nil, err - } - } - - return event, nil -} - -func newProcessor(name string, fn func(*beat.Event) (*beat.Event, error)) *processorFn { - return &processorFn{name: name, fn: fn} -} - -func newAnnotateProcessor(name string, fn func(*beat.Event)) *processorFn { - return newProcessor(name, func(event *beat.Event) (*beat.Event, error) { - fn(event) - return event, nil - }) -} - -func (p *processorFn) String() string { return p.name } -func (p *processorFn) Run(e *beat.Event) (*beat.Event, error) { return p.fn(e) } - -var generalizeProcessor = newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) { - - // Filter out empty events. Empty events are still reported by ACK callbacks. - if len(event.Fields) == 0 { - return nil, nil - } - - fields := common.ConvertToGenericEvent(event.Fields) - if fields == nil { - logp.Err("fail to convert to generic event") - return nil, nil - } - - event.Fields = fields - return event, nil -}) - -var dropDisabledProcessor = newProcessor("dropDisabled", func(event *beat.Event) (*beat.Event, error) { - return nil, nil -}) - -func clientEventMeta(meta common.MapStr, needsCopy bool) *processorFn { - fn := func(event *beat.Event) { addMeta(event, meta) } - if needsCopy { - fn = func(event *beat.Event) { addMeta(event, meta.Clone()) } - } - return newAnnotateProcessor("@metadata", fn) -} - -func addMeta(event *beat.Event, meta common.MapStr) { - if event.Meta == nil { - event.Meta = meta - } else { - event.Meta.Clone() - event.Meta.DeepUpdate(meta) - } -} - -func makeAddDynMetaProcessor( - name string, - meta *common.MapStrPointer, - checkCopy func(m common.MapStr) bool, -) *processorFn { - return newAnnotateProcessor(name, func(event *beat.Event) { - dynFields := meta.Get() - if checkCopy(dynFields) { - dynFields = dynFields.Clone() - } - - event.Fields.DeepUpdate(dynFields) - }) -} - -func createAgentFields(info beat.Info) common.MapStr { - metadata := common.MapStr{ - "type": info.Beat, - "ephemeral_id": info.EphemeralID.String(), - "hostname": info.Hostname, - "id": info.ID.String(), - "version": info.Version, - } - if info.Name != info.Hostname { - metadata.Put("name", info.Name) - } - - return common.MapStr{"agent": metadata} -} - -func debugPrintProcessor(info beat.Info, monitors Monitors) *processorFn { - // ensure only one go-routine is using the encoder (in case - // beat.Client is shared between multiple go-routines by accident) - var mux sync.Mutex - - encoder := json.New(info.Version, json.Config{ - Pretty: true, - EscapeHTML: false, - }) - log := monitors.Logger - return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) { - mux.Lock() - defer mux.Unlock() - - b, err := encoder.Encode(info.Beat, event) - if err != nil { - return event, nil - } - - log.Debugf("Publish event: %s", b) - return event, nil - }) -} - -func makeClientProcessors( - monitors Monitors, - config beat.ClientConfig, -) processors.Processor { - procs := config.Processor - if procs == nil || len(procs.All()) == 0 { - return nil - } - - p := newProgram("client", monitors.Logger) - p.list = procs.All() - return p -} - -func hasKey(m common.MapStr, key string) bool { - _, exists := m[key] - return exists -} diff --git a/libbeat/publisher/pipeline/processor_test.go b/libbeat/publisher/pipeline/processor_test.go deleted file mode 100644 index 07b643d0c81..00000000000 --- a/libbeat/publisher/pipeline/processor_test.go +++ /dev/null @@ -1,433 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "sync" - "testing" - "time" - - "github.com/gofrs/uuid" - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) - -func TestProcessors(t *testing.T) { - defaultInfo := beat.Info{} - - type local struct { - config beat.ClientConfig - events []common.MapStr - expected []common.MapStr - includeAgentMetadata bool - } - - tests := []struct { - name string - global pipelineProcessors - local []local - info *beat.Info - }{ - { - name: "user global fields and tags", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - {"value": "abc", "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "no normalization", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{SkipNormalization: true}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - {"value": "abc", "user": nil, "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "add agent metadata", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1, "agent": common.MapStr{"foo": "bar"}}, - tags: []string{"tag"}, - }, - info: &beat.Info{ - Beat: "test", - EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), - Hostname: "test.host.name", - ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), - Name: "test.host.name", - Version: "0.1", - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - { - "agent": common.MapStr{ - "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", - "hostname": "test.host.name", - "id": "123e4567-e89b-12d3-a456-426655440001", - "type": "test", - "version": "0.1", - "foo": "bar", - }, - "value": "abc", "global": 1, "tags": []string{"tag"}, - }, - }, - includeAgentMetadata: true, - }, - }, - }, - { - name: "add agent metadata with custom host.name", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - info: &beat.Info{ - Beat: "test", - EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), - Hostname: "test.host.name", - ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), - Name: "other.test.host.name", - Version: "0.1", - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - { - "agent": common.MapStr{ - "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", - "hostname": "test.host.name", - "id": "123e4567-e89b-12d3-a456-426655440001", - "name": "other.test.host.name", - "type": "test", - "version": "0.1", - }, - "value": "abc", "global": 1, "tags": []string{"tag"}, - }, - }, - includeAgentMetadata: true, - }, - }, - }, - { - name: "beat local fields", - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "local": 1}}, - }, - }, - }, - { - name: "beat local and user global fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user global fields overwrite beat local fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1, "shared": "global"}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1, "shared": "local"}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "global": 1, "shared": "global", "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "beat local fields isolated", - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "local": 1}}, - }, - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 2}, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "local": 2}}, - }, - }, - }, - - { - name: "beat local fields + user global fields isolated", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, - }, - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 2}, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, - }, - }, - }, - { - name: "user local fields and tags", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - Tags: []string{"tag"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "fields": common.MapStr{"local": 1}, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user local fields (under root) and tags", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - FieldsUnderRoot: true, - Tags: []string{"tag"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user local fields overwrite user global fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0, "shared": "global"}, - tags: []string{"global"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1, "shared": "local"}, - FieldsUnderRoot: true, - Tags: []string{"local"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - { - "value": "abc", - "global": 0, "local": 1, "shared": "local", - "tags": []string{"global", "local"}, - }, - }, - }, - }, - }, - { - name: "user local fields isolated", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"local": 1}}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"local": 2}}}, - }, - }, - }, - { - name: "user local + global fields isolated", - global: pipelineProcessors{ - fields: common.MapStr{"fields": common.MapStr{"global": 0}}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"global": 0, "local": 1}}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"global": 0, "local": 2}}}, - }, - }, - }, - { - name: "user local + global fields isolated (fields with root)", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - FieldsUnderRoot: true, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - FieldsUnderRoot: true, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, - }, - }, - }, - } - - for _, test := range tests { - test := test - t.Run(test.name, func(t *testing.T) { - monitors := Monitors{ - Logger: logp.NewLogger("test processors"), - } - - // create processor pipelines - programs := make([]beat.Processor, len(test.local)) - info := defaultInfo - if test.info != nil { - info = *test.info - } - for i, local := range test.local { - local.config.SkipAgentMetadata = !local.includeAgentMetadata - programs[i] = newProcessorPipeline(info, monitors, test.global, local.config) - } - - // run processor pipelines in parallel - var ( - wg sync.WaitGroup - mux sync.Mutex - results = make([][]common.MapStr, len(programs)) - ) - for id, local := range test.local { - wg.Add(1) - id, program, local := id, programs[id], local - go func() { - defer wg.Done() - - actual := make([]common.MapStr, len(local.events)) - for i, event := range local.events { - out, _ := program.Run(&beat.Event{ - Timestamp: time.Now(), - Fields: event, - }) - actual[i] = out.Fields - } - - mux.Lock() - defer mux.Unlock() - results[id] = actual - }() - } - wg.Wait() - - // validate - for i, local := range test.local { - assert.Equal(t, local.expected, results[i]) - } - }) - } -} diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index 3ebce0351f1..902d303e94f 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" ) type config struct { @@ -58,13 +59,21 @@ func RunTests( return fmt.Errorf("unpacking config failed: %v", err) } + log := logp.L() + + processing, err := processing.MakeDefaultSupport(false)(info, log, cfg) + if err != nil { + return err + } + pipeline, err := pipeline.Load(info, pipeline.Monitors{ Metrics: nil, Telemetry: nil, - Logger: logp.L(), + Logger: log, }, config.Pipeline, + processing, func(stat outputs.Observer) (string, outputs.Group, error) { cfg := config.Output out, err := outputs.Load(nil, info, stat, cfg.Name(), cfg.Config()) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go new file mode 100644 index 00000000000..547593be95d --- /dev/null +++ b/libbeat/publisher/processing/default.go @@ -0,0 +1,325 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "fmt" + + "github.com/elastic/ecs/code/go/ecs" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/actions" +) + +// builder is used to create the event processing pipeline in Beats. The +// builder orders and merges global and local (per client) event annotation +// settings, with the configured event processors into one common event +// processor for use with the publisher pipeline. +// Also See: (*builder).Create +type builder struct { + info beat.Info + log *logp.Logger + + skipNormalize bool + + // global pipeline fields and tags configurations + modifiers []modifier + builtinMeta common.MapStr + fields common.MapStr + tags []string + + // global pipeline processors + processors *group + + drop bool // disabled is set if outputs have been disabled via CLI + alwaysCopy bool +} + +type modifier interface { + // BuiltinFields defines global fields to be added to every event. + BuiltinFields(beat.Info) common.MapStr + + // ClientFields defines connection local fields to be added to each event + // of a pipeline client. + ClientFields(beat.Info, beat.ProcessingConfig) common.MapStr +} + +type builtinModifier func(beat.Info) common.MapStr + +// MakeDefaultBeatSupport creates a new SupportFactory based on NewDefaultSupport. +// MakeDefaultBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields +// to each event. +func MakeDefaultBeatSupport(normalize bool) SupportFactory { + return MakeDefaultSupport(normalize, WithECS, WithHost, WithBeatMeta("agent")) +} + +// MakeDefaultObserverSupport creates a new SupportFactory based on NewDefaultSupport. +// MakeDefaultObserverSupport automatically adds the `ecs.version` and `observer.X` fields +// to each event. +func MakeDefaultObserverSupport(normalize bool) SupportFactory { + return MakeDefaultSupport(normalize, WithECS, WithBeatMeta("observer")) +} + +// MakeDefaultSupport creates a new SupportFactory for use with the publisher pipeline. +// If normalize is set, events will be normalized first before being presented +// to the actual processors. +// The Supporter will apply the global `fields`, `fields_under_root`, `tags` +// and `processor` settings to the event processing pipeline to be generated. +// Use WithFields, WithBeatMeta, and other to declare the builtin fields to be added +// to each event. Builtin fields can be modified using global `processors`, and `fields` only. +func MakeDefaultSupport( + normalize bool, + modifiers ...modifier, +) SupportFactory { + return func(info beat.Info, log *logp.Logger, beatCfg *common.Config) (Supporter, error) { + cfg := struct { + common.EventMetadata `config:",inline"` // Fields and tags to add to each event. + Processors processors.PluginConfig `config:"processors"` + }{} + if err := beatCfg.Unpack(&cfg); err != nil { + return nil, err + } + + processors, err := processors.New(cfg.Processors) + if err != nil { + return nil, fmt.Errorf("error initializing processors: %v", err) + } + + return newBuilder(info, log, processors, cfg.EventMetadata, modifiers, !normalize), nil + } +} + +// WithFields creates a modifier with the given default builtin fields. +func WithFields(fields common.MapStr) modifier { + return builtinModifier(func(_ beat.Info) common.MapStr { + return fields + }) +} + +// WithECS modifier adds `ecs.version` builtin fields to a processing pipeline. +var WithECS modifier = WithFields(common.MapStr{ + "ecs": common.MapStr{ + "version": ecs.Version, + }, +}) + +// WithHost modifier adds `host.name` builtin fields to a processing pipeline +var WithHost modifier = builtinModifier(func(info beat.Info) common.MapStr { + return common.MapStr{ + "host": common.MapStr{ + "name": info.Name, + }, + } +}) + +// WithBeatMeta adds beat meta information as builtin fields to a processing pipeline. +// The `key` parameter defines the field to be used. +func WithBeatMeta(key string) modifier { + return builtinModifier(func(info beat.Info) common.MapStr { + metadata := common.MapStr{ + "type": info.Beat, + "ephemeral_id": info.EphemeralID.String(), + "hostname": info.Hostname, + "id": info.ID.String(), + "version": info.Version, + } + if info.Name != info.Hostname { + metadata.Put("name", info.Name) + } + return common.MapStr{key: metadata} + }) +} + +func newBuilder( + info beat.Info, + log *logp.Logger, + processors *processors.Processors, + eventMeta common.EventMetadata, + modifiers []modifier, + skipNormalize bool, +) *builder { + b := &builder{ + skipNormalize: skipNormalize, + modifiers: modifiers, + log: log, + } + + hasProcessors := processors != nil && len(processors.List) > 0 + if hasProcessors { + tmp := newGroup("global", log) + for _, p := range processors.List { + tmp.add(p) + } + b.processors = tmp + } + + builtin := common.MapStr{} + for _, mod := range modifiers { + m := mod.BuiltinFields(info) + if len(m) > 0 { + builtin.DeepUpdate(m.Clone()) + } + } + if len(builtin) > 0 { + b.builtinMeta = builtin + } + + if fields := eventMeta.Fields; len(fields) > 0 { + b.fields = common.MapStr{} + common.MergeFields(b.fields, fields.Clone(), eventMeta.FieldsUnderRoot) + } + + if t := eventMeta.Tags; len(t) > 0 { + b.tags = t + } + + return b +} + +// Create combines the builder configuration with the client settings +// in order to build the event processing pipeline. +// +// Processing order (C=client, P=pipeline) +// 1. (P) generalize/normalize event +// 2. (C) add Meta from client Config to event.Meta +// 3. (C) add Fields from client config to event.Fields +// 4. (P) add pipeline fields + tags +// 5. (C) add client fields + tags +// 6. (C) client processors list +// 7. (P) add builtins +// 8. (P) pipeline processors list +// 9. (P) (if publish/debug enabled) log event +// 10. (P) (if output disabled) dropEvent +func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) { + var ( + // pipeline processors + processors = newGroup("processPipeline", b.log) + + // client fields and metadata + clientMeta = cfg.Meta + localProcessors = makeClientProcessors(b.log, cfg) + ) + + needsCopy := b.alwaysCopy || localProcessors != nil || b.processors != nil + + builtin := b.builtinMeta + var clientFields common.MapStr + for _, mod := range b.modifiers { + m := mod.ClientFields(b.info, cfg) + if len(m) > 0 { + if clientFields == nil { + clientFields = common.MapStr{} + } + clientFields.DeepUpdate(m.Clone()) + } + } + if len(clientFields) > 0 { + tmp := builtin.Clone() + tmp.DeepUpdate(clientFields) + builtin = tmp + } + + if !b.skipNormalize { + // setup 1: generalize/normalize output (P) + processors.add(generalizeProcessor) + } + + // setup 2: add Meta from client config (C) + if m := clientMeta; len(m) > 0 { + processors.add(clientEventMeta(m, needsCopy)) + } + + // setup 4, 5: pipeline tags + client tags + var tags []string + tags = append(tags, b.tags...) + tags = append(tags, cfg.EventMetadata.Tags...) + if len(tags) > 0 { + processors.add(actions.NewAddTags("tags", tags)) + } + + // setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata + fields := cfg.Fields.Clone() + fields.DeepUpdate(b.fields.Clone()) + if em := cfg.EventMetadata; len(em.Fields) > 0 { + common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) + } + + if len(fields) > 0 { + // Enforce a copy of fields if dynamic fields are configured or agent + // metadata will be merged into the fields. + // With dynamic fields potentially changing at any time, we need to copy, + // so we do not change shared structures be accident. + fieldsNeedsCopy := needsCopy || cfg.DynamicFields != nil || hasKeyAnyOf(fields, builtin) + processors.add(actions.NewAddFields(fields, fieldsNeedsCopy)) + } + + if cfg.DynamicFields != nil { + checkCopy := func(m common.MapStr) bool { + return needsCopy || hasKeyAnyOf(m, builtin) + } + processors.add(makeAddDynMetaProcessor("dynamicFields", cfg.DynamicFields, checkCopy)) + } + + // setup 5: client processor list + processors.add(localProcessors) + + // setup 6: add beats and host metadata + if meta := builtin; len(meta) > 0 { + processors.add(actions.NewAddFields(meta, needsCopy)) + } + + // setup 8: pipeline processors list + processors.add(b.processors) + + // setup 9: debug print final event (P) + if b.log.IsDebug() { + processors.add(debugPrintProcessor(b.info, b.log)) + } + + // setup 10: drop all events if outputs are disabled (P) + if drop { + processors.add(dropDisabledProcessor) + } + + return processors, nil +} + +func makeClientProcessors( + log *logp.Logger, + cfg beat.ProcessingConfig, +) processors.Processor { + procs := cfg.Processor + if procs == nil || len(procs.All()) == 0 { + return nil + } + + p := newGroup("client", log) + p.list = procs.All() + return p +} + +func (b builtinModifier) BuiltinFields(info beat.Info) common.MapStr { + return b(info) +} + +func (b builtinModifier) ClientFields(_ beat.Info, _ beat.ProcessingConfig) common.MapStr { + return nil +} diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go new file mode 100644 index 00000000000..5f49b2b3829 --- /dev/null +++ b/libbeat/publisher/processing/default_test.go @@ -0,0 +1,364 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "encoding/json" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors/actions" + "github.com/elastic/ecs/code/go/ecs" +) + +func TestProcessorsConfigs(t *testing.T) { + defaultInfo := beat.Info{ + Beat: "test", + EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), + Hostname: "test.host.name", + ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), + Name: "test.host.name", + Version: "0.1", + } + + ecsFields := common.MapStr{"version": ecs.Version} + + cases := map[string]struct { + factory SupportFactory + global string + local beat.ProcessingConfig + drop bool + event string + want common.MapStr + wantMeta common.MapStr + infoMod func(beat.Info) beat.Info + }{ + "user global fields and tags": { + global: "{fields: {global: 1}, fields_under_root: true, tags: [tag]}", + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": uint64(1), + "tags": []string{"tag"}, + }, + }, + "beat local fields": { + global: "", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": 1}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": 1, + }, + }, + "beat local and user global fields": { + global: "{fields: {global: 1}, fields_under_root: true, tags: [tag]}", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": 1}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": uint64(1), + "local": 1, + "tags": []string{"tag"}, + }, + }, + "user global fields overwrite beat local fields": { + global: "{fields: {global: a, shared: global}, fields_under_root: true}", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": "b", "shared": "local"}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": "b", + "global": "a", + "shared": "global", + }, + }, + "user local fields and tags": { + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": "a"}, + Tags: []string{"tag"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "fields": common.MapStr{ + "local": "a", + }, + "tags": []string{"tag"}, + }, + }, + "user local fields (under root) and tags": { + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": "a"}, + FieldsUnderRoot: true, + Tags: []string{"tag"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": "a", + "tags": []string{"tag"}, + }, + }, + "user local fields overwrite user global fields": { + global: `{fields: {global: a, shared: global}, fields_under_root: true, tags: [global]}`, + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{ + "local": "a", + "shared": "local", + }, + FieldsUnderRoot: true, + Tags: []string{"local"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": "a", + "local": "a", + "shared": "local", + "tags": []string{"global", "local"}, + }, + }, + "with client metadata": { + local: beat.ProcessingConfig{ + Meta: common.MapStr{"index": "test"}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{"value": "abc"}, + wantMeta: common.MapStr{"index": "test"}, + }, + "with client processor": { + local: beat.ProcessingConfig{ + Processor: func() beat.ProcessorList { + g := newGroup("test", logp.L()) + g.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true)) + return g + }(), + }, + event: `{"value": "abc"}`, + want: common.MapStr{"value": "abc", "custom": "value"}, + }, + "with beat default fields": { + factory: MakeDefaultBeatSupport(true), + global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + want: common.MapStr{ + "ecs": ecsFields, + "host": common.MapStr{ + "name": "test.host.name", + }, + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + "with beat default fields and custom name": { + factory: MakeDefaultBeatSupport(true), + global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + infoMod: func(info beat.Info) beat.Info { + info.Name = "other.test.host.name" + return info + }, + want: common.MapStr{ + "ecs": ecsFields, + "host": common.MapStr{ + "name": "other.test.host.name", + }, + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "name": "other.test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + "with observer default fields": { + factory: MakeDefaultObserverSupport(false), + global: `{fields: {global: a, observer.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + want: common.MapStr{ + "ecs": ecsFields, + "observer": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + + cfg, err := common.NewConfigWithYAML([]byte(test.global), "test") + require.NoError(t, err) + + info := defaultInfo + if test.infoMod != nil { + info = test.infoMod(info) + } + + factory := test.factory + if factory == nil { + factory = MakeDefaultSupport(true) + } + + support, err := factory(info, logp.L(), cfg) + require.NoError(t, err) + + prog, err := support.Create(test.local, test.drop) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{ + Timestamp: time.Now(), + Fields: fromJSON(test.event), + }) + require.NoError(t, err) + + // validate + assert.Equal(t, test.want, actual.Fields) + assert.Equal(t, test.wantMeta, actual.Meta) + }) + } +} + +func TestNormalization(t *testing.T) { + cases := map[string]struct { + normalize bool + in common.MapStr + mod common.MapStr + want common.MapStr + }{ + "no sharing if normalized": { + normalize: true, + in: common.MapStr{"a": "b"}, + mod: common.MapStr{"change": "x"}, + want: common.MapStr{"a": "b"}, + }, + "data sharing if not normalized": { + normalize: false, + in: common.MapStr{"a": "b"}, + mod: common.MapStr{"change": "x"}, + want: common.MapStr{"a": "b", "change": "x"}, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + + s, err := MakeDefaultSupport(test.normalize)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + prog, err := s.Create(beat.ProcessingConfig{}, false) + require.NoError(t, err) + + fields := test.in.Clone() + actual, err := prog.Run(&beat.Event{Fields: fields}) + require.NoError(t, err) + require.NotNil(t, actual) + + fields.DeepUpdate(test.mod) + assert.Equal(t, test.want, actual.Fields) + }) + } +} + +func TestAlwaysDrop(t *testing.T) { + s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + prog, err := s.Create(beat.ProcessingConfig{}, true) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{}) + require.NoError(t, err) + assert.Nil(t, actual) +} + +func TestDynamicFields(t *testing.T) { + factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + dynFields := common.NewMapStrPointer(common.MapStr{}) + prog, err := factory.Create(beat.ProcessingConfig{ + DynamicFields: &dynFields, + }, false) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{Fields: common.MapStr{"hello": "world"}}) + require.NoError(t, err) + assert.Equal(t, common.MapStr{"hello": "world"}, actual.Fields) + + dynFields.Set(common.MapStr{"dyn": "field"}) + actual, err = prog.Run(&beat.Event{Fields: common.MapStr{"hello": "world"}}) + require.NoError(t, err) + assert.Equal(t, common.MapStr{"hello": "world", "dyn": "field"}, actual.Fields) +} + +func fromJSON(in string) common.MapStr { + var tmp common.MapStr + err := json.Unmarshal([]byte(in), &tmp) + if err != nil { + panic(err) + } + return tmp +} diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go new file mode 100644 index 00000000000..ff1be24fc45 --- /dev/null +++ b/libbeat/publisher/processing/processing.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +// SupportFactory creates a new processing Supporter that can be used with +// the publisher pipeline. The factory gets the global configuration passed, +// in order to configure some shared global event processing. +type SupportFactory func(info beat.Info, log *logp.Logger, cfg *common.Config) (Supporter, error) + +// Supporter is used to create an event processing pipeline. It is used by the +// publisher pipeline when a client connects to the pipeline. The supporter +// will merge the global and local configurations into a common event +// processor. +// If `drop` is set, then the processor generated must always drop all events. +type Supporter interface { + Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) +} diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go new file mode 100644 index 00000000000..38a77bdd948 --- /dev/null +++ b/libbeat/publisher/processing/processors.go @@ -0,0 +1,200 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "fmt" + "strings" + "sync" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/codec/json" + "github.com/elastic/beats/libbeat/processors" +) + +type group struct { + log *logp.Logger + title string + list []beat.Processor +} + +type processorFn struct { + name string + fn func(event *beat.Event) (*beat.Event, error) +} + +var generalizeProcessor = newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) { + // Filter out empty events. Empty events are still reported by ACK callbacks. + if len(event.Fields) == 0 { + return nil, nil + } + + fields := common.ConvertToGenericEvent(event.Fields) + if fields == nil { + logp.Err("fail to convert to generic event") + return nil, nil + } + + event.Fields = fields + return event, nil +}) + +var dropDisabledProcessor = newProcessor("dropDisabled", func(event *beat.Event) (*beat.Event, error) { + return nil, nil +}) + +func newGroup(title string, log *logp.Logger) *group { + return &group{ + title: title, + log: log, + } +} + +func (p *group) add(processor processors.Processor) { + if processor != nil { + p.list = append(p.list, processor) + } +} + +func (p *group) String() string { + var s []string + for _, p := range p.list { + s = append(s, p.String()) + } + + str := strings.Join(s, ", ") + if p.title == "" { + return str + } + return fmt.Sprintf("%v{%v}", p.title, str) +} + +func (p *group) All() []beat.Processor { + return p.list +} + +func (p *group) Run(event *beat.Event) (*beat.Event, error) { + if p == nil || len(p.list) == 0 { + return event, nil + } + + for _, sub := range p.list { + var err error + + event, err = sub.Run(event) + if err != nil { + // XXX: We don't drop the event, but continue filtering here if the most + // recent processor did return an event. + // We want processors having this kind of implicit behavior + // on errors? + + p.log.Debugf("Fail to apply processor %s: %s", p, err) + } + + if event == nil { + return nil, err + } + } + + return event, nil +} + +func newProcessor(name string, fn func(*beat.Event) (*beat.Event, error)) *processorFn { + return &processorFn{name: name, fn: fn} +} + +func newAnnotateProcessor(name string, fn func(*beat.Event)) *processorFn { + return newProcessor(name, func(event *beat.Event) (*beat.Event, error) { + fn(event) + return event, nil + }) +} + +func (p *processorFn) String() string { return p.name } +func (p *processorFn) Run(e *beat.Event) (*beat.Event, error) { return p.fn(e) } + +func clientEventMeta(meta common.MapStr, needsCopy bool) *processorFn { + fn := func(event *beat.Event) { addMeta(event, meta) } + if needsCopy { + fn = func(event *beat.Event) { addMeta(event, meta.Clone()) } + } + return newAnnotateProcessor("@metadata", fn) +} + +func addMeta(event *beat.Event, meta common.MapStr) { + if event.Meta == nil { + event.Meta = meta + } else { + event.Meta.Clone() + event.Meta.DeepUpdate(meta) + } +} + +func makeAddDynMetaProcessor( + name string, + meta *common.MapStrPointer, + checkCopy func(m common.MapStr) bool, +) *processorFn { + return newAnnotateProcessor(name, func(event *beat.Event) { + dynFields := meta.Get() + if checkCopy(dynFields) { + dynFields = dynFields.Clone() + } + + event.Fields.DeepUpdate(dynFields) + }) +} + +func debugPrintProcessor(info beat.Info, log *logp.Logger) *processorFn { + // ensure only one go-routine is using the encoder (in case + // beat.Client is shared between multiple go-routines by accident) + var mux sync.Mutex + + encoder := json.New(info.Version, json.Config{ + Pretty: true, + EscapeHTML: false, + }) + return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) { + mux.Lock() + defer mux.Unlock() + + b, err := encoder.Encode(info.Beat, event) + if err != nil { + return event, nil + } + + log.Debugf("Publish event: %s", b) + return event, nil + }) +} + +func hasKey(m common.MapStr, key string) bool { + _, exists := m[key] + return exists +} + +func hasKeyAnyOf(m, builtin common.MapStr) bool { + for k := range builtin { + if hasKey(m, k) { + return true + } + } + return false +} diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 2da153d98ea..d00e524a555 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -58,8 +58,10 @@ func NewConnector(pipeline beat.Pipeline, c *common.Config, dynFields *common.Ma func (c *Connector) Connect() (beat.Client, error) { return c.pipeline.ConnectWith(beat.ClientConfig{ - EventMetadata: c.eventMeta, - Processor: c.processors, - DynamicFields: c.dynamicFields, + Processing: beat.ProcessingConfig{ + EventMetadata: c.eventMeta, + Processor: c.processors, + DynamicFields: c.dynamicFields, + }, }) } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index becc53cf65b..a4e812ad66e 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -178,8 +178,10 @@ func (pb *packetbeat) setupFlows() error { } client, err := pb.pipeline.ConnectWith(beat.ClientConfig{ - EventMetadata: config.Flows.EventMetadata, - Processor: processors, + Processing: beat.ProcessingConfig{ + EventMetadata: config.Flows.EventMetadata, + Processor: processors, + }, }) if err != nil { return err diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index 0b34715e1b9..db264852036 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -97,8 +97,10 @@ func (p *TransactionPublisher) CreateReporter( } clientConfig := beat.ClientConfig{ - EventMetadata: meta.Event, - Processor: processors, + Processing: beat.ProcessingConfig{ + EventMetadata: meta.Event, + Processor: processors, + }, } if p.canDrop { clientConfig.PublishMode = beat.DropIfFull diff --git a/winlogbeat/beater/eventlogger.go b/winlogbeat/beater/eventlogger.go index 59c9ecb1f90..0eb9ec8c13b 100644 --- a/winlogbeat/beater/eventlogger.go +++ b/winlogbeat/beater/eventlogger.go @@ -64,10 +64,12 @@ func newEventLogger( func (e *eventLogger) connect(pipeline beat.Pipeline) (beat.Client, error) { api := e.source.Name() return pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: e.eventMeta, - Meta: nil, // TODO: configure modules/ES ingest pipeline? - Processor: e.processors, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: e.eventMeta, + Meta: nil, // TODO: configure modules/ES ingest pipeline? + Processor: e.processors, + }, ACKCount: func(n int) { addPublished(api, n) logp.Info("EventLog[%s] successfully published %d events", api, n) diff --git a/x-pack/functionbeat/beater/functionbeat.go b/x-pack/functionbeat/beater/functionbeat.go index 61b61cf0344..b62e8cd704d 100644 --- a/x-pack/functionbeat/beater/functionbeat.go +++ b/x-pack/functionbeat/beater/functionbeat.go @@ -143,9 +143,11 @@ func makeClientFactory(log *logp.Logger, manager *licenser.Manager, pipeline bea } client, err := core.NewSyncClient(log, pipeline, beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - Processor: processors, - EventMetadata: c.EventMetadata, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + Processor: processors, + EventMetadata: c.EventMetadata, + }, }) if err != nil {