Skip to content

Commit

Permalink
Do not propagate gateway errors. (#2203) (#2214)
Browse files Browse the repository at this point in the history
Do not propagate gateway errors. (#2203)

(cherry picked from commit 4107f0f)

Co-authored-by: Michal Pristas <michal.pristas@gmail.com>
  • Loading branch information
mergify[bot] and michalpristas authored Feb 1, 2023
1 parent 82c01e1 commit 08a62e3
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 38 deletions.
9 changes: 0 additions & 9 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type fleetGateway struct {
checkinFailCounter int
stateFetcher coordinator.StateFetcher
stateStore stateStore
errCh chan error
actionCh chan []fleetapi.Action
}

Expand Down Expand Up @@ -119,7 +118,6 @@ func newFleetGatewayWithScheduler(
acker: acker,
stateFetcher: stateFetcher,
stateStore: stateStore,
errCh: make(chan error),
actionCh: make(chan []fleetapi.Action, 1),
}, nil
}
Expand Down Expand Up @@ -169,11 +167,6 @@ func (f *fleetGateway) Run(ctx context.Context) error {
}
}

// Errors returns the channel to watch for reported errors.
func (f *fleetGateway) Errors() <-chan error {
return f.errCh
}

func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
bo.Reset()

Expand Down Expand Up @@ -205,10 +198,8 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
)

f.log.Error(err)
f.errCh <- err
return nil, err
}
f.errCh <- err
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package fleet
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -18,7 +19,6 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
Expand Down Expand Up @@ -404,27 +404,15 @@ func (e *emptyStateFetcher) State(_ bool) coordinator.State {
}

func runFleetGateway(ctx context.Context, g gateway.FleetGateway) <-chan error {
done := make(chan bool)
errCh := make(chan error, 1)
go func() {
err := g.Run(ctx)
close(done)
if err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
} else {
errCh <- nil
}
}()
go func() {
for {
select {
case <-done:
return
case <-g.Errors():
// ignore errors here
}
}
}()
return errCh
}

Expand Down
3 changes: 0 additions & 3 deletions internal/pkg/agent/application/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ type FleetGateway interface {
// Run runs the gateway.
Run(ctx context.Context) error

// Errors returns the channel to watch for reported errors.
Errors() <-chan error

// Actions returns the channel to watch for new actions from the fleet-server.
Actions() <-chan []fleetapi.Action

Expand Down
13 changes: 0 additions & 13 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,8 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
policyChanger.AddSetter(ack)
}

// Proxy errors from the gateway to our own channel.
gatewayErrorsRunner := runner.Start(context.Background(), func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case err := <-gateway.Errors():
m.errCh <- err
}
}
})

// Run the gateway.
gatewayRunner := runner.Start(gatewayCtx, func(ctx context.Context) error {
defer gatewayErrorsRunner.Stop()
return gateway.Run(ctx)
})

Expand Down

0 comments on commit 08a62e3

Please sign in to comment.