Skip to content

Commit

Permalink
Allow Beats to increase publisher internal queue size (#22650) (#22896)
Browse files Browse the repository at this point in the history
This is a mitigation to fix a problem with Beats that use
PublishMode=DropIfFull (only Packetbeat).

This parameter is meant to drop events when the configured queue is
full. However, due to the way the mem and spool queues are implemented,
this is actually dropping events when the internal "publisher" queue is
full. Due to the small hardcoded size in this queue (20 events), it can
get full easily when the publisher queue is not full, just because a
burst of events is published and the consumer goroutine doesn't react
fast enough to consume them.

Changes in this PR allows Beats to set a parameter in instance.Settings
to request a larger internal queue and Packetbeat is updated to request
a queue for up to 400 events. All other Beats remain unchanged.

(cherry picked from commit 1657b5a)
  • Loading branch information
adriansr authored Dec 3, 2020
1 parent 0c2bc7a commit 4309c8b
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ same journal. {pull}18467[18467]
port. {pull}19209[19209]
- Add support for overriding the published index on a per-protocol/flow basis. {pull}22134[22134]
- Change build process for x-pack distribution {pull}21979[21979]
- Tuned the internal queue size to reduce the chances of events being dropped. {pull}22650[22650]


*Functionbeat*
Expand All @@ -640,6 +641,7 @@ port. {pull}19209[19209]
- Add file.pe and process.pe fields to ProcessCreate & LoadImage events in Sysmon module. {issue}17335[17335] {pull}22217[22217]

*Elastic Log Driver*

- Add support for `docker logs` command {pull}19531[19531]
- Add new winlogbeat security dashboard {pull}18775[18775]

Expand Down
38 changes: 25 additions & 13 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Beat struct {

keystore keystore.Keystore
processing processing.Supporter

InputQueueSize int // Size of the producer queue used by most queues.
}

type beatConfig struct {
Expand Down Expand Up @@ -335,29 +337,37 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, errors.New(msg)
}
}
pipeline, err := pipeline.Load(b.Info,
pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
},
b.Config.Pipeline,
b.processing,
b.makeOutputFactory(b.Config.Output),
)

var publisher *pipeline.Pipeline
monitors := pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
outputFactory := b.makeOutputFactory(b.Config.Output)
settings := pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
Processors: b.processing,
InputQueueSize: b.InputQueueSize,
}
if settings.InputQueueSize > 0 {
publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings)
} else {
publisher, err = pipeline.Load(b.Info, monitors, b.Config.Pipeline, b.processing, outputFactory)
}
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}

reload.Register.MustRegister("output", b.makeOutputReloader(pipeline.OutputReloader()))
reload.Register.MustRegister("output", b.makeOutputReloader(publisher.OutputReloader()))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer pipeline.Close()

b.Publisher = pipeline
b.Publisher = publisher
beater, err := bt(&b.Beat, sub)
if err != nil {
return nil, err
Expand Down Expand Up @@ -591,6 +601,8 @@ func (b *Beat) handleFlags() error {
func (b *Beat) configure(settings Settings) error {
var err error

b.InputQueueSize = settings.InputQueueSize

cfg, err := cfgfile.Load("", settings.ConfigOverrides)
if err != nil {
return fmt.Errorf("error loading config file: %v", err)
Expand Down
5 changes: 5 additions & 0 deletions libbeat/cmd/instance/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ type Settings struct {
Processing processing.SupportFactory

Umask *int

// InputQueueSize is the size for the internal publisher queue in the
// publisher pipeline. This is only useful when the Beat plans to use
// beat.DropIfFull PublishMode. Leave as zero for default.
InputQueueSize int
}
5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func LoadWithSettings(

name := beatInfo.Name

queueBuilder, err := createQueueBuilder(config.Queue, monitors)
queueBuilder, err := createQueueBuilder(config.Queue, monitors, settings.InputQueueSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,6 +169,7 @@ func loadOutput(
func createQueueBuilder(
config common.ConfigNamespace,
monitors Monitors,
inQueueSize int,
) (func(queue.ACKListener) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
Expand All @@ -191,6 +192,6 @@ func createQueueBuilder(
}

return func(ackListener queue.ACKListener) (queue.Queue, error) {
return queueFactory(ackListener, monitors.Logger, queueConfig)
return queueFactory(ackListener, monitors.Logger, queueConfig, inQueueSize)
}, nil
}
2 changes: 2 additions & 0 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Settings struct {
WaitCloseMode WaitCloseMode

Processors processing.Supporter

InputQueueSize int
}

// WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline.
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func init() {
// queueFactory matches the queue.Factory interface, and is used to add the
// disk queue to the registry.
func queueFactory(
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config,
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config, _ int, // input queue size param is unused.
) (queue.Queue, error) {
settings, err := SettingsForUserConfig(cfg)
if err != nil {
Expand Down
27 changes: 22 additions & 5 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

const (
minInputQueueSize = 20
maxInputQueueSizeRatio = 0.1
)

type broker struct {
done chan struct{}

Expand Down Expand Up @@ -56,6 +61,7 @@ type Settings struct {
FlushMinEvents int
FlushTimeout time.Duration
WaitOnClose bool
InputQueueSize int
}

type ackChan struct {
Expand All @@ -82,7 +88,7 @@ func init() {
}

func create(
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config,
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config, inQueueSize int,
) (queue.Queue, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
Expand All @@ -98,6 +104,7 @@ func create(
Events: config.Events,
FlushMinEvents: config.FlushMinEvents,
FlushTimeout: config.FlushTimeout,
InputQueueSize: inQueueSize,
}), nil
}

Expand All @@ -108,16 +115,14 @@ func NewQueue(
logger logger,
settings Settings,
) queue.Queue {
// define internal channel size for producer/client requests
// to the broker
chanSize := 20

var (
sz = settings.Events
minEvents = settings.FlushMinEvents
flushTimeout = settings.FlushTimeout
)

chanSize := AdjustInputQueueSize(settings.InputQueueSize, sz)

if minEvents < 1 {
minEvents = 1
}
Expand Down Expand Up @@ -298,3 +303,15 @@ func (l *chanList) reverse() {
l.prepend(tmp.pop())
}
}

// AdjustInputQueueSize decides the size for the input queue.
func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) {
actual = requested
if max := int(float64(mainQueueSize) * maxInputQueueSizeRatio); mainQueueSize > 0 && actual > max {
actual = max
}
if actual < minInputQueueSize {
actual = minInputQueueSize
}
return actual
}
29 changes: 29 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package memqueue

import (
"flag"
"math"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
)
Expand Down Expand Up @@ -82,3 +85,29 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu
})
}
}

func TestAdjustInputQueueSize(t *testing.T) {
t.Run("zero yields default value (main queue size=0)", func(t *testing.T) {
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(0, 0))
})
t.Run("zero yields default value (main queue size=10)", func(t *testing.T) {
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(0, 10))
})
t.Run("can't go below min", func(t *testing.T) {
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(1, 0))
})
t.Run("can set any value within bounds", func(t *testing.T) {
for q, mainQueue := minInputQueueSize+1, 4096; q < int(float64(mainQueue)*maxInputQueueSizeRatio); q += 10 {
assert.Equal(t, q, AdjustInputQueueSize(q, mainQueue))
}
})
t.Run("can set any value if no upper bound", func(t *testing.T) {
for q := minInputQueueSize + 1; q < math.MaxInt32; q *= 2 {
assert.Equal(t, q, AdjustInputQueueSize(q, 0))
}
})
t.Run("can't go above upper bound", func(t *testing.T) {
mainQueue := 4096
assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue))
})
}
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// Factory for creating a queue used by a pipeline instance.
type Factory func(ACKListener, *logp.Logger, *common.Config) (Queue, error)
type Factory func(ACKListener, *logp.Logger, *common.Config, int) (Queue, error)

// ACKListener listens to special events to be send by queue implementations.
type ACKListener interface {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/spool/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func init() {
}

func create(
ackListener queue.ACKListener, logp *logp.Logger, cfg *common.Config,
ackListener queue.ACKListener, logp *logp.Logger, cfg *common.Config, inQueueSize int,
) (queue.Queue, error) {
cfgwarn.Beta("Spooling to disk is beta")

Expand Down
9 changes: 5 additions & 4 deletions packetbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ func PacketbeatSettings() instance.Settings {
runFlags.AddGoFlag(flag.CommandLine.Lookup("dump"))

return instance.Settings{
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
InputQueueSize: 400,
}
}

Expand Down

0 comments on commit 4309c8b

Please sign in to comment.