Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Enable log shipping of endpoint-security by Elastic Agent #22526

Merged
merged 5 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@
- Update `fleet.kibana.path` from a POLICY_CHANGE {pull}21804[21804]
- Removed `install-service.ps1` and `uninstall-service.ps1` from Windows .zip packaging {pull}21694[21694]
- Add `priority` to `AddOrUpdate` on dynamic composable input providers communication channel {pull}22352[22352]
- Ship `endpoint-security` logs to elasticsearch {pull}22526[22526]
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri
return "", errors.New(err, "initiating fetcher")
}

path, err := fetcher.Download(ctx, agentName, agentArtifactName, version)
path, err := fetcher.Download(ctx, agentSpec, version)
if err != nil {
return "", errors.New(err, "failed upgrade of agent binary")
}

matches, err := verifier.Verify(agentName, version, agentArtifactName, true)
matches, err := verifier.Verify(agentSpec, version, true)
if err != nil {
return "", errors.New(err, "failed verification of agent binary")
}
Expand Down
17 changes: 13 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"path/filepath"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
blakerouse marked this conversation as resolved.
Show resolved Hide resolved

"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
Expand All @@ -26,10 +28,17 @@ import (
)

const (
agentName = "elastic-agent"
hashLen = 6
agentCommitFile = ".elastic-agent.active.commit"
agentArtifactName = "beats/" + agentName
agentName = "elastic-agent"
hashLen = 6
agentCommitFile = ".elastic-agent.active.commit"
)

var (
agentSpec = program.Spec{
Name: "Elastic Agent",
Cmd: agentName,
Artifact: "beats/" + agentName,
}
)

// Upgrader performs an upgrade
Expand Down
12 changes: 6 additions & 6 deletions x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a

// make the download path so the `operation_verify` can ensure the path exists
downloadConfig := operator.config.DownloadConfig
fullPath, err := artifact.GetArtifactPath(p.BinaryName(), p.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
fullPath, err := artifact.GetArtifactPath(p.Spec(), p.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -137,35 +137,35 @@ func waitFor(t *testing.T, check func() error) {

type DummyDownloader struct{}

func (*DummyDownloader) Download(_ context.Context, p, a, v string) (string, error) {
func (*DummyDownloader) Download(_ context.Context, _ program.Spec, _ string) (string, error) {
return "", nil
}

var _ download.Downloader = &DummyDownloader{}

type DummyVerifier struct{}

func (*DummyVerifier) Verify(p, v, _ string, _ bool) (bool, error) {
func (*DummyVerifier) Verify(_ program.Spec, _ string, _ bool) (bool, error) {
return true, nil
}

var _ download.Verifier = &DummyVerifier{}

type DummyInstallerChecker struct{}

func (*DummyInstallerChecker) Check(_ context.Context, p, v, _ string) error {
func (*DummyInstallerChecker) Check(_ context.Context, _ program.Spec, _, _ string) error {
return nil
}

func (*DummyInstallerChecker) Install(_ context.Context, p, v, _ string) error {
func (*DummyInstallerChecker) Install(_ context.Context, _ program.Spec, _, _ string) error {
return nil
}

var _ install.InstallerChecker = &DummyInstallerChecker{}

type DummyUninstaller struct{}

func (*DummyUninstaller) Uninstall(_ context.Context, p, v, _ string) error {
func (*DummyUninstaller) Uninstall(_ context.Context, _ program.Spec, _, _ string) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (o *Operator) getLogFilePaths() map[string][]string {
defer o.appsLock.Unlock()

for _, a := range o.apps {
logPath := a.Monitor().LogPath(a.Name(), o.pipelineID)
logPath := a.Monitor().LogPath(a.Spec(), o.pipelineID)
if logPath != "" {
paths[a.Name()] = append(paths[a.Name()], logPath)
}
Expand All @@ -360,7 +360,7 @@ func (o *Operator) getMetricbeatEndpoints() map[string][]string {
defer o.appsLock.Unlock()

for _, a := range o.apps {
metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Name(), o.pipelineID)
metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Spec(), o.pipelineID)
if metricEndpoint != "" {
endpoints[a.Name()] = append(endpoints[a.Name()], metricEndpoint)
}
Expand Down
17 changes: 11 additions & 6 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
Expand Down Expand Up @@ -154,6 +156,7 @@ func (*testMonitorableApp) Shutdown() {}
func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error {
return nil
}
func (*testMonitorableApp) Spec() program.Spec { return program.Spec{} }
func (*testMonitorableApp) State() state.State { return state.State{} }
func (*testMonitorableApp) SetState(_ state.Status, _ string, _ map[string]interface{}) {}
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }
Expand All @@ -167,20 +170,22 @@ type testMonitor struct {

// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *testMonitor) EnrichArgs(_ string, _ string, args []string, _ bool) []string { return args }
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string, _ bool) []string {
return args
}

// Cleanup cleans up all drops.
func (b *testMonitor) Cleanup(string, string) error { return nil }
func (b *testMonitor) Cleanup(program.Spec, string) error { return nil }

// Close closes the monitor.
func (b *testMonitor) Close() {}

// Prepare executes steps in order for monitoring to work correctly
func (b *testMonitor) Prepare(string, string, int, int) error { return nil }
func (b *testMonitor) Prepare(program.Spec, string, int, int) error { return nil }

// LogPath describes a path where application stores logs. Empty if
// application is not monitorable
func (b *testMonitor) LogPath(string, string) string {
func (b *testMonitor) LogPath(program.Spec, string) string {
if !b.monitorLogs {
return ""
}
Expand All @@ -189,15 +194,15 @@ func (b *testMonitor) LogPath(string, string) string {

// MetricsPath describes a location where application exposes metrics
// collectable by metricbeat.
func (b *testMonitor) MetricsPath(string, string) string {
func (b *testMonitor) MetricsPath(program.Spec, string) string {
if !b.monitorMetrics {
return ""
}
return "path"
}

// MetricsPathPrefixed return metrics path prefixed with http+ prefix.
func (b *testMonitor) MetricsPathPrefixed(string, string) string {
func (b *testMonitor) MetricsPathPrefixed(program.Spec, string) string {
return "http+path"
}

Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
Expand Down Expand Up @@ -42,13 +43,15 @@ type Application interface {
Configure(ctx context.Context, config map[string]interface{}) error
Monitor() monitoring.Monitor
State() state.State
Spec() program.Spec
SetState(status state.Status, msg string, payload map[string]interface{})
OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{})
}

// Descriptor defines a program which needs to be run.
// Is passed around operator operations.
type Descriptor interface {
Spec() program.Spec
ServicePort() int
BinaryName() string
ArtifactName() string
Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (o *operationFetch) Name() string {
// If the artifacts already exists then fetch will not be ran.
func (o *operationFetch) Check(_ context.Context, _ Application) (bool, error) {
downloadConfig := o.operatorConfig.DownloadConfig
fullPath, err := artifact.GetArtifactPath(o.program.BinaryName(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
fullPath, err := artifact.GetArtifactPath(o.program.Spec(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
if err != nil {
return false, err
}
Expand All @@ -70,7 +70,7 @@ func (o *operationFetch) Run(ctx context.Context, application Application) (err
}
}()

fullPath, err := o.downloader.Download(ctx, o.program.BinaryName(), o.program.ArtifactName(), o.program.Version())
fullPath, err := o.downloader.Download(ctx, o.program.Spec(), o.program.Version())
if err == nil {
o.logger.Infof("operation '%s' downloaded %s.%s into %s", o.Name(), o.program.BinaryName(), o.program.Version(), fullPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (o *operationInstall) Name() string {
//
// If the installation directory already exists then it will not be ran.
func (o *operationInstall) Check(ctx context.Context, _ Application) (bool, error) {
err := o.installer.Check(ctx, o.program.BinaryName(), o.program.Version(), o.program.Directory())
err := o.installer.Check(ctx, o.program.Spec(), o.program.Version(), o.program.Directory())
if err != nil {
// don't return err, just state if Run should be called
return true, nil
Expand All @@ -61,5 +61,5 @@ func (o *operationInstall) Run(ctx context.Context, application Application) (er
}
}()

return o.installer.Install(ctx, o.program.BinaryName(), o.program.Version(), o.program.Directory())
return o.installer.Install(ctx, o.program.Spec(), o.program.Version(), o.program.Directory())
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ func (o *operationUninstall) Run(ctx context.Context, application Application) (
}
}()

return o.uninstaller.Uninstall(ctx, o.program.BinaryName(), o.program.Version(), o.program.Directory())
return o.uninstaller.Uninstall(ctx, o.program.Spec(), o.program.Version(), o.program.Directory())
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (o *operationVerify) Name() string {
// Only if the artifacts exists does it need to be verified.
func (o *operationVerify) Check(_ context.Context, _ Application) (bool, error) {
downloadConfig := o.operatorConfig.DownloadConfig
fullPath, err := artifact.GetArtifactPath(o.program.BinaryName(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
fullPath, err := artifact.GetArtifactPath(o.program.Spec(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory)
if err != nil {
return false, err
}
Expand All @@ -66,7 +66,7 @@ func (o *operationVerify) Run(_ context.Context, application Application) (err e
}
}()

isVerified, err := o.verifier.Verify(o.program.BinaryName(), o.program.Version(), o.program.ArtifactName(), true)
isVerified, err := o.verifier.Verify(o.program.Spec(), o.program.Version(), true)
if err != nil {
return errors.New(err,
fmt.Sprintf("operation '%s' failed to verify %s.%s", o.Name(), o.program.BinaryName(), o.program.Version()),
Expand Down
6 changes: 3 additions & 3 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
return a, nil
}

specifier, ok := p.(app.Specifier)
desc, ok := p.(*app.Descriptor)
if !ok {
return nil, fmt.Errorf("descriptor is not an app.Specifier")
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -272,7 +272,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
p.BinaryName(),
o.pipelineID,
o.config.LoggingConfig.Level.String(),
specifier,
desc,
o.srv,
o.config,
o.logger,
Expand All @@ -288,7 +288,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
o.pipelineID,
o.config.LoggingConfig.Level.String(),
p.ServicePort(),
specifier,
desc,
o.srv,
o.config,
o.logger,
Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func TestConfigurableService(t *testing.T) {
defer operator.stop(p) // failure catch, to ensure no sub-process stays running

// emulating a service, so we need to start the binary here in the test
spec := p.Spec()
spec := p.ProcessSpec()
cmd := exec.Command(spec.BinaryPath, fmt.Sprintf("%d", p.ServicePort()))
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Dir = filepath.Dir(spec.BinaryPath)
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestConfigurableService(t *testing.T) {

func isAvailable(name, version string) error {
p := getProgram(name, version)
spec := p.Spec()
spec := p.ProcessSpec()
path := spec.BinaryPath
if runtime.GOOS == "windows" {
path += ".exe"
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/agent/program/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Spec struct {
Cmd string `yaml:"cmd"`
Args []string `yaml:"args"`
Artifact string `yaml:"artifact"`
LogPaths map[string]string `yaml:"log_paths,omitempty"`
MetricEndpoints map[string]string `yaml:"metric_endpoints,omitempty"`
Rules *transpiler.RuleList `yaml:"rules"`
CheckInstallSteps *transpiler.StepList `yaml:"check_install"`
PostInstallSteps *transpiler.StepList `yaml:"post_install"`
Expand Down
Loading