Skip to content

Commit

Permalink
Fix: Windows Agent Left Unhealthy After Removing Endpoint Integration (
Browse files Browse the repository at this point in the history
…#1286)

* Add stop timeout to the endpoint spec
* Service watcher
* Wire in the service watcher
* Remove waiting on the service stop since Endpoint should not be stopped

(cherry picked from commit a8ad2da)

# Conflicts:
#	internal/pkg/agent/program/supported.go
  • Loading branch information
aleksmaus authored and mergify[bot] committed Oct 24, 2022
1 parent 9da6ba5 commit 0532806
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 17 deletions.
4 changes: 4 additions & 0 deletions changelog/fragments/1666611696-fix_service_stop_timeout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
kind: bug-fix
summary: "Fix: Windows Agent Left Unhealthy After Removing Endpoint Integration"
pr: 1286
issue: 1262
3 changes: 2 additions & 1 deletion internal/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(outputType string, output inter
if len(hosts) == 0 {
return nil, false
}
//nolint:prealloc // false positive
var modules []interface{}
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")

Expand Down Expand Up @@ -668,7 +669,7 @@ func normalizeHTTPCopyRules(name string) []map[string]interface{} {
return fromToMap
}

for _, exportedMetric := range spec.ExprtedMetrics {
for _, exportedMetric := range spec.ExportedMetrics {
fromToMap = append(fromToMap, map[string]interface{}{
"from": fmt.Sprintf("http.agent.%s", exportedMetric),
"to": exportedMetric,
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func TestExportedMetrics(t *testing.T) {
programName := "testing"
expectedMetricsName := "metric_name"
program.SupportedMap[programName] = program.Spec{ExprtedMetrics: []string{expectedMetricsName}}
program.SupportedMap[programName] = program.Spec{ExportedMetrics: []string{expectedMetricsName}}

exportedMetrics := normalizeHTTPCopyRules(programName)

Expand Down
16 changes: 15 additions & 1 deletion internal/pkg/agent/program/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io/ioutil"
"path/filepath"
"time"

"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -40,7 +41,20 @@ type Spec struct {
When string `yaml:"when"`
Constraints string `yaml:"constraints"`
RestartOnOutputChange bool `yaml:"restart_on_output_change,omitempty"`
ExprtedMetrics []string `yaml:"exported_metrics,omitempty"`
ExportedMetrics []string `yaml:"exported_metrics,omitempty"`
Process *ProcessSettings `yaml:"process,omitempty"`
}

// ProcessSettings process specific settings
type ProcessSettings struct {
// Allows to override the agent stop timeout settings and specify a different stop timeout for Endpoint service
StopTimeout time.Duration `yaml:"stop_timeout"`
}

// Service info
type ServiceInfo struct {
Name string `yaml:"name"`
Label string `yaml:"label"`
}

// ReadSpecs reads all the specs that match the provided globbing path.
Expand Down
44 changes: 44 additions & 0 deletions internal/pkg/agent/program/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
package program

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -147,3 +150,44 @@ func TestExport(t *testing.T) {
require.NoError(t, err)
}
}

func TestSerializationProcessSettings(t *testing.T) {
ymlTmpl := `name: "Foobar"
process:
stop_timeout: %v`

tests := []struct {
name string
tonum int
to time.Duration
}{
{"zero", 0, 0},
{"180ns", 180, 0},
{"180s", 0, 120 * time.Second},
{"3m", 0, 3 * time.Minute},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
yml string
wantTimeout time.Duration
)
if tc.to == 0 {
yml = fmt.Sprintf(ymlTmpl, tc.tonum)
wantTimeout = time.Duration(tc.tonum)
} else {
yml = fmt.Sprintf(ymlTmpl, tc.to)
wantTimeout = tc.to
}
var spec Spec
err := yaml.Unmarshal([]byte(yml), &spec)
if err != nil {
t.Fatal(err)
}
diff := cmp.Diff(wantTimeout, spec.Process.StopTimeout)
if diff != "" {
t.Fatal(diff)
}
})
}
}
4 changes: 4 additions & 0 deletions internal/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 36 additions & 14 deletions internal/pkg/core/plugin/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (a *Application) Start(ctx context.Context, _ app.Taggable, cfg map[string]
// already started
if a.srvState != nil {
a.setState(state.Starting, "Starting", nil)
a.srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload)
a.srvState.UpdateConfig(a.srvState.Config())
_ = a.srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload)
_ = a.srvState.UpdateConfig(a.srvState.Config())
} else {
a.setState(state.Starting, "Starting", nil)
a.srvState, err = a.srv.Register(a, string(cfgStr))
Expand Down Expand Up @@ -247,6 +247,13 @@ func (a *Application) Configure(ctx context.Context, config map[string]interface
return err
}

func (a *Application) getStopTimeout() time.Duration {
if a.desc.Spec().Process != nil && a.desc.Spec().Process.StopTimeout > 0 {
return a.desc.Spec().Process.StopTimeout
}
return a.processConfig.StopTimeout
}

// Stop stops the current application.
func (a *Application) Stop() {
a.appLock.Lock()
Expand All @@ -257,21 +264,36 @@ func (a *Application) Stop() {
return
}

if err := srvState.Stop(a.processConfig.StopTimeout); err != nil {
a.appLock.Lock()
a.setState(
state.Failed,
fmt.Errorf("failed to stop after %s: %w", a.processConfig.StopTimeout, err).Error(),
nil)
} else {
a.appLock.Lock()
a.setState(state.Stopped, "Stopped", nil)
name := a.desc.Spec().Name
to := a.getStopTimeout()

a.logger.Infof("Stop %v service, with %v timeout", name, to)
start := time.Now()

// Try to stop the service with timeout
// If timed out and the service is still not stopped the runtime is set to STOPPED state anyways.
// This avoids leaving the runtime indefinitely in the failed state.
//
// The Agent is not managing the Endpoint service state by design.
// The service runtime should send STOPPING state to the Endpoint service only before the Endpoint is expected to be uninstalled.
// So if the Agent never receives the STOPPING check-in from the Endpoint after this, it's ok to set the state
// to STOPPED following with the Endpoint service uninstall.
if err := srvState.Stop(to); err != nil {
// Log the error
a.logger.Errorf("Failed to stop %v service after %v timeout", name, to)
}
a.srvState = nil

// Cleanup
a.appLock.Lock()
defer a.appLock.Unlock()

a.srvState = nil
a.cleanUp()
a.stopCredsListener()
a.appLock.Unlock()

// Set the service state to "stopped", otherwise the agent is stuck in the failed stop state until restarted
a.logger.Infof("setting %s service status to Stopped, took: %v", name, time.Since(start))
a.setState(state.Stopped, "Stopped", nil)
}

// Shutdown disconnects the service, but doesn't signal it to stop.
Expand Down Expand Up @@ -327,7 +349,7 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in
}

func (a *Application) cleanUp() {
a.monitor.Cleanup(a.desc.Spec(), a.pipelineID)
_ = a.monitor.Cleanup(a.desc.Spec(), a.pipelineID)
}

func (a *Application) startCredsListener() error {
Expand Down
Loading

0 comments on commit 0532806

Please sign in to comment.