Skip to content

Commit

Permalink
Refactor add_kubernetes_metadata to support autodiscovery
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel committed Oct 25, 2017
1 parent d1dfc7e commit c902d6d
Show file tree
Hide file tree
Showing 13 changed files with 532 additions and 462 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add support for enabling TLS renegotiation. {issue}4386[4386]
- Add Azure VM support for add_cloud_metadata processor {pull}5355[5355]
- Add `output.file.permission` config option. {pull}4638[4638]
- Refactor add_kubernetes_metadata to support autodiscovery {pull}5434[5434]

*Auditbeat*

Expand Down
26 changes: 26 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package add_kubernetes_metadata

import (
"encoding/json"

corev1 "github.com/ericchiang/k8s/api/v1"

"github.com/elastic/beats/libbeat/logp"
)

func GetPodMeta(pod *corev1.Pod) *Pod {
bytes, err := json.Marshal(pod)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

po := &Pod{}
err = json.Unmarshal(bytes, po)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

return po
}
250 changes: 250 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package add_kubernetes_metadata

import (
"fmt"
"strings"
"sync"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

const (
ContainerIndexerName = "container"
PodNameIndexerName = "pod_name"
)

// Indexer take known pods and generate all the metadata we need to enrich
// events in a efficient way. By preindexing the metadata in the way it will be
// checked when matching events
type Indexer interface {
// GetMetadata generates event metadata for the given pod, then returns the
// list of indexes to create, with the metadata to put on them
GetMetadata(pod *Pod) []MetadataIndex

// GetIndexes return the list of indexes the given pod belongs to. This function
// must return the same indexes than GetMetadata
GetIndexes(pod *Pod) []string
}

// MetadataIndex holds a pair of index -> metadata info
type MetadataIndex struct {
Index string
Data common.MapStr
}

type Indexers struct {
sync.RWMutex
indexers []Indexer
}

//GenMeta takes in pods to generate metadata for them
type GenMeta interface {
//GenerateMetaData generates metadata by taking in a pod as an input
GenerateMetaData(pod *Pod) common.MapStr
}

type IndexerConstructor func(config common.Config, genMeta GenMeta) (Indexer, error)

func NewIndexers(configs PluginConfig, metaGen *GenDefaultMeta) *Indexers {
indexers := []Indexer{}
for _, pluginConfigs := range configs {
for name, pluginConfig := range pluginConfigs {
indexFunc := Indexing.GetIndexer(name)
if indexFunc == nil {
logp.Warn("Unable to find indexing plugin %s", name)
continue
}

indexer, err := indexFunc(pluginConfig, metaGen)
if err != nil {
logp.Warn("Unable to initialize indexing plugin %s due to error %v", name, err)
}

indexers = append(indexers, indexer)
}
}

return &Indexers{
indexers: indexers,
}
}

// GetMetadata returns the composed metadata list from all registered indexers
func (i *Indexers) GetMetadata(pod *Pod) []MetadataIndex {
var metadata []MetadataIndex
i.RLock()
defer i.RUnlock()
for _, indexer := range i.indexers {
for _, m := range indexer.GetMetadata(pod) {
metadata = append(metadata, m)
}
}
return metadata
}

// GetIndexes returns the composed index list from all registered indexers
func (i *Indexers) GetIndexes(pod *Pod) []string {
var indexes []string
i.RLock()
defer i.RUnlock()
for _, indexer := range i.indexers {
for _, i := range indexer.GetIndexes(pod) {
indexes = append(indexes, i)
}
}
return indexes
}

func (i *Indexers) Empty() bool {
if len(i.indexers) == 0 {
return true
}

return false
}

type GenDefaultMeta struct {
annotations []string
labels []string
labelsExclude []string
}

func NewGenDefaultMeta(annotations, labels, labelsExclude []string) *GenDefaultMeta {
return &GenDefaultMeta{
annotations: annotations,
labels: labels,
labelsExclude: labelsExclude,
}
}

// GenerateMetaData generates default metadata for the given pod taking to account certain filters
func (g *GenDefaultMeta) GenerateMetaData(pod *Pod) common.MapStr {
labelMap := common.MapStr{}
annotationsMap := common.MapStr{}

if len(g.labels) == 0 {
for k, v := range pod.Metadata.Labels {
labelMap[k] = v
}
} else {
labelMap = generateMapSubset(pod.Metadata.Labels, g.labels)
}

// Exclude any labels that are present in the exclude_labels config
for _, label := range g.labelsExclude {
delete(labelMap, label)
}

annotationsMap = generateMapSubset(pod.Metadata.Annotations, g.annotations)

meta := common.MapStr{
"pod": common.MapStr{
"name": pod.Metadata.Name,
},
"namespace": pod.Metadata.Namespace,
}

if len(labelMap) != 0 {
meta["labels"] = labelMap
}

if len(annotationsMap) != 0 {
meta["annotations"] = annotationsMap
}

return meta
}

func generateMapSubset(input map[string]string, keys []string) common.MapStr {
output := common.MapStr{}
if input == nil {
return output
}

for _, key := range keys {
value, ok := input[key]
if ok {
output[key] = value
}
}

return output
}

// PodNameIndexer implements default indexer based on pod name
type PodNameIndexer struct {
genMeta GenMeta
}

func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &PodNameIndexer{genMeta: genMeta}, nil
}

func (p *PodNameIndexer) GetMetadata(pod *Pod) []MetadataIndex {
data := p.genMeta.GenerateMetaData(pod)
return []MetadataIndex{
{
Index: fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name),
Data: data,
},
}
}

func (p *PodNameIndexer) GetIndexes(pod *Pod) []string {
return []string{fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name)}
}

// ContainerIndexer indexes pods based on all their containers IDs
type ContainerIndexer struct {
genMeta GenMeta
}

func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &ContainerIndexer{genMeta: genMeta}, nil
}

func (c *ContainerIndexer) GetMetadata(pod *Pod) []MetadataIndex {
commonMeta := c.genMeta.GenerateMetaData(pod)
var metadata []MetadataIndex
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
if cID == "" {
continue
}

containerMeta := commonMeta.Clone()
containerMeta["container"] = common.MapStr{
"name": status.Name,
}
metadata = append(metadata, MetadataIndex{
Index: cID,
Data: containerMeta,
})
}

return metadata
}

func (c *ContainerIndexer) GetIndexes(pod *Pod) []string {
var containers []string
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
if cID == "" {
continue
}
containers = append(containers, cID)
}
return containers
}

func containerID(status PodContainerStatus) string {
cID := status.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
return parts[1]
}
}
return ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,38 +124,6 @@ func TestContainerIndexer(t *testing.T) {
assert.Equal(t, expected.String(), indexers[1].Data.String())
}

func TestFieldMatcher(t *testing.T) {
testCfg := map[string]interface{}{
"lookup_fields": []string{},
}
fieldCfg, err := common.NewConfigFrom(testCfg)

assert.Nil(t, err)
matcher, err := NewFieldMatcher(*fieldCfg)
assert.NotNil(t, err)

testCfg["lookup_fields"] = "foo"
fieldCfg, _ = common.NewConfigFrom(testCfg)

matcher, err = NewFieldMatcher(*fieldCfg)
assert.NotNil(t, matcher)
assert.Nil(t, err)

input := common.MapStr{
"foo": "bar",
}

out := matcher.MetadataIndex(input)
assert.Equal(t, out, "bar")

nonMatchInput := common.MapStr{
"not": "match",
}

out = matcher.MetadataIndex(nonMatchInput)
assert.Equal(t, out, "")
}

func TestFilteredGenMeta(t *testing.T) {
var testConfig = common.NewConfig()

Expand Down Expand Up @@ -269,49 +237,3 @@ func TestFilteredGenMetaExclusion(t *testing.T) {
ok, _ = labelMap.HasKey("x")
assert.Equal(t, ok, false)
}

func TestFieldFormatMatcher(t *testing.T) {
testCfg := map[string]interface{}{}
fieldCfg, err := common.NewConfigFrom(testCfg)

assert.Nil(t, err)
matcher, err := NewFieldFormatMatcher(*fieldCfg)
assert.NotNil(t, err)

testCfg["format"] = `%{[namespace]}/%{[pod]}`
fieldCfg, _ = common.NewConfigFrom(testCfg)

matcher, err = NewFieldFormatMatcher(*fieldCfg)
assert.NotNil(t, matcher)
assert.Nil(t, err)

event := common.MapStr{
"namespace": "foo",
"pod": "bar",
}

out := matcher.MetadataIndex(event)
assert.Equal(t, "foo/bar", out)

event = common.MapStr{
"foo": "bar",
}
out = matcher.MetadataIndex(event)
assert.Empty(t, out)

testCfg["format"] = `%{[dimensions.namespace]}/%{[dimensions.pod]}`
fieldCfg, _ = common.NewConfigFrom(testCfg)
matcher, err = NewFieldFormatMatcher(*fieldCfg)
assert.NotNil(t, matcher)
assert.Nil(t, err)

event = common.MapStr{
"dimensions": common.MapStr{
"pod": "bar",
"namespace": "foo",
},
}

out = matcher.MetadataIndex(event)
assert.Equal(t, "foo/bar", out)
}
Loading

0 comments on commit c902d6d

Please sign in to comment.