diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 374e6bc899e..3842938d827 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -72,6 +72,7 @@ - Agent sends wrong log level to Endpoint {issue}25583[25583] - Fix startup with failing configuration {pull}26057[26057] - Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391] +- Fix add support for Logstash output. {pull}24305[24305] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers/monitoring_decorator.go b/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers/monitoring_decorator.go index 5c1d2d037fd..8c3eb1c7d43 100644 --- a/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers/monitoring_decorator.go +++ b/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers/monitoring_decorator.go @@ -55,19 +55,18 @@ func InjectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *tr } // get monitoring output name to be used - monitoringOutputName := defaultOutputName - useOutputNode, found := transpiler.Lookup(rootAst, monitoringUseOutputKey) - if found { - monitoringOutputNameKey, ok := useOutputNode.Value().(*transpiler.StrVal) - if !ok { - return programsToRun, nil - } - - monitoringOutputName = monitoringOutputNameKey.String() + monitoringOutputName, found := transpiler.LookupString(rootAst, monitoringUseOutputKey) + if !found { + monitoringOutputName = defaultOutputName + } + + typeValue, found := transpiler.LookupString(rootAst, fmt.Sprintf("%s.%s.type", outputsKey, monitoringOutputName)) + if !found { + typeValue = elasticsearchKey } ast := rootAst.Clone() - if err := getMonitoringRule(monitoringOutputName).Apply(agentInfo, ast); err != nil { + if err := getMonitoringRule(monitoringOutputName, typeValue).Apply(agentInfo, ast); err != nil { return programsToRun, err } @@ -95,11 +94,11 @@ func InjectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *tr return append(programsToRun, monitoringProgram), nil } -func getMonitoringRule(outputName string) *transpiler.RuleList { +func getMonitoringRule(outputName string, t string) *transpiler.RuleList { monitoringOutputSelector := fmt.Sprintf(monitoringOutputFormatKey, outputName) return transpiler.NewRuleList( transpiler.Copy(monitoringOutputSelector, outputKey), - transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey), + transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), t), transpiler.Filter(monitoringKey, programsKey, outputKey), ) } diff --git a/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers/monitoring_decorator_test.go b/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers/monitoring_decorator_test.go index afb15edac80..45b8ebab434 100644 --- a/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers/monitoring_decorator_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers/monitoring_decorator_test.go @@ -171,6 +171,85 @@ GROUPLOOP: } } +func TestMonitoringToLogstashInjection(t *testing.T) { + agentInfo, err := info.NewAgentInfo(true) + if err != nil { + t.Fatal(err) + } + ast, err := transpiler.NewAST(inputConfigLS) + if err != nil { + t.Fatal(err) + } + + programsToRun, err := program.Programs(agentInfo, ast) + if err != nil { + t.Fatal(err) + } + + if len(programsToRun) != 1 { + t.Fatal(fmt.Errorf("programsToRun expected to have %d entries", 1)) + } + +GROUPLOOP: + for group, ptr := range programsToRun { + programsCount := len(ptr) + newPtr, err := InjectMonitoring(agentInfo, group, ast, ptr) + if err != nil { + t.Error(err) + continue GROUPLOOP + } + + if programsCount+1 != len(newPtr) { + t.Errorf("incorrect programs to run count, expected: %d, got %d", programsCount+1, len(newPtr)) + continue GROUPLOOP + } + + for _, p := range newPtr { + if p.Spec.Name != MonitoringName { + continue + } + + cm, err := p.Config.Map() + if err != nil { + t.Error(err) + continue GROUPLOOP + } + + outputCfg, found := cm[outputKey] + if !found { + t.Errorf("output not found for '%s'", group) + continue GROUPLOOP + } + + outputMap, ok := outputCfg.(map[string]interface{}) + if !ok { + t.Errorf("output is not a map for '%s'", group) + continue GROUPLOOP + } + + esCfg, found := outputMap["logstash"] + if !found { + t.Errorf("logstash output not found for '%s' %v", group, outputMap) + continue GROUPLOOP + } + + esMap, ok := esCfg.(map[string]interface{}) + if !ok { + t.Errorf("output.logstash is not a map for '%s'", group) + continue GROUPLOOP + } + + if uname, found := esMap["hosts"]; !found { + t.Errorf("output.logstash.hosts output not found for '%s'", group) + continue GROUPLOOP + } else if uname != "192.168.1.2" { + t.Errorf("output.logstash.hosts has incorrect value expected '%s', got '%s for %s", "monitoring-uname", uname, group) + continue GROUPLOOP + } + } + } +} + func TestMonitoringInjectionDisabled(t *testing.T) { agentInfo, err := info.NewAgentInfo(true) if err != nil { @@ -613,42 +692,40 @@ var inputChange2 = map[string]interface{}{ }, } -// const inputConfig = `outputs: -// default: -// index_name: general -// pass: xxx -// type: es -// url: xxxxx -// username: xxx -// infosec1: -// pass: xxx -// spool: -// file: "${path.data}/spool.dat" -// type: es -// url: xxxxx -// username: xxx -// streams: -// - -// output: -// override: -// index_name: my_service_logs -// ingest_pipeline: process_logs -// path: /xxxx -// processors: -// - -// dissect: -// tokenizer: "---" -// type: log -// - -// output: -// index_name: mysql_access_logs -// path: /xxxx -// type: log -// - -// output: -// index_name: mysql_metrics -// use_output: infosec1 -// pass: yyy -// type: metrics/system -// username: xxxx -// ` +var inputConfigLS = map[string]interface{}{ + "agent.monitoring": map[string]interface{}{ + "enabled": true, + "logs": true, + "metrics": true, + "use_output": "monitoring", + }, + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "index_name": "general", + "pass": "xxx", + "type": "elasticsearch", + "url": "xxxxx", + "username": "xxx", + }, + "monitoring": map[string]interface{}{ + "type": "logstash", + "hosts": "192.168.1.2", + "ssl.certificate_authorities": []string{"/etc/pki.key"}, + }, + }, + "inputs": []map[string]interface{}{ + { + "type": "log", + "streams": []map[string]interface{}{ + {"paths": "/xxxx"}, + }, + "processors": []interface{}{ + map[string]interface{}{ + "dissect": map[string]interface{}{ + "tokenizer": "---", + }, + }, + }, + }, + }, +} diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index d7c81d9a3a9..45b7263cf73 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -69,7 +69,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { } func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) { - for _, step := range o.generateMonitoringSteps(s.Version, nil) { + for _, step := range o.generateMonitoringSteps(s.Version, "", nil) { p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags()) if err != nil { return errors.New(err, @@ -115,16 +115,49 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.S return nil } - output, found := outputMap["elasticsearch"] - if !found { - o.logger.Error("operator.getMonitoringSteps: monitoring is missing an elasticsearch output configuration configuration for sidecar of type: %s", step.ProgramSpec.Cmd) + if len(outputMap) == 0 { + o.logger.Errorf("operator.getMonitoringSteps: monitoring is missing an output configuration for sidecar of type: %s", step.ProgramSpec.Cmd) + return nil + } + + // Guards against parser issues upstream, this should not be possible but + // since we are folding all the child options as a map we should make sure we have + //a unique output. + if len(outputMap) > 1 { + o.logger.Errorf("operator.getMonitoringSteps: monitoring has too many outputs configuration for sidecar of type: %s", step.ProgramSpec.Cmd) + return nil + } + + // Aggregate output configuration independently of the received output key. + output := make(map[string]interface{}) + + for _, v := range outputMap { + child, ok := v.(map[string]interface{}) + if !ok { + o.logger.Error("operator.getMonitoringSteps: monitoring config is not a map") + return nil + } + for c, j := range child { + output[c] = j + } + } + + t, ok := output["type"] + if !ok { + o.logger.Errorf("operator.getMonitoringSteps: unknown monitoring output for sidecar of type: %s", step.ProgramSpec.Cmd) return nil } - return o.generateMonitoringSteps(step.Version, output) + outputType, ok := t.(string) + if !ok { + o.logger.Errorf("operator.getMonitoringSteps: unexpected monitoring output type: %+v for sidecar of type: %s", t, step.ProgramSpec.Cmd) + return nil + } + + return o.generateMonitoringSteps(step.Version, outputType, output) } -func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step { +func (o *Operator) generateMonitoringSteps(version, outputType string, output interface{}) []configrequest.Step { var steps []configrequest.Step watchLogs := o.monitor.WatchLogs() watchMetrics := o.monitor.WatchMetrics() @@ -132,7 +165,7 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [ // generate only when monitoring is running (for config refresh) or // state changes (turning on/off) if watchLogs != o.isMonitoringLogs() || watchLogs { - fbConfig, any := o.getMonitoringFilebeatConfig(output) + fbConfig, any := o.getMonitoringFilebeatConfig(outputType, output) stepID := configrequest.StepRun if !watchLogs || !any { stepID = configrequest.StepRemove @@ -149,7 +182,7 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [ steps = append(steps, filebeatStep) } if watchMetrics != o.isMonitoringMetrics() || watchMetrics { - mbConfig, any := o.getMonitoringMetricbeatConfig(output) + mbConfig, any := o.getMonitoringMetricbeatConfig(outputType, output) stepID := configrequest.StepRun if !watchMetrics || !any { stepID = configrequest.StepRemove @@ -182,7 +215,7 @@ func loadSpecFromSupported(processName string) program.Spec { } } -func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]interface{}, bool) { +func (o *Operator) getMonitoringFilebeatConfig(outputType string, output interface{}) (map[string]interface{}, bool) { inputs := []interface{}{ map[string]interface{}{ "type": "filestream", @@ -297,12 +330,13 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i }) } } + result := map[string]interface{}{ "filebeat": map[string]interface{}{ "inputs": inputs, }, "output": map[string]interface{}{ - "elasticsearch": output, + outputType: output, }, } @@ -311,7 +345,7 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i return result, true } -func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string]interface{}, bool) { +func (o *Operator) getMonitoringMetricbeatConfig(outputType string, output interface{}) (map[string]interface{}, bool) { hosts := o.getMetricbeatEndpoints() if len(hosts) == 0 { return nil, false @@ -526,7 +560,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string "modules": modules, }, "output": map[string]interface{}{ - "elasticsearch": output, + outputType: output, }, } diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index cbf9edf3266..136c9e485b1 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -31,6 +31,7 @@ import ( func TestGenerateSteps(t *testing.T) { const sampleOutput = "sample-output" + const outputType = "logstash" type testCase struct { Name string @@ -51,7 +52,7 @@ func TestGenerateSteps(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { m := &testMonitor{monitorLogs: tc.Config.MonitorLogs, monitorMetrics: tc.Config.MonitorMetrics} operator := getMonitorableTestOperator(t, "tests/scripts", m, tc.Config) - steps := operator.generateMonitoringSteps("8.0", sampleOutput) + steps := operator.generateMonitoringSteps("8.0", outputType, sampleOutput) if actualSteps := len(steps); actualSteps != tc.ExpectedSteps { t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps) } @@ -61,13 +62,13 @@ func TestGenerateSteps(t *testing.T) { // Filebeat step check if s.ProgramSpec.Cmd == "filebeat" { fbFound = true - checkStep(t, "filebeat", sampleOutput, s) + checkStep(t, "filebeat", outputType, sampleOutput, s) } // Metricbeat step check if s.ProgramSpec.Cmd == "metricbeat" { mbFound = true - checkStep(t, "metricbeat", sampleOutput, s) + checkStep(t, "metricbeat", outputType, sampleOutput, s) } } @@ -82,7 +83,7 @@ func TestGenerateSteps(t *testing.T) { } } -func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s configrequest.Step) { +func checkStep(t *testing.T, stepName string, outputType string, expectedOutput interface{}, s configrequest.Step) { if meta := s.Meta[configrequest.MetaConfigKey]; meta != nil { mapstr, ok := meta.(map[string]interface{}) if !ok { @@ -94,7 +95,7 @@ func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s conf t.Fatalf("output not found for %s step", stepName) } - if actualOutput := esOut["elasticsearch"]; actualOutput != expectedOutput { + if actualOutput := esOut[outputType]; actualOutput != expectedOutput { t.Fatalf("output for %s step does not match. expected: %v, got %v", stepName, expectedOutput, actualOutput) } } diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/ast.go b/x-pack/elastic-agent/pkg/agent/transpiler/ast.go index cc61efd63ea..100d8a462a0 100644 --- a/x-pack/elastic-agent/pkg/agent/transpiler/ast.go +++ b/x-pack/elastic-agent/pkg/agent/transpiler/ast.go @@ -1023,6 +1023,21 @@ func Lookup(a *AST, selector Selector) (Node, bool) { return current, true } +// LookupString accepts an AST and a selector and return the matching node at that position as a string. +func LookupString(a *AST, selector Selector) (string, bool) { + n, ok := Lookup(a, selector) + if !ok { + return "", false + } + + v, ok := n.Value().(*StrVal) + if !ok { + return "", false + } + + return v.String(), true +} + // Insert inserts a node into an existing AST, will return and error if the target position cannot // accept a new node. func Insert(a *AST, node Node, to Selector) error { diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go b/x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go index b48d15a112a..e1b22c390ed 100644 --- a/x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go +++ b/x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go @@ -1793,6 +1793,50 @@ func TestHash(t *testing.T) { } } +func TestLookupString(t *testing.T) { + t.Run("when the selector exist with a string value", func(t *testing.T) { + a := &AST{ + root: &Dict{ + value: []Node{ + &Key{name: "inputs", value: &StrVal{value: "/var/log/log1"}}, + }, + }, + } + + s, ok := LookupString(a, "inputs") + assert.Equal(t, "/var/log/log1", s) + assert.True(t, ok) + }) + + t.Run("when the selector doesn't exist", func(t *testing.T) { + a := &AST{ + root: &Dict{ + value: []Node{ + &Key{name: "Weee!", value: &StrVal{value: "/var/log/log1"}}, + }, + }, + } + + s, ok := LookupString(a, "inputs") + assert.Equal(t, "", s) + assert.False(t, ok) + }) + + t.Run("when the node is not a StrVal will fail", func(t *testing.T) { + a := &AST{ + root: &Dict{ + value: []Node{ + &Key{name: "inputs", value: &FloatVal{value: 4.2}}, + }, + }, + } + + s, ok := LookupString(a, "inputs") + assert.Equal(t, "", s) + assert.False(t, ok) + }) +} + func mustMakeVars(mapping map[string]interface{}) *Vars { v, err := NewVars(mapping, nil) if err != nil {