Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify add_docker_metadata processor to lookup ContainerId from source path in case of filebeat #4496

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processor/kubernetes"
// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processor/add_docker_metadata"
)

var (
Expand Down
55 changes: 55 additions & 0 deletions filebeat/processor/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package add_docker_metadata

import (
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors/add_docker_metadata"
)
const LogPathMatcherName = "logs_path"


func init() {
add_docker_metadata.Matcher=&FileMatch{}
}

type FileMatch struct {
LogsPath string
}

func (match *FileMatch) MetadataIndex(event common.MapStr) string{
if value, ok := event["source"]; ok {
source := value.(string)
cid := ""
if strings.Contains(source, match.LogsPath) {
//Docker container is 64 chars in length
cid = source[len(match.LogsPath) : len(match.LogsPath)+64]
}
if cid != "" {
return cid
}
}

return ""
}

func (match *FileMatch) InitMatcher(cfg common.Config) error{
config := struct {
LogsPath string `config:"logs_path"`
}{
LogsPath: "/var/lib/docker/containers/",
}

err := cfg.Unpack(&config)
if err != nil || config.LogsPath == "" {
return err
}

match.LogsPath = config.LogsPath
return nil
}




80 changes: 80 additions & 0 deletions filebeat/processor/add_docker_metadata/add_docker_metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package add_docker_metadata

import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
"github.com/elastic/beats/libbeat/processors/add_docker_metadata"
)

func TestLogPath(t *testing.T) {
testConfig, err := common.NewConfigFrom(map[string]interface{}{
"logs_path": "/path1/path2/",
})
assert.NoError(t, err)

p, err := add_docker_metadata.BuildDockerMetadataProcessor(*testConfig, MockWatcherFactory(
map[string]*add_docker_metadata.Container{
"9b11fc6df837c05fe81a174b80fb3731c32a5dba442af6146944cb0f85e30e56": &add_docker_metadata.Container{
ID: "9b11fc6df837c05fe81a174b80fb3731c32a5dba442af6146944cb0f85e30e56",
Image: "image",
Name: "name",
Labels: map[string]string{
"a": "1",
"b": "2",
},
},
}))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
"source": "/path1/path2/9b11fc6df837c05fe81a174b80fb3731c32a5dba442af6146944cb0f85e30e56/container.log",
}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.EqualValues(t, common.MapStr{
"source":"/path1/path2/9b11fc6df837c05fe81a174b80fb3731c32a5dba442af6146944cb0f85e30e56/container.log",
"docker": common.MapStr{
"container": common.MapStr{
"id": "9b11fc6df837c05fe81a174b80fb3731c32a5dba442af6146944cb0f85e30e56",
"image": "image",
"labels": common.MapStr{
"a": "1",
"b": "2",
},
"name": "name",
},
},
}, result)
}

func MockWatcherFactory(containers map[string]*add_docker_metadata.Container) add_docker_metadata.WatcherConstructor {
if containers == nil {
containers = make(map[string]*add_docker_metadata.Container)
}
return func(host string, tls *add_docker_metadata.TLSConfig) (add_docker_metadata.Watcher, error) {
return &mockWatcher{containers: containers}, nil
}
}

type mockWatcher struct {
containers map[string]*add_docker_metadata.Container
}

func (m *mockWatcher) Start() error {
return nil
}

func (m *mockWatcher) Container(ID string) *add_docker_metadata.Container {
return m.containers[ID]
}

func (m *mockWatcher) Containers() map[string]*add_docker_metadata.Container {
return m.containers
}




4 changes: 3 additions & 1 deletion libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,15 @@ from Docker containers:

Currently it supports enriching events by matching existing fields with the
container ID in them, for example, cgroup IDs retrieved by metricbeat system
module:
module. In case of filebeat, it supports parsing Docker Container ID from the prospector
path:

[source,yaml]
-------------------------------------------------------------------------------
processors:
- add_docker_metadata:
match_fields: ["system.process.cgroup.id"]
logs_path: "/var/lib/docker/containers"
host: "unix:///var/run/docker.sock"

# To connect to Docker over TLS you must specify a client and CA certificate.
Expand Down
72 changes: 60 additions & 12 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,68 @@ func init() {
processors.RegisterPlugin("add_docker_metadata", newDockerMetadataProcessor)
}

var Matcher Match = &FieldMatch{};

type Match interface {
InitMatcher(cfg common.Config) error
MetadataIndex(event common.MapStr) string
}

type NoopMatch struct {
MatchFields []string
}

type FieldMatch struct {
MatchFields []string
}

func (match *FieldMatch) MetadataIndex(event common.MapStr) string{
for _, field := range match.MatchFields {
keyIface, err := event.GetValue(field)
if err == nil {
key, ok := keyIface.(string)
if ok {
return key
}
}
}

return ""
}

func (match *NoopMatch) InitMatcher(cfg common.Config) error {
return nil
}

func (match *NoopMatch) MetadataIndex(event common.MapStr) string{
return ""
}

func (match *FieldMatch) InitMatcher(cfg common.Config) error {
config := struct {
Fields []string `config:"match_fields"`
}{}
err := cfg.Unpack(&config)
if err != nil || len(config.Fields) ==0 {
return err
}
match.MatchFields = config.Fields
return nil
}



type addDockerMetadata struct {
watcher Watcher
fields []string
matcher Match
}

func newDockerMetadataProcessor(cfg common.Config) (processors.Processor, error) {
return buildDockerMetadataProcessor(cfg, NewWatcher)
return BuildDockerMetadataProcessor(cfg, NewWatcher)
}

func buildDockerMetadataProcessor(cfg common.Config, watcherConstructor WatcherConstructor) (processors.Processor, error) {
func BuildDockerMetadataProcessor(cfg common.Config, watcherConstructor WatcherConstructor) (processors.Processor, error) {
logp.Beta("The add_docker_metadata processor is beta")

config := defaultConfig()
Expand All @@ -41,24 +93,20 @@ func buildDockerMetadataProcessor(cfg common.Config, watcherConstructor WatcherC
return nil, err
}

if err = Matcher.InitMatcher(cfg); err != nil {
Matcher = &NoopMatch{}
}

return &addDockerMetadata{
watcher: watcher,
fields: config.Fields,
matcher: Matcher,
}, nil
}

func (d *addDockerMetadata) Run(event common.MapStr) (common.MapStr, error) {
var cid string
for _, field := range d.fields {
value, err := event.GetValue(field)
if err != nil {
continue
}

if strValue, ok := value.(string); ok {
cid = strValue
}
}
cid := d.matcher.MetadataIndex(event)

if cid == "" {
return event, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestInitialization(t *testing.T) {
var testConfig = common.NewConfig()

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
p, err := BuildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{}
Expand All @@ -26,7 +26,7 @@ func TestNoMatch(t *testing.T) {
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
p, err := BuildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
Expand All @@ -44,7 +44,7 @@ func TestMatchNoContainer(t *testing.T) {
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
p, err := BuildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
Expand All @@ -62,7 +62,7 @@ func TestMatchContainer(t *testing.T) {
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(
p, err := BuildDockerMetadataProcessor(*testConfig, MockWatcherFactory(
map[string]*Container{
"container_id": &Container{
ID: "container_id",
Expand Down