Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Enable configuring monitoring namespace #26439

Merged
merged 8 commits into from
Jun 29, 2021
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,4 @@
- Use `filestream` input for internal log collection. {pull}25660[25660]
- Enable agent to send custom headers to kibana/ES {pull}26275[26275]
- Set `agent.id` to the Fleet Agent ID in events published from inputs backed by Beats. {issue}21121[21121] {pull}26394[26394]
- Enable configuring monitoring namespace {issue}26439[26439]
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func InjectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *tr
transpiler.NewKey("logs", transpiler.NewBoolVal(true)),
transpiler.NewKey("metrics", transpiler.NewBoolVal(true)),
transpiler.NewKey("use_output", transpiler.NewStrVal("default")),
transpiler.NewKey("namespace", transpiler.NewStrVal("default")),
})

transpiler.Insert(rootAst, transpiler.NewKey("monitoring", monitoringNode), "settings")
Expand Down
33 changes: 17 additions & 16 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [
var steps []configrequest.Step
watchLogs := o.monitor.WatchLogs()
watchMetrics := o.monitor.WatchMetrics()
monitoringNamespace := o.monitor.MonitoringNamespace()

// generate only when monitoring is running (for config refresh) or
// state changes (turning on/off)
if watchLogs != o.isMonitoringLogs() || watchLogs {
fbConfig, any := o.getMonitoringFilebeatConfig(output)
fbConfig, any := o.getMonitoringFilebeatConfig(output, monitoringNamespace)
stepID := configrequest.StepRun
if !watchLogs || !any {
stepID = configrequest.StepRemove
Expand All @@ -149,7 +150,7 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [
steps = append(steps, filebeatStep)
}
if watchMetrics != o.isMonitoringMetrics() || watchMetrics {
mbConfig, any := o.getMonitoringMetricbeatConfig(output)
mbConfig, any := o.getMonitoringMetricbeatConfig(output, monitoringNamespace)
stepID := configrequest.StepRun
if !watchMetrics || !any {
stepID = configrequest.StepRemove
Expand Down Expand Up @@ -182,12 +183,12 @@ func loadSpecFromSupported(processName string) program.Spec {
}
}

func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]interface{}, bool) {
func (o *Operator) getMonitoringFilebeatConfig(output interface{}, monitoringNamespace string) (map[string]interface{}, bool) {
inputs := []interface{}{
map[string]interface{}{
"type": "filestream",
"parsers": []map[string]interface{}{
map[string]interface{}{
{
"ndjson": map[string]interface{}{
"overwrite_keys": true,
"message_key": "message",
Expand All @@ -200,15 +201,15 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
filepath.Join(paths.Home(), "logs", "elastic-agent-watcher-json.log"),
filepath.Join(paths.Home(), "logs", "elastic-agent-watcher-json.log*"),
},
"index": "logs-elastic_agent-default",
"index": fmt.Sprintf("logs-elastic_agent-%s", monitoringNamespace),
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "data_stream",
"fields": map[string]interface{}{
"type": "logs",
"dataset": "elastic_agent",
"namespace": "default",
"namespace": monitoringNamespace,
},
},
},
Expand Down Expand Up @@ -247,23 +248,23 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
inputs = append(inputs, map[string]interface{}{
"type": "filestream",
"parsers": []map[string]interface{}{
map[string]interface{}{
{
"ndjson": map[string]interface{}{
"overwrite_keys": true,
"message_key": "message",
},
},
},
"paths": paths,
"index": fmt.Sprintf("logs-elastic_agent.%s-default", name),
"index": fmt.Sprintf("logs-elastic_agent.%s-%s", name, monitoringNamespace),
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "data_stream",
"fields": map[string]interface{}{
"type": "logs",
"dataset": fmt.Sprintf("elastic_agent.%s", name),
"namespace": "default",
"namespace": monitoringNamespace,
},
},
},
Expand Down Expand Up @@ -311,7 +312,7 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
return result, true
}

func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string]interface{}, bool) {
func (o *Operator) getMonitoringMetricbeatConfig(output interface{}, monitoringNamespace string) (map[string]interface{}, bool) {
hosts := o.getMetricbeatEndpoints()
if len(hosts) == 0 {
return nil, false
Expand All @@ -325,15 +326,15 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
"metricsets": []string{"stats", "state"},
"period": "10s",
"hosts": endpoints,
"index": fmt.Sprintf("metrics-elastic_agent.%s-default", name),
"index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace),
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "data_stream",
"fields": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", name),
"namespace": "default",
"namespace": monitoringNamespace,
},
},
},
Expand Down Expand Up @@ -363,15 +364,15 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
"period": "10s",
"path": "/stats",
"hosts": endpoints,
"index": fmt.Sprintf("metrics-elastic_agent.%s-default", fixedAgentName),
"index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fixedAgentName, monitoringNamespace),
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "data_stream",
"fields": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
"namespace": "default",
"namespace": monitoringNamespace,
},
},
},
Expand Down Expand Up @@ -446,15 +447,15 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
"period": "10s",
"path": "/stats",
"hosts": []string{beats.AgentPrefixedMonitoringEndpoint(o.config.DownloadConfig.OS(), o.config.MonitoringConfig.HTTP)},
"index": fmt.Sprintf("metrics-elastic_agent.%s-default", fixedAgentName),
"index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fixedAgentName, monitoringNamespace),
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "data_stream",
"fields": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
"namespace": "default",
"namespace": monitoringNamespace,
},
},
},
Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func (b *testMonitor) Reload(cfg *config.Config) error { return nil }
// IsMonitoringEnabled returns true if monitoring is configured.
func (b *testMonitor) IsMonitoringEnabled() bool { return b.monitorLogs || b.monitorMetrics }

// MonitoringNamespace returns monitoring namespace configured.
func (b *testMonitor) MonitoringNamespace() string { return "default" }

// WatchLogs return true if monitoring is configured and monitoring logs is enabled.
func (b *testMonitor) WatchLogs() bool { return b.monitorLogs }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

const httpPlusPrefix = "http+"
const defaultMonitoringNamespace = "default"

// Monitor is a monitoring interface providing information about the way
// how beat is monitored
Expand Down Expand Up @@ -69,6 +70,14 @@ func (b *Monitor) Close() {
// IsMonitoringEnabled returns true if monitoring is enabled.
func (b *Monitor) IsMonitoringEnabled() bool { return b.config.Enabled }

// MonitoringNamespace returns monitoring namespace configured.
func (b *Monitor) MonitoringNamespace() string {
if b.config.Namespace == "" {
return defaultMonitoringNamespace
}
return b.config.Namespace
}

// WatchLogs returns true if monitoring is enabled and monitor should watch logs.
func (b *Monitor) WatchLogs() bool { return b.config.Enabled && b.config.MonitorLogs }

Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/core/monitoring/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package config

const defaultPort = 6791
const defaultNamespace = "default"

// MonitoringConfig describes a configuration of a monitoring
type MonitoringConfig struct {
Enabled bool `yaml:"enabled" config:"enabled"`
MonitorLogs bool `yaml:"logs" config:"logs"`
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
Namespace string `yaml:"namespace" config:"namespace"`
}

// MonitoringHTTPConfig is a config defining HTTP endpoint published by agent
Expand All @@ -33,5 +35,6 @@ func DefaultConfig() *MonitoringConfig {
Enabled: false,
Port: defaultPort,
},
Namespace: defaultNamespace,
}
}
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/core/monitoring/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Monitor interface {
Cleanup(spec program.Spec, pipelineID string) error
Reload(cfg *config.Config) error
IsMonitoringEnabled() bool
MonitoringNamespace() string
WatchLogs() bool
WatchMetrics() bool
Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,6 @@ func (b *Monitor) WatchLogs() bool { return false }

// WatchMetrics return true if monitoring is configured and monitoring metrics is enabled.
func (b *Monitor) WatchMetrics() bool { return false }

// MonitoringNamespace returns monitoring namespace configured.
func (b *Monitor) MonitoringNamespace() string { return "default" }