Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Packetbeat] Refactor packetbeat for use with Elastic Agent #22134

Merged
merged 9 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 75 additions & 200 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,25 @@
package beater

import (
"errors"
"flag"
"fmt"
"sync"
"time"

"github.com/tsg/gopacket/layers"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/service"

"github.com/elastic/beats/v7/packetbeat/config"
"github.com/elastic/beats/v7/packetbeat/decoder"
"github.com/elastic/beats/v7/packetbeat/flows"
"github.com/elastic/beats/v7/packetbeat/procs"
"github.com/elastic/beats/v7/packetbeat/protos"
"github.com/elastic/beats/v7/packetbeat/protos/icmp"
"github.com/elastic/beats/v7/packetbeat/protos/tcp"
"github.com/elastic/beats/v7/packetbeat/protos/udp"
"github.com/elastic/beats/v7/packetbeat/publish"
"github.com/elastic/beats/v7/packetbeat/sniffer"

// Add packetbeat default processors
_ "github.com/elastic/beats/v7/packetbeat/processor/add_kubernetes_metadata"
)

// Beater object. Contains all objects needed to run the beat
type packetbeat struct {
config config.Config
cmdLineArgs flags
sniff *sniffer.Sniffer

// publisher/pipeline
pipeline beat.Pipeline
transPub *publish.TransactionPublisher
flows *flows.Flows
}

type flags struct {
file *string
loop *int
Expand All @@ -79,8 +57,8 @@ func init() {
}
}

func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config := config.Config{
func initialConfig() config.Config {
andrewstucki marked this conversation as resolved.
Show resolved Hide resolved
return config.Config{
Interfaces: config.InterfacesConfig{
File: *cmdLineArgs.file,
Loop: *cmdLineArgs.loop,
Expand All @@ -89,111 +67,59 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
Dumpfile: *cmdLineArgs.dumpfile,
},
}
}

// Beater object. Contains all objects needed to run the beat
type packetbeat struct {
config *common.Config
factory *processorFactory
publisher *publish.TransactionPublisher
shutdownTimeout time.Duration
done chan struct{}
}

func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config := initialConfig()
err := rawConfig.Unpack(&config)
if err != nil {
logp.Err("fails to read the beat config: %v, %v", err, config)
return nil, err
}

pb := &packetbeat{
config: config,
cmdLineArgs: cmdLineArgs,
}
err = pb.init(b)
if err != nil {
return nil, err
}

return pb, nil
}

// init packetbeat components
func (pb *packetbeat) init(b *beat.Beat) error {
var err error
cfg := &pb.config
watcher := procs.ProcessesWatcher{}
// Enable the process watcher only if capturing live traffic
if cfg.Interfaces.File == "" {
err = procs.ProcWatcher.Init(cfg.Procs)
if config.Interfaces.File == "" {
err = watcher.Init(config.Procs)
if err != nil {
logp.Critical(err.Error())
return err
return nil, err
}
} else {
logp.Info("Process watcher disabled when file input is used")
}

pb.pipeline = b.Publisher
pb.transPub, err = publish.NewTransactionPublisher(
publisher, err := publish.NewTransactionPublisher(
b.Info.Name,
b.Publisher,
pb.config.IgnoreOutgoing,
pb.config.Interfaces.File == "",
config.IgnoreOutgoing,
config.Interfaces.File == "",
)
if err != nil {
return err
}

logp.Debug("main", "Initializing protocol plugins")
err = protos.Protos.Init(false, pb.transPub, cfg.Protocols, cfg.ProtocolsList)
if err != nil {
return fmt.Errorf("Initializing protocol analyzers failed: %v", err)
}

if err := pb.setupFlows(); err != nil {
return err
}

return pb.setupSniffer()
}

func (pb *packetbeat) setupSniffer() error {
config := &pb.config

icmp, err := pb.icmpConfig()
if err != nil {
return err
}

withVlans := config.Interfaces.WithVlans
withICMP := icmp.Enabled()

filter := config.Interfaces.BpfFilter
if filter == "" && !config.Flows.IsEnabled() {
filter = protos.Protos.BpfFilter(withVlans, withICMP)
}

pb.sniff, err = sniffer.New(false, filter, pb.createWorker, config.Interfaces)
return err
}

func (pb *packetbeat) setupFlows() error {
config := &pb.config
if !config.Flows.IsEnabled() {
return nil
}

processors, err := processors.New(config.Flows.Processors)
if err != nil {
return err
}

client, err := pb.pipeline.ConnectWith(beat.ClientConfig{
Processing: beat.ProcessingConfig{
EventMetadata: config.Flows.EventMetadata,
Processor: processors,
KeepNull: config.Flows.KeepNull,
},
})
if err != nil {
return err
return nil, err
}

pb.flows, err = flows.NewFlows(client.PublishAll, config.Flows)
if err != nil {
return err
factory := newProcessorFactory(b.Info.Name, make(chan error, 1), publisher)
if err := factory.CheckConfig(rawConfig); err != nil {
return nil, err
}

return nil
return &packetbeat{
config: rawConfig,
shutdownTimeout: config.ShutdownTimeout,
factory: factory,
publisher: publisher,
done: make(chan struct{}),
}, nil
}

func (pb *packetbeat) Run(b *beat.Beat) error {
Expand All @@ -205,114 +131,63 @@ func (pb *packetbeat) Run(b *beat.Beat) error {
}
}()

defer pb.transPub.Stop()
defer pb.publisher.Stop()

timeout := pb.config.ShutdownTimeout
timeout := pb.shutdownTimeout
if timeout > 0 {
defer time.Sleep(timeout)
}

if pb.flows != nil {
pb.flows.Start()
defer pb.flows.Stop()
if !b.Manager.Enabled() {
return pb.runStatic(b, pb.factory)
}
return pb.runManaged(b, pb.factory)
}

var wg sync.WaitGroup
errC := make(chan error, 1)

// Run the sniffer in background
wg.Add(1)
go func() {
defer wg.Done()
func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
runner, err := factory.Create(b.Publisher, pb.config)
if err != nil {
return err
}
runner.Start()
defer runner.Stop()

err := pb.sniff.Run()
if err != nil {
errC <- fmt.Errorf("Sniffer main loop failed: %v", err)
}
}()
logp.Debug("main", "Waiting for the runner to finish")

logp.Debug("main", "Waiting for the sniffer to finish")
wg.Wait()
select {
default:
case err := <-errC:
case <-pb.done:
case err := <-factory.err:
close(pb.done)
return err
}

return nil
}

// Called by the Beat stop function
func (pb *packetbeat) Stop() {
logp.Info("Packetbeat send stop signal")
pb.sniff.Stop()
}

func (pb *packetbeat) createWorker(dl layers.LinkType) (sniffer.Worker, error) {
var icmp4 icmp.ICMPv4Processor
var icmp6 icmp.ICMPv6Processor
cfg, err := pb.icmpConfig()
if err != nil {
return nil, err
}
if cfg.Enabled() {
reporter, err := pb.transPub.CreateReporter(cfg)
if err != nil {
return nil, err
func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error {
runner := newReloader(management.DebugK, factory, b.Publisher)
reload.Register.MustRegister(b.Info.Beat, runner)
defer runner.Stop()

logp.Debug("main", "Waiting for the runner to finish")

for {
select {
case <-pb.done:
return nil
case err := <-factory.err:
// when we're managed we don't want
// to stop if the sniffer exited without an error
// this would happen during a configuration reload
if err != nil {
close(pb.done)
return err
}
}

icmp, err := icmp.New(false, reporter, cfg)
if err != nil {
return nil, err
}

icmp4 = icmp
icmp6 = icmp
}

tcp, err := tcp.NewTCP(&protos.Protos)
if err != nil {
return nil, err
}

udp, err := udp.NewUDP(&protos.Protos)
if err != nil {
return nil, err
}

worker, err := decoder.New(pb.flows, dl, icmp4, icmp6, tcp, udp)
if err != nil {
return nil, err
}

return worker, nil
}

func (pb *packetbeat) icmpConfig() (*common.Config, error) {
var icmp *common.Config
if pb.config.Protocols["icmp"].Enabled() {
icmp = pb.config.Protocols["icmp"]
}

for _, cfg := range pb.config.ProtocolsList {
info := struct {
Type string `config:"type" validate:"required"`
}{}

if err := cfg.Unpack(&info); err != nil {
return nil, err
}

if info.Type != "icmp" {
continue
}

if icmp != nil {
return nil, errors.New("More then one icmp configurations found")
}

icmp = cfg
}

return icmp, nil
// Called by the Beat stop function
func (pb *packetbeat) Stop() {
logp.Info("Packetbeat send stop signal")
close(pb.done)
}
Loading