diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index c938aafe8bd..c295d1a4056 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -125,13 +125,15 @@ func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *c } client, err := p.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: config.EventMetadata, - DynamicFields: dynFields, - Meta: meta, - Fields: fields, - Processor: processors, - Events: f.eventer, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: config.EventMetadata, + DynamicFields: dynFields, + Meta: meta, + Fields: fields, + Processor: processors, + }, + Events: f.eventer, }) if err != nil { return nil, err diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 3ee62eff235..43e4a8056fa 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -139,9 +139,11 @@ func (t *configuredJob) Start() { } t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{ - EventMetadata: t.config.EventMetadata, - Processor: t.processors, - Fields: fields, + Processing: beat.ProcessingConfig{ + EventMetadata: t.config.EventMetadata, + Processor: t.processors, + Fields: fields, + }, }) if err != nil { logp.Err("could not start monitor: %v", err) diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 70d9bcb6874..7f1941271dc 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -125,10 +125,12 @@ func New( func (i *Input) Run() { var err error i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: i.eventMeta, - Meta: nil, - Processor: i.processors, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: i.eventMeta, + Meta: nil, + Processor: i.processors, + }, ACKCount: func(n int) { i.logger.Infof("journalbeat successfully published %d events", n) }, diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 3db7baae6d2..5743e1cc92a 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -46,22 +46,7 @@ type Client interface { type ClientConfig struct { PublishMode PublishMode - // EventMetadata configures additional fields/tags to be added to published events. - EventMetadata common.EventMetadata - - // Meta provides additional meta data to be added to the Meta field in the beat.Event - // structure. - Meta common.MapStr - - // Fields provides additional 'global' fields to be added to every event - Fields common.MapStr - - // DynamicFields provides additional fields to be added to every event, supporting live updates - DynamicFields *common.MapStrPointer - - // Processors passes additional processor to the client, to be executed before - // the pipeline processors. - Processor ProcessorList + Processing ProcessingConfig // WaitClose sets the maximum duration to wait on ACK, if client still has events // active non-acknowledged events in the publisher pipeline. @@ -72,14 +57,6 @@ type ClientConfig struct { // Events configures callbacks for common client callbacks Events ClientEventer - // By default events are normalized within processor pipeline, - // if the normalization step should be skipped set this to true. - SkipNormalization bool - - // By default events are decorated with agent metadata. - // To skip adding that metadata set this to true. - SkipAgentMetadata bool - // ACK handler strategies. // Note: ack handlers are run in another go-routine owned by the publisher pipeline. // They should not block for to long, to not block the internal buffers for @@ -101,6 +78,31 @@ type ClientConfig struct { ACKLastEvent func(interface{}) } +// ProcessingConfig provides additional event processing settings a client can +// pass to the publisher pipeline on Connect. +type ProcessingConfig struct { + // EventMetadata configures additional fields/tags to be added to published events. + EventMetadata common.EventMetadata + + // Meta provides additional meta data to be added to the Meta field in the beat.Event + // structure. + Meta common.MapStr + + // Fields provides additional 'global' fields to be added to every event + Fields common.MapStr + + // DynamicFields provides additional fields to be added to every event, supporting live updates + DynamicFields *common.MapStrPointer + + // Processors passes additional processor to the client, to be executed before + // the pipeline processors. + Processor ProcessorList + + // Private contains additional information to be passed to the processing + // pipeline builder. + Private interface{} +} + // ClientEventer provides access to internal client events. type ClientEventer interface { Closing() // Closing indicates the client is being shutdown next diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 3ab2507486e..520623a235d 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -62,6 +62,7 @@ import ( "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/plugin" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" svc "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/libbeat/version" sysinfo "github.com/elastic/go-sysinfo" @@ -78,6 +79,8 @@ type Beat struct { keystore keystore.Keystore index idxmgmt.Supporter + + processing processing.Supporter } type beatConfig struct { @@ -310,6 +313,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { Logger: logp.L().Named("publisher"), }, b.Config.Pipeline, + b.processing, b.makeOutputFactory(b.Config.Output), ) @@ -593,6 +597,16 @@ func (b *Beat) configure(settings Settings) error { imFactory = idxmgmt.MakeDefaultSupport(settings.ILM) } b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig) + if err != nil { + return err + } + + processingFactory := settings.Processing + if processingFactory == nil { + processingFactory = processing.MakeDefaultBeatSupport(true) + } + b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) + return err } diff --git a/libbeat/cmd/instance/settings.go b/libbeat/cmd/instance/settings.go index 9b2bfd34a05..765206fac8a 100644 --- a/libbeat/cmd/instance/settings.go +++ b/libbeat/cmd/instance/settings.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/idxmgmt/ilm" "github.com/elastic/beats/libbeat/monitoring/report" + "github.com/elastic/beats/libbeat/publisher/processing" ) // Settings contains basic settings for any beat to pass into GenRootCmd @@ -40,4 +41,6 @@ type Settings struct { // load custom index manager. The config object will be the Beats root configuration. IndexManagement idxmgmt.SupportFactory ILM ilm.SupportFactory + + Processing processing.SupportFactory } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index c53d6263962..46bd2104f5c 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/outputs/transport" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" "github.com/elastic/beats/libbeat/publisher/queue/memqueue" ) @@ -169,11 +170,16 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) outClient := outputs.NewFailoverClient(clients) outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max) + processing, err := processing.MakeDefaultSupport(true)(beat, log, common.NewConfig()) + if err != nil { + return nil, err + } + pipeline, err := pipeline.New( beat, pipeline.Monitors{ Metrics: monitoring, - Logger: logp.NewLogger(selector), + Logger: log, }, queueFactory, outputs.Group{ @@ -184,6 +190,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) pipeline.Settings{ WaitClose: 0, WaitCloseMode: pipeline.NoWaitOnClose, + Processors: processing, }) if err != nil { return nil, err diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 2c854c13676..8b5a66c63c6 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" ) @@ -60,6 +60,7 @@ func Load( beatInfo beat.Info, monitors Monitors, config Config, + processors processing.Supporter, makeOutput func(outputs.Observer) (string, outputs.Group, error), ) (*Pipeline, error) { log := monitors.Logger @@ -71,28 +72,11 @@ func Load( log.Info("Dry run mode. All output types except the file based one are disabled.") } - processors, err := processors.New(config.Processors) - if err != nil { - return nil, fmt.Errorf("error initializing processors: %v", err) - } - name := beatInfo.Name settings := Settings{ WaitClose: 0, WaitCloseMode: NoWaitOnClose, - Disabled: publishDisabled, Processors: processors, - Annotations: Annotations{ - Event: config.EventMetadata, - Builtin: common.MapStr{ - "host": common.MapStr{ - "name": name, - }, - "ecs": common.MapStr{ - "version": "1.0.0-beta2", - }, - }, - }, } queueBuilder, err := createQueueBuilder(config.Queue, monitors) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index fbe49510e0d..e8289e28ba5 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -31,8 +31,8 @@ import ( "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" ) @@ -73,21 +73,7 @@ type Pipeline struct { ackBuilder ackBuilder eventSema *sema - processors pipelineProcessors -} - -type pipelineProcessors struct { - // The pipeline its processor settings for - // constructing the clients complete processor - // pipeline on connect. - builtinMeta common.MapStr - fields common.MapStr - tags []string - - processors beat.Processor - - disabled bool // disabled is set if outputs have been disabled via CLI - alwaysCopy bool + processors processing.Supporter } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -98,19 +84,7 @@ type Settings struct { WaitCloseMode WaitCloseMode - Annotations Annotations - Processors *processors.Processors - - Disabled bool -} - -// Annotations configures additional metadata to be adde to every single event -// being published. The meta data will be added before executing the configured -// processors, so all processors configured with the pipeline or client will see -// the same/complete event. -type Annotations struct { - Event common.EventMetadata - Builtin common.MapStr + Processors processing.Supporter } // WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline. @@ -172,17 +146,13 @@ func New( monitors.Logger = logp.NewLogger("publish") } - annotations := settings.Annotations - processors := settings.Processors - log := monitors.Logger - disabledOutput := settings.Disabled p := &Pipeline{ beatInfo: beat, monitors: monitors, observer: nilObserver, waitCloseMode: settings.WaitCloseMode, waitCloseTimeout: settings.WaitClose, - processors: makePipelineProcessors(log, annotations, processors, disabledOutput), + processors: settings.Processors, } p.ackBuilder = &pipelineEmptyACK{p} p.ackActive = atomic.MakeBool(true) @@ -347,7 +317,10 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } } - processors := newProcessorPipeline(p.beatInfo, p.monitors, p.processors, cfg) + processors, err := p.createEventProcessing(cfg.Processing, publishDisabled) + if err != nil { + return nil, err + } acker := p.makeACKer(processors != nil, &cfg, waitClose) producerCfg := queue.ProducerConfig{ // Cancel events from queue if acker is configured @@ -389,6 +362,13 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return client, nil } +func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) { + if p.processors == nil { + return nil, nil + } + return p.processors.Create(cfg, noPublish) +} + func (e *pipelineEventer) OnACK(n int) { e.observer.queueACKed(n) @@ -414,42 +394,6 @@ func (e *waitCloser) wait() { e.events.Wait() } -func makePipelineProcessors( - log *logp.Logger, - annotations Annotations, - processors *processors.Processors, - disabled bool, -) pipelineProcessors { - p := pipelineProcessors{ - disabled: disabled, - } - - hasProcessors := processors != nil && len(processors.List) > 0 - if hasProcessors { - tmp := newProgram("global", log) - for _, p := range processors.List { - tmp.add(p) - } - p.processors = tmp - } - - if meta := annotations.Builtin; meta != nil { - p.builtinMeta = meta - } - - if em := annotations.Event; len(em.Fields) > 0 { - fields := common.MapStr{} - common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) - p.fields = fields - } - - if t := annotations.Event.Tags; len(t) > 0 { - p.tags = t - } - - return p -} - // OutputReloader returns a reloadable object for the output section of this pipeline func (p *Pipeline) OutputReloader() OutputReloader { return p.output diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go deleted file mode 100644 index 81834f7bdff..00000000000 --- a/libbeat/publisher/pipeline/processor.go +++ /dev/null @@ -1,327 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "fmt" - "strings" - "sync" - - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/outputs/codec/json" - "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/processors/actions" -) - -type program struct { - log *logp.Logger - title string - list []beat.Processor -} - -type processorFn struct { - name string - fn func(event *beat.Event) (*beat.Event, error) -} - -// newProcessorPipeline prepares the processor pipeline, merging -// post processing, event annotations and actual configured processors. -// The pipeline generated ensure the client and pipeline processors -// will see the complete events with all meta data applied. -// -// Pipeline (C=client, P=pipeline) -// -// 1. (P) generalize/normalize event -// 2. (C) add Meta from client Config to event.Meta -// 3. (C) add Fields from client config to event.Fields -// 4. (P) add pipeline fields + tags -// 5. (C) add client fields + tags -// 6. (C) client processors list -// 7. (P) add beats metadata -// 8. (P) pipeline processors list -// 9. (P) (if publish/debug enabled) log event -// 10. (P) (if output disabled) dropEvent -func newProcessorPipeline( - info beat.Info, - monitors Monitors, - global pipelineProcessors, - config beat.ClientConfig, -) beat.Processor { - var ( - // pipeline processors - processors = &program{ - title: "processPipeline", - log: monitors.Logger, - } - - // client fields and metadata - clientMeta = config.Meta - localProcessors = makeClientProcessors(monitors, config) - ) - - needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil - - if !config.SkipNormalization { - // setup 1: generalize/normalize output (P) - processors.add(generalizeProcessor) - } - - // setup 2: add Meta from client config (C) - if m := clientMeta; len(m) > 0 { - processors.add(clientEventMeta(m, needsCopy)) - } - - // setup 4, 5: pipeline tags + client tags - var tags []string - tags = append(tags, global.tags...) - tags = append(tags, config.EventMetadata.Tags...) - if len(tags) > 0 { - processors.add(actions.NewAddTags("tags", tags)) - } - - // setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata - fields := config.Fields.Clone() - fields.DeepUpdate(global.fields.Clone()) - if em := config.EventMetadata; len(em.Fields) > 0 { - common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) - } - - if len(fields) > 0 { - // Enforce a copy of fields if dynamic fields are configured or agent - // metadata will be merged into the fields. - // With dynamic fields potentially changing at any time, we need to copy, - // so we do not change shared structures be accident. - fieldsNeedsCopy := needsCopy || config.DynamicFields != nil || fields["agent"] != nil - processors.add(actions.NewAddFields(fields, fieldsNeedsCopy)) - } - - if config.DynamicFields != nil { - checkCopy := func(m common.MapStr) bool { - return needsCopy || hasKey(m, "agent") - } - processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, checkCopy)) - } - - // setup 5: client processor list - processors.add(localProcessors) - - // setup 6: add beats and host metadata - if meta := global.builtinMeta; len(meta) > 0 { - processors.add(actions.NewAddFields(meta, needsCopy)) - } - - // setup 7: add agent metadata - if !config.SkipAgentMetadata { - needsCopy := global.alwaysCopy || global.processors != nil - processors.add(actions.NewAddFields(createAgentFields(info), needsCopy)) - } - - // setup 8: pipeline processors list - processors.add(global.processors) - - // setup 9: debug print final event (P) - if logp.IsDebug("publish") { - processors.add(debugPrintProcessor(info, monitors)) - } - - // setup 10: drop all events if outputs are disabled (P) - if global.disabled { - processors.add(dropDisabledProcessor) - } - - return processors -} - -func newProgram(title string, log *logp.Logger) *program { - return &program{ - title: title, - log: log, - } -} - -func (p *program) add(processor processors.Processor) { - if processor != nil { - p.list = append(p.list, processor) - } -} - -func (p *program) String() string { - var s []string - for _, p := range p.list { - s = append(s, p.String()) - } - - str := strings.Join(s, ", ") - if p.title == "" { - return str - } - return fmt.Sprintf("%v{%v}", p.title, str) -} - -func (p *program) Run(event *beat.Event) (*beat.Event, error) { - if p == nil || len(p.list) == 0 { - return event, nil - } - - for _, sub := range p.list { - var err error - - event, err = sub.Run(event) - if err != nil { - // XXX: We don't drop the event, but continue filtering here iff the most - // recent processor did return an event. - // We want processors having this kind of implicit behavior - // on errors? - - p.log.Debugf("Fail to apply processor %s: %s", p, err) - } - - if event == nil { - return nil, err - } - } - - return event, nil -} - -func newProcessor(name string, fn func(*beat.Event) (*beat.Event, error)) *processorFn { - return &processorFn{name: name, fn: fn} -} - -func newAnnotateProcessor(name string, fn func(*beat.Event)) *processorFn { - return newProcessor(name, func(event *beat.Event) (*beat.Event, error) { - fn(event) - return event, nil - }) -} - -func (p *processorFn) String() string { return p.name } -func (p *processorFn) Run(e *beat.Event) (*beat.Event, error) { return p.fn(e) } - -var generalizeProcessor = newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) { - - // Filter out empty events. Empty events are still reported by ACK callbacks. - if len(event.Fields) == 0 { - return nil, nil - } - - fields := common.ConvertToGenericEvent(event.Fields) - if fields == nil { - logp.Err("fail to convert to generic event") - return nil, nil - } - - event.Fields = fields - return event, nil -}) - -var dropDisabledProcessor = newProcessor("dropDisabled", func(event *beat.Event) (*beat.Event, error) { - return nil, nil -}) - -func clientEventMeta(meta common.MapStr, needsCopy bool) *processorFn { - fn := func(event *beat.Event) { addMeta(event, meta) } - if needsCopy { - fn = func(event *beat.Event) { addMeta(event, meta.Clone()) } - } - return newAnnotateProcessor("@metadata", fn) -} - -func addMeta(event *beat.Event, meta common.MapStr) { - if event.Meta == nil { - event.Meta = meta - } else { - event.Meta.Clone() - event.Meta.DeepUpdate(meta) - } -} - -func makeAddDynMetaProcessor( - name string, - meta *common.MapStrPointer, - checkCopy func(m common.MapStr) bool, -) *processorFn { - return newAnnotateProcessor(name, func(event *beat.Event) { - dynFields := meta.Get() - if checkCopy(dynFields) { - dynFields = dynFields.Clone() - } - - event.Fields.DeepUpdate(dynFields) - }) -} - -func createAgentFields(info beat.Info) common.MapStr { - metadata := common.MapStr{ - "type": info.Beat, - "ephemeral_id": info.EphemeralID.String(), - "hostname": info.Hostname, - "id": info.ID.String(), - "version": info.Version, - } - if info.Name != info.Hostname { - metadata.Put("name", info.Name) - } - - return common.MapStr{"agent": metadata} -} - -func debugPrintProcessor(info beat.Info, monitors Monitors) *processorFn { - // ensure only one go-routine is using the encoder (in case - // beat.Client is shared between multiple go-routines by accident) - var mux sync.Mutex - - encoder := json.New(info.Version, json.Config{ - Pretty: true, - EscapeHTML: false, - }) - log := monitors.Logger - return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) { - mux.Lock() - defer mux.Unlock() - - b, err := encoder.Encode(info.Beat, event) - if err != nil { - return event, nil - } - - log.Debugf("Publish event: %s", b) - return event, nil - }) -} - -func makeClientProcessors( - monitors Monitors, - config beat.ClientConfig, -) processors.Processor { - procs := config.Processor - if procs == nil || len(procs.All()) == 0 { - return nil - } - - p := newProgram("client", monitors.Logger) - p.list = procs.All() - return p -} - -func hasKey(m common.MapStr, key string) bool { - _, exists := m[key] - return exists -} diff --git a/libbeat/publisher/pipeline/processor_test.go b/libbeat/publisher/pipeline/processor_test.go deleted file mode 100644 index 07b643d0c81..00000000000 --- a/libbeat/publisher/pipeline/processor_test.go +++ /dev/null @@ -1,433 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "sync" - "testing" - "time" - - "github.com/gofrs/uuid" - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) - -func TestProcessors(t *testing.T) { - defaultInfo := beat.Info{} - - type local struct { - config beat.ClientConfig - events []common.MapStr - expected []common.MapStr - includeAgentMetadata bool - } - - tests := []struct { - name string - global pipelineProcessors - local []local - info *beat.Info - }{ - { - name: "user global fields and tags", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - {"value": "abc", "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "no normalization", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{SkipNormalization: true}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - {"value": "abc", "user": nil, "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "add agent metadata", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1, "agent": common.MapStr{"foo": "bar"}}, - tags: []string{"tag"}, - }, - info: &beat.Info{ - Beat: "test", - EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), - Hostname: "test.host.name", - ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), - Name: "test.host.name", - Version: "0.1", - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - { - "agent": common.MapStr{ - "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", - "hostname": "test.host.name", - "id": "123e4567-e89b-12d3-a456-426655440001", - "type": "test", - "version": "0.1", - "foo": "bar", - }, - "value": "abc", "global": 1, "tags": []string{"tag"}, - }, - }, - includeAgentMetadata: true, - }, - }, - }, - { - name: "add agent metadata with custom host.name", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - info: &beat.Info{ - Beat: "test", - EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), - Hostname: "test.host.name", - ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), - Name: "other.test.host.name", - Version: "0.1", - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - { - "agent": common.MapStr{ - "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", - "hostname": "test.host.name", - "id": "123e4567-e89b-12d3-a456-426655440001", - "name": "other.test.host.name", - "type": "test", - "version": "0.1", - }, - "value": "abc", "global": 1, "tags": []string{"tag"}, - }, - }, - includeAgentMetadata: true, - }, - }, - }, - { - name: "beat local fields", - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "local": 1}}, - }, - }, - }, - { - name: "beat local and user global fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user global fields overwrite beat local fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1, "shared": "global"}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1, "shared": "local"}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "global": 1, "shared": "global", "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "beat local fields isolated", - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "local": 1}}, - }, - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 2}, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "local": 2}}, - }, - }, - }, - - { - name: "beat local fields + user global fields isolated", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, - }, - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 2}, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, - }, - }, - }, - { - name: "user local fields and tags", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - Tags: []string{"tag"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "fields": common.MapStr{"local": 1}, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user local fields (under root) and tags", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - FieldsUnderRoot: true, - Tags: []string{"tag"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user local fields overwrite user global fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0, "shared": "global"}, - tags: []string{"global"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1, "shared": "local"}, - FieldsUnderRoot: true, - Tags: []string{"local"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - { - "value": "abc", - "global": 0, "local": 1, "shared": "local", - "tags": []string{"global", "local"}, - }, - }, - }, - }, - }, - { - name: "user local fields isolated", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"local": 1}}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"local": 2}}}, - }, - }, - }, - { - name: "user local + global fields isolated", - global: pipelineProcessors{ - fields: common.MapStr{"fields": common.MapStr{"global": 0}}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"global": 0, "local": 1}}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"global": 0, "local": 2}}}, - }, - }, - }, - { - name: "user local + global fields isolated (fields with root)", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - FieldsUnderRoot: true, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - FieldsUnderRoot: true, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, - }, - }, - }, - } - - for _, test := range tests { - test := test - t.Run(test.name, func(t *testing.T) { - monitors := Monitors{ - Logger: logp.NewLogger("test processors"), - } - - // create processor pipelines - programs := make([]beat.Processor, len(test.local)) - info := defaultInfo - if test.info != nil { - info = *test.info - } - for i, local := range test.local { - local.config.SkipAgentMetadata = !local.includeAgentMetadata - programs[i] = newProcessorPipeline(info, monitors, test.global, local.config) - } - - // run processor pipelines in parallel - var ( - wg sync.WaitGroup - mux sync.Mutex - results = make([][]common.MapStr, len(programs)) - ) - for id, local := range test.local { - wg.Add(1) - id, program, local := id, programs[id], local - go func() { - defer wg.Done() - - actual := make([]common.MapStr, len(local.events)) - for i, event := range local.events { - out, _ := program.Run(&beat.Event{ - Timestamp: time.Now(), - Fields: event, - }) - actual[i] = out.Fields - } - - mux.Lock() - defer mux.Unlock() - results[id] = actual - }() - } - wg.Wait() - - // validate - for i, local := range test.local { - assert.Equal(t, local.expected, results[i]) - } - }) - } -} diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index 3ebce0351f1..902d303e94f 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" ) type config struct { @@ -58,13 +59,21 @@ func RunTests( return fmt.Errorf("unpacking config failed: %v", err) } + log := logp.L() + + processing, err := processing.MakeDefaultSupport(false)(info, log, cfg) + if err != nil { + return err + } + pipeline, err := pipeline.Load(info, pipeline.Monitors{ Metrics: nil, Telemetry: nil, - Logger: logp.L(), + Logger: log, }, config.Pipeline, + processing, func(stat outputs.Observer) (string, outputs.Group, error) { cfg := config.Output out, err := outputs.Load(nil, info, stat, cfg.Name(), cfg.Config()) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go new file mode 100644 index 00000000000..547593be95d --- /dev/null +++ b/libbeat/publisher/processing/default.go @@ -0,0 +1,325 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "fmt" + + "github.com/elastic/ecs/code/go/ecs" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/actions" +) + +// builder is used to create the event processing pipeline in Beats. The +// builder orders and merges global and local (per client) event annotation +// settings, with the configured event processors into one common event +// processor for use with the publisher pipeline. +// Also See: (*builder).Create +type builder struct { + info beat.Info + log *logp.Logger + + skipNormalize bool + + // global pipeline fields and tags configurations + modifiers []modifier + builtinMeta common.MapStr + fields common.MapStr + tags []string + + // global pipeline processors + processors *group + + drop bool // disabled is set if outputs have been disabled via CLI + alwaysCopy bool +} + +type modifier interface { + // BuiltinFields defines global fields to be added to every event. + BuiltinFields(beat.Info) common.MapStr + + // ClientFields defines connection local fields to be added to each event + // of a pipeline client. + ClientFields(beat.Info, beat.ProcessingConfig) common.MapStr +} + +type builtinModifier func(beat.Info) common.MapStr + +// MakeDefaultBeatSupport creates a new SupportFactory based on NewDefaultSupport. +// MakeDefaultBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields +// to each event. +func MakeDefaultBeatSupport(normalize bool) SupportFactory { + return MakeDefaultSupport(normalize, WithECS, WithHost, WithBeatMeta("agent")) +} + +// MakeDefaultObserverSupport creates a new SupportFactory based on NewDefaultSupport. +// MakeDefaultObserverSupport automatically adds the `ecs.version` and `observer.X` fields +// to each event. +func MakeDefaultObserverSupport(normalize bool) SupportFactory { + return MakeDefaultSupport(normalize, WithECS, WithBeatMeta("observer")) +} + +// MakeDefaultSupport creates a new SupportFactory for use with the publisher pipeline. +// If normalize is set, events will be normalized first before being presented +// to the actual processors. +// The Supporter will apply the global `fields`, `fields_under_root`, `tags` +// and `processor` settings to the event processing pipeline to be generated. +// Use WithFields, WithBeatMeta, and other to declare the builtin fields to be added +// to each event. Builtin fields can be modified using global `processors`, and `fields` only. +func MakeDefaultSupport( + normalize bool, + modifiers ...modifier, +) SupportFactory { + return func(info beat.Info, log *logp.Logger, beatCfg *common.Config) (Supporter, error) { + cfg := struct { + common.EventMetadata `config:",inline"` // Fields and tags to add to each event. + Processors processors.PluginConfig `config:"processors"` + }{} + if err := beatCfg.Unpack(&cfg); err != nil { + return nil, err + } + + processors, err := processors.New(cfg.Processors) + if err != nil { + return nil, fmt.Errorf("error initializing processors: %v", err) + } + + return newBuilder(info, log, processors, cfg.EventMetadata, modifiers, !normalize), nil + } +} + +// WithFields creates a modifier with the given default builtin fields. +func WithFields(fields common.MapStr) modifier { + return builtinModifier(func(_ beat.Info) common.MapStr { + return fields + }) +} + +// WithECS modifier adds `ecs.version` builtin fields to a processing pipeline. +var WithECS modifier = WithFields(common.MapStr{ + "ecs": common.MapStr{ + "version": ecs.Version, + }, +}) + +// WithHost modifier adds `host.name` builtin fields to a processing pipeline +var WithHost modifier = builtinModifier(func(info beat.Info) common.MapStr { + return common.MapStr{ + "host": common.MapStr{ + "name": info.Name, + }, + } +}) + +// WithBeatMeta adds beat meta information as builtin fields to a processing pipeline. +// The `key` parameter defines the field to be used. +func WithBeatMeta(key string) modifier { + return builtinModifier(func(info beat.Info) common.MapStr { + metadata := common.MapStr{ + "type": info.Beat, + "ephemeral_id": info.EphemeralID.String(), + "hostname": info.Hostname, + "id": info.ID.String(), + "version": info.Version, + } + if info.Name != info.Hostname { + metadata.Put("name", info.Name) + } + return common.MapStr{key: metadata} + }) +} + +func newBuilder( + info beat.Info, + log *logp.Logger, + processors *processors.Processors, + eventMeta common.EventMetadata, + modifiers []modifier, + skipNormalize bool, +) *builder { + b := &builder{ + skipNormalize: skipNormalize, + modifiers: modifiers, + log: log, + } + + hasProcessors := processors != nil && len(processors.List) > 0 + if hasProcessors { + tmp := newGroup("global", log) + for _, p := range processors.List { + tmp.add(p) + } + b.processors = tmp + } + + builtin := common.MapStr{} + for _, mod := range modifiers { + m := mod.BuiltinFields(info) + if len(m) > 0 { + builtin.DeepUpdate(m.Clone()) + } + } + if len(builtin) > 0 { + b.builtinMeta = builtin + } + + if fields := eventMeta.Fields; len(fields) > 0 { + b.fields = common.MapStr{} + common.MergeFields(b.fields, fields.Clone(), eventMeta.FieldsUnderRoot) + } + + if t := eventMeta.Tags; len(t) > 0 { + b.tags = t + } + + return b +} + +// Create combines the builder configuration with the client settings +// in order to build the event processing pipeline. +// +// Processing order (C=client, P=pipeline) +// 1. (P) generalize/normalize event +// 2. (C) add Meta from client Config to event.Meta +// 3. (C) add Fields from client config to event.Fields +// 4. (P) add pipeline fields + tags +// 5. (C) add client fields + tags +// 6. (C) client processors list +// 7. (P) add builtins +// 8. (P) pipeline processors list +// 9. (P) (if publish/debug enabled) log event +// 10. (P) (if output disabled) dropEvent +func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) { + var ( + // pipeline processors + processors = newGroup("processPipeline", b.log) + + // client fields and metadata + clientMeta = cfg.Meta + localProcessors = makeClientProcessors(b.log, cfg) + ) + + needsCopy := b.alwaysCopy || localProcessors != nil || b.processors != nil + + builtin := b.builtinMeta + var clientFields common.MapStr + for _, mod := range b.modifiers { + m := mod.ClientFields(b.info, cfg) + if len(m) > 0 { + if clientFields == nil { + clientFields = common.MapStr{} + } + clientFields.DeepUpdate(m.Clone()) + } + } + if len(clientFields) > 0 { + tmp := builtin.Clone() + tmp.DeepUpdate(clientFields) + builtin = tmp + } + + if !b.skipNormalize { + // setup 1: generalize/normalize output (P) + processors.add(generalizeProcessor) + } + + // setup 2: add Meta from client config (C) + if m := clientMeta; len(m) > 0 { + processors.add(clientEventMeta(m, needsCopy)) + } + + // setup 4, 5: pipeline tags + client tags + var tags []string + tags = append(tags, b.tags...) + tags = append(tags, cfg.EventMetadata.Tags...) + if len(tags) > 0 { + processors.add(actions.NewAddTags("tags", tags)) + } + + // setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata + fields := cfg.Fields.Clone() + fields.DeepUpdate(b.fields.Clone()) + if em := cfg.EventMetadata; len(em.Fields) > 0 { + common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) + } + + if len(fields) > 0 { + // Enforce a copy of fields if dynamic fields are configured or agent + // metadata will be merged into the fields. + // With dynamic fields potentially changing at any time, we need to copy, + // so we do not change shared structures be accident. + fieldsNeedsCopy := needsCopy || cfg.DynamicFields != nil || hasKeyAnyOf(fields, builtin) + processors.add(actions.NewAddFields(fields, fieldsNeedsCopy)) + } + + if cfg.DynamicFields != nil { + checkCopy := func(m common.MapStr) bool { + return needsCopy || hasKeyAnyOf(m, builtin) + } + processors.add(makeAddDynMetaProcessor("dynamicFields", cfg.DynamicFields, checkCopy)) + } + + // setup 5: client processor list + processors.add(localProcessors) + + // setup 6: add beats and host metadata + if meta := builtin; len(meta) > 0 { + processors.add(actions.NewAddFields(meta, needsCopy)) + } + + // setup 8: pipeline processors list + processors.add(b.processors) + + // setup 9: debug print final event (P) + if b.log.IsDebug() { + processors.add(debugPrintProcessor(b.info, b.log)) + } + + // setup 10: drop all events if outputs are disabled (P) + if drop { + processors.add(dropDisabledProcessor) + } + + return processors, nil +} + +func makeClientProcessors( + log *logp.Logger, + cfg beat.ProcessingConfig, +) processors.Processor { + procs := cfg.Processor + if procs == nil || len(procs.All()) == 0 { + return nil + } + + p := newGroup("client", log) + p.list = procs.All() + return p +} + +func (b builtinModifier) BuiltinFields(info beat.Info) common.MapStr { + return b(info) +} + +func (b builtinModifier) ClientFields(_ beat.Info, _ beat.ProcessingConfig) common.MapStr { + return nil +} diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go new file mode 100644 index 00000000000..5f49b2b3829 --- /dev/null +++ b/libbeat/publisher/processing/default_test.go @@ -0,0 +1,364 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "encoding/json" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors/actions" + "github.com/elastic/ecs/code/go/ecs" +) + +func TestProcessorsConfigs(t *testing.T) { + defaultInfo := beat.Info{ + Beat: "test", + EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), + Hostname: "test.host.name", + ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), + Name: "test.host.name", + Version: "0.1", + } + + ecsFields := common.MapStr{"version": ecs.Version} + + cases := map[string]struct { + factory SupportFactory + global string + local beat.ProcessingConfig + drop bool + event string + want common.MapStr + wantMeta common.MapStr + infoMod func(beat.Info) beat.Info + }{ + "user global fields and tags": { + global: "{fields: {global: 1}, fields_under_root: true, tags: [tag]}", + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": uint64(1), + "tags": []string{"tag"}, + }, + }, + "beat local fields": { + global: "", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": 1}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": 1, + }, + }, + "beat local and user global fields": { + global: "{fields: {global: 1}, fields_under_root: true, tags: [tag]}", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": 1}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": uint64(1), + "local": 1, + "tags": []string{"tag"}, + }, + }, + "user global fields overwrite beat local fields": { + global: "{fields: {global: a, shared: global}, fields_under_root: true}", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": "b", "shared": "local"}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": "b", + "global": "a", + "shared": "global", + }, + }, + "user local fields and tags": { + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": "a"}, + Tags: []string{"tag"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "fields": common.MapStr{ + "local": "a", + }, + "tags": []string{"tag"}, + }, + }, + "user local fields (under root) and tags": { + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": "a"}, + FieldsUnderRoot: true, + Tags: []string{"tag"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": "a", + "tags": []string{"tag"}, + }, + }, + "user local fields overwrite user global fields": { + global: `{fields: {global: a, shared: global}, fields_under_root: true, tags: [global]}`, + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{ + "local": "a", + "shared": "local", + }, + FieldsUnderRoot: true, + Tags: []string{"local"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": "a", + "local": "a", + "shared": "local", + "tags": []string{"global", "local"}, + }, + }, + "with client metadata": { + local: beat.ProcessingConfig{ + Meta: common.MapStr{"index": "test"}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{"value": "abc"}, + wantMeta: common.MapStr{"index": "test"}, + }, + "with client processor": { + local: beat.ProcessingConfig{ + Processor: func() beat.ProcessorList { + g := newGroup("test", logp.L()) + g.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true)) + return g + }(), + }, + event: `{"value": "abc"}`, + want: common.MapStr{"value": "abc", "custom": "value"}, + }, + "with beat default fields": { + factory: MakeDefaultBeatSupport(true), + global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + want: common.MapStr{ + "ecs": ecsFields, + "host": common.MapStr{ + "name": "test.host.name", + }, + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + "with beat default fields and custom name": { + factory: MakeDefaultBeatSupport(true), + global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + infoMod: func(info beat.Info) beat.Info { + info.Name = "other.test.host.name" + return info + }, + want: common.MapStr{ + "ecs": ecsFields, + "host": common.MapStr{ + "name": "other.test.host.name", + }, + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "name": "other.test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + "with observer default fields": { + factory: MakeDefaultObserverSupport(false), + global: `{fields: {global: a, observer.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + want: common.MapStr{ + "ecs": ecsFields, + "observer": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + + cfg, err := common.NewConfigWithYAML([]byte(test.global), "test") + require.NoError(t, err) + + info := defaultInfo + if test.infoMod != nil { + info = test.infoMod(info) + } + + factory := test.factory + if factory == nil { + factory = MakeDefaultSupport(true) + } + + support, err := factory(info, logp.L(), cfg) + require.NoError(t, err) + + prog, err := support.Create(test.local, test.drop) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{ + Timestamp: time.Now(), + Fields: fromJSON(test.event), + }) + require.NoError(t, err) + + // validate + assert.Equal(t, test.want, actual.Fields) + assert.Equal(t, test.wantMeta, actual.Meta) + }) + } +} + +func TestNormalization(t *testing.T) { + cases := map[string]struct { + normalize bool + in common.MapStr + mod common.MapStr + want common.MapStr + }{ + "no sharing if normalized": { + normalize: true, + in: common.MapStr{"a": "b"}, + mod: common.MapStr{"change": "x"}, + want: common.MapStr{"a": "b"}, + }, + "data sharing if not normalized": { + normalize: false, + in: common.MapStr{"a": "b"}, + mod: common.MapStr{"change": "x"}, + want: common.MapStr{"a": "b", "change": "x"}, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + + s, err := MakeDefaultSupport(test.normalize)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + prog, err := s.Create(beat.ProcessingConfig{}, false) + require.NoError(t, err) + + fields := test.in.Clone() + actual, err := prog.Run(&beat.Event{Fields: fields}) + require.NoError(t, err) + require.NotNil(t, actual) + + fields.DeepUpdate(test.mod) + assert.Equal(t, test.want, actual.Fields) + }) + } +} + +func TestAlwaysDrop(t *testing.T) { + s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + prog, err := s.Create(beat.ProcessingConfig{}, true) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{}) + require.NoError(t, err) + assert.Nil(t, actual) +} + +func TestDynamicFields(t *testing.T) { + factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + dynFields := common.NewMapStrPointer(common.MapStr{}) + prog, err := factory.Create(beat.ProcessingConfig{ + DynamicFields: &dynFields, + }, false) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{Fields: common.MapStr{"hello": "world"}}) + require.NoError(t, err) + assert.Equal(t, common.MapStr{"hello": "world"}, actual.Fields) + + dynFields.Set(common.MapStr{"dyn": "field"}) + actual, err = prog.Run(&beat.Event{Fields: common.MapStr{"hello": "world"}}) + require.NoError(t, err) + assert.Equal(t, common.MapStr{"hello": "world", "dyn": "field"}, actual.Fields) +} + +func fromJSON(in string) common.MapStr { + var tmp common.MapStr + err := json.Unmarshal([]byte(in), &tmp) + if err != nil { + panic(err) + } + return tmp +} diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go new file mode 100644 index 00000000000..ff1be24fc45 --- /dev/null +++ b/libbeat/publisher/processing/processing.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +// SupportFactory creates a new processing Supporter that can be used with +// the publisher pipeline. The factory gets the global configuration passed, +// in order to configure some shared global event processing. +type SupportFactory func(info beat.Info, log *logp.Logger, cfg *common.Config) (Supporter, error) + +// Supporter is used to create an event processing pipeline. It is used by the +// publisher pipeline when a client connects to the pipeline. The supporter +// will merge the global and local configurations into a common event +// processor. +// If `drop` is set, then the processor generated must always drop all events. +type Supporter interface { + Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) +} diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go new file mode 100644 index 00000000000..38a77bdd948 --- /dev/null +++ b/libbeat/publisher/processing/processors.go @@ -0,0 +1,200 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "fmt" + "strings" + "sync" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/codec/json" + "github.com/elastic/beats/libbeat/processors" +) + +type group struct { + log *logp.Logger + title string + list []beat.Processor +} + +type processorFn struct { + name string + fn func(event *beat.Event) (*beat.Event, error) +} + +var generalizeProcessor = newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) { + // Filter out empty events. Empty events are still reported by ACK callbacks. + if len(event.Fields) == 0 { + return nil, nil + } + + fields := common.ConvertToGenericEvent(event.Fields) + if fields == nil { + logp.Err("fail to convert to generic event") + return nil, nil + } + + event.Fields = fields + return event, nil +}) + +var dropDisabledProcessor = newProcessor("dropDisabled", func(event *beat.Event) (*beat.Event, error) { + return nil, nil +}) + +func newGroup(title string, log *logp.Logger) *group { + return &group{ + title: title, + log: log, + } +} + +func (p *group) add(processor processors.Processor) { + if processor != nil { + p.list = append(p.list, processor) + } +} + +func (p *group) String() string { + var s []string + for _, p := range p.list { + s = append(s, p.String()) + } + + str := strings.Join(s, ", ") + if p.title == "" { + return str + } + return fmt.Sprintf("%v{%v}", p.title, str) +} + +func (p *group) All() []beat.Processor { + return p.list +} + +func (p *group) Run(event *beat.Event) (*beat.Event, error) { + if p == nil || len(p.list) == 0 { + return event, nil + } + + for _, sub := range p.list { + var err error + + event, err = sub.Run(event) + if err != nil { + // XXX: We don't drop the event, but continue filtering here if the most + // recent processor did return an event. + // We want processors having this kind of implicit behavior + // on errors? + + p.log.Debugf("Fail to apply processor %s: %s", p, err) + } + + if event == nil { + return nil, err + } + } + + return event, nil +} + +func newProcessor(name string, fn func(*beat.Event) (*beat.Event, error)) *processorFn { + return &processorFn{name: name, fn: fn} +} + +func newAnnotateProcessor(name string, fn func(*beat.Event)) *processorFn { + return newProcessor(name, func(event *beat.Event) (*beat.Event, error) { + fn(event) + return event, nil + }) +} + +func (p *processorFn) String() string { return p.name } +func (p *processorFn) Run(e *beat.Event) (*beat.Event, error) { return p.fn(e) } + +func clientEventMeta(meta common.MapStr, needsCopy bool) *processorFn { + fn := func(event *beat.Event) { addMeta(event, meta) } + if needsCopy { + fn = func(event *beat.Event) { addMeta(event, meta.Clone()) } + } + return newAnnotateProcessor("@metadata", fn) +} + +func addMeta(event *beat.Event, meta common.MapStr) { + if event.Meta == nil { + event.Meta = meta + } else { + event.Meta.Clone() + event.Meta.DeepUpdate(meta) + } +} + +func makeAddDynMetaProcessor( + name string, + meta *common.MapStrPointer, + checkCopy func(m common.MapStr) bool, +) *processorFn { + return newAnnotateProcessor(name, func(event *beat.Event) { + dynFields := meta.Get() + if checkCopy(dynFields) { + dynFields = dynFields.Clone() + } + + event.Fields.DeepUpdate(dynFields) + }) +} + +func debugPrintProcessor(info beat.Info, log *logp.Logger) *processorFn { + // ensure only one go-routine is using the encoder (in case + // beat.Client is shared between multiple go-routines by accident) + var mux sync.Mutex + + encoder := json.New(info.Version, json.Config{ + Pretty: true, + EscapeHTML: false, + }) + return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) { + mux.Lock() + defer mux.Unlock() + + b, err := encoder.Encode(info.Beat, event) + if err != nil { + return event, nil + } + + log.Debugf("Publish event: %s", b) + return event, nil + }) +} + +func hasKey(m common.MapStr, key string) bool { + _, exists := m[key] + return exists +} + +func hasKeyAnyOf(m, builtin common.MapStr) bool { + for k := range builtin { + if hasKey(m, k) { + return true + } + } + return false +} diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 2da153d98ea..d00e524a555 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -58,8 +58,10 @@ func NewConnector(pipeline beat.Pipeline, c *common.Config, dynFields *common.Ma func (c *Connector) Connect() (beat.Client, error) { return c.pipeline.ConnectWith(beat.ClientConfig{ - EventMetadata: c.eventMeta, - Processor: c.processors, - DynamicFields: c.dynamicFields, + Processing: beat.ProcessingConfig{ + EventMetadata: c.eventMeta, + Processor: c.processors, + DynamicFields: c.dynamicFields, + }, }) } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index becc53cf65b..a4e812ad66e 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -178,8 +178,10 @@ func (pb *packetbeat) setupFlows() error { } client, err := pb.pipeline.ConnectWith(beat.ClientConfig{ - EventMetadata: config.Flows.EventMetadata, - Processor: processors, + Processing: beat.ProcessingConfig{ + EventMetadata: config.Flows.EventMetadata, + Processor: processors, + }, }) if err != nil { return err diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index 0b34715e1b9..db264852036 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -97,8 +97,10 @@ func (p *TransactionPublisher) CreateReporter( } clientConfig := beat.ClientConfig{ - EventMetadata: meta.Event, - Processor: processors, + Processing: beat.ProcessingConfig{ + EventMetadata: meta.Event, + Processor: processors, + }, } if p.canDrop { clientConfig.PublishMode = beat.DropIfFull diff --git a/winlogbeat/beater/eventlogger.go b/winlogbeat/beater/eventlogger.go index 59c9ecb1f90..0eb9ec8c13b 100644 --- a/winlogbeat/beater/eventlogger.go +++ b/winlogbeat/beater/eventlogger.go @@ -64,10 +64,12 @@ func newEventLogger( func (e *eventLogger) connect(pipeline beat.Pipeline) (beat.Client, error) { api := e.source.Name() return pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: e.eventMeta, - Meta: nil, // TODO: configure modules/ES ingest pipeline? - Processor: e.processors, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: e.eventMeta, + Meta: nil, // TODO: configure modules/ES ingest pipeline? + Processor: e.processors, + }, ACKCount: func(n int) { addPublished(api, n) logp.Info("EventLog[%s] successfully published %d events", api, n) diff --git a/x-pack/functionbeat/beater/functionbeat.go b/x-pack/functionbeat/beater/functionbeat.go index 61b61cf0344..b62e8cd704d 100644 --- a/x-pack/functionbeat/beater/functionbeat.go +++ b/x-pack/functionbeat/beater/functionbeat.go @@ -143,9 +143,11 @@ func makeClientFactory(log *logp.Logger, manager *licenser.Manager, pipeline bea } client, err := core.NewSyncClient(log, pipeline, beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - Processor: processors, - EventMetadata: c.EventMetadata, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + Processor: processors, + EventMetadata: c.EventMetadata, + }, }) if err != nil {