Skip to content

Commit

Permalink
[Elastic Agent] Enable log shipping of endpoint-security by Elastic A…
Browse files Browse the repository at this point in the history
…gent (elastic#22526)

* Refactor to pass program.Spec around so custom log paths can be defined in a program spec.

* Fix code.

* Fix formatting.

* Add changelog.

* Fixes from code review.
  • Loading branch information
blakerouse authored Nov 11, 2020
1 parent 952435b commit 4c2c647
Show file tree
Hide file tree
Showing 46 changed files with 290 additions and 230 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@
- Update `fleet.kibana.path` from a POLICY_CHANGE {pull}21804[21804]
- Removed `install-service.ps1` and `uninstall-service.ps1` from Windows .zip packaging {pull}21694[21694]
- Add `priority` to `AddOrUpdate` on dynamic composable input providers communication channel {pull}22352[22352]
- Ship `endpoint-security` logs to elasticsearch {pull}22526[22526]
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri
return "", errors.New(err, "initiating fetcher")
}

path, err := fetcher.Download(ctx, agentName, agentArtifactName, version)
path, err := fetcher.Download(ctx, agentSpec, version)
if err != nil {
return "", errors.New(err, "failed upgrade of agent binary")
}

matches, err := verifier.Verify(agentName, version, agentArtifactName, true)
matches, err := verifier.Verify(agentSpec, version, true)
if err != nil {
return "", errors.New(err, "failed verification of agent binary")
}
Expand Down
16 changes: 12 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
Expand All @@ -26,10 +27,17 @@ import (
)

const (
agentName = "elastic-agent"
hashLen = 6
agentCommitFile = ".elastic-agent.active.commit"
agentArtifactName = "beats/" + agentName
agentName = "elastic-agent"
hashLen = 6
agentCommitFile = ".elastic-agent.active.commit"
)

var (
agentSpec = program.Spec{
Name: "Elastic Agent",
Cmd: agentName,
Artifact: "beats/" + agentName,
}
)

// Upgrader performs an upgrade
Expand Down
12 changes: 6 additions & 6 deletions x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a

// make the download path so the `operation_verify` can ensure the path exists
downloadConfig := operator.config.DownloadConfig
fullPath, err := artifact.GetArtifactPath(p.BinaryName(), p.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
fullPath, err := artifact.GetArtifactPath(p.Spec(), p.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -137,35 +137,35 @@ func waitFor(t *testing.T, check func() error) {

type DummyDownloader struct{}

func (*DummyDownloader) Download(_ context.Context, p, a, v string) (string, error) {
func (*DummyDownloader) Download(_ context.Context, _ program.Spec, _ string) (string, error) {
return "", nil
}

var _ download.Downloader = &DummyDownloader{}

type DummyVerifier struct{}

func (*DummyVerifier) Verify(p, v, _ string, _ bool) (bool, error) {
func (*DummyVerifier) Verify(_ program.Spec, _ string, _ bool) (bool, error) {
return true, nil
}

var _ download.Verifier = &DummyVerifier{}

type DummyInstallerChecker struct{}

func (*DummyInstallerChecker) Check(_ context.Context, p, v, _ string) error {
func (*DummyInstallerChecker) Check(_ context.Context, _ program.Spec, _, _ string) error {
return nil
}

func (*DummyInstallerChecker) Install(_ context.Context, p, v, _ string) error {
func (*DummyInstallerChecker) Install(_ context.Context, _ program.Spec, _, _ string) error {
return nil
}

var _ install.InstallerChecker = &DummyInstallerChecker{}

type DummyUninstaller struct{}

func (*DummyUninstaller) Uninstall(_ context.Context, p, v, _ string) error {
func (*DummyUninstaller) Uninstall(_ context.Context, _ program.Spec, _, _ string) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (o *Operator) getLogFilePaths() map[string][]string {
defer o.appsLock.Unlock()

for _, a := range o.apps {
logPath := a.Monitor().LogPath(a.Name(), o.pipelineID)
logPath := a.Monitor().LogPath(a.Spec(), o.pipelineID)
if logPath != "" {
paths[a.Name()] = append(paths[a.Name()], logPath)
}
Expand All @@ -360,7 +360,7 @@ func (o *Operator) getMetricbeatEndpoints() map[string][]string {
defer o.appsLock.Unlock()

for _, a := range o.apps {
metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Name(), o.pipelineID)
metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Spec(), o.pipelineID)
if metricEndpoint != "" {
endpoints[a.Name()] = append(endpoints[a.Name()], metricEndpoint)
}
Expand Down
17 changes: 11 additions & 6 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
Expand Down Expand Up @@ -154,6 +156,7 @@ func (*testMonitorableApp) Shutdown() {}
func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error {
return nil
}
func (*testMonitorableApp) Spec() program.Spec { return program.Spec{} }
func (*testMonitorableApp) State() state.State { return state.State{} }
func (*testMonitorableApp) SetState(_ state.Status, _ string, _ map[string]interface{}) {}
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }
Expand All @@ -167,20 +170,22 @@ type testMonitor struct {

// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *testMonitor) EnrichArgs(_ string, _ string, args []string, _ bool) []string { return args }
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string, _ bool) []string {
return args
}

// Cleanup cleans up all drops.
func (b *testMonitor) Cleanup(string, string) error { return nil }
func (b *testMonitor) Cleanup(program.Spec, string) error { return nil }

// Close closes the monitor.
func (b *testMonitor) Close() {}

// Prepare executes steps in order for monitoring to work correctly
func (b *testMonitor) Prepare(string, string, int, int) error { return nil }
func (b *testMonitor) Prepare(program.Spec, string, int, int) error { return nil }

// LogPath describes a path where application stores logs. Empty if
// application is not monitorable
func (b *testMonitor) LogPath(string, string) string {
func (b *testMonitor) LogPath(program.Spec, string) string {
if !b.monitorLogs {
return ""
}
Expand All @@ -189,15 +194,15 @@ func (b *testMonitor) LogPath(string, string) string {

// MetricsPath describes a location where application exposes metrics
// collectable by metricbeat.
func (b *testMonitor) MetricsPath(string, string) string {
func (b *testMonitor) MetricsPath(program.Spec, string) string {
if !b.monitorMetrics {
return ""
}
return "path"
}

// MetricsPathPrefixed return metrics path prefixed with http+ prefix.
func (b *testMonitor) MetricsPathPrefixed(string, string) string {
func (b *testMonitor) MetricsPathPrefixed(program.Spec, string) string {
return "http+path"
}

Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
Expand Down Expand Up @@ -42,13 +43,15 @@ type Application interface {
Configure(ctx context.Context, config map[string]interface{}) error
Monitor() monitoring.Monitor
State() state.State
Spec() program.Spec
SetState(status state.Status, msg string, payload map[string]interface{})
OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{})
}

// Descriptor defines a program which needs to be run.
// Is passed around operator operations.
type Descriptor interface {
Spec() program.Spec
ServicePort() int
BinaryName() string
ArtifactName() string
Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (o *operationFetch) Name() string {
// If the artifacts already exists then fetch will not be ran.
func (o *operationFetch) Check(_ context.Context, _ Application) (bool, error) {
downloadConfig := o.operatorConfig.DownloadConfig
fullPath, err := artifact.GetArtifactPath(o.program.BinaryName(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
fullPath, err := artifact.GetArtifactPath(o.program.Spec(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
if err != nil {
return false, err
}
Expand All @@ -70,7 +70,7 @@ func (o *operationFetch) Run(ctx context.Context, application Application) (err
}
}()

fullPath, err := o.downloader.Download(ctx, o.program.BinaryName(), o.program.ArtifactName(), o.program.Version())
fullPath, err := o.downloader.Download(ctx, o.program.Spec(), o.program.Version())
if err == nil {
o.logger.Infof("operation '%s' downloaded %s.%s into %s", o.Name(), o.program.BinaryName(), o.program.Version(), fullPath)
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operation_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (o *operationInstall) Name() string {
//
// If the installation directory already exists then it will not be ran.
func (o *operationInstall) Check(ctx context.Context, _ Application) (bool, error) {
err := o.installer.Check(ctx, o.program.BinaryName(), o.program.Version(), o.program.Directory())
err := o.installer.Check(ctx, o.program.Spec(), o.program.Version(), o.program.Directory())
if err != nil {
// don't return err, just state if Run should be called
return true, nil
Expand All @@ -61,5 +61,5 @@ func (o *operationInstall) Run(ctx context.Context, application Application) (er
}
}()

return o.installer.Install(ctx, o.program.BinaryName(), o.program.Version(), o.program.Directory())
return o.installer.Install(ctx, o.program.Spec(), o.program.Version(), o.program.Directory())
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ func (o *operationUninstall) Run(ctx context.Context, application Application) (
}
}()

return o.uninstaller.Uninstall(ctx, o.program.BinaryName(), o.program.Version(), o.program.Directory())
return o.uninstaller.Uninstall(ctx, o.program.Spec(), o.program.Version(), o.program.Directory())
}
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operation_verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (o *operationVerify) Name() string {
// Only if the artifacts exists does it need to be verified.
func (o *operationVerify) Check(_ context.Context, _ Application) (bool, error) {
downloadConfig := o.operatorConfig.DownloadConfig
fullPath, err := artifact.GetArtifactPath(o.program.BinaryName(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
fullPath, err := artifact.GetArtifactPath(o.program.Spec(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
if err != nil {
return false, err
}
Expand All @@ -66,7 +66,7 @@ func (o *operationVerify) Run(_ context.Context, application Application) (err e
}
}()

isVerified, err := o.verifier.Verify(o.program.BinaryName(), o.program.Version(), o.program.ArtifactName(), true)
isVerified, err := o.verifier.Verify(o.program.Spec(), o.program.Version(), true)
if err != nil {
return errors.New(err,
fmt.Sprintf("operation '%s' failed to verify %s.%s", o.Name(), o.program.BinaryName(), o.program.Version()),
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,9 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
return a, nil
}

specifier, ok := p.(app.Specifier)
desc, ok := p.(*app.Descriptor)
if !ok {
return nil, fmt.Errorf("descriptor is not an app.Specifier")
return nil, fmt.Errorf("descriptor is not an app.Descriptor")
}

// TODO: (michal) join args into more compact options version
Expand All @@ -272,7 +272,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
p.BinaryName(),
o.pipelineID,
o.config.LoggingConfig.Level.String(),
specifier,
desc,
o.srv,
o.config,
o.logger,
Expand All @@ -288,7 +288,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
o.pipelineID,
o.config.LoggingConfig.Level.String(),
p.ServicePort(),
specifier,
desc,
o.srv,
o.config,
o.logger,
Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func TestConfigurableService(t *testing.T) {
defer operator.stop(p) // failure catch, to ensure no sub-process stays running

// emulating a service, so we need to start the binary here in the test
spec := p.Spec()
spec := p.ProcessSpec()
cmd := exec.Command(spec.BinaryPath, fmt.Sprintf("%d", p.ServicePort()))
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Dir = filepath.Dir(spec.BinaryPath)
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestConfigurableService(t *testing.T) {

func isAvailable(name, version string) error {
p := getProgram(name, version)
spec := p.Spec()
spec := p.ProcessSpec()
path := spec.BinaryPath
if runtime.GOOS == "windows" {
path += ".exe"
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/agent/program/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Spec struct {
Cmd string `yaml:"cmd"`
Args []string `yaml:"args"`
Artifact string `yaml:"artifact"`
LogPaths map[string]string `yaml:"log_paths,omitempty"`
MetricEndpoints map[string]string `yaml:"metric_endpoints,omitempty"`
Rules *transpiler.RuleList `yaml:"rules"`
CheckInstallSteps *transpiler.StepList `yaml:"check_install"`
PostInstallSteps *transpiler.StepList `yaml:"post_install"`
Expand Down
Loading

0 comments on commit 4c2c647

Please sign in to comment.