Skip to content

Commit

Permalink
Beats event processing and default fields (#10801)
Browse files Browse the repository at this point in the history
This changes moves the generation of the event processing into it's
distinct package, such that the actual publisher pipeline will not
define any processors anymore. A new instance of a publisher pipeline
must not add fields on it's own.

This change converts the event processing pipline into the 'Supporter'
pattern, which is already used for Index Management.
As different beats ask for slightly different behavior in the event
processing (e.g. normalize, default builtins and so on), the
`processing.Support` can be used for customizations.
  • Loading branch information
Steffen Siering authored Mar 8, 2019
1 parent 19c7699 commit 83dfb2f
Show file tree
Hide file tree
Showing 21 changed files with 1,049 additions and 903 deletions.
16 changes: 9 additions & 7 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *c
}

client, err := p.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
Events: f.eventer,
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
},
Events: f.eventer,
})
if err != nil {
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ func (t *configuredJob) Start() {
}

t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
Fields: fields,
Processing: beat.ProcessingConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
Fields: fields,
},
})
if err != nil {
logp.Err("could not start monitor: %v", err)
Expand Down
10 changes: 6 additions & 4 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ func New(
func (i *Input) Run() {
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
},
ACKCount: func(n int) {
i.logger.Infof("journalbeat successfully published %d events", n)
},
Expand Down
50 changes: 26 additions & 24 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,7 @@ type Client interface {
type ClientConfig struct {
PublishMode PublishMode

// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata common.EventMetadata

// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta common.MapStr

// Fields provides additional 'global' fields to be added to every event
Fields common.MapStr

// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *common.MapStrPointer

// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList
Processing ProcessingConfig

// WaitClose sets the maximum duration to wait on ACK, if client still has events
// active non-acknowledged events in the publisher pipeline.
Expand All @@ -72,14 +57,6 @@ type ClientConfig struct {
// Events configures callbacks for common client callbacks
Events ClientEventer

// By default events are normalized within processor pipeline,
// if the normalization step should be skipped set this to true.
SkipNormalization bool

// By default events are decorated with agent metadata.
// To skip adding that metadata set this to true.
SkipAgentMetadata bool

// ACK handler strategies.
// Note: ack handlers are run in another go-routine owned by the publisher pipeline.
// They should not block for to long, to not block the internal buffers for
Expand All @@ -101,6 +78,31 @@ type ClientConfig struct {
ACKLastEvent func(interface{})
}

// ProcessingConfig provides additional event processing settings a client can
// pass to the publisher pipeline on Connect.
type ProcessingConfig struct {
// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata common.EventMetadata

// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta common.MapStr

// Fields provides additional 'global' fields to be added to every event
Fields common.MapStr

// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *common.MapStrPointer

// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
}

// ClientEventer provides access to internal client events.
type ClientEventer interface {
Closing() // Closing indicates the client is being shutdown next
Expand Down
14 changes: 14 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/version"
sysinfo "github.com/elastic/go-sysinfo"
Expand All @@ -78,6 +79,8 @@ type Beat struct {

keystore keystore.Keystore
index idxmgmt.Supporter

processing processing.Supporter
}

type beatConfig struct {
Expand Down Expand Up @@ -310,6 +313,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
Logger: logp.L().Named("publisher"),
},
b.Config.Pipeline,
b.processing,
b.makeOutputFactory(b.Config.Output),
)

Expand Down Expand Up @@ -593,6 +597,16 @@ func (b *Beat) configure(settings Settings) error {
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM)
}
b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig)
if err != nil {
return err
}

processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)

return err
}

Expand Down
3 changes: 3 additions & 0 deletions libbeat/cmd/instance/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/libbeat/monitoring/report"
"github.com/elastic/beats/libbeat/publisher/processing"
)

// Settings contains basic settings for any beat to pass into GenRootCmd
Expand All @@ -40,4 +41,6 @@ type Settings struct {
// load custom index manager. The config object will be the Beats root configuration.
IndexManagement idxmgmt.SupportFactory
ILM ilm.SupportFactory

Processing processing.SupportFactory
}
9 changes: 8 additions & 1 deletion libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/publisher/queue"
"github.com/elastic/beats/libbeat/publisher/queue/memqueue"
)
Expand Down Expand Up @@ -169,11 +170,16 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

processing, err := processing.MakeDefaultSupport(true)(beat, log, common.NewConfig())
if err != nil {
return nil, err
}

pipeline, err := pipeline.New(
beat,
pipeline.Monitors{
Metrics: monitoring,
Logger: logp.NewLogger(selector),
Logger: log,
},
queueFactory,
outputs.Group{
Expand All @@ -184,6 +190,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
Processors: processing,
})
if err != nil {
return nil, err
Expand Down
20 changes: 2 additions & 18 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -60,6 +60,7 @@ func Load(
beatInfo beat.Info,
monitors Monitors,
config Config,
processors processing.Supporter,
makeOutput func(outputs.Observer) (string, outputs.Group, error),
) (*Pipeline, error) {
log := monitors.Logger
Expand All @@ -71,28 +72,11 @@ func Load(
log.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}

name := beatInfo.Name
settings := Settings{
WaitClose: 0,
WaitCloseMode: NoWaitOnClose,
Disabled: publishDisabled,
Processors: processors,
Annotations: Annotations{
Event: config.EventMetadata,
Builtin: common.MapStr{
"host": common.MapStr{
"name": name,
},
"ecs": common.MapStr{
"version": "1.0.0-beta2",
},
},
},
}

queueBuilder, err := createQueueBuilder(config.Queue, monitors)
Expand Down
Loading

0 comments on commit 83dfb2f

Please sign in to comment.