Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Endpoint collision between monitoring and regular beats #1034

Merged
merged 10 commits into from
Sep 22, 2022
25 changes: 15 additions & 10 deletions internal/pkg/agent/control/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/control/proto"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
monitoring "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/internal/pkg/core/socket"
Expand All @@ -37,6 +36,10 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

const (
agentName = "elastic-agent"
)

// Server is the daemon side of the control protocol.
type Server struct {
logger *logger.Logger
Expand Down Expand Up @@ -225,7 +228,8 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR
// gather spec data for all rk/apps running
specs := s.getSpecInfo("", "")
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
isSidecar := strings.HasSuffix(si.app, "_monitoring")
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar)
client := newSocketRequester(si.app, si.rk, endpoint)

procMeta := client.procMeta(ctx)
Expand Down Expand Up @@ -258,9 +262,9 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr
ch := make(chan *proto.PprofResult, 1)

// retrieve elastic-agent pprof data if requested or application is unspecified.
if req.AppName == "" || req.AppName == "elastic-agent" {
endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester("elastic-agent", "", endpoint)
if req.AppName == "" || req.AppName == agentName {
endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester(agentName, "", endpoint)
for _, opt := range req.PprofType {
wg.Add(1)
go func(opt proto.PprofOption) {
Expand All @@ -273,11 +277,11 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr

// get requested rk/appname spec or all specs
var specs []specInfo
if req.AppName != "elastic-agent" {
if req.AppName != agentName {
specs = s.getSpecInfo(req.RouteKey, req.AppName)
}
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, false)
c := newSocketRequester(si.app, si.rk, endpoint)
// Launch a concurrent goroutine to gather all pprof endpoints from a socket.
for _, opt := range req.PprofType {
Expand Down Expand Up @@ -315,8 +319,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe
}

// gather metrics buffer data from the elastic-agent
endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester("elastic-agent", "", endpoint)
endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester(agentName, "", endpoint)
metrics := c.procMetrics(ctx)

resp := &proto.ProcMetricsResponse{
Expand All @@ -326,7 +330,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe
// gather metrics buffer data from all other processes
specs := s.getSpecInfo("", "")
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
isSidecar := strings.HasSuffix(si.app, "_monitoring")
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar)
client := newSocketRequester(si.app, si.rk, endpoint)

s.logger.Infof("gather metrics from %s", endpoint)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ type testMonitor struct {

// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string, _ bool) []string {
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string) []string {
return args
}

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/app"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
"github.com/elastic/elastic-agent/internal/pkg/core/plugin/process"
"github.com/elastic/elastic-agent/internal/pkg/core/plugin/service"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
Expand Down Expand Up @@ -387,7 +387,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
appName := p.BinaryName()
if app.IsSidecar(p) {
// make watchers unmonitorable
monitor = noop.NewMonitor()
monitor = beats.NewSidecarMonitor(o.config.DownloadConfig, o.config.MonitoringConfig)
appName += "_monitoring"
}

Expand Down
41 changes: 16 additions & 25 deletions internal/pkg/core/monitoring/beats/beats_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package beats

import (
"fmt"
"net/url"
"os"
"path/filepath"
Expand All @@ -20,8 +19,13 @@ import (
monitoringConfig "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
)

const httpPlusPrefix = "http+"
const defaultMonitoringNamespace = "default"
const (
httpPlusPrefix = "http+"
defaultMonitoringNamespace = "default"
fileSchemePrefix = "file"
unixSchemePrefix = "unix"
windowsOS = "windows"
)

// Monitor implements the monitoring.Monitor interface providing information
// about beats.
Expand Down Expand Up @@ -99,15 +103,11 @@ func (b *Monitor) WatchLogs() bool { return b.config.Enabled && b.config.Monitor
func (b *Monitor) WatchMetrics() bool { return b.config.Enabled && b.config.MonitorMetrics }

func (b *Monitor) generateMonitoringEndpoint(spec program.Spec, pipelineID string) string {
return MonitoringEndpoint(spec, b.operatingSystem, pipelineID)
}

func (b *Monitor) generateLoggingFile(spec program.Spec, pipelineID string) string {
return getLoggingFile(spec, b.operatingSystem, b.installPath, pipelineID)
return MonitoringEndpoint(spec, b.operatingSystem, pipelineID, false)
}

func (b *Monitor) generateLoggingPath(spec program.Spec, pipelineID string) string {
return filepath.Dir(b.generateLoggingFile(spec, pipelineID))
return filepath.Dir(getLoggingFile(spec, b.operatingSystem, pipelineID))
}

func (b *Monitor) ownLoggingPath(spec program.Spec) bool {
Expand All @@ -118,15 +118,10 @@ func (b *Monitor) ownLoggingPath(spec program.Spec) bool {

// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string, isSidecar bool) []string {
func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string) []string {
appendix := make([]string, 0, 7)

monitoringEndpoint := b.generateMonitoringEndpoint(spec, pipelineID)
if monitoringEndpoint != "" {
endpoint := monitoringEndpoint
if isSidecar {
endpoint += "_monitor"
}
if endpoint := b.generateMonitoringEndpoint(spec, pipelineID); endpoint != "" {
appendix = append(appendix,
"-E", "http.enabled=true",
"-E", "http.host="+endpoint,
Expand All @@ -146,10 +141,6 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string
loggingPath := b.generateLoggingPath(spec, pipelineID)
if loggingPath != "" {
logFile := spec.Cmd
if isSidecar {
logFile += "_monitor"
}
logFile = fmt.Sprintf("%s", logFile)
appendix = append(appendix,
"-E", "logging.files.path="+loggingPath,
"-E", "logging.files.name="+logFile,
Expand Down Expand Up @@ -224,7 +215,7 @@ func (b *Monitor) LogPath(spec program.Spec, pipelineID string) string {
return ""
}

return b.generateLoggingFile(spec, pipelineID)
return getLoggingFile(spec, b.operatingSystem, pipelineID)
}

// MetricsPath describes a location where application exposes metrics
Expand Down Expand Up @@ -272,15 +263,15 @@ func monitoringDrop(path string) (drop string) {
}

u, _ := url.Parse(path)
if u == nil || (u.Scheme != "" && u.Scheme != "file" && u.Scheme != "unix") {
if u == nil || (u.Scheme != "" && u.Scheme != fileSchemePrefix && u.Scheme != unixSchemePrefix) {
return ""
}

if u.Scheme == "file" {
if u.Scheme == fileSchemePrefix {
return strings.TrimPrefix(path, "file://")
}

if u.Scheme == "unix" {
if u.Scheme == unixSchemePrefix {
return strings.TrimPrefix(path, "unix://")
}

Expand All @@ -299,7 +290,7 @@ func isWindowsPath(path string) bool {
}

func changeOwner(path string, uid, gid int) error {
if runtime.GOOS == "windows" {
if runtime.GOOS == windowsOS {
// on windows it always returns the syscall.EWINDOWS error, wrapped in *PathError
return nil
}
Expand Down
20 changes: 14 additions & 6 deletions internal/pkg/core/monitoring/beats/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,39 @@ const (
agentMbEndpointFileFormatWin = `npipe:///elastic-agent`
// agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint
agentMbEndpointHTTP = "http://%s:%d"

monitorSuffix = "_monitor"
)

// MonitoringEndpoint is an endpoint where process is exposing its metrics.
func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) string {
func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string, isSidecar bool) (endpointPath string) {
defer func() {
if isSidecar && endpointPath != "" {
endpointPath += monitorSuffix
}
}()

if endpoint, ok := spec.MetricEndpoints[operatingSystem]; ok {
return endpoint
}
if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return fmt.Sprintf(mbEndpointFileFormatWin, pipelineID, spec.Cmd)
}
// unix socket path must be less than 104 characters
path := fmt.Sprintf("unix://%s.sock", filepath.Join(paths.TempDir(), pipelineID, spec.Cmd, spec.Cmd))
if len(path) < 104 {
if (isSidecar && len(path) < 104-len(monitorSuffix)) || (!isSidecar && len(path) < 104) {
return path
}
// place in global /tmp (or /var/tmp on Darwin) to ensure that its small enough to fit; current path is way to long
// for it to be used, but needs to be unique per Agent (in the case that multiple are running)
return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path)))
}

func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID string) string {
func getLoggingFile(spec program.Spec, operatingSystem, pipelineID string) string {
if path, ok := spec.LogPaths[operatingSystem]; ok {
return path
}
if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return fmt.Sprintf(logFileFormatWin, paths.Home(), pipelineID, spec.Cmd)
}
return fmt.Sprintf(logFileFormat, paths.Home(), pipelineID, spec.Cmd)
Expand All @@ -63,7 +71,7 @@ func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.Monit
return fmt.Sprintf(agentMbEndpointHTTP, cfg.Host, cfg.Port)
}

if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return agentMbEndpointFileFormatWin
}
// unix socket path must be less than 104 characters
Expand Down
Loading