Skip to content

Commit

Permalink
Ignore context cancell in status reporting (elastic#27668)
Browse files Browse the repository at this point in the history
  • Loading branch information
michalpristas authored and wiwen committed Nov 1, 2021
1 parent a836b93 commit aaaefab
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 9 deletions.
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ type ApplicationStatusHandler struct{}
//
// It updates the status of the application and handles restarting the application is needed.
func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) {
if state.IsStateFiltered(msg, payload) {
return
}
app, ok := s.App().(Application)

if !ok {
panic(errors.New("only Application can be registered when using the ApplicationStatusHandler", errors.TypeUnexpected))
}
Expand Down
9 changes: 7 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) (err error) {

_, stateID, steps, ack, err := o.stateResolver.Resolve(cfg)
if err != nil {
o.statusReporter.Update(state.Failed, err.Error(), nil)
return errors.New(err, errors.TypeConfig, fmt.Sprintf("operator: failed to resolve configuration %s, error: %v", cfg, err))
if err == filterContextCancelled(err) {
// error is not filtered and should be reported
o.statusReporter.Update(state.Failed, err.Error(), nil)
err = errors.New(err, errors.TypeConfig, fmt.Sprintf("operator: failed to resolve configuration %s, error: %v", cfg, err))
}

return err
}
o.statusController.UpdateStateID(stateID)

Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {

func (a *Application) setState(s state.Status, msg string, payload map[string]interface{}) {
if a.state.Status != s || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) {
if state.IsStateFiltered(msg, payload) {
return
}

a.state.Status = s
a.state.Message = msg
a.state.Payload = payload
Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St

func (a *Application) setState(s state.Status, msg string, payload map[string]interface{}) {
if a.state.Status != s || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) {
if state.IsStateFiltered(msg, payload) {
return
}

a.state.Status = s
a.state.Message = msg
a.state.Payload = payload
Expand Down
18 changes: 18 additions & 0 deletions x-pack/elastic-agent/pkg/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package state

import (
"context"
"strings"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
Expand Down Expand Up @@ -37,6 +40,10 @@ const (
Stopping = Status(proto.StateObserved_STOPPING)
)

var filteredErrors = []string{
context.Canceled.Error(),
}

// IsInternal returns true if the status is an internal status and not something that should be reported
// over the protocol as an actual status.
func (s Status) IsInternal() bool {
Expand Down Expand Up @@ -79,3 +86,14 @@ type Reporter interface {
// OnStateChange is called when state changes.
OnStateChange(id string, name string, state State)
}

// IsStateFiltered returns true if state message contains error out of predefined
// collection of ignored errors.
func IsStateFiltered(msg string, payload map[string]interface{}) bool {
for _, e := range filteredErrors {
if strings.Contains(msg, e) {
return true
}
}
return false
}
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/core/status/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ func (r *reporter) Update(s state.Status, message string, payload map[string]int
if !r.isRegistered {
return
}
if state.IsStateFiltered(message, payload) {
return
}

if r.status != s || r.message != message || !reflect.DeepEqual(r.payload, payload) {
r.status = s
r.message = message
Expand Down
31 changes: 24 additions & 7 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
)

var notReportedErrors = []error{
context.Canceled,
}

// Manager handles internal config updates. By retrieving
// new configs from Kibana and applying them to the Beat.
type Manager struct {
Expand Down Expand Up @@ -147,37 +151,50 @@ func (cm *Manager) UpdateStatus(status lbmanagement.Status, msg string) {
}
}

// updateStatusWithError updates the manager with the current status for the beat with error.
func (cm *Manager) updateStatusWithError(err error) {
if err == nil {
return
}

for _, e := range notReportedErrors {
if errors.Is(err, e) {
return
}
}

cm.logger.Error(err)
cm.UpdateStatus(lbmanagement.Failed, err.Error())
}

func (cm *Manager) OnConfig(s string) {
cm.UpdateStatus(lbmanagement.Configuring, "Updating configuration")

var configMap common.MapStr
uconfig, err := common.NewConfigFrom(s)
if err != nil {
err = errors.Wrap(err, "config blocks unsuccessfully generated")
cm.logger.Error(err)
cm.UpdateStatus(lbmanagement.Failed, err.Error())
cm.updateStatusWithError(err)
return
}

err = uconfig.Unpack(&configMap)
if err != nil {
err = errors.Wrap(err, "config blocks unsuccessfully generated")
cm.logger.Error(err)
cm.UpdateStatus(lbmanagement.Failed, err.Error())
cm.updateStatusWithError(err)
return
}

blocks, err := cm.toConfigBlocks(configMap)
if err != nil {
err = errors.Wrap(err, "failed to parse configuration")
cm.logger.Error(err)
cm.UpdateStatus(lbmanagement.Failed, err.Error())
cm.updateStatusWithError(err)
return
}

if errs := cm.apply(blocks); errs != nil {
// `cm.apply` already logs the errors; currently allow beat to run degraded
cm.UpdateStatus(lbmanagement.Failed, errs.Error())
cm.updateStatusWithError(err)
return
}

Expand Down

0 comments on commit aaaefab

Please sign in to comment.