Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Logstash as a valid Agent destination #24305

Merged
merged 8 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
- Agent sends wrong log level to Endpoint {issue}25583[25583]
- Fix startup with failing configuration {pull}26057[26057]
- Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391]
- Fix add support for Logstash output. {pull}24305[24305]

==== New features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,18 @@ func InjectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *tr
}

// get monitoring output name to be used
monitoringOutputName := defaultOutputName
useOutputNode, found := transpiler.Lookup(rootAst, monitoringUseOutputKey)
if found {
monitoringOutputNameKey, ok := useOutputNode.Value().(*transpiler.StrVal)
if !ok {
return programsToRun, nil
}

monitoringOutputName = monitoringOutputNameKey.String()
monitoringOutputName, found := transpiler.LookupString(rootAst, monitoringUseOutputKey)
if !found {
monitoringOutputName = defaultOutputName
}

typeValue, found := transpiler.LookupString(rootAst, fmt.Sprintf("%s.%s.type", outputsKey, monitoringOutputName))
if !found {
typeValue = elasticsearchKey
}

ast := rootAst.Clone()
if err := getMonitoringRule(monitoringOutputName).Apply(agentInfo, ast); err != nil {
if err := getMonitoringRule(monitoringOutputName, typeValue).Apply(agentInfo, ast); err != nil {
return programsToRun, err
}

Expand Down Expand Up @@ -95,11 +94,11 @@ func InjectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *tr
return append(programsToRun, monitoringProgram), nil
}

func getMonitoringRule(outputName string) *transpiler.RuleList {
func getMonitoringRule(outputName string, t string) *transpiler.RuleList {
monitoringOutputSelector := fmt.Sprintf(monitoringOutputFormatKey, outputName)
return transpiler.NewRuleList(
transpiler.Copy(monitoringOutputSelector, outputKey),
transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey),
transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), t),
transpiler.Filter(monitoringKey, programsKey, outputKey),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,85 @@ GROUPLOOP:
}
}

func TestMonitoringToLogstashInjection(t *testing.T) {
agentInfo, err := info.NewAgentInfo(true)
if err != nil {
t.Fatal(err)
}
ast, err := transpiler.NewAST(inputConfigLS)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(agentInfo, ast)
if err != nil {
t.Fatal(err)
}

if len(programsToRun) != 1 {
t.Fatal(fmt.Errorf("programsToRun expected to have %d entries", 1))
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := InjectMonitoring(agentInfo, group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
}

if programsCount+1 != len(newPtr) {
t.Errorf("incorrect programs to run count, expected: %d, got %d", programsCount+1, len(newPtr))
continue GROUPLOOP
}

for _, p := range newPtr {
if p.Spec.Name != MonitoringName {
continue
}

cm, err := p.Config.Map()
if err != nil {
t.Error(err)
continue GROUPLOOP
}

outputCfg, found := cm[outputKey]
if !found {
t.Errorf("output not found for '%s'", group)
continue GROUPLOOP
}

outputMap, ok := outputCfg.(map[string]interface{})
if !ok {
t.Errorf("output is not a map for '%s'", group)
continue GROUPLOOP
}

esCfg, found := outputMap["logstash"]
if !found {
t.Errorf("logstash output not found for '%s' %v", group, outputMap)
continue GROUPLOOP
}

esMap, ok := esCfg.(map[string]interface{})
if !ok {
t.Errorf("output.logstash is not a map for '%s'", group)
continue GROUPLOOP
}

if uname, found := esMap["hosts"]; !found {
t.Errorf("output.logstash.hosts output not found for '%s'", group)
continue GROUPLOOP
} else if uname != "192.168.1.2" {
t.Errorf("output.logstash.hosts has incorrect value expected '%s', got '%s for %s", "monitoring-uname", uname, group)
continue GROUPLOOP
}
}
}
}

func TestMonitoringInjectionDisabled(t *testing.T) {
agentInfo, err := info.NewAgentInfo(true)
if err != nil {
Expand Down Expand Up @@ -613,42 +692,40 @@ var inputChange2 = map[string]interface{}{
},
}

// const inputConfig = `outputs:
// default:
// index_name: general
// pass: xxx
// type: es
// url: xxxxx
// username: xxx
// infosec1:
// pass: xxx
// spool:
// file: "${path.data}/spool.dat"
// type: es
// url: xxxxx
// username: xxx
// streams:
// -
// output:
// override:
// index_name: my_service_logs
// ingest_pipeline: process_logs
// path: /xxxx
// processors:
// -
// dissect:
// tokenizer: "---"
// type: log
// -
// output:
// index_name: mysql_access_logs
// path: /xxxx
// type: log
// -
// output:
// index_name: mysql_metrics
// use_output: infosec1
// pass: yyy
// type: metrics/system
// username: xxxx
// `
var inputConfigLS = map[string]interface{}{
"agent.monitoring": map[string]interface{}{
"enabled": true,
"logs": true,
"metrics": true,
"use_output": "monitoring",
},
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"index_name": "general",
"pass": "xxx",
"type": "elasticsearch",
"url": "xxxxx",
"username": "xxx",
},
"monitoring": map[string]interface{}{
"type": "logstash",
"hosts": "192.168.1.2",
"ssl.certificate_authorities": []string{"/etc/pki.key"},
},
},
"inputs": []map[string]interface{}{
{
"type": "log",
"streams": []map[string]interface{}{
{"paths": "/xxxx"},
},
"processors": []interface{}{
map[string]interface{}{
"dissect": map[string]interface{}{
"tokenizer": "---",
},
},
},
},
},
}
58 changes: 46 additions & 12 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
}

func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) {
for _, step := range o.generateMonitoringSteps(s.Version, nil) {
for _, step := range o.generateMonitoringSteps(s.Version, "", nil) {
p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags())
if err != nil {
return errors.New(err,
Expand Down Expand Up @@ -115,24 +115,57 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.S
return nil
}

output, found := outputMap["elasticsearch"]
if !found {
o.logger.Error("operator.getMonitoringSteps: monitoring is missing an elasticsearch output configuration configuration for sidecar of type: %s", step.ProgramSpec.Cmd)
if len(outputMap) == 0 {
o.logger.Errorf("operator.getMonitoringSteps: monitoring is missing an output configuration for sidecar of type: %s", step.ProgramSpec.Cmd)
return nil
}

// Guards against parser issues upstream, this should not be possible but
// since we are folding all the child options as a map we should make sure we have
//a unique output.
if len(outputMap) > 1 {
o.logger.Errorf("operator.getMonitoringSteps: monitoring has too many outputs configuration for sidecar of type: %s", step.ProgramSpec.Cmd)
return nil
}

// Aggregate output configuration independently of the received output key.
output := make(map[string]interface{})

for _, v := range outputMap {
child, ok := v.(map[string]interface{})
if !ok {
o.logger.Error("operator.getMonitoringSteps: monitoring config is not a map")
return nil
}
for c, j := range child {
output[c] = j
}
}

t, ok := output["type"]
if !ok {
o.logger.Errorf("operator.getMonitoringSteps: unknown monitoring output for sidecar of type: %s", step.ProgramSpec.Cmd)
return nil
}

return o.generateMonitoringSteps(step.Version, output)
outputType, ok := t.(string)
if !ok {
o.logger.Errorf("operator.getMonitoringSteps: unexpected monitoring output type: %+v for sidecar of type: %s", t, step.ProgramSpec.Cmd)
return nil
}

return o.generateMonitoringSteps(step.Version, outputType, output)
}

func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step {
func (o *Operator) generateMonitoringSteps(version, outputType string, output interface{}) []configrequest.Step {
var steps []configrequest.Step
watchLogs := o.monitor.WatchLogs()
watchMetrics := o.monitor.WatchMetrics()

// generate only when monitoring is running (for config refresh) or
// state changes (turning on/off)
if watchLogs != o.isMonitoringLogs() || watchLogs {
fbConfig, any := o.getMonitoringFilebeatConfig(output)
fbConfig, any := o.getMonitoringFilebeatConfig(outputType, output)
stepID := configrequest.StepRun
if !watchLogs || !any {
stepID = configrequest.StepRemove
Expand All @@ -149,7 +182,7 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [
steps = append(steps, filebeatStep)
}
if watchMetrics != o.isMonitoringMetrics() || watchMetrics {
mbConfig, any := o.getMonitoringMetricbeatConfig(output)
mbConfig, any := o.getMonitoringMetricbeatConfig(outputType, output)
stepID := configrequest.StepRun
if !watchMetrics || !any {
stepID = configrequest.StepRemove
Expand Down Expand Up @@ -182,7 +215,7 @@ func loadSpecFromSupported(processName string) program.Spec {
}
}

func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]interface{}, bool) {
func (o *Operator) getMonitoringFilebeatConfig(outputType string, output interface{}) (map[string]interface{}, bool) {
inputs := []interface{}{
map[string]interface{}{
"type": "filestream",
Expand Down Expand Up @@ -297,12 +330,13 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
})
}
}

result := map[string]interface{}{
"filebeat": map[string]interface{}{
"inputs": inputs,
},
"output": map[string]interface{}{
"elasticsearch": output,
outputType: output,
},
}

Expand All @@ -311,7 +345,7 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
return result, true
}

func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string]interface{}, bool) {
func (o *Operator) getMonitoringMetricbeatConfig(outputType string, output interface{}) (map[string]interface{}, bool) {
hosts := o.getMetricbeatEndpoints()
if len(hosts) == 0 {
return nil, false
Expand Down Expand Up @@ -526,7 +560,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
"modules": modules,
},
"output": map[string]interface{}{
"elasticsearch": output,
outputType: output,
},
}

Expand Down
11 changes: 6 additions & 5 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

func TestGenerateSteps(t *testing.T) {
const sampleOutput = "sample-output"
const outputType = "logstash"

type testCase struct {
Name string
Expand All @@ -51,7 +52,7 @@ func TestGenerateSteps(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
m := &testMonitor{monitorLogs: tc.Config.MonitorLogs, monitorMetrics: tc.Config.MonitorMetrics}
operator := getMonitorableTestOperator(t, "tests/scripts", m, tc.Config)
steps := operator.generateMonitoringSteps("8.0", sampleOutput)
steps := operator.generateMonitoringSteps("8.0", outputType, sampleOutput)
if actualSteps := len(steps); actualSteps != tc.ExpectedSteps {
t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps)
}
Expand All @@ -61,13 +62,13 @@ func TestGenerateSteps(t *testing.T) {
// Filebeat step check
if s.ProgramSpec.Cmd == "filebeat" {
fbFound = true
checkStep(t, "filebeat", sampleOutput, s)
checkStep(t, "filebeat", outputType, sampleOutput, s)
}

// Metricbeat step check
if s.ProgramSpec.Cmd == "metricbeat" {
mbFound = true
checkStep(t, "metricbeat", sampleOutput, s)
checkStep(t, "metricbeat", outputType, sampleOutput, s)
}
}

Expand All @@ -82,7 +83,7 @@ func TestGenerateSteps(t *testing.T) {
}
}

func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s configrequest.Step) {
func checkStep(t *testing.T, stepName string, outputType string, expectedOutput interface{}, s configrequest.Step) {
if meta := s.Meta[configrequest.MetaConfigKey]; meta != nil {
mapstr, ok := meta.(map[string]interface{})
if !ok {
Expand All @@ -94,7 +95,7 @@ func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s conf
t.Fatalf("output not found for %s step", stepName)
}

if actualOutput := esOut["elasticsearch"]; actualOutput != expectedOutput {
if actualOutput := esOut[outputType]; actualOutput != expectedOutput {
t.Fatalf("output for %s step does not match. expected: %v, got %v", stepName, expectedOutput, actualOutput)
}
}
Expand Down
Loading