Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore context cancell in status reporting #27668

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ type ApplicationStatusHandler struct{}
//
// It updates the status of the application and handles restarting the application is needed.
func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) {
if state.IsStateFiltered(msg, payload) {
return
}
app, ok := s.App().(Application)

if !ok {
panic(errors.New("only Application can be registered when using the ApplicationStatusHandler", errors.TypeUnexpected))
}
Expand Down
9 changes: 7 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) (err error) {

_, stateID, steps, ack, err := o.stateResolver.Resolve(cfg)
if err != nil {
o.statusReporter.Update(state.Failed, err.Error(), nil)
return errors.New(err, errors.TypeConfig, fmt.Sprintf("operator: failed to resolve configuration %s, error: %v", cfg, err))
if err == filterContextCancelled(err) {
// error is not filtered and should be reported
o.statusReporter.Update(state.Failed, err.Error(), nil)
err = errors.New(err, errors.TypeConfig, fmt.Sprintf("operator: failed to resolve configuration %s, error: %v", cfg, err))
}

return err
}
o.statusController.UpdateStateID(stateID)

Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {

func (a *Application) setState(s state.Status, msg string, payload map[string]interface{}) {
if a.state.Status != s || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) {
if state.IsStateFiltered(msg, payload) {
return
}

a.state.Status = s
a.state.Message = msg
a.state.Payload = payload
Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St

func (a *Application) setState(s state.Status, msg string, payload map[string]interface{}) {
if a.state.Status != s || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) {
if state.IsStateFiltered(msg, payload) {
return
}

a.state.Status = s
a.state.Message = msg
a.state.Payload = payload
Expand Down
18 changes: 18 additions & 0 deletions x-pack/elastic-agent/pkg/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package state

import (
"context"
"strings"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
Expand Down Expand Up @@ -37,6 +40,10 @@ const (
Stopping = Status(proto.StateObserved_STOPPING)
)

var filteredErrors = []string{
context.Canceled.Error(),
}

// IsInternal returns true if the status is an internal status and not something that should be reported
// over the protocol as an actual status.
func (s Status) IsInternal() bool {
Expand Down Expand Up @@ -79,3 +86,14 @@ type Reporter interface {
// OnStateChange is called when state changes.
OnStateChange(id string, name string, state State)
}

// IsStateFiltered returns true if state message contains error out of predefined
// collection of ignored errors.
func IsStateFiltered(msg string, payload map[string]interface{}) bool {
for _, e := range filteredErrors {
if strings.Contains(msg, e) {
return true
}
}
return false
}
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/core/status/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ func (r *reporter) Update(s state.Status, message string, payload map[string]int
if !r.isRegistered {
return
}
if state.IsStateFiltered(message, payload) {
return
}

if r.status != s || r.message != message || !reflect.DeepEqual(r.payload, payload) {
r.status = s
r.message = message
Expand Down
31 changes: 24 additions & 7 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
)

var notReportedErrors = []error{
context.Canceled,
}

// Manager handles internal config updates. By retrieving
// new configs from Kibana and applying them to the Beat.
type Manager struct {
Expand Down Expand Up @@ -147,37 +151,50 @@ func (cm *Manager) UpdateStatus(status lbmanagement.Status, msg string) {
}
}

// updateStatusWithError updates the manager with the current status for the beat with error.
func (cm *Manager) updateStatusWithError(err error) {
if err == nil {
return
}

for _, e := range notReportedErrors {
if errors.Is(err, e) {
return
}
}

cm.logger.Error(err)
cm.UpdateStatus(lbmanagement.Failed, err.Error())
}

func (cm *Manager) OnConfig(s string) {
cm.UpdateStatus(lbmanagement.Configuring, "Updating configuration")

var configMap common.MapStr
uconfig, err := common.NewConfigFrom(s)
if err != nil {
err = errors.Wrap(err, "config blocks unsuccessfully generated")
cm.logger.Error(err)
cm.UpdateStatus(lbmanagement.Failed, err.Error())
cm.updateStatusWithError(err)
return
}

err = uconfig.Unpack(&configMap)
if err != nil {
err = errors.Wrap(err, "config blocks unsuccessfully generated")
cm.logger.Error(err)
cm.UpdateStatus(lbmanagement.Failed, err.Error())
cm.updateStatusWithError(err)
return
}

blocks, err := cm.toConfigBlocks(configMap)
if err != nil {
err = errors.Wrap(err, "failed to parse configuration")
cm.logger.Error(err)
cm.UpdateStatus(lbmanagement.Failed, err.Error())
cm.updateStatusWithError(err)
return
}

if errs := cm.apply(blocks); errs != nil {
// `cm.apply` already logs the errors; currently allow beat to run degraded
cm.UpdateStatus(lbmanagement.Failed, errs.Error())
cm.updateStatusWithError(err)
return
}

Expand Down