From c18fed97461c415ecefce5d72fb71653436710b4 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Mon, 26 Oct 2020 09:17:35 -0400 Subject: [PATCH] Refactor packetbeat to support agent-based configuration --- packetbeat/beater/packetbeat.go | 275 +++++------------- packetbeat/beater/processor.go | 142 +++++++++ packetbeat/beater/reloader.go | 100 +++++++ packetbeat/beater/setup.go | 72 +++++ packetbeat/beater/worker.go | 75 +++++ packetbeat/config/agent.go | 80 +++++ packetbeat/config/agent_test.go | 49 ++++ packetbeat/config/config.go | 35 +++ packetbeat/flows/flows.go | 5 +- packetbeat/flows/flows_test.go | 3 +- packetbeat/flows/worker.go | 8 +- packetbeat/flows/worker_test.go | 3 +- packetbeat/procs/procs.go | 2 - packetbeat/protos/amqp/amqp.go | 8 +- packetbeat/protos/amqp/amqp_parser.go | 3 +- packetbeat/protos/amqp/amqp_test.go | 3 +- packetbeat/protos/cassandra/cassandra.go | 10 +- packetbeat/protos/cassandra/trans.go | 7 +- packetbeat/protos/dhcpv4/dhcpv4.go | 12 +- packetbeat/protos/dhcpv4/dhcpv4_test.go | 5 +- packetbeat/protos/dns/dns.go | 8 +- packetbeat/protos/dns/dns_tcp.go | 3 +- packetbeat/protos/dns/dns_test.go | 3 +- packetbeat/protos/dns/dns_udp.go | 3 +- packetbeat/protos/http/http.go | 9 +- packetbeat/protos/http/http_test.go | 3 +- packetbeat/protos/icmp/icmp.go | 9 +- packetbeat/protos/icmp/icmp_test.go | 3 +- packetbeat/protos/memcache/memcache.go | 8 +- packetbeat/protos/memcache/memcache_test.go | 3 +- packetbeat/protos/memcache/plugin_tcp.go | 3 +- packetbeat/protos/memcache/plugin_udp.go | 3 +- packetbeat/protos/mongodb/mongodb.go | 9 +- packetbeat/protos/mongodb/mongodb_test.go | 3 +- packetbeat/protos/mysql/mysql.go | 9 +- packetbeat/protos/mysql/mysql_test.go | 3 +- packetbeat/protos/nfs/rpc.go | 2 + packetbeat/protos/pgsql/pgsql.go | 9 +- packetbeat/protos/pgsql/pgsql_test.go | 3 +- packetbeat/protos/protos.go | 20 +- packetbeat/protos/redis/redis.go | 9 +- packetbeat/protos/registry.go | 3 + packetbeat/protos/sip/parser.go | 9 +- packetbeat/protos/sip/plugin.go | 10 +- packetbeat/protos/sip/plugin_test.go | 3 +- packetbeat/protos/tcp/tcp_test.go | 3 +- packetbeat/protos/thrift/thrift.go | 8 +- packetbeat/protos/thrift/thrift_test.go | 3 +- packetbeat/protos/tls/tls.go | 9 +- packetbeat/protos/tls/tls_test.go | 3 +- packetbeat/publish/publish.go | 4 + .../tcp-protocol/{protocol}/trans.go.tmpl | 7 +- .../{protocol}/{protocol}.go.tmpl | 10 +- 53 files changed, 805 insertions(+), 289 deletions(-) create mode 100644 packetbeat/beater/processor.go create mode 100644 packetbeat/beater/reloader.go create mode 100644 packetbeat/beater/setup.go create mode 100644 packetbeat/beater/worker.go create mode 100644 packetbeat/config/agent.go create mode 100644 packetbeat/config/agent_test.go diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index b862bd3ee11..0526e2090d7 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -18,47 +18,25 @@ package beater import ( - "errors" "flag" - "fmt" - "sync" "time" - "github.com/tsg/gopacket/layers" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/service" "github.com/elastic/beats/v7/packetbeat/config" - "github.com/elastic/beats/v7/packetbeat/decoder" - "github.com/elastic/beats/v7/packetbeat/flows" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" - "github.com/elastic/beats/v7/packetbeat/protos/icmp" - "github.com/elastic/beats/v7/packetbeat/protos/tcp" - "github.com/elastic/beats/v7/packetbeat/protos/udp" "github.com/elastic/beats/v7/packetbeat/publish" - "github.com/elastic/beats/v7/packetbeat/sniffer" // Add packetbeat default processors _ "github.com/elastic/beats/v7/packetbeat/processor/add_kubernetes_metadata" ) -// Beater object. Contains all objects needed to run the beat -type packetbeat struct { - config config.Config - cmdLineArgs flags - sniff *sniffer.Sniffer - - // publisher/pipeline - pipeline beat.Pipeline - transPub *publish.TransactionPublisher - flows *flows.Flows -} - type flags struct { file *string loop *int @@ -79,8 +57,8 @@ func init() { } } -func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { - config := config.Config{ +func initialConfig() config.Config { + return config.Config{ Interfaces: config.InterfacesConfig{ File: *cmdLineArgs.file, Loop: *cmdLineArgs.loop, @@ -89,111 +67,59 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { Dumpfile: *cmdLineArgs.dumpfile, }, } +} + +// Beater object. Contains all objects needed to run the beat +type packetbeat struct { + config *common.Config + factory *processorFactory + publisher *publish.TransactionPublisher + shutdownTimeout time.Duration + done chan struct{} +} + +func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { + config := initialConfig() err := rawConfig.Unpack(&config) if err != nil { logp.Err("fails to read the beat config: %v, %v", err, config) return nil, err } - pb := &packetbeat{ - config: config, - cmdLineArgs: cmdLineArgs, - } - err = pb.init(b) - if err != nil { - return nil, err - } - - return pb, nil -} - -// init packetbeat components -func (pb *packetbeat) init(b *beat.Beat) error { - var err error - cfg := &pb.config + watcher := procs.ProcessesWatcher{} // Enable the process watcher only if capturing live traffic - if cfg.Interfaces.File == "" { - err = procs.ProcWatcher.Init(cfg.Procs) + if config.Interfaces.File == "" { + err = watcher.Init(config.Procs) if err != nil { logp.Critical(err.Error()) - return err + return nil, err } } else { logp.Info("Process watcher disabled when file input is used") } - pb.pipeline = b.Publisher - pb.transPub, err = publish.NewTransactionPublisher( + publisher, err := publish.NewTransactionPublisher( b.Info.Name, b.Publisher, - pb.config.IgnoreOutgoing, - pb.config.Interfaces.File == "", + config.IgnoreOutgoing, + config.Interfaces.File == "", ) if err != nil { - return err - } - - logp.Debug("main", "Initializing protocol plugins") - err = protos.Protos.Init(false, pb.transPub, cfg.Protocols, cfg.ProtocolsList) - if err != nil { - return fmt.Errorf("Initializing protocol analyzers failed: %v", err) - } - - if err := pb.setupFlows(); err != nil { - return err - } - - return pb.setupSniffer() -} - -func (pb *packetbeat) setupSniffer() error { - config := &pb.config - - icmp, err := pb.icmpConfig() - if err != nil { - return err - } - - withVlans := config.Interfaces.WithVlans - withICMP := icmp.Enabled() - - filter := config.Interfaces.BpfFilter - if filter == "" && !config.Flows.IsEnabled() { - filter = protos.Protos.BpfFilter(withVlans, withICMP) - } - - pb.sniff, err = sniffer.New(false, filter, pb.createWorker, config.Interfaces) - return err -} - -func (pb *packetbeat) setupFlows() error { - config := &pb.config - if !config.Flows.IsEnabled() { - return nil - } - - processors, err := processors.New(config.Flows.Processors) - if err != nil { - return err - } - - client, err := pb.pipeline.ConnectWith(beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - EventMetadata: config.Flows.EventMetadata, - Processor: processors, - KeepNull: config.Flows.KeepNull, - }, - }) - if err != nil { - return err + return nil, err } - pb.flows, err = flows.NewFlows(client.PublishAll, config.Flows) - if err != nil { - return err + factory := newProcessorFactory(b.Info.Name, make(chan error, 1), publisher) + if err := factory.CheckConfig(rawConfig); err != nil { + return nil, err } - return nil + return &packetbeat{ + config: rawConfig, + shutdownTimeout: config.ShutdownTimeout, + factory: factory, + publisher: publisher, + done: make(chan struct{}), + }, nil } func (pb *packetbeat) Run(b *beat.Beat) error { @@ -205,114 +131,63 @@ func (pb *packetbeat) Run(b *beat.Beat) error { } }() - defer pb.transPub.Stop() + defer pb.publisher.Stop() - timeout := pb.config.ShutdownTimeout + timeout := pb.shutdownTimeout if timeout > 0 { defer time.Sleep(timeout) } - if pb.flows != nil { - pb.flows.Start() - defer pb.flows.Stop() + if !b.Manager.Enabled() { + return pb.runStatic(b, pb.factory) } + return pb.runManaged(b, pb.factory) +} - var wg sync.WaitGroup - errC := make(chan error, 1) - - // Run the sniffer in background - wg.Add(1) - go func() { - defer wg.Done() +func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { + runner, err := factory.Create(b.Publisher, pb.config) + if err != nil { + return err + } + runner.Start() + defer runner.Stop() - err := pb.sniff.Run() - if err != nil { - errC <- fmt.Errorf("Sniffer main loop failed: %v", err) - } - }() + logp.Debug("main", "Waiting for the runner to finish") - logp.Debug("main", "Waiting for the sniffer to finish") - wg.Wait() select { - default: - case err := <-errC: + case <-pb.done: + case err := <-factory.err: + close(pb.done) return err } - return nil } -// Called by the Beat stop function -func (pb *packetbeat) Stop() { - logp.Info("Packetbeat send stop signal") - pb.sniff.Stop() -} - -func (pb *packetbeat) createWorker(dl layers.LinkType) (sniffer.Worker, error) { - var icmp4 icmp.ICMPv4Processor - var icmp6 icmp.ICMPv6Processor - cfg, err := pb.icmpConfig() - if err != nil { - return nil, err - } - if cfg.Enabled() { - reporter, err := pb.transPub.CreateReporter(cfg) - if err != nil { - return nil, err +func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { + runner := newReloader(management.DebugK, factory, b.Publisher) + reload.Register.MustRegister(b.Info.Beat, runner) + defer runner.Stop() + + logp.Debug("main", "Waiting for the runner to finish") + + for { + select { + case <-pb.done: + return nil + case err := <-factory.err: + // when we're managed we don't want + // to stop if the sniffer exited without an error + // this would happen during a configuration reload + if err != nil { + close(pb.done) + return err + } } - - icmp, err := icmp.New(false, reporter, cfg) - if err != nil { - return nil, err - } - - icmp4 = icmp - icmp6 = icmp - } - - tcp, err := tcp.NewTCP(&protos.Protos) - if err != nil { - return nil, err - } - - udp, err := udp.NewUDP(&protos.Protos) - if err != nil { - return nil, err } - - worker, err := decoder.New(pb.flows, dl, icmp4, icmp6, tcp, udp) - if err != nil { - return nil, err - } - - return worker, nil } -func (pb *packetbeat) icmpConfig() (*common.Config, error) { - var icmp *common.Config - if pb.config.Protocols["icmp"].Enabled() { - icmp = pb.config.Protocols["icmp"] - } - - for _, cfg := range pb.config.ProtocolsList { - info := struct { - Type string `config:"type" validate:"required"` - }{} - - if err := cfg.Unpack(&info); err != nil { - return nil, err - } - - if info.Type != "icmp" { - continue - } - - if icmp != nil { - return nil, errors.New("More then one icmp configurations found") - } - - icmp = cfg - } - - return icmp, nil +// Called by the Beat stop function +func (pb *packetbeat) Stop() { + logp.Info("Packetbeat send stop signal") + close(pb.done) } diff --git a/packetbeat/beater/processor.go b/packetbeat/beater/processor.go new file mode 100644 index 00000000000..dc9dde1cb5f --- /dev/null +++ b/packetbeat/beater/processor.go @@ -0,0 +1,142 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "fmt" + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" + + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/publish" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +type processor struct { + wg sync.WaitGroup + flows *flows.Flows + sniffer *sniffer.Sniffer + err chan error +} + +func newProcessor(flows *flows.Flows, sniffer *sniffer.Sniffer, err chan error) *processor { + return &processor{ + flows: flows, + sniffer: sniffer, + err: err, + } +} + +func (p *processor) String() string { + return "packetbeat.processor" +} + +func (p *processor) Start() { + if p.flows != nil { + p.flows.Start() + } + p.wg.Add(1) + go func() { + defer p.wg.Done() + + err := p.sniffer.Run() + if err != nil { + p.err <- fmt.Errorf("Sniffer loop failed: %v", err) + } + p.err <- nil + }() +} + +func (p *processor) Stop() { + p.sniffer.Stop() + if p.flows != nil { + p.flows.Stop() + } + p.wg.Wait() +} + +type processorFactory struct { + name string + err chan error + publisher *publish.TransactionPublisher +} + +func newProcessorFactory(name string, err chan error, publisher *publish.TransactionPublisher) *processorFactory { + return &processorFactory{ + name: name, + err: err, + publisher: publisher, + } +} + +func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.Config) (cfgfile.Runner, error) { + config := initialConfig() + err := cfg.Unpack(&config) + if err != nil { + logp.Err("fails to read the beat config: %v, %v", err, config) + return nil, err + } + + // normalize agent-based configuration + config, err = config.Normalize() + if err != nil { + logp.Err("failed to normalize the beat config: %v, %v", err, config) + return nil, err + } + + watcher := procs.ProcessesWatcher{} + // Enable the process watcher only if capturing live traffic + if config.Interfaces.File == "" { + err = watcher.Init(config.Procs) + if err != nil { + logp.Critical(err.Error()) + return nil, err + } + } else { + logp.Info("Process watcher disabled when file input is used") + } + + logp.Debug("main", "Initializing protocol plugins") + protocols := protos.NewProtocols() + err = protocols.Init(false, p.publisher, watcher, config.Protocols, config.ProtocolsList) + if err != nil { + return nil, fmt.Errorf("Initializing protocol analyzers failed: %v", err) + } + flows, err := setupFlows(pipeline, watcher, config) + if err != nil { + return nil, err + } + sniffer, err := setupSniffer(config, protocols, workerFactory(p.publisher, protocols, watcher, flows, config)) + if err != nil { + return nil, err + } + + return newProcessor(flows, sniffer, p.err), nil +} + +func (p *processorFactory) CheckConfig(config *common.Config) error { + _, err := p.Create(pipeline.NewNilPipeline(), config) + return err +} diff --git a/packetbeat/beater/reloader.go b/packetbeat/beater/reloader.go new file mode 100644 index 00000000000..6cbec3c6d1d --- /dev/null +++ b/packetbeat/beater/reloader.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "sync" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" +) + +type reloader struct { + mutex sync.Mutex + factory cfgfile.RunnerFactory + runner cfgfile.Runner + configHash uint64 + pipeline beat.PipelineConnector + logger *logp.Logger +} + +func newReloader(name string, factory cfgfile.RunnerFactory, pipeline beat.PipelineConnector) *reloader { + return &reloader{ + factory: factory, + logger: logp.NewLogger(name), + } +} + +func (r *reloader) Stop() { + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.runner != nil { + r.runner.Stop() + } +} + +func (r *reloader) Reload(config *reload.ConfigWithMeta) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + r.logger.Debug("Starting reload procedure") + + hash, err := cfgfile.HashConfig(config.Config) + if err != nil { + r.logger.Errorf("Unable to hash given config: %s", err) + return errors.Wrap(err, "Unable to hash given config") + } + + if hash == r.configHash { + // we have the same config reloaded + return nil + } + // reinitialize config hash + r.configHash = 0 + + if r.runner != nil { + go r.runner.Stop() + } + // reinitialize runner + r.runner = nil + + c, err := common.NewConfigFrom(config.Config) + if err != nil { + r.logger.Errorf("Unable to create new configuration for factory: %s", err) + return errors.Wrap(err, "Unable to create new configuration for factory") + } + runner, err := r.factory.Create(pipetool.WithDynamicFields(r.pipeline, config.Meta), c) + if err != nil { + r.logger.Errorf("Unable to create new runner: %s", err) + return errors.Wrap(err, "Unable to create new runner") + } + + r.logger.Debugf("Starting runner: %s", runner) + r.configHash = hash + r.runner = runner + runner.Start() + + return nil +} diff --git a/packetbeat/beater/setup.go b/packetbeat/beater/setup.go new file mode 100644 index 00000000000..f8ed8b0aea6 --- /dev/null +++ b/packetbeat/beater/setup.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, workerFactory sniffer.WorkerFactory) (*sniffer.Sniffer, error) { + icmp, err := cfg.ICMP() + if err != nil { + return nil, err + } + + filter := cfg.Interfaces.BpfFilter + if filter == "" && !cfg.Flows.IsEnabled() { + filter = protocols.BpfFilter(cfg.Interfaces.WithVlans, icmp.Enabled()) + } + + return sniffer.New(false, filter, workerFactory, cfg.Interfaces) +} + +func setupFlows(pipeline beat.Pipeline, watcher procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) { + if !cfg.Flows.IsEnabled() { + return nil, nil + } + + processors, err := processors.New(cfg.Flows.Processors) + if err != nil { + return nil, err + } + + clientConfig := beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + EventMetadata: cfg.Flows.EventMetadata, + Processor: processors, + KeepNull: cfg.Flows.KeepNull, + }, + } + if cfg.Flows.Index != "" { + clientConfig.Processing.Meta = common.MapStr{"index": cfg.Flows.Index} + } + + client, err := pipeline.ConnectWith(clientConfig) + if err != nil { + return nil, err + } + + return flows.NewFlows(client.PublishAll, watcher, cfg.Flows) +} diff --git a/packetbeat/beater/worker.go b/packetbeat/beater/worker.go new file mode 100644 index 00000000000..5dd6a514454 --- /dev/null +++ b/packetbeat/beater/worker.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "github.com/tsg/gopacket/layers" + + "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/decoder" + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/protos/icmp" + "github.com/elastic/beats/v7/packetbeat/protos/tcp" + "github.com/elastic/beats/v7/packetbeat/protos/udp" + "github.com/elastic/beats/v7/packetbeat/publish" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +func workerFactory(publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) func(dl layers.LinkType) (sniffer.Worker, error) { + return func(dl layers.LinkType) (sniffer.Worker, error) { + var icmp4 icmp.ICMPv4Processor + var icmp6 icmp.ICMPv6Processor + config, err := cfg.ICMP() + if err != nil { + return nil, err + } + if config.Enabled() { + reporter, err := publisher.CreateReporter(config) + if err != nil { + return nil, err + } + + icmp, err := icmp.New(false, reporter, watcher, config) + if err != nil { + return nil, err + } + + icmp4 = icmp + icmp6 = icmp + } + + tcp, err := tcp.NewTCP(protocols) + if err != nil { + return nil, err + } + + udp, err := udp.NewUDP(protocols) + if err != nil { + return nil, err + } + + worker, err := decoder.New(flows, dl, icmp4, icmp6, tcp, udp) + if err != nil { + return nil, err + } + + return worker, nil + } +} diff --git a/packetbeat/config/agent.go b/packetbeat/config/agent.go new file mode 100644 index 00000000000..3c5c6b70c6f --- /dev/null +++ b/packetbeat/config/agent.go @@ -0,0 +1,80 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "fmt" + "runtime" + "strings" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +var osDefaultDevices = map[string]string{ + "darwin": "en0", + "linux": "any", +} + +func defaultDevice() string { + if device, found := osDefaultDevices[runtime.GOOS]; found { + return device + } + return "0" +} + +// Normalize allows the packetbeat configuration to understand +// agent semantics +func (c Config) Normalize() (Config, error) { + logp.Debug("agent", "Normalizing agent configuration") + if len(c.Inputs) > 0 { + // override everything, we're managed by agent + c.Flows = nil + c.Protocols = nil + c.ProtocolsList = []*common.Config{} + // TODO: make this configurable rather than just using the default device in + // managed mode + c.Interfaces.Device = defaultDevice() + } + + for _, input := range c.Inputs { + if rawInputType, ok := input["type"]; ok { + inputType, ok := rawInputType.(string) + if ok && strings.HasPrefix(inputType, "network/") { + config, err := common.NewConfigFrom(input) + if err != nil { + return c, err + } + protocol := strings.TrimPrefix(inputType, "network/") + logp.Debug("agent", fmt.Sprintf("Found agent configuration for %v", protocol)) + switch protocol { + case "flows": + if err := config.Unpack(&c.Flows); err != nil { + return c, err + } + default: + if err = config.SetString("type", -1, protocol); err != nil { + return c, err + } + c.ProtocolsList = append(c.ProtocolsList, config) + } + } + } + } + return c, nil +} diff --git a/packetbeat/config/agent_test.go b/packetbeat/config/agent_test.go new file mode 100644 index 00000000000..eb82bb90f21 --- /dev/null +++ b/packetbeat/config/agent_test.go @@ -0,0 +1,49 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestAgentInputNormalization(t *testing.T) { + cfg, err := common.NewConfigFrom(` +inputs: +- type: network/flows + timeout: 10s + period: 10s + keep_null: false + data_stream.namespace: default +- type: network/amqp + ports: [5672] + data_stream.namespace: default +`) + require.NoError(t, err) + config := Config{} + require.NoError(t, cfg.Unpack(&config)) + + config, err = config.Normalize() + require.NoError(t, err) + + require.Equal(t, config.Flows.Timeout, "10s") + require.Len(t, config.ProtocolsList, 1) +} diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 893e4828ab8..1d344787055 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -18,6 +18,7 @@ package config import ( + "errors" "time" "github.com/elastic/beats/v7/libbeat/common" @@ -33,6 +34,38 @@ type Config struct { Procs procs.ProcsConfig `config:"procs"` IgnoreOutgoing bool `config:"ignore_outgoing"` ShutdownTimeout time.Duration `config:"shutdown_timeout"` + + // agent configuration + Inputs []map[string]interface{} `config:"inputs"` +} + +// ICMP returns the ICMP configuration +func (c Config) ICMP() (*common.Config, error) { + var icmp *common.Config + if c.Protocols["icmp"].Enabled() { + icmp = c.Protocols["icmp"] + } + + for _, cfg := range c.ProtocolsList { + info := struct { + Type string `config:"type" validate:"required"` + }{} + + if err := cfg.Unpack(&info); err != nil { + return nil, err + } + + if info.Type != "icmp" { + continue + } + + if icmp != nil { + return nil, errors.New("More then one icmp configurations found") + } + + icmp = cfg + } + return icmp, nil } type InterfacesConfig struct { @@ -57,6 +90,8 @@ type Flows struct { EventMetadata common.EventMetadata `config:",inline"` Processors processors.PluginConfig `config:"processors"` KeepNull bool `config:"keep_null"` + // Index is used to overwrite the index where flows are published + Index string `config:"index"` } type ProtocolCommon struct { diff --git a/packetbeat/flows/flows.go b/packetbeat/flows/flows.go index fb292b92bf2..d58a2c45987 100644 --- a/packetbeat/flows/flows.go +++ b/packetbeat/flows/flows.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/procs" ) type Flows struct { @@ -41,7 +42,7 @@ const ( defaultPeriod = 10 * time.Second ) -func NewFlows(pub Reporter, config *config.Flows) (*Flows, error) { +func NewFlows(pub Reporter, watcher procs.ProcessesWatcher, config *config.Flows) (*Flows, error) { duration := func(s string, d time.Duration) (time.Duration, error) { if s == "" { return d, nil @@ -67,7 +68,7 @@ func NewFlows(pub Reporter, config *config.Flows) (*Flows, error) { counter := &counterReg{} - worker, err := newFlowsWorker(pub, table, counter, timeout, period) + worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period) if err != nil { logp.Err("failed to configure flows processing intervals: %v", err) return nil, err diff --git a/packetbeat/flows/flows_test.go b/packetbeat/flows/flows_test.go index b1c3a5b83b9..e56b3777374 100644 --- a/packetbeat/flows/flows_test.go +++ b/packetbeat/flows/flows_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/procs" ) type flowsChan struct { @@ -50,7 +51,7 @@ func TestFlowsCounting(t *testing.T) { port1 := []byte{0, 1} port2 := []byte{0, 2} - module, err := NewFlows(nil, &config.Flows{}) + module, err := NewFlows(nil, procs.ProcessesWatcher{}, &config.Flows{}) assert.NoError(t, err) uint1, err := module.NewUint("uint1") diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index 56445801781..2a9ca482ed3 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -32,6 +32,7 @@ import ( type flowsProcessor struct { spool spool + watcher procs.ProcessesWatcher table *flowMetaTable counters *counterReg timeout time.Duration @@ -44,6 +45,7 @@ var ( func newFlowsWorker( pub Reporter, + watcher procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, @@ -84,6 +86,7 @@ func newFlowsWorker( defaultBatchSize := 1024 processor := &flowsProcessor{ table: table, + watcher: watcher, counters: counters, timeout: timeout, } @@ -194,13 +197,14 @@ func (fw *flowsProcessor) report( isOver bool, intNames, uintNames, floatNames []string, ) { - event := createEvent(ts, flow, isOver, intNames, uintNames, floatNames) + event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames) debugf("add event: %v", event) fw.spool.publish(event) } func createEvent( + watcher procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, @@ -386,7 +390,7 @@ func createEvent( // Set process information if it's available if tuple.IPLength != 0 && tuple.SrcPort != 0 { - if proc := procs.ProcWatcher.FindProcessesTuple(&tuple, proto); proc != nil { + if proc := watcher.FindProcessesTuple(&tuple, proto); proc != nil { if proc.Src.PID > 0 { p := common.MapStr{ "pid": proc.Src.PID, diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index 15cef57cc25..3bec75f2fe3 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" ) var ( @@ -66,7 +67,7 @@ func TestCreateEvent(t *testing.T) { } bif.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{10, 1}} bif.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{460, 2}} - event := createEvent(time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil) + event := createEvent(procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil) // Validate the contents of the event. validate := lookslike.MustCompile(map[string]interface{}{ diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index dfead47d93c..bf3daab9ff2 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -83,8 +83,6 @@ type ProcessesWatcher struct { impl processWatcherImpl } -var ProcWatcher ProcessesWatcher - func (proc *ProcessesWatcher) Init(config ProcsConfig) error { return proc.initWithImpl(config, proc) } diff --git a/packetbeat/protos/amqp/amqp.go b/packetbeat/protos/amqp/amqp.go index c361c3e7fe6..1113d4ee6df 100644 --- a/packetbeat/protos/amqp/amqp.go +++ b/packetbeat/protos/amqp/amqp.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -47,6 +48,7 @@ type amqpPlugin struct { transactions *common.Cache transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher //map containing functions associated with different method numbers methodMap map[codeClass]map[codeMethod]amqpMethod @@ -64,6 +66,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &amqpPlugin{} @@ -74,13 +77,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (amqp *amqpPlugin) init(results protos.Reporter, config *amqpConfig) error { +func (amqp *amqpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *amqpConfig) error { amqp.initMethodMap() amqp.setFromConfig(config) @@ -92,6 +95,7 @@ func (amqp *amqpPlugin) init(results protos.Reporter, config *amqpConfig) error protos.DefaultTransactionHashSize) amqp.transactions.StartJanitor(amqp.transactionTimeout) amqp.results = results + amqp.watcher = watcher return nil } diff --git a/packetbeat/protos/amqp/amqp_parser.go b/packetbeat/protos/amqp/amqp_parser.go index eeaaf2a9464..6ab15ec8159 100644 --- a/packetbeat/protos/amqp/amqp_parser.go +++ b/packetbeat/protos/amqp/amqp_parser.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" ) func (amqp *amqpPlugin) amqpMessageParser(s *amqpStream) (ok bool, complete bool) { @@ -336,7 +335,7 @@ func (amqp *amqpPlugin) handleAmqp(m *amqpMessage, tcptuple *common.TCPTuple, di debugf("A message is ready to be handled") m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = amqp.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.method == "basic.publish" { amqp.handlePublishing(m) diff --git a/packetbeat/protos/amqp/amqp_test.go b/packetbeat/protos/amqp/amqp_test.go index 37be71c571d..19002b60941 100644 --- a/packetbeat/protos/amqp/amqp_test.go +++ b/packetbeat/protos/amqp/amqp_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -45,7 +46,7 @@ func amqpModForTests() (*eventStore, *amqpPlugin) { var amqp amqpPlugin results := &eventStore{} config := defaultConfig - amqp.init(results.publish, &config) + amqp.init(results.publish, procs.ProcessesWatcher{}, &config) return results, &amqp } diff --git a/packetbeat/protos/cassandra/cassandra.go b/packetbeat/protos/cassandra/cassandra.go index ed0f48e91a4..7d3001a5159 100644 --- a/packetbeat/protos/cassandra/cassandra.go +++ b/packetbeat/protos/cassandra/cassandra.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -34,6 +35,7 @@ type cassandra struct { ports protos.PortsConfig parserConfig parserConfig transConfig transactionConfig + watcher procs.ProcessesWatcher pub transPub } @@ -60,6 +62,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &cassandra{} @@ -70,17 +73,18 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (cassandra *cassandra) init(results protos.Reporter, config *cassandraConfig) error { +func (cassandra *cassandra) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *cassandraConfig) error { if err := cassandra.setFromConfig(config); err != nil { return err } cassandra.pub.results = results + cassandra.watcher = watcher return nil } @@ -193,7 +197,7 @@ func (cassandra *cassandra) ensureConnection(private protos.ProtocolData) *conne conn := getConnection(private) if conn == nil { conn = &connection{} - conn.trans.init(&cassandra.transConfig, cassandra.pub.onTransaction) + conn.trans.init(&cassandra.transConfig, cassandra.watcher, cassandra.pub.onTransaction) } return conn } diff --git a/packetbeat/protos/cassandra/trans.go b/packetbeat/protos/cassandra/trans.go index 62d36ee3695..9b055d22c88 100644 --- a/packetbeat/protos/cassandra/trans.go +++ b/packetbeat/protos/cassandra/trans.go @@ -33,6 +33,8 @@ type transactions struct { responses messageList onTransaction transactionHandler + + watcher procs.ProcessesWatcher } type transactionConfig struct { @@ -46,8 +48,9 @@ type messageList struct { head, tail *message } -func (trans *transactions) init(c *transactionConfig, cb transactionHandler) { +func (trans *transactions) init(c *transactionConfig, watcher procs.ProcessesWatcher, cb transactionHandler) { trans.config = c + trans.watcher = watcher trans.onTransaction = cb } @@ -59,7 +62,7 @@ func (trans *transactions) onMessage( var err error msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(&msg.Tuple) + msg.CmdlineTuple = trans.watcher.FindProcessesTupleTCP(&msg.Tuple) if msg.IsRequest { if isDebug { diff --git a/packetbeat/protos/dhcpv4/dhcpv4.go b/packetbeat/protos/dhcpv4/dhcpv4.go index 10d299aea76..323f46f2a54 100644 --- a/packetbeat/protos/dhcpv4/dhcpv4.go +++ b/packetbeat/protos/dhcpv4/dhcpv4.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/ecs/code/go/ecs" ) @@ -45,12 +46,13 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { - return newPlugin(testMode, results, cfg) + return newPlugin(testMode, results, watcher, cfg) } -func newPlugin(testMode bool, results protos.Reporter, cfg *common.Config) (*dhcpv4Plugin, error) { +func newPlugin(testMode bool, results protos.Reporter, watcher procs.ProcessesWatcher, cfg *common.Config) (*dhcpv4Plugin, error) { config := defaultConfig if !testMode { @@ -62,14 +64,16 @@ func newPlugin(testMode bool, results protos.Reporter, cfg *common.Config) (*dhc return &dhcpv4Plugin{ dhcpv4Config: config, report: results, + watcher: watcher, log: logp.NewLogger("dhcpv4"), }, nil } type dhcpv4Plugin struct { dhcpv4Config - report protos.Reporter - log *logp.Logger + report protos.Reporter + watcher procs.ProcessesWatcher + log *logp.Logger } func (p *dhcpv4Plugin) GetPorts() []int { diff --git a/packetbeat/protos/dhcpv4/dhcpv4_test.go b/packetbeat/protos/dhcpv4/dhcpv4_test.go index 1f7d416248a..e695ecf00cf 100644 --- a/packetbeat/protos/dhcpv4/dhcpv4_test.go +++ b/packetbeat/protos/dhcpv4/dhcpv4_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -81,7 +82,7 @@ var ( func TestParseDHCPRequest(t *testing.T) { logp.TestingSetup() - p, err := newPlugin(true, nil, nil) + p, err := newPlugin(true, nil, procs.ProcessesWatcher{}, nil) if err != nil { t.Fatal(err) } @@ -165,7 +166,7 @@ func TestParseDHCPRequest(t *testing.T) { } func TestParseDHCPACK(t *testing.T) { - p, err := newPlugin(true, nil, nil) + p, err := newPlugin(true, nil, procs.ProcessesWatcher{}, nil) if err != nil { t.Fatal(err) } diff --git a/packetbeat/protos/dns/dns.go b/packetbeat/protos/dns/dns.go index 8fbf402b6b4..15aa154276f 100644 --- a/packetbeat/protos/dns/dns.go +++ b/packetbeat/protos/dns/dns.go @@ -38,6 +38,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -55,6 +56,7 @@ type dnsPlugin struct { transactionTimeout time.Duration results protos.Reporter // Channel where results are pushed. + watcher procs.ProcessesWatcher } var ( @@ -220,6 +222,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &dnsPlugin{} @@ -230,13 +233,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (dns *dnsPlugin) init(results protos.Reporter, config *dnsConfig) error { +func (dns *dnsPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *dnsConfig) error { dns.setFromConfig(config) dns.transactions = common.NewCacheWithRemovalListener( dns.transactionTimeout, @@ -252,6 +255,7 @@ func (dns *dnsPlugin) init(results protos.Reporter, config *dnsConfig) error { dns.transactions.StartJanitor(dns.transactionTimeout) dns.results = results + dns.watcher = watcher return nil } diff --git a/packetbeat/protos/dns/dns_tcp.go b/packetbeat/protos/dns/dns_tcp.go index 310cf43553e..bbf7e736926 100644 --- a/packetbeat/protos/dns/dns_tcp.go +++ b/packetbeat/protos/dns/dns_tcp.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -150,7 +149,7 @@ func (dns *dnsPlugin) handleDNS(conn *dnsConnectionData, tcpTuple *common.TCPTup message := conn.data[dir].message dnsTuple := dnsTupleFromIPPort(&message.tuple, transportTCP, decodedData.Id) - message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcpTuple.IPPort()) + message.cmdlineTuple = dns.watcher.FindProcessesTupleTCP(tcpTuple.IPPort()) message.data = decodedData message.length += decodeOffset diff --git a/packetbeat/protos/dns/dns_test.go b/packetbeat/protos/dns/dns_test.go index c5ee52eb5eb..17303783fca 100644 --- a/packetbeat/protos/dns/dns_test.go +++ b/packetbeat/protos/dns/dns_test.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -111,7 +112,7 @@ func newDNS(store *eventStore, verbose bool) *dnsPlugin { "send_request": true, "send_response": true, }) - dns, err := New(false, callback, cfg) + dns, err := New(false, callback, procs.ProcessesWatcher{}, cfg) if err != nil { panic(err) } diff --git a/packetbeat/protos/dns/dns_udp.go b/packetbeat/protos/dns/dns_udp.go index 652e03bb717..c1a22c7536f 100644 --- a/packetbeat/protos/dns/dns_udp.go +++ b/packetbeat/protos/dns/dns_udp.go @@ -20,7 +20,6 @@ package dns import ( "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -47,7 +46,7 @@ func (dns *dnsPlugin) ParseUDP(pkt *protos.Packet) { dnsMsg := &dnsMessage{ ts: pkt.Ts, tuple: pkt.Tuple, - cmdlineTuple: procs.ProcWatcher.FindProcessesTupleUDP(&pkt.Tuple), + cmdlineTuple: dns.watcher.FindProcessesTupleUDP(&pkt.Tuple), data: dnsPkt, length: packetSize, } diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index 4b2367c0239..3dd7484822e 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -97,6 +97,7 @@ type httpPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -111,6 +112,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &httpPlugin{} @@ -121,19 +123,20 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Init initializes the HTTP protocol analyser. -func (http *httpPlugin) init(results protos.Reporter, config *httpConfig) error { +func (http *httpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *httpConfig) error { http.setFromConfig(config) isDebug = logp.IsDebug("http") isDetailed = logp.IsDebug("httpdetailed") http.results = results + http.watcher = watcher return nil } @@ -435,7 +438,7 @@ func (http *httpPlugin) handleHTTP( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = http.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) http.hideHeaders(m) if m.isRequest { diff --git a/packetbeat/protos/http/http_test.go b/packetbeat/protos/http/http_test.go index 2e2995ff463..53f55214851 100644 --- a/packetbeat/protos/http/http_test.go +++ b/packetbeat/protos/http/http_test.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -88,7 +89,7 @@ func httpModForTests(store *eventStore) *httpPlugin { callback = store.publish } - http, err := New(false, callback, common.NewConfig()) + http, err := New(false, callback, procs.ProcessesWatcher{}, common.NewConfig()) if err != nil { panic(err) } diff --git a/packetbeat/protos/icmp/icmp.go b/packetbeat/protos/icmp/icmp.go index 6fb210fd871..f86dd291886 100644 --- a/packetbeat/protos/icmp/icmp.go +++ b/packetbeat/protos/icmp/icmp.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/packetbeat/flows" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/tsg/gopacket/layers" @@ -45,6 +46,7 @@ type icmpPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } type ICMPv4Processor interface { @@ -74,7 +76,7 @@ var ( duplicateRequests = monitoring.NewInt(nil, "icmp.duplicate_requests") ) -func New(testMode bool, results protos.Reporter, cfg *common.Config) (*icmpPlugin, error) { +func New(testMode bool, results protos.Reporter, watcher procs.ProcessesWatcher, cfg *common.Config) (*icmpPlugin, error) { p := &icmpPlugin{} config := defaultConfig if !testMode { @@ -83,13 +85,13 @@ func New(testMode bool, results protos.Reporter, cfg *common.Config) (*icmpPlugi } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (icmp *icmpPlugin) init(results protos.Reporter, config *icmpConfig) error { +func (icmp *icmpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *icmpConfig) error { icmp.setFromConfig(config) var err error @@ -112,6 +114,7 @@ func (icmp *icmpPlugin) init(results protos.Reporter, config *icmpConfig) error icmp.transactions.StartJanitor(icmp.transactionTimeout) icmp.results = results + icmp.watcher = watcher return nil } diff --git a/packetbeat/protos/icmp/icmp_test.go b/packetbeat/protos/icmp/icmp_test.go index 3ad537fa7d4..fc9508fbcdc 100644 --- a/packetbeat/protos/icmp/icmp_test.go +++ b/packetbeat/protos/icmp/icmp_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/tsg/gopacket" @@ -60,7 +61,7 @@ func TestIcmpDirection(t *testing.T) { func BenchmarkIcmpProcessICMPv4(b *testing.B) { logp.TestingSetup(logp.WithSelectors("icmp", "icmpdetailed")) - icmp, err := New(true, func(beat.Event) {}, common.NewConfig()) + icmp, err := New(true, func(beat.Event) {}, procs.ProcessesWatcher{}, common.NewConfig()) if err != nil { b.Error("Failed to create ICMP processor") return diff --git a/packetbeat/protos/memcache/memcache.go b/packetbeat/protos/memcache/memcache.go index e59550287a5..39bfccd255a 100644 --- a/packetbeat/protos/memcache/memcache.go +++ b/packetbeat/protos/memcache/memcache.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" ) @@ -38,6 +39,7 @@ import ( type memcache struct { ports protos.PortsConfig results protos.Reporter + watcher procs.ProcessesWatcher config parserConfig udpMemcache @@ -131,6 +133,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &memcache{} @@ -141,14 +144,14 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Called to initialize the Plugin -func (mc *memcache) init(results protos.Reporter, config *memcacheConfig) error { +func (mc *memcache) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *memcacheConfig) error { debug("init memcache plugin") mc.handler = mc @@ -158,6 +161,7 @@ func (mc *memcache) init(results protos.Reporter, config *memcacheConfig) error mc.udpConnections = make(map[common.HashableIPPortTuple]*udpConnection) mc.results = results + mc.watcher = watcher return nil } diff --git a/packetbeat/protos/memcache/memcache_test.go b/packetbeat/protos/memcache/memcache_test.go index b36483770c1..641dec070e1 100644 --- a/packetbeat/protos/memcache/memcache_test.go +++ b/packetbeat/protos/memcache/memcache_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" ) type memcacheTest struct { @@ -36,7 +37,7 @@ type memcacheTest struct { func newMemcacheTest(config memcacheConfig) *memcacheTest { mct := &memcacheTest{} mc := &memcache{} - mc.init(nil, &config) + mc.init(nil, procs.ProcessesWatcher{}, &config) mc.handler = mct mct.mc = mc return mct diff --git a/packetbeat/protos/memcache/plugin_tcp.go b/packetbeat/protos/memcache/plugin_tcp.go index e9dded17dd6..830a0cd64a5 100644 --- a/packetbeat/protos/memcache/plugin_tcp.go +++ b/packetbeat/protos/memcache/plugin_tcp.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -191,7 +190,7 @@ func (mc *memcache) onTCPMessage( ) error { msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tuple) + msg.CmdlineTuple = mc.watcher.FindProcessesTupleTCP(tuple) if msg.IsRequest { return mc.onTCPRequest(conn, tuple, dir, msg) diff --git a/packetbeat/protos/memcache/plugin_udp.go b/packetbeat/protos/memcache/plugin_udp.go index 850c6e421fb..441b286a49e 100644 --- a/packetbeat/protos/memcache/plugin_udp.go +++ b/packetbeat/protos/memcache/plugin_udp.go @@ -27,7 +27,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/streambuf" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" ) @@ -184,7 +183,7 @@ func (mc *memcache) onUDPMessage( } msg.Tuple = *tuple msg.Transport = applayer.TransportUDP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleUDP(tuple) + msg.CmdlineTuple = mc.watcher.FindProcessesTupleUDP(tuple) done := false var err error diff --git a/packetbeat/protos/mongodb/mongodb.go b/packetbeat/protos/mongodb/mongodb.go index ac3e66dca5e..28a9350840e 100644 --- a/packetbeat/protos/mongodb/mongodb.go +++ b/packetbeat/protos/mongodb/mongodb.go @@ -47,6 +47,7 @@ type mongodbPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } type transactionKey struct { @@ -65,6 +66,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &mongodbPlugin{} @@ -75,13 +77,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (mongodb *mongodbPlugin) init(results protos.Reporter, config *mongodbConfig) error { +func (mongodb *mongodbPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *mongodbConfig) error { debugf("Init a MongoDB protocol parser") mongodb.setFromConfig(config) @@ -94,6 +96,7 @@ func (mongodb *mongodbPlugin) init(results protos.Reporter, config *mongodbConfi protos.DefaultTransactionHashSize) mongodb.responses.StartJanitor(mongodb.transactionTimeout) mongodb.results = results + mongodb.watcher = watcher return nil } @@ -218,7 +221,7 @@ func (mongodb *mongodbPlugin) handleMongodb( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = mongodb.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isResponse { debugf("MongoDB response message") diff --git a/packetbeat/protos/mongodb/mongodb_test.go b/packetbeat/protos/mongodb/mongodb_test.go index 639a2ee7e78..2debae92dff 100644 --- a/packetbeat/protos/mongodb/mongodb_test.go +++ b/packetbeat/protos/mongodb/mongodb_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -46,7 +47,7 @@ func mongodbModForTests() (*eventStore, *mongodbPlugin) { var mongodb mongodbPlugin results := &eventStore{} config := defaultConfig - mongodb.init(results.publish, &config) + mongodb.init(results.publish, procs.ProcessesWatcher{}, &config) return results, &mongodb } diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index 4d08debf976..506b6c30ca8 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -158,6 +158,7 @@ type mysqlPlugin struct { prepareStatementTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher // function pointer for mocking handleMysql func(mysql *mysqlPlugin, m *mysqlMessage, tcp *common.TCPTuple, @@ -171,6 +172,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &mysqlPlugin{} @@ -181,13 +183,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (mysql *mysqlPlugin) init(results protos.Reporter, config *mysqlConfig) error { +func (mysql *mysqlPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *mysqlConfig) error { mysql.setFromConfig(config) mysql.transactions = common.NewCache( @@ -203,6 +205,7 @@ func (mysql *mysqlPlugin) init(results protos.Reporter, config *mysqlConfig) err mysql.handleMysql = handleMysql mysql.results = results + mysql.watcher = watcher return nil } @@ -651,7 +654,7 @@ func handleMysql(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple, m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = mysql.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) m.raw = rawMsg if m.isRequest { diff --git a/packetbeat/protos/mysql/mysql_test.go b/packetbeat/protos/mysql/mysql_test.go index d55917114ee..4e9123c5617 100644 --- a/packetbeat/protos/mysql/mysql_test.go +++ b/packetbeat/protos/mysql/mysql_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" "github.com/elastic/beats/v7/packetbeat/publish" @@ -60,7 +61,7 @@ func mysqlModForTests(store *eventStore) *mysqlPlugin { var mysql mysqlPlugin config := defaultConfig config.Ports = []int{serverPort} - mysql.init(callback, &config) + mysql.init(callback, procs.ProcessesWatcher{}, &config) return &mysql } diff --git a/packetbeat/protos/nfs/rpc.go b/packetbeat/protos/nfs/rpc.go index f115cf19fba..9cde7ab5aac 100644 --- a/packetbeat/protos/nfs/rpc.go +++ b/packetbeat/protos/nfs/rpc.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -70,6 +71,7 @@ func init() { func New( testMode bool, results protos.Reporter, + _ procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &rpc{} diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index 69bfb468887..5ad6f6e305a 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -49,6 +49,7 @@ type pgsqlPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher // function pointer for mocking handlePgsql func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcp *common.TCPTuple, @@ -140,6 +141,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &pgsqlPlugin{} @@ -150,13 +152,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) error { +func (pgsql *pgsqlPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *pgsqlConfig) error { pgsql.setFromConfig(config) pgsql.log = logp.NewLogger("pgsql") @@ -170,6 +172,7 @@ func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) err pgsql.transactions.StartJanitor(pgsql.transactionTimeout) pgsql.handlePgsql = handlePgsql pgsql.results = results + pgsql.watcher = watcher return nil } @@ -379,7 +382,7 @@ var handlePgsql = func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcptuple *common.TCP m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = pgsql.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { pgsql.receivedPgsqlRequest(m) diff --git a/packetbeat/protos/pgsql/pgsql_test.go b/packetbeat/protos/pgsql/pgsql_test.go index db735c64a5d..737f2905a91 100644 --- a/packetbeat/protos/pgsql/pgsql_test.go +++ b/packetbeat/protos/pgsql/pgsql_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -56,7 +57,7 @@ func pgsqlModForTests(store *eventStore) *pgsqlPlugin { var pgsql pgsqlPlugin config := defaultConfig - pgsql.init(callback, &config) + pgsql.init(callback, procs.ProcessesWatcher{}, &config) return &pgsql } diff --git a/packetbeat/protos/protos.go b/packetbeat/protos/protos.go index 9991458eb2b..e0343a0ee87 100644 --- a/packetbeat/protos/protos.go +++ b/packetbeat/protos/protos.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" ) const ( @@ -92,11 +93,12 @@ type ProtocolsStruct struct { udp map[Protocol]UDPPlugin } -// Singleton of Protocols type. -var Protos = ProtocolsStruct{ - all: map[Protocol]protocolInstance{}, - tcp: map[Protocol]TCPPlugin{}, - udp: map[Protocol]UDPPlugin{}, +func NewProtocols() *ProtocolsStruct { + return &ProtocolsStruct{ + all: map[Protocol]protocolInstance{}, + tcp: map[Protocol]TCPPlugin{}, + udp: map[Protocol]UDPPlugin{}, + } } type protocolInstance struct { @@ -111,6 +113,7 @@ type reporterFactory interface { func (s ProtocolsStruct) Init( testMode bool, pub reporterFactory, + watcher procs.ProcessesWatcher, configs map[string]*common.Config, listConfigs []*common.Config, ) error { @@ -123,7 +126,7 @@ func (s ProtocolsStruct) Init( } for name, config := range configs { - if err := s.configureProtocol(testMode, pub, name, config); err != nil { + if err := s.configureProtocol(testMode, pub, watcher, name, config); err != nil { return err } } @@ -136,7 +139,7 @@ func (s ProtocolsStruct) Init( return err } - if err := s.configureProtocol(testMode, pub, module.Name, config); err != nil { + if err := s.configureProtocol(testMode, pub, watcher, module.Name, config); err != nil { return err } } @@ -147,6 +150,7 @@ func (s ProtocolsStruct) Init( func (s ProtocolsStruct) configureProtocol( testMode bool, pub reporterFactory, + watcher procs.ProcessesWatcher, name string, config *common.Config, ) error { @@ -182,7 +186,7 @@ func (s ProtocolsStruct) configureProtocol( } } - inst, err := plugin(testMode, results, config) + inst, err := plugin(testMode, results, watcher, config) if err != nil { logp.Err("Failed to register protocol plugin: %v", err) return err diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go index bf23e94836f..23dd1ad8696 100644 --- a/packetbeat/protos/redis/redis.go +++ b/packetbeat/protos/redis/redis.go @@ -55,6 +55,7 @@ type redisPlugin struct { transactionTimeout time.Duration queueConfig MessageQueueConfig + watcher procs.ProcessesWatcher results protos.Reporter } @@ -75,6 +76,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &redisPlugin{} @@ -85,16 +87,17 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (redis *redisPlugin) init(results protos.Reporter, config *redisConfig) error { +func (redis *redisPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *redisConfig) error { redis.setFromConfig(config) redis.results = results + redis.watcher = watcher isDebug = logp.IsDebug("redis") return nil @@ -247,7 +250,7 @@ func (redis *redisPlugin) handleRedis( ) { m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = redis.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { // wait for response diff --git a/packetbeat/protos/registry.go b/packetbeat/protos/registry.go index f1fc17b7074..1d1bd2c7b88 100644 --- a/packetbeat/protos/registry.go +++ b/packetbeat/protos/registry.go @@ -22,11 +22,14 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + + "github.com/elastic/beats/v7/packetbeat/procs" ) type ProtocolPlugin func( testMode bool, results Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (Plugin, error) diff --git a/packetbeat/protos/sip/parser.go b/packetbeat/protos/sip/parser.go index 55e66045e95..7ee5a10bb5b 100644 --- a/packetbeat/protos/sip/parser.go +++ b/packetbeat/protos/sip/parser.go @@ -87,6 +87,7 @@ const ( ) type parser struct { + watcher procs.ProcessesWatcher } type parsingInfo struct { @@ -117,15 +118,17 @@ var ( nameVia = []byte("via") ) -func newParser() *parser { - return &parser{} +func newParser(watcher procs.ProcessesWatcher) *parser { + return &parser{ + watcher: watcher, + } } func (parser *parser) parse(pi *parsingInfo) (*message, error) { m := &message{ ts: pi.pkt.Ts, ipPortTuple: pi.pkt.Tuple, - cmdlineTuple: procs.ProcWatcher.FindProcessesTupleTCP(&pi.pkt.Tuple), + cmdlineTuple: parser.watcher.FindProcessesTupleTCP(&pi.pkt.Tuple), rawData: pi.data, } for pi.parseOffset < len(pi.data) { diff --git a/packetbeat/protos/sip/plugin.go b/packetbeat/protos/sip/plugin.go index 14b56aeda45..0c28b5eb3a4 100644 --- a/packetbeat/protos/sip/plugin.go +++ b/packetbeat/protos/sip/plugin.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -46,6 +47,7 @@ type plugin struct { keepOriginal bool results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -60,6 +62,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { cfgwarn.Beta("packetbeat SIP protocol is used") @@ -72,19 +75,20 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Init initializes the HTTP protocol analyser. -func (p *plugin) init(results protos.Reporter, config *config) error { +func (p *plugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *config) error { p.setFromConfig(config) isDebug = logp.IsDebug("sip") isDetailed = logp.IsDebug("sipdetailed") p.results = results + p.watcher = watcher return nil } @@ -112,7 +116,7 @@ func (p *plugin) doParse(pkt *protos.Packet) error { detailedf("Payload received: [%s]", pkt.Payload) } - parser := newParser() + parser := newParser(p.watcher) pi := newParsingInfo(pkt) m, err := parser.parse(pi) diff --git a/packetbeat/protos/sip/plugin_test.go b/packetbeat/protos/sip/plugin_test.go index d8c09f5b307..5b09f522aff 100644 --- a/packetbeat/protos/sip/plugin_test.go +++ b/packetbeat/protos/sip/plugin_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -114,7 +115,7 @@ func TestParseUDP(t *testing.T) { gotEvent = &evt } const data = "INVITE sip:test@10.0.2.15:5060 SIP/2.0\r\nVia: SIP/2.0/UDP 10.0.2.20:5060;branch=z9hG4bK-2187-1-0\r\nFrom: \"DVI4/8000\" ;tag=1\r\nTo: test \r\nCall-ID: 1-2187@10.0.2.20\r\nCSeq: 1 INVITE\r\nContact: sip:sipp@10.0.2.20:5060\r\nMax-Forwards: 70\r\nContent-Type: application/sdp\r\nContent-Length: 123\r\n\r\nv=0\r\no=- 42 42 IN IP4 10.0.2.20\r\ns=-\r\nc=IN IP4 10.0.2.20\r\nt=0 0\r\nm=audio 6000 RTP/AVP 5\r\na=rtpmap:5 DVI4/8000\r\na=recvonly\r\n" - p, _ := New(true, reporter, nil) + p, _ := New(true, reporter, procs.ProcessesWatcher{}, nil) plugin := p.(*plugin) plugin.ParseUDP(&protos.Packet{ Ts: time.Now(), diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index 092d1f6310c..c014e870c6b 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/stretchr/testify/assert" @@ -44,7 +45,7 @@ var ( ) func init() { - new := func(_ bool, _ protos.Reporter, _ *common.Config) (protos.Plugin, error) { + new := func(_ bool, _ protos.Reporter, _ procs.ProcessesWatcher, _ *common.Config) (protos.Plugin, error) { return &TestProtocol{}, nil } diff --git a/packetbeat/protos/thrift/thrift.go b/packetbeat/protos/thrift/thrift.go index 8c15b9bdf9c..d9778031d76 100644 --- a/packetbeat/protos/thrift/thrift.go +++ b/packetbeat/protos/thrift/thrift.go @@ -57,6 +57,7 @@ type thriftPlugin struct { publishQueue chan *thriftTransaction results protos.Reporter + watcher procs.ProcessesWatcher idl *thriftIdl } @@ -182,6 +183,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &thriftPlugin{} @@ -192,7 +194,7 @@ func New( } } - if err := p.init(testMode, results, &config); err != nil { + if err := p.init(testMode, results, watcher, &config); err != nil { return nil, err } return p, nil @@ -201,6 +203,7 @@ func New( func (thrift *thriftPlugin) init( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, config *thriftConfig, ) error { thrift.InitDefaults() @@ -218,6 +221,7 @@ func (thrift *thriftPlugin) init( if !testMode { thrift.publishQueue = make(chan *thriftTransaction, 1000) thrift.results = results + thrift.watcher = watcher go thrift.publishTransactions() } @@ -894,7 +898,7 @@ func (thrift *thriftPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8 // all ok, go to next level stream.message.tcpTuple = *tcptuple stream.message.direction = dir - stream.message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + stream.message.cmdlineTuple = thrift.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if stream.message.frameSize == 0 { stream.message.frameSize = uint32(stream.parseOffset - stream.message.start) } diff --git a/packetbeat/protos/thrift/thrift_test.go b/packetbeat/protos/thrift/thrift_test.go index 2c6618bab77..e1eca793e42 100644 --- a/packetbeat/protos/thrift/thrift_test.go +++ b/packetbeat/protos/thrift/thrift_test.go @@ -26,13 +26,14 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) func thriftForTests() *thriftPlugin { t := &thriftPlugin{} config := defaultConfig - t.init(true, nil, &config) + t.init(true, nil, procs.ProcessesWatcher{}, &config) return t } diff --git a/packetbeat/protos/tls/tls.go b/packetbeat/protos/tls/tls.go index 74034c4afaf..e91c78d69b8 100644 --- a/packetbeat/protos/tls/tls.go +++ b/packetbeat/protos/tls/tls.go @@ -60,6 +60,7 @@ type tlsPlugin struct { fingerprints []*FingerprintAlgorithm transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -78,6 +79,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &tlsPlugin{} @@ -88,18 +90,19 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (plugin *tlsPlugin) init(results protos.Reporter, config *tlsConfig) error { +func (plugin *tlsPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *tlsConfig) error { if err := plugin.setFromConfig(config); err != nil { return err } plugin.results = results + plugin.watcher = watcher isDebug = logp.IsDebug("tls") return nil @@ -178,7 +181,7 @@ func (plugin *tlsPlugin) doParse( st := conn.streams[dir] if st == nil { st = newStream(tcptuple) - st.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + st.cmdlineTuple = plugin.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) conn.streams[dir] = st } diff --git a/packetbeat/protos/tls/tls_test.go b/packetbeat/protos/tls/tls_test.go index 512294f2d4f..90aadb07a95 100644 --- a/packetbeat/protos/tls/tls_test.go +++ b/packetbeat/protos/tls/tls_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -66,7 +67,7 @@ func testInit() (*eventStore, *tlsPlugin) { logp.TestingSetup(logp.WithSelectors("tls", "tlsdetailed")) results := &eventStore{} - tls, err := New(true, results.publish, nil) + tls, err := New(true, results.publish, procs.ProcessesWatcher{}, nil) if err != nil { return nil, nil } diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index 7890a1c173d..bd904030b59 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -84,6 +84,7 @@ func (p *TransactionPublisher) CreateReporter( // load and register the module it's fields, tags and processors settings meta := struct { + Index string `config:"index"` Event common.EventMetadata `config:",inline"` Processors processors.PluginConfig `config:"processors"` KeepNull bool `config:"keep_null"` @@ -107,6 +108,9 @@ func (p *TransactionPublisher) CreateReporter( if p.canDrop { clientConfig.PublishMode = beat.DropIfFull } + if meta.Index != "" { + clientConfig.Processing.Meta = common.MapStr{"index": meta.Index} + } client, err := p.pipeline.ConnectWith(clientConfig) if err != nil { diff --git a/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl b/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl index 4f7ad362bfe..c25d06f0d65 100644 --- a/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl +++ b/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl @@ -16,6 +16,8 @@ type transactions struct { responses messageList onTransaction transactionHandler + + watcher procs.ProcessesWatcher } type transactionConfig struct { @@ -29,8 +31,9 @@ type messageList struct { head, tail *message } -func (trans *transactions) init(c *transactionConfig, cb transactionHandler) { +func (trans *transactions) init(c *transactionConfig, watcher procs.ProcessesWatcher, cb transactionHandler) { trans.config = c + trans.watcher = watcher trans.onTransaction = cb } @@ -43,7 +46,7 @@ func (trans *transactions) onMessage( msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(&msg.Tuple) + msg.CmdlineTuple = trans.watcher.FindProcessesTuple(&msg.Tuple) if msg.IsRequest { if isDebug { diff --git a/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl b/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl index f783e840301..8af08842fd4 100644 --- a/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl +++ b/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl @@ -6,6 +6,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -15,6 +16,7 @@ type {plugin_type} struct { ports protos.PortsConfig parserConfig parserConfig transConfig transactionConfig + watcher procs.ProcessesWatcher pub transPub } @@ -45,6 +47,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &{plugin_type}{} @@ -55,17 +58,18 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func ({plugin_var} *{plugin_type}) init(results protos.Reporter, config *{protocol}Config) error { +func ({plugin_var} *{plugin_type}) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *{protocol}Config) error { if err := {plugin_var}.setFromConfig(config); err != nil { return err } {plugin_var}.pub.results = results + {plugin_var}.watcher = watcher isDebug = logp.IsDebug("http") return nil @@ -162,7 +166,7 @@ func ({plugin_var} *{plugin_type}) ensureConnection(private protos.ProtocolData) conn := getConnection(private) if conn == nil { conn = &connection{} - conn.trans.init(&{plugin_var}.transConfig, {plugin_var}.pub.onTransaction) + conn.trans.init(&{plugin_var}.transConfig, {plugin_var}.watcher, {plugin_var}.pub.onTransaction) } return conn }