Skip to content

Commit

Permalink
Refactor packetbeat to support agent-based configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Stucki committed Oct 26, 2020
1 parent 155dfda commit c18fed9
Show file tree
Hide file tree
Showing 53 changed files with 805 additions and 289 deletions.
275 changes: 75 additions & 200 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,25 @@
package beater

import (
"errors"
"flag"
"fmt"
"sync"
"time"

"github.com/tsg/gopacket/layers"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/service"

"github.com/elastic/beats/v7/packetbeat/config"
"github.com/elastic/beats/v7/packetbeat/decoder"
"github.com/elastic/beats/v7/packetbeat/flows"
"github.com/elastic/beats/v7/packetbeat/procs"
"github.com/elastic/beats/v7/packetbeat/protos"
"github.com/elastic/beats/v7/packetbeat/protos/icmp"
"github.com/elastic/beats/v7/packetbeat/protos/tcp"
"github.com/elastic/beats/v7/packetbeat/protos/udp"
"github.com/elastic/beats/v7/packetbeat/publish"
"github.com/elastic/beats/v7/packetbeat/sniffer"

// Add packetbeat default processors
_ "github.com/elastic/beats/v7/packetbeat/processor/add_kubernetes_metadata"
)

// Beater object. Contains all objects needed to run the beat
type packetbeat struct {
config config.Config
cmdLineArgs flags
sniff *sniffer.Sniffer

// publisher/pipeline
pipeline beat.Pipeline
transPub *publish.TransactionPublisher
flows *flows.Flows
}

type flags struct {
file *string
loop *int
Expand All @@ -79,8 +57,8 @@ func init() {
}
}

func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config := config.Config{
func initialConfig() config.Config {
return config.Config{
Interfaces: config.InterfacesConfig{
File: *cmdLineArgs.file,
Loop: *cmdLineArgs.loop,
Expand All @@ -89,111 +67,59 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
Dumpfile: *cmdLineArgs.dumpfile,
},
}
}

// Beater object. Contains all objects needed to run the beat
type packetbeat struct {
config *common.Config
factory *processorFactory
publisher *publish.TransactionPublisher
shutdownTimeout time.Duration
done chan struct{}
}

func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config := initialConfig()
err := rawConfig.Unpack(&config)
if err != nil {
logp.Err("fails to read the beat config: %v, %v", err, config)
return nil, err
}

pb := &packetbeat{
config: config,
cmdLineArgs: cmdLineArgs,
}
err = pb.init(b)
if err != nil {
return nil, err
}

return pb, nil
}

// init packetbeat components
func (pb *packetbeat) init(b *beat.Beat) error {
var err error
cfg := &pb.config
watcher := procs.ProcessesWatcher{}
// Enable the process watcher only if capturing live traffic
if cfg.Interfaces.File == "" {
err = procs.ProcWatcher.Init(cfg.Procs)
if config.Interfaces.File == "" {
err = watcher.Init(config.Procs)
if err != nil {
logp.Critical(err.Error())
return err
return nil, err
}
} else {
logp.Info("Process watcher disabled when file input is used")
}

pb.pipeline = b.Publisher
pb.transPub, err = publish.NewTransactionPublisher(
publisher, err := publish.NewTransactionPublisher(
b.Info.Name,
b.Publisher,
pb.config.IgnoreOutgoing,
pb.config.Interfaces.File == "",
config.IgnoreOutgoing,
config.Interfaces.File == "",
)
if err != nil {
return err
}

logp.Debug("main", "Initializing protocol plugins")
err = protos.Protos.Init(false, pb.transPub, cfg.Protocols, cfg.ProtocolsList)
if err != nil {
return fmt.Errorf("Initializing protocol analyzers failed: %v", err)
}

if err := pb.setupFlows(); err != nil {
return err
}

return pb.setupSniffer()
}

func (pb *packetbeat) setupSniffer() error {
config := &pb.config

icmp, err := pb.icmpConfig()
if err != nil {
return err
}

withVlans := config.Interfaces.WithVlans
withICMP := icmp.Enabled()

filter := config.Interfaces.BpfFilter
if filter == "" && !config.Flows.IsEnabled() {
filter = protos.Protos.BpfFilter(withVlans, withICMP)
}

pb.sniff, err = sniffer.New(false, filter, pb.createWorker, config.Interfaces)
return err
}

func (pb *packetbeat) setupFlows() error {
config := &pb.config
if !config.Flows.IsEnabled() {
return nil
}

processors, err := processors.New(config.Flows.Processors)
if err != nil {
return err
}

client, err := pb.pipeline.ConnectWith(beat.ClientConfig{
Processing: beat.ProcessingConfig{
EventMetadata: config.Flows.EventMetadata,
Processor: processors,
KeepNull: config.Flows.KeepNull,
},
})
if err != nil {
return err
return nil, err
}

pb.flows, err = flows.NewFlows(client.PublishAll, config.Flows)
if err != nil {
return err
factory := newProcessorFactory(b.Info.Name, make(chan error, 1), publisher)
if err := factory.CheckConfig(rawConfig); err != nil {
return nil, err
}

return nil
return &packetbeat{
config: rawConfig,
shutdownTimeout: config.ShutdownTimeout,
factory: factory,
publisher: publisher,
done: make(chan struct{}),
}, nil
}

func (pb *packetbeat) Run(b *beat.Beat) error {
Expand All @@ -205,114 +131,63 @@ func (pb *packetbeat) Run(b *beat.Beat) error {
}
}()

defer pb.transPub.Stop()
defer pb.publisher.Stop()

timeout := pb.config.ShutdownTimeout
timeout := pb.shutdownTimeout
if timeout > 0 {
defer time.Sleep(timeout)
}

if pb.flows != nil {
pb.flows.Start()
defer pb.flows.Stop()
if !b.Manager.Enabled() {
return pb.runStatic(b, pb.factory)
}
return pb.runManaged(b, pb.factory)
}

var wg sync.WaitGroup
errC := make(chan error, 1)

// Run the sniffer in background
wg.Add(1)
go func() {
defer wg.Done()
func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
runner, err := factory.Create(b.Publisher, pb.config)
if err != nil {
return err
}
runner.Start()
defer runner.Stop()

err := pb.sniff.Run()
if err != nil {
errC <- fmt.Errorf("Sniffer main loop failed: %v", err)
}
}()
logp.Debug("main", "Waiting for the runner to finish")

logp.Debug("main", "Waiting for the sniffer to finish")
wg.Wait()
select {
default:
case err := <-errC:
case <-pb.done:
case err := <-factory.err:
close(pb.done)
return err
}

return nil
}

// Called by the Beat stop function
func (pb *packetbeat) Stop() {
logp.Info("Packetbeat send stop signal")
pb.sniff.Stop()
}

func (pb *packetbeat) createWorker(dl layers.LinkType) (sniffer.Worker, error) {
var icmp4 icmp.ICMPv4Processor
var icmp6 icmp.ICMPv6Processor
cfg, err := pb.icmpConfig()
if err != nil {
return nil, err
}
if cfg.Enabled() {
reporter, err := pb.transPub.CreateReporter(cfg)
if err != nil {
return nil, err
func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error {
runner := newReloader(management.DebugK, factory, b.Publisher)
reload.Register.MustRegister(b.Info.Beat, runner)
defer runner.Stop()

logp.Debug("main", "Waiting for the runner to finish")

for {
select {
case <-pb.done:
return nil
case err := <-factory.err:
// when we're managed we don't want
// to stop if the sniffer exited without an error
// this would happen during a configuration reload
if err != nil {
close(pb.done)
return err
}
}

icmp, err := icmp.New(false, reporter, cfg)
if err != nil {
return nil, err
}

icmp4 = icmp
icmp6 = icmp
}

tcp, err := tcp.NewTCP(&protos.Protos)
if err != nil {
return nil, err
}

udp, err := udp.NewUDP(&protos.Protos)
if err != nil {
return nil, err
}

worker, err := decoder.New(pb.flows, dl, icmp4, icmp6, tcp, udp)
if err != nil {
return nil, err
}

return worker, nil
}

func (pb *packetbeat) icmpConfig() (*common.Config, error) {
var icmp *common.Config
if pb.config.Protocols["icmp"].Enabled() {
icmp = pb.config.Protocols["icmp"]
}

for _, cfg := range pb.config.ProtocolsList {
info := struct {
Type string `config:"type" validate:"required"`
}{}

if err := cfg.Unpack(&info); err != nil {
return nil, err
}

if info.Type != "icmp" {
continue
}

if icmp != nil {
return nil, errors.New("More then one icmp configurations found")
}

icmp = cfg
}

return icmp, nil
// Called by the Beat stop function
func (pb *packetbeat) Stop() {
logp.Info("Packetbeat send stop signal")
close(pb.done)
}
Loading

0 comments on commit c18fed9

Please sign in to comment.