Skip to content

Commit

Permalink
Remove ECS options in filebeat pipelines when not available (#11362)
Browse files Browse the repository at this point in the history
Similar to #10875, but removing ecs flags when using pipelines on
Elasticsearch versions below 6.7.0, that don't have them.
  • Loading branch information
jsoriano committed Mar 21, 2019
1 parent 5ac8585 commit 79800ae
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff]
- Filesets with multiple ingest pipelines added in {pull}8914[8914] only work with Elasticsearch >= 6.5.0 {pull}10001[10001]
- Add grok pattern to support redis 5.0.3 log timestamp. {issue}9819[9819] {pull}10033[10033]
- Ingesting Elasticsearch audit logs is only supported with Elasticsearch 6.5.0 and above {pull}8852[8852]
- Remove `ecs` option from user_agent processors when loading pipelines with Filebeat 6.7.x into Elasticsearch < 6.7.0. {issue}10655[10655] {pull}11362[11362]

*Heartbeat*

Expand Down
35 changes: 35 additions & 0 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
return nil
}
}

err := setECSProcessors(esClient.GetVersion(), pipelineID, content)
if err != nil {
return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err)
}

body, err := esClient.LoadJSON(path, content)
if err != nil {
return interpretError(err, body)
Expand All @@ -129,6 +135,35 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
return nil
}

// setECSProcessors removes ECS-specific versions from processors in versions not supporting them
func setECSProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error {
ecsFlagVersion := common.MustNewVersion("6.7.0")
if !esVersion.LessThan(ecsFlagVersion) {
return nil
}

p, ok := content["processors"]
if !ok {
return nil
}
processors, ok := p.([]interface{})
if !ok {
return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p)
}

for _, p := range processors {
processor, ok := p.(map[string]interface{})
if !ok {
continue
}
if options, ok := processor["user_agent"].(map[string]interface{}); ok {
logp.Debug("modules", "Removing 'ecs' option in user_agent processor for field '%v' in pipeline '%s' as it is not supported in Elasticsearch %v", options["field"], pipelineID, esVersion)
delete(options, "ecs")
}
}
return nil
}

func deletePipeline(esClient PipelineLoader, pipelineID string) error {
path := makeIngestPipelinePath(pipelineID)
_, _, err := esClient.Request("DELETE", path, "", nil, nil)
Expand Down
122 changes: 122 additions & 0 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
)

Expand Down Expand Up @@ -103,3 +104,124 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
})
}
}

func TestSetECSProcessors(t *testing.T) {
cases := []struct {
name string
esVersion *common.Version
content map[string]interface{}
expected map[string]interface{}
isErrExpected bool
}{
{
name: "ES < 6.7.0",
esVersion: common.MustNewVersion("6.6.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
"ecs": false,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
},
},
}},
isErrExpected: false,
},
{
name: "ES == 6.7.0",
esVersion: common.MustNewVersion("6.7.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"rename": map[string]interface{}{
"field": "foo.src_ip",
"target_field": "source.ip",
},
},
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
"ecs": false,
},
},
},
},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"rename": map[string]interface{}{
"field": "foo.src_ip",
"target_field": "source.ip",
},
},
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
"ecs": false,
},
},
},
},
isErrExpected: false,
},
{
name: "ES >= 7.0.0",
esVersion: common.MustNewVersion("7.0.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"rename": map[string]interface{}{
"field": "foo.src_ip",
"target_field": "source.ip",
},
},
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
"ecs": false,
},
},
},
},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"rename": map[string]interface{}{
"field": "foo.src_ip",
"target_field": "source.ip",
},
},
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
"ecs": false,
},
},
},
},
isErrExpected: false,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := setECSProcessors(*test.esVersion, "foo-pipeline", test.content)
if test.isErrExpected {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expected, test.content)
}
})
}
}

0 comments on commit 79800ae

Please sign in to comment.