From a0378ecd8a708bee54d7aff3a2eb112b701e2ab8 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 2 Sep 2021 12:15:22 +0200 Subject: [PATCH] Ignore context cancell in status reporting (#27668) (#27701) (cherry picked from commit d601e3f0cf6ca17ab5d4691119231f5400ab8574) Co-authored-by: Michal Pristas --- .../pkg/agent/operation/operation.go | 4 +++ .../pkg/agent/operation/operator.go | 9 ++++-- .../pkg/core/plugin/process/app.go | 4 +++ .../pkg/core/plugin/service/app.go | 4 +++ x-pack/elastic-agent/pkg/core/state/state.go | 18 +++++++++++ .../elastic-agent/pkg/core/status/reporter.go | 4 +++ x-pack/libbeat/management/manager.go | 31 ++++++++++++++----- 7 files changed, 65 insertions(+), 9 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation.go b/x-pack/elastic-agent/pkg/agent/operation/operation.go index eb47d053772..8dbedfd831d 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation.go @@ -72,7 +72,11 @@ type ApplicationStatusHandler struct{} // // It updates the status of the application and handles restarting the application is needed. func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) { + if state.IsStateFiltered(msg, payload) { + return + } app, ok := s.App().(Application) + if !ok { panic(errors.New("only Application can be registered when using the ApplicationStatusHandler", errors.TypeUnexpected)) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index df2f2b7cd99..922246050dd 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -151,8 +151,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) (err error) { _, stateID, steps, ack, err := o.stateResolver.Resolve(cfg) if err != nil { - o.statusReporter.Update(state.Failed, err.Error(), nil) - return errors.New(err, errors.TypeConfig, fmt.Sprintf("operator: failed to resolve configuration %s, error: %v", cfg, err)) + if err == filterContextCancelled(err) { + // error is not filtered and should be reported + o.statusReporter.Update(state.Failed, err.Error(), nil) + err = errors.New(err, errors.TypeConfig, fmt.Sprintf("operator: failed to resolve configuration %s, error: %v", cfg, err)) + } + + return err } o.statusController.UpdateStateID(stateID) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index 8ae64358b89..37568828cac 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -256,6 +256,10 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState { func (a *Application) setState(s state.Status, msg string, payload map[string]interface{}) { if a.state.Status != s || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) { + if state.IsStateFiltered(msg, payload) { + return + } + a.state.Status = s a.state.Message = msg a.state.Payload = payload diff --git a/x-pack/elastic-agent/pkg/core/plugin/service/app.go b/x-pack/elastic-agent/pkg/core/plugin/service/app.go index e92dd3b66a5..19ef5aca60c 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/service/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/service/app.go @@ -308,6 +308,10 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St func (a *Application) setState(s state.Status, msg string, payload map[string]interface{}) { if a.state.Status != s || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) { + if state.IsStateFiltered(msg, payload) { + return + } + a.state.Status = s a.state.Message = msg a.state.Payload = payload diff --git a/x-pack/elastic-agent/pkg/core/state/state.go b/x-pack/elastic-agent/pkg/core/state/state.go index 98319ad3315..97e6ed07d7c 100644 --- a/x-pack/elastic-agent/pkg/core/state/state.go +++ b/x-pack/elastic-agent/pkg/core/state/state.go @@ -5,6 +5,9 @@ package state import ( + "context" + "strings" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process" @@ -37,6 +40,10 @@ const ( Stopping = Status(proto.StateObserved_STOPPING) ) +var filteredErrors = []string{ + context.Canceled.Error(), +} + // IsInternal returns true if the status is an internal status and not something that should be reported // over the protocol as an actual status. func (s Status) IsInternal() bool { @@ -79,3 +86,14 @@ type Reporter interface { // OnStateChange is called when state changes. OnStateChange(id string, name string, state State) } + +// IsStateFiltered returns true if state message contains error out of predefined +// collection of ignored errors. +func IsStateFiltered(msg string, payload map[string]interface{}) bool { + for _, e := range filteredErrors { + if strings.Contains(msg, e) { + return true + } + } + return false +} diff --git a/x-pack/elastic-agent/pkg/core/status/reporter.go b/x-pack/elastic-agent/pkg/core/status/reporter.go index e986a0a43cf..3add6b188c8 100644 --- a/x-pack/elastic-agent/pkg/core/status/reporter.go +++ b/x-pack/elastic-agent/pkg/core/status/reporter.go @@ -267,6 +267,10 @@ func (r *reporter) Update(s state.Status, message string, payload map[string]int if !r.isRegistered { return } + if state.IsStateFiltered(message, payload) { + return + } + if r.status != s || r.message != message || !reflect.DeepEqual(r.payload, payload) { r.status = s r.message = message diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go index ed476cffaf8..5ae65a69523 100644 --- a/x-pack/libbeat/management/manager.go +++ b/x-pack/libbeat/management/manager.go @@ -25,6 +25,10 @@ import ( lbmanagement "github.com/elastic/beats/v7/libbeat/management" ) +var notReportedErrors = []error{ + context.Canceled, +} + // Manager handles internal config updates. By retrieving // new configs from Kibana and applying them to the Beat. type Manager struct { @@ -147,6 +151,22 @@ func (cm *Manager) UpdateStatus(status lbmanagement.Status, msg string) { } } +// updateStatusWithError updates the manager with the current status for the beat with error. +func (cm *Manager) updateStatusWithError(err error) { + if err == nil { + return + } + + for _, e := range notReportedErrors { + if errors.Is(err, e) { + return + } + } + + cm.logger.Error(err) + cm.UpdateStatus(lbmanagement.Failed, err.Error()) +} + func (cm *Manager) OnConfig(s string) { cm.UpdateStatus(lbmanagement.Configuring, "Updating configuration") @@ -154,30 +174,27 @@ func (cm *Manager) OnConfig(s string) { uconfig, err := common.NewConfigFrom(s) if err != nil { err = errors.Wrap(err, "config blocks unsuccessfully generated") - cm.logger.Error(err) - cm.UpdateStatus(lbmanagement.Failed, err.Error()) + cm.updateStatusWithError(err) return } err = uconfig.Unpack(&configMap) if err != nil { err = errors.Wrap(err, "config blocks unsuccessfully generated") - cm.logger.Error(err) - cm.UpdateStatus(lbmanagement.Failed, err.Error()) + cm.updateStatusWithError(err) return } blocks, err := cm.toConfigBlocks(configMap) if err != nil { err = errors.Wrap(err, "failed to parse configuration") - cm.logger.Error(err) - cm.UpdateStatus(lbmanagement.Failed, err.Error()) + cm.updateStatusWithError(err) return } if errs := cm.apply(blocks); errs != nil { // `cm.apply` already logs the errors; currently allow beat to run degraded - cm.UpdateStatus(lbmanagement.Failed, errs.Error()) + cm.updateStatusWithError(err) return }