Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/change the initialization of management layer #30694

Merged
merged 22 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fixes Beats crashing when glibc >= 2.35 is used {issue}30576[30576]
- Log errors when parsing and applying config blocks and if the input is disabled. {pull}30534[30534]
- Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668]
- Ensure that the Reloadable part of beats are initialized before the Manager is started. {issue}30533[30533]
- Ignore bugfix version when running version compatibility check against Elasticsearch. {pull}30746[30746]

*Auditbeat*
Expand Down
8 changes: 8 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
adiscover.Start()

// We start the manager when all the subsystem are initialized and ready to received events.
if err := b.Manager.Start(); err != nil {
return err
}

// Add done channel to wait for shutdown signal
waitFinished.AddChan(fb.done)
waitFinished.Wait()
Expand Down Expand Up @@ -409,6 +414,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
}

// Stop the manager and stop the connection to any dependent services.
b.Manager.Stop()

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
return err
}
}
// Configure the beats Manager to start after all the reloadable hooks are initialized
// and shutdown when the function return.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
Expand Down
5 changes: 2 additions & 3 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,8 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {

logp.Info("%s start running.", b.Info.Beat)

// Launch config manager
b.Manager.Start(beater.Stop)
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
defer b.Manager.Stop()
ph marked this conversation as resolved.
Show resolved Hide resolved
// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(beater.Stop)

return beater.Run(&b.Beat)
}
Expand Down
40 changes: 29 additions & 11 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,38 @@ type Manager interface {
// Enabled returns true if manager is enabled.
Enabled() bool

// Start the config manager giving it a stopFunc callback
// so the beat can be told when to stop.
Start(stopFunc func())

// Stop the config manager.
// Start needs to invoked when the system is ready to receive an external configuration and
ph marked this conversation as resolved.
Show resolved Hide resolved
// also ready to start ingesting new events. The manager expects that all the reloadable and
// reloadable list are fixed for the whole lifetime of the manager.
//
// Notes: Adding dynamically new reloadable hooks at runtime can lead to inconsistency in the
// execution.
Start() error

ph marked this conversation as resolved.
Show resolved Hide resolved
// Stop when this method is called, the manager will stop receiving new actions, no more action
// will be propagated to the handlers and will not try to configure any reloadable parts.
// When the manager is stop the callback will be called to signal that the system can terminate.
ph marked this conversation as resolved.
Show resolved Hide resolved
//
// Calls to 'CheckRawConfig()' or 'SetPayload()' will be ignored after calling stop.
//
// Note: Stop will not call 'UnregisterAction()' automaticallty.
Stop()
ph marked this conversation as resolved.
Show resolved Hide resolved

// SetStopCallback accepts a function that need to be called when the manager want to shutdown the
// beats. This is needed when you want your beats to be gracefully shutdown remotely by the Elastic Agent
// when a policy doesn't need to run this beat.
SetStopCallback(f func())
ph marked this conversation as resolved.
Show resolved Hide resolved

// CheckRawConfig check settings are correct before launching the beat.
CheckRawConfig(cfg *common.Config) error

// RegisterAction registers action handler with the client
RegisterAction(action client.Action)

// UnregisterAction unregisters action handler with the client
UnregisterAction(action client.Action)

// SetPayload sets the client payload
// SetPayload Allows to add additional metadata to future requests made by the manager.
ph marked this conversation as resolved.
Show resolved Hide resolved
SetPayload(map[string]interface{})
}

Expand Down Expand Up @@ -136,10 +152,11 @@ func defaultModeConfig() *modeConfig {

// nilManager, fallback when no manager is present
type nilManager struct {
logger *logp.Logger
lock sync.Mutex
status Status
msg string
logger *logp.Logger
lock sync.Mutex
status Status
msg string
stopFunc func()
}

func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
Expand All @@ -151,8 +168,9 @@ func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
}, nil
}

func (*nilManager) SetStopCallback(func()) {}
func (*nilManager) Enabled() bool { return false }
func (*nilManager) Start(_ func()) {}
func (*nilManager) Start() error { return nil }
func (*nilManager) Stop() {}
func (*nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
func (n *nilManager) UpdateStatus(status Status, msg string) {
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
modules.Stop()
}()

// Start the manager after all the reload hooks are configured,
// the Manager is stopped at the end of the execution.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

// Dynamic file based modules (metricbeat.config.modules)
if bt.config.ConfigModules.Enabled() {
moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules)
Expand Down Expand Up @@ -256,6 +263,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
}

wg.Wait()

return nil
}

Expand Down
13 changes: 11 additions & 2 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,19 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error {
runner := newReloader(management.DebugK, factory, b.Publisher)
reload.Register.MustRegisterList("inputs", runner)
defer runner.Stop()

logp.Debug("main", "Waiting for the runner to finish")

// Start the manager after all the hooks are registered and terminates when
// the function return.
if err := b.Manager.Start(); err != nil {
return err
}

defer func() {
runner.Stop()
b.Manager.Stop()
}()

for {
select {
case <-pb.done:
Expand Down
30 changes: 22 additions & 8 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,36 @@ func (cm *Manager) Enabled() bool {
return cm.config.Enabled
}

// Start the config manager
func (cm *Manager) Start(stopFunc func()) {
if !cm.Enabled() {
return
}
// SetStopCallback sets the callback to run when the manager want to shutdown the beats gracefully.
func (cm *Manager) SetStopCallback(stopFunc func()) {
cm.lock.Lock()
defer cm.lock.Unlock()
cm.stopFunc = stopFunc
}

// Start the config manager.
func (cm *Manager) Start() error {
cm.lock.Lock()
defer cm.lock.Unlock()

if !cm.Enabled() {
return nil
}

cfgwarn.Beta("Fleet management is enabled")
cm.logger.Info("Starting fleet management service")

cm.stopFunc = stopFunc
cm.isRunning = true
err := cm.client.Start(context.Background())
if err != nil {
cm.logger.Errorf("failed to start elastic-agent-client: %s", err)
return err
}
cm.logger.Info("Ready to receive configuration")
return nil
}

// Stop the config manager
// Stop stops the current Manager and close the connection to Elastic Agent.
func (cm *Manager) Stop() {
cm.lock.Lock()
defer cm.lock.Unlock()
Expand All @@ -133,6 +142,8 @@ func (cm *Manager) Stop() {
// CheckRawConfig check settings are correct to start the beat. This method
// checks there are no collision between the existing configuration and what
// fleet management can configure.
//
// NOTE: This is currently not implemented for fleet.
func (cm *Manager) CheckRawConfig(cfg *common.Config) error {
// TODO implement this method
return nil
Expand Down Expand Up @@ -217,6 +228,9 @@ func (cm *Manager) SetPayload(payload map[string]interface{}) {
}

func (cm *Manager) OnStop() {
cm.lock.Lock()
defer cm.lock.Unlock()

if cm.stopFunc != nil {
cm.client.Status(proto.StateObserved_STOPPING, "Stopping", nil)
cm.stopFunc()
Expand Down Expand Up @@ -320,7 +334,7 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (ConfigBlocks, error) {
for _, regName := range cm.registry.GetRegisteredNames() {
iBlock, err := cfg.GetValue(regName)
if err != nil {
cm.logger.Errorf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
cm.logger.Warnf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
ph marked this conversation as resolved.
Show resolved Hide resolved
continue
}

Expand Down
7 changes: 7 additions & 0 deletions x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
runner.Update(ctx, bt.config.Inputs)
}

// Ensure that all the hooks and actions are ready before starting the Manager
// to receive configuration.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

// Set the osquery beat version to the manager payload. This allows the bundled osquery version to be reported to the stack.
bt.setManagerPayload(b)

Expand Down