Skip to content

Commit

Permalink
[8.5](backport #1034) Fix: Endpoint collision between monitoring and …
Browse files Browse the repository at this point in the history
…regular beats (#1270)

* Fix: Endpoint collision between monitoring and regular beats  (#1034)

Fix: Endpoint collision between monitoring and regular beats  (#1034)
(cherry picked from commit 815382d)

Co-authored-by: Michal Pristas <michal.pristas@gmail.com>
  • Loading branch information
mergify[bot] and michalpristas authored Sep 22, 2022
1 parent 17a5fa6 commit 100ddf7
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 71 deletions.
2 changes: 1 addition & 1 deletion deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ spec:
# - -c
# - >-
# mkdir -p /etc/elastic-agent/inputs.d &&
# wget -O - https://github.com/elastic/elastic-agent/archive/8.3.0.tar.gz | tar xz -C /etc/elastic-agent/inputs.d --strip=5 "elastic-agent-main/deploy/kubernetes/elastic-agent-standalone/templates.d"
# wget -O - https://github.com/elastic/elastic-agent/archive/8.5.0.tar.gz | tar xz -C /etc/elastic-agent/inputs.d --strip=5 "elastic-agent-main/deploy/kubernetes/elastic-agent-standalone/templates.d"
# volumeMounts:
# - name: external-inputs
# mountPath: /etc/elastic-agent/inputs.d
Expand Down
25 changes: 15 additions & 10 deletions internal/pkg/agent/control/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/control/proto"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
monitoring "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/internal/pkg/core/socket"
Expand All @@ -37,6 +36,10 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

const (
agentName = "elastic-agent"
)

// Server is the daemon side of the control protocol.
type Server struct {
logger *logger.Logger
Expand Down Expand Up @@ -225,7 +228,8 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR
// gather spec data for all rk/apps running
specs := s.getSpecInfo("", "")
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
isSidecar := strings.HasSuffix(si.app, "_monitoring")
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar)
client := newSocketRequester(si.app, si.rk, endpoint)

procMeta := client.procMeta(ctx)
Expand Down Expand Up @@ -258,9 +262,9 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr
ch := make(chan *proto.PprofResult, 1)

// retrieve elastic-agent pprof data if requested or application is unspecified.
if req.AppName == "" || req.AppName == "elastic-agent" {
endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester("elastic-agent", "", endpoint)
if req.AppName == "" || req.AppName == agentName {
endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester(agentName, "", endpoint)
for _, opt := range req.PprofType {
wg.Add(1)
go func(opt proto.PprofOption) {
Expand All @@ -273,11 +277,11 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr

// get requested rk/appname spec or all specs
var specs []specInfo
if req.AppName != "elastic-agent" {
if req.AppName != agentName {
specs = s.getSpecInfo(req.RouteKey, req.AppName)
}
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, false)
c := newSocketRequester(si.app, si.rk, endpoint)
// Launch a concurrent goroutine to gather all pprof endpoints from a socket.
for _, opt := range req.PprofType {
Expand Down Expand Up @@ -315,8 +319,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe
}

// gather metrics buffer data from the elastic-agent
endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester("elastic-agent", "", endpoint)
endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester(agentName, "", endpoint)
metrics := c.procMetrics(ctx)

resp := &proto.ProcMetricsResponse{
Expand All @@ -326,7 +330,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe
// gather metrics buffer data from all other processes
specs := s.getSpecInfo("", "")
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
isSidecar := strings.HasSuffix(si.app, "_monitoring")
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar)
client := newSocketRequester(si.app, si.rk, endpoint)

s.logger.Infof("gather metrics from %s", endpoint)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ type testMonitor struct {

// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string, _ bool) []string {
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string) []string {
return args
}

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/app"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
"github.com/elastic/elastic-agent/internal/pkg/core/plugin/process"
"github.com/elastic/elastic-agent/internal/pkg/core/plugin/service"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
Expand Down Expand Up @@ -387,7 +387,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
appName := p.BinaryName()
if app.IsSidecar(p) {
// make watchers unmonitorable
monitor = noop.NewMonitor()
monitor = beats.NewSidecarMonitor(o.config.DownloadConfig, o.config.MonitoringConfig)
appName += "_monitoring"
}

Expand Down
41 changes: 16 additions & 25 deletions internal/pkg/core/monitoring/beats/beats_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package beats

import (
"fmt"
"net/url"
"os"
"path/filepath"
Expand All @@ -20,8 +19,13 @@ import (
monitoringConfig "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
)

const httpPlusPrefix = "http+"
const defaultMonitoringNamespace = "default"
const (
httpPlusPrefix = "http+"
defaultMonitoringNamespace = "default"
fileSchemePrefix = "file"
unixSchemePrefix = "unix"
windowsOS = "windows"
)

// Monitor implements the monitoring.Monitor interface providing information
// about beats.
Expand Down Expand Up @@ -99,15 +103,11 @@ func (b *Monitor) WatchLogs() bool { return b.config.Enabled && b.config.Monitor
func (b *Monitor) WatchMetrics() bool { return b.config.Enabled && b.config.MonitorMetrics }

func (b *Monitor) generateMonitoringEndpoint(spec program.Spec, pipelineID string) string {
return MonitoringEndpoint(spec, b.operatingSystem, pipelineID)
}

func (b *Monitor) generateLoggingFile(spec program.Spec, pipelineID string) string {
return getLoggingFile(spec, b.operatingSystem, b.installPath, pipelineID)
return MonitoringEndpoint(spec, b.operatingSystem, pipelineID, false)
}

func (b *Monitor) generateLoggingPath(spec program.Spec, pipelineID string) string {
return filepath.Dir(b.generateLoggingFile(spec, pipelineID))
return filepath.Dir(getLoggingFile(spec, b.operatingSystem, pipelineID))
}

func (b *Monitor) ownLoggingPath(spec program.Spec) bool {
Expand All @@ -118,15 +118,10 @@ func (b *Monitor) ownLoggingPath(spec program.Spec) bool {

// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string, isSidecar bool) []string {
func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string) []string {
appendix := make([]string, 0, 7)

monitoringEndpoint := b.generateMonitoringEndpoint(spec, pipelineID)
if monitoringEndpoint != "" {
endpoint := monitoringEndpoint
if isSidecar {
endpoint += "_monitor"
}
if endpoint := b.generateMonitoringEndpoint(spec, pipelineID); endpoint != "" {
appendix = append(appendix,
"-E", "http.enabled=true",
"-E", "http.host="+endpoint,
Expand All @@ -146,10 +141,6 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string
loggingPath := b.generateLoggingPath(spec, pipelineID)
if loggingPath != "" {
logFile := spec.Cmd
if isSidecar {
logFile += "_monitor"
}
logFile = fmt.Sprintf("%s", logFile)
appendix = append(appendix,
"-E", "logging.files.path="+loggingPath,
"-E", "logging.files.name="+logFile,
Expand Down Expand Up @@ -224,7 +215,7 @@ func (b *Monitor) LogPath(spec program.Spec, pipelineID string) string {
return ""
}

return b.generateLoggingFile(spec, pipelineID)
return getLoggingFile(spec, b.operatingSystem, pipelineID)
}

// MetricsPath describes a location where application exposes metrics
Expand Down Expand Up @@ -272,15 +263,15 @@ func monitoringDrop(path string) (drop string) {
}

u, _ := url.Parse(path)
if u == nil || (u.Scheme != "" && u.Scheme != "file" && u.Scheme != "unix") {
if u == nil || (u.Scheme != "" && u.Scheme != fileSchemePrefix && u.Scheme != unixSchemePrefix) {
return ""
}

if u.Scheme == "file" {
if u.Scheme == fileSchemePrefix {
return strings.TrimPrefix(path, "file://")
}

if u.Scheme == "unix" {
if u.Scheme == unixSchemePrefix {
return strings.TrimPrefix(path, "unix://")
}

Expand All @@ -299,7 +290,7 @@ func isWindowsPath(path string) bool {
}

func changeOwner(path string, uid, gid int) error {
if runtime.GOOS == "windows" {
if runtime.GOOS == windowsOS {
// on windows it always returns the syscall.EWINDOWS error, wrapped in *PathError
return nil
}
Expand Down
20 changes: 14 additions & 6 deletions internal/pkg/core/monitoring/beats/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,39 @@ const (
agentMbEndpointFileFormatWin = `npipe:///elastic-agent`
// agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint
agentMbEndpointHTTP = "http://%s:%d"

monitorSuffix = "_monitor"
)

// MonitoringEndpoint is an endpoint where process is exposing its metrics.
func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) string {
func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string, isSidecar bool) (endpointPath string) {
defer func() {
if isSidecar && endpointPath != "" {
endpointPath += monitorSuffix
}
}()

if endpoint, ok := spec.MetricEndpoints[operatingSystem]; ok {
return endpoint
}
if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return fmt.Sprintf(mbEndpointFileFormatWin, pipelineID, spec.Cmd)
}
// unix socket path must be less than 104 characters
path := fmt.Sprintf("unix://%s.sock", filepath.Join(paths.TempDir(), pipelineID, spec.Cmd, spec.Cmd))
if len(path) < 104 {
if (isSidecar && len(path) < 104-len(monitorSuffix)) || (!isSidecar && len(path) < 104) {
return path
}
// place in global /tmp (or /var/tmp on Darwin) to ensure that its small enough to fit; current path is way to long
// for it to be used, but needs to be unique per Agent (in the case that multiple are running)
return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path)))
}

func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID string) string {
func getLoggingFile(spec program.Spec, operatingSystem, pipelineID string) string {
if path, ok := spec.LogPaths[operatingSystem]; ok {
return path
}
if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return fmt.Sprintf(logFileFormatWin, paths.Home(), pipelineID, spec.Cmd)
}
return fmt.Sprintf(logFileFormat, paths.Home(), pipelineID, spec.Cmd)
Expand All @@ -63,7 +71,7 @@ func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.Monit
return fmt.Sprintf(agentMbEndpointHTTP, cfg.Host, cfg.Port)
}

if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return agentMbEndpointFileFormatWin
}
// unix socket path must be less than 104 characters
Expand Down
Loading

0 comments on commit 100ddf7

Please sign in to comment.