diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 13c115ae98c..516a7fa46a5 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -75,6 +75,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add support for enabling TLS renegotiation. {issue}4386[4386] - Add Azure VM support for add_cloud_metadata processor {pull}5355[5355] - Add `output.file.permission` config option. {pull}4638[4638] +- Refactor add_kubernetes_metadata to support autodiscovery {pull}5434[5434] *Auditbeat* diff --git a/filebeat/processor/add_kubernetes_metadata/indexing.go b/filebeat/processor/add_kubernetes_metadata/matchers.go similarity index 100% rename from filebeat/processor/add_kubernetes_metadata/indexing.go rename to filebeat/processor/add_kubernetes_metadata/matchers.go diff --git a/filebeat/processor/add_kubernetes_metadata/indexing_test.go b/filebeat/processor/add_kubernetes_metadata/matchers_test.go similarity index 100% rename from filebeat/processor/add_kubernetes_metadata/indexing_test.go rename to filebeat/processor/add_kubernetes_metadata/matchers_test.go diff --git a/libbeat/processors/add_kubernetes_metadata/common.go b/libbeat/processors/add_kubernetes_metadata/common.go new file mode 100644 index 00000000000..61bee1c6583 --- /dev/null +++ b/libbeat/processors/add_kubernetes_metadata/common.go @@ -0,0 +1,26 @@ +package add_kubernetes_metadata + +import ( + "encoding/json" + + corev1 "github.com/ericchiang/k8s/api/v1" + + "github.com/elastic/beats/libbeat/logp" +) + +func GetPodMeta(pod *corev1.Pod) *Pod { + bytes, err := json.Marshal(pod) + if err != nil { + logp.Warn("Unable to marshal %v", pod.String()) + return nil + } + + po := &Pod{} + err = json.Unmarshal(bytes, po) + if err != nil { + logp.Warn("Unable to marshal %v", pod.String()) + return nil + } + + return po +} diff --git a/libbeat/processors/add_kubernetes_metadata/indexers.go b/libbeat/processors/add_kubernetes_metadata/indexers.go new file mode 100644 index 00000000000..8976e637350 --- /dev/null +++ b/libbeat/processors/add_kubernetes_metadata/indexers.go @@ -0,0 +1,250 @@ +package add_kubernetes_metadata + +import ( + "fmt" + "strings" + "sync" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +const ( + ContainerIndexerName = "container" + PodNameIndexerName = "pod_name" +) + +// Indexer take known pods and generate all the metadata we need to enrich +// events in a efficient way. By preindexing the metadata in the way it will be +// checked when matching events +type Indexer interface { + // GetMetadata generates event metadata for the given pod, then returns the + // list of indexes to create, with the metadata to put on them + GetMetadata(pod *Pod) []MetadataIndex + + // GetIndexes return the list of indexes the given pod belongs to. This function + // must return the same indexes than GetMetadata + GetIndexes(pod *Pod) []string +} + +// MetadataIndex holds a pair of index -> metadata info +type MetadataIndex struct { + Index string + Data common.MapStr +} + +type Indexers struct { + sync.RWMutex + indexers []Indexer +} + +//GenMeta takes in pods to generate metadata for them +type GenMeta interface { + //GenerateMetaData generates metadata by taking in a pod as an input + GenerateMetaData(pod *Pod) common.MapStr +} + +type IndexerConstructor func(config common.Config, genMeta GenMeta) (Indexer, error) + +func NewIndexers(configs PluginConfig, metaGen *GenDefaultMeta) *Indexers { + indexers := []Indexer{} + for _, pluginConfigs := range configs { + for name, pluginConfig := range pluginConfigs { + indexFunc := Indexing.GetIndexer(name) + if indexFunc == nil { + logp.Warn("Unable to find indexing plugin %s", name) + continue + } + + indexer, err := indexFunc(pluginConfig, metaGen) + if err != nil { + logp.Warn("Unable to initialize indexing plugin %s due to error %v", name, err) + } + + indexers = append(indexers, indexer) + } + } + + return &Indexers{ + indexers: indexers, + } +} + +// GetMetadata returns the composed metadata list from all registered indexers +func (i *Indexers) GetMetadata(pod *Pod) []MetadataIndex { + var metadata []MetadataIndex + i.RLock() + defer i.RUnlock() + for _, indexer := range i.indexers { + for _, m := range indexer.GetMetadata(pod) { + metadata = append(metadata, m) + } + } + return metadata +} + +// GetIndexes returns the composed index list from all registered indexers +func (i *Indexers) GetIndexes(pod *Pod) []string { + var indexes []string + i.RLock() + defer i.RUnlock() + for _, indexer := range i.indexers { + for _, i := range indexer.GetIndexes(pod) { + indexes = append(indexes, i) + } + } + return indexes +} + +func (i *Indexers) Empty() bool { + if len(i.indexers) == 0 { + return true + } + + return false +} + +type GenDefaultMeta struct { + annotations []string + labels []string + labelsExclude []string +} + +func NewGenDefaultMeta(annotations, labels, labelsExclude []string) *GenDefaultMeta { + return &GenDefaultMeta{ + annotations: annotations, + labels: labels, + labelsExclude: labelsExclude, + } +} + +// GenerateMetaData generates default metadata for the given pod taking to account certain filters +func (g *GenDefaultMeta) GenerateMetaData(pod *Pod) common.MapStr { + labelMap := common.MapStr{} + annotationsMap := common.MapStr{} + + if len(g.labels) == 0 { + for k, v := range pod.Metadata.Labels { + labelMap[k] = v + } + } else { + labelMap = generateMapSubset(pod.Metadata.Labels, g.labels) + } + + // Exclude any labels that are present in the exclude_labels config + for _, label := range g.labelsExclude { + delete(labelMap, label) + } + + annotationsMap = generateMapSubset(pod.Metadata.Annotations, g.annotations) + + meta := common.MapStr{ + "pod": common.MapStr{ + "name": pod.Metadata.Name, + }, + "namespace": pod.Metadata.Namespace, + } + + if len(labelMap) != 0 { + meta["labels"] = labelMap + } + + if len(annotationsMap) != 0 { + meta["annotations"] = annotationsMap + } + + return meta +} + +func generateMapSubset(input map[string]string, keys []string) common.MapStr { + output := common.MapStr{} + if input == nil { + return output + } + + for _, key := range keys { + value, ok := input[key] + if ok { + output[key] = value + } + } + + return output +} + +// PodNameIndexer implements default indexer based on pod name +type PodNameIndexer struct { + genMeta GenMeta +} + +func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) { + return &PodNameIndexer{genMeta: genMeta}, nil +} + +func (p *PodNameIndexer) GetMetadata(pod *Pod) []MetadataIndex { + data := p.genMeta.GenerateMetaData(pod) + return []MetadataIndex{ + { + Index: fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name), + Data: data, + }, + } +} + +func (p *PodNameIndexer) GetIndexes(pod *Pod) []string { + return []string{fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name)} +} + +// ContainerIndexer indexes pods based on all their containers IDs +type ContainerIndexer struct { + genMeta GenMeta +} + +func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) { + return &ContainerIndexer{genMeta: genMeta}, nil +} + +func (c *ContainerIndexer) GetMetadata(pod *Pod) []MetadataIndex { + commonMeta := c.genMeta.GenerateMetaData(pod) + var metadata []MetadataIndex + for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + cID := containerID(status) + if cID == "" { + continue + } + + containerMeta := commonMeta.Clone() + containerMeta["container"] = common.MapStr{ + "name": status.Name, + } + metadata = append(metadata, MetadataIndex{ + Index: cID, + Data: containerMeta, + }) + } + + return metadata +} + +func (c *ContainerIndexer) GetIndexes(pod *Pod) []string { + var containers []string + for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + cID := containerID(status) + if cID == "" { + continue + } + containers = append(containers, cID) + } + return containers +} + +func containerID(status PodContainerStatus) string { + cID := status.ContainerID + if cID != "" { + parts := strings.Split(cID, "//") + if len(parts) == 2 { + return parts[1] + } + } + return "" +} diff --git a/libbeat/processors/add_kubernetes_metadata/indexing_test.go b/libbeat/processors/add_kubernetes_metadata/indexers_test.go similarity index 75% rename from libbeat/processors/add_kubernetes_metadata/indexing_test.go rename to libbeat/processors/add_kubernetes_metadata/indexers_test.go index 657d8c31e2a..16edcedf64a 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexing_test.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers_test.go @@ -124,38 +124,6 @@ func TestContainerIndexer(t *testing.T) { assert.Equal(t, expected.String(), indexers[1].Data.String()) } -func TestFieldMatcher(t *testing.T) { - testCfg := map[string]interface{}{ - "lookup_fields": []string{}, - } - fieldCfg, err := common.NewConfigFrom(testCfg) - - assert.Nil(t, err) - matcher, err := NewFieldMatcher(*fieldCfg) - assert.NotNil(t, err) - - testCfg["lookup_fields"] = "foo" - fieldCfg, _ = common.NewConfigFrom(testCfg) - - matcher, err = NewFieldMatcher(*fieldCfg) - assert.NotNil(t, matcher) - assert.Nil(t, err) - - input := common.MapStr{ - "foo": "bar", - } - - out := matcher.MetadataIndex(input) - assert.Equal(t, out, "bar") - - nonMatchInput := common.MapStr{ - "not": "match", - } - - out = matcher.MetadataIndex(nonMatchInput) - assert.Equal(t, out, "") -} - func TestFilteredGenMeta(t *testing.T) { var testConfig = common.NewConfig() @@ -269,49 +237,3 @@ func TestFilteredGenMetaExclusion(t *testing.T) { ok, _ = labelMap.HasKey("x") assert.Equal(t, ok, false) } - -func TestFieldFormatMatcher(t *testing.T) { - testCfg := map[string]interface{}{} - fieldCfg, err := common.NewConfigFrom(testCfg) - - assert.Nil(t, err) - matcher, err := NewFieldFormatMatcher(*fieldCfg) - assert.NotNil(t, err) - - testCfg["format"] = `%{[namespace]}/%{[pod]}` - fieldCfg, _ = common.NewConfigFrom(testCfg) - - matcher, err = NewFieldFormatMatcher(*fieldCfg) - assert.NotNil(t, matcher) - assert.Nil(t, err) - - event := common.MapStr{ - "namespace": "foo", - "pod": "bar", - } - - out := matcher.MetadataIndex(event) - assert.Equal(t, "foo/bar", out) - - event = common.MapStr{ - "foo": "bar", - } - out = matcher.MetadataIndex(event) - assert.Empty(t, out) - - testCfg["format"] = `%{[dimensions.namespace]}/%{[dimensions.pod]}` - fieldCfg, _ = common.NewConfigFrom(testCfg) - matcher, err = NewFieldFormatMatcher(*fieldCfg) - assert.NotNil(t, matcher) - assert.Nil(t, err) - - event = common.MapStr{ - "dimensions": common.MapStr{ - "pod": "bar", - "namespace": "foo", - }, - } - - out = matcher.MetadataIndex(event) - assert.Equal(t, "foo/bar", out) -} diff --git a/libbeat/processors/add_kubernetes_metadata/indexing.go b/libbeat/processors/add_kubernetes_metadata/indexing.go index ccb9c35d098..21015b87ba4 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexing.go +++ b/libbeat/processors/add_kubernetes_metadata/indexing.go @@ -1,73 +1,15 @@ package add_kubernetes_metadata import ( - "fmt" - "strings" "sync" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/fmtstr" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/outputs/codec" - "github.com/elastic/beats/libbeat/outputs/codec/format" -) - -//Names of indexers and matchers that have been defined. -const ( - ContainerIndexerName = "container" - PodNameIndexerName = "pod_name" - FieldMatcherName = "fields" - FieldFormatMatcherName = "field_format" ) // Indexing is the singleton Register instance where all Indexers and Matchers // are stored var Indexing = NewRegister() -// Indexer take known pods and generate all the metadata we need to enrich -// events in a efficient way. By preindexing the metadata in the way it will be -// checked when matching events -type Indexer interface { - // GetMetadata generates event metadata for the given pod, then returns the - // list of indexes to create, with the metadata to put on them - GetMetadata(pod *Pod) []MetadataIndex - - // GetIndexes return the list of indexes the given pod belongs to. This function - // must return the same indexes than GetMetadata - GetIndexes(pod *Pod) []string -} - -// MetadataIndex holds a pair of index -> metadata info -type MetadataIndex struct { - Index string - Data common.MapStr -} - -// Matcher takes a new event and returns the index -type Matcher interface { - // MetadataIndex returns the index string to use in annotation lookups for the given - // event. A previous indexer should have generated that index for this to work - // This function can return "" if the event doesn't match - MetadataIndex(event common.MapStr) string -} - -//GenMeta takes in pods to generate metadata for them -type GenMeta interface { - //GenerateMetaData generates metadata by taking in a pod as an input - GenerateMetaData(pod *Pod) common.MapStr -} - -type Indexers struct { - sync.RWMutex - indexers []Indexer -} - -type Matchers struct { - sync.RWMutex - matchers []Matcher -} - // Register contains Indexer and Matchers to use on pod indexing and event matching type Register struct { sync.RWMutex @@ -78,9 +20,6 @@ type Register struct { defaultMatcherConfigs map[string]common.Config } -type IndexerConstructor func(config common.Config, genMeta GenMeta) (Indexer, error) -type MatcherConstructor func(config common.Config) (Matcher, error) - // NewRegister creates and returns a new Register. func NewRegister() *Register { return &Register{ @@ -136,255 +75,10 @@ func (r *Register) GetMatcher(name string) MatcherConstructor { } } -// GetMetadata returns the composed metadata list from all registered indexers -func (i *Indexers) GetMetadata(pod *Pod) []MetadataIndex { - var metadata []MetadataIndex - i.RLock() - defer i.RUnlock() - for _, indexer := range i.indexers { - for _, m := range indexer.GetMetadata(pod) { - metadata = append(metadata, m) - } - } - return metadata -} - -// GetIndexes returns the composed index list from all registered indexers -func (i *Indexers) GetIndexes(pod *Pod) []string { - var indexes []string - i.RLock() - defer i.RUnlock() - for _, indexer := range i.indexers { - for _, i := range indexer.GetIndexes(pod) { - indexes = append(indexes, i) - } - } - return indexes -} - -// MetadataIndex returns the index string for the first matcher from the Registry returning one -func (m *Matchers) MetadataIndex(event common.MapStr) string { - m.RLock() - defer m.RUnlock() - for _, matcher := range m.matchers { - index := matcher.MetadataIndex(event) - if index != "" { - return index - } - } - - // No index returned - return "" -} - -type GenDefaultMeta struct { - annotations []string - labels []string - labelsExclude []string -} - -// GenerateMetaData generates default metadata for the given pod taking to account certain filters -func (g *GenDefaultMeta) GenerateMetaData(pod *Pod) common.MapStr { - labelMap := common.MapStr{} - annotationsMap := common.MapStr{} - - if len(g.labels) == 0 { - for k, v := range pod.Metadata.Labels { - labelMap[k] = v - } - } else { - labelMap = generateMapSubset(pod.Metadata.Labels, g.labels) - } - - // Exclude any labels that are present in the exclude_labels config - for _, label := range g.labelsExclude { - delete(labelMap, label) - } - - annotationsMap = generateMapSubset(pod.Metadata.Annotations, g.annotations) - - meta := common.MapStr{ - "pod": common.MapStr{ - "name": pod.Metadata.Name, - }, - "namespace": pod.Metadata.Namespace, - } - - if len(labelMap) != 0 { - meta["labels"] = labelMap - } - - if len(annotationsMap) != 0 { - meta["annotations"] = annotationsMap - } - - return meta -} - -func generateMapSubset(input map[string]string, keys []string) common.MapStr { - output := common.MapStr{} - if input == nil { - return output - } - - for _, key := range keys { - value, ok := input[key] - if ok { - output[key] = value - } - } - - return output -} - -// PodNameIndexer implements default indexer based on pod name -type PodNameIndexer struct { - genMeta GenMeta +func (r *Register) GetDefaultIndexerConfigs() map[string]common.Config { + return r.defaultIndexerConfigs } -func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) { - return &PodNameIndexer{genMeta: genMeta}, nil -} - -func (p *PodNameIndexer) GetMetadata(pod *Pod) []MetadataIndex { - data := p.genMeta.GenerateMetaData(pod) - return []MetadataIndex{ - { - Index: fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name), - Data: data, - }, - } -} - -func (p *PodNameIndexer) GetIndexes(pod *Pod) []string { - return []string{fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name)} -} - -// ContainerIndexer indexes pods based on all their containers IDs -type ContainerIndexer struct { - genMeta GenMeta -} - -func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) { - return &ContainerIndexer{genMeta: genMeta}, nil -} - -func (c *ContainerIndexer) GetMetadata(pod *Pod) []MetadataIndex { - commonMeta := c.genMeta.GenerateMetaData(pod) - var metadata []MetadataIndex - for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { - cID := containerID(status) - if cID == "" { - continue - } - - containerMeta := commonMeta.Clone() - containerMeta["container"] = common.MapStr{ - "name": status.Name, - } - metadata = append(metadata, MetadataIndex{ - Index: cID, - Data: containerMeta, - }) - } - - return metadata -} - -func (c *ContainerIndexer) GetIndexes(pod *Pod) []string { - var containers []string - for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { - cID := containerID(status) - if cID == "" { - continue - } - containers = append(containers, cID) - } - return containers -} - -func containerID(status PodContainerStatus) string { - cID := status.ContainerID - if cID != "" { - parts := strings.Split(cID, "//") - if len(parts) == 2 { - return parts[1] - } - } - return "" -} - -type FieldMatcher struct { - MatchFields []string -} - -func NewFieldMatcher(cfg common.Config) (Matcher, error) { - config := struct { - LookupFields []string `config:"lookup_fields"` - }{} - - err := cfg.Unpack(&config) - if err != nil { - return nil, fmt.Errorf("fail to unpack the `lookup_fields` configuration: %s", err) - } - - if len(config.LookupFields) == 0 { - return nil, fmt.Errorf("lookup_fields can not be empty") - } - - return &FieldMatcher{MatchFields: config.LookupFields}, nil -} - -func (f *FieldMatcher) MetadataIndex(event common.MapStr) string { - for _, field := range f.MatchFields { - keyIface, err := event.GetValue(field) - if err == nil { - key, ok := keyIface.(string) - if ok { - return key - } - } - } - - return "" -} - -type FieldFormatMatcher struct { - Codec codec.Codec -} - -func NewFieldFormatMatcher(cfg common.Config) (Matcher, error) { - config := struct { - Format string `config:"format"` - }{} - - err := cfg.Unpack(&config) - if err != nil { - return nil, fmt.Errorf("fail to unpack the `format` configuration of `field_format` matcher: %s", err) - } - - if config.Format == "" { - return nil, fmt.Errorf("`format` of `field_format` matcher can't be empty") - } - - return &FieldFormatMatcher{ - Codec: format.New(fmtstr.MustCompileEvent(config.Format)), - }, nil - -} - -func (f *FieldFormatMatcher) MetadataIndex(event common.MapStr) string { - bytes, err := f.Codec.Encode("", &beat.Event{ - Fields: event, - }) - - if err != nil { - logp.Debug("kubernetes", "Unable to apply field format pattern on event") - } - - if len(bytes) == 0 { - return "" - } - - return string(bytes) +func (r *Register) GetDefaultMatcherConfigs() map[string]common.Config { + return r.defaultMatcherConfigs } diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 3be48a46596..a98b51a07ce 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -59,7 +59,7 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) { //Load default indexer configs if config.DefaultIndexers.Enabled == true { Indexing.RLock() - for key, cfg := range Indexing.defaultIndexerConfigs { + for key, cfg := range Indexing.GetDefaultIndexerConfigs() { config.Indexers = append(config.Indexers, map[string]common.Config{key: cfg}) } Indexing.RUnlock() @@ -68,65 +68,18 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) { //Load default matcher configs if config.DefaultMatchers.Enabled == true { Indexing.RLock() - for key, cfg := range Indexing.defaultMatcherConfigs { + for key, cfg := range Indexing.GetDefaultMatcherConfigs() { config.Matchers = append(config.Matchers, map[string]common.Config{key: cfg}) } Indexing.RUnlock() } - metaGen := &GenDefaultMeta{ - labels: config.IncludeLabels, - annotations: config.IncludeAnnotations, - labelsExclude: config.ExcludeLabels, - } - - indexers := Indexers{ - indexers: []Indexer{}, - } - - //Create all configured indexers - for _, pluginConfigs := range config.Indexers { - for name, pluginConfig := range pluginConfigs { - indexFunc := Indexing.GetIndexer(name) - if indexFunc == nil { - logp.Warn("Unable to find indexing plugin %s", name) - continue - } - - indexer, err := indexFunc(pluginConfig, metaGen) - if err != nil { - logp.Warn("Unable to initialize indexing plugin %s due to error %v", name, err) - } - - indexers.indexers = append(indexers.indexers, indexer) - - } - } - - matchers := Matchers{ - matchers: []Matcher{}, - } - - //Create all configured matchers - for _, pluginConfigs := range config.Matchers { - for name, pluginConfig := range pluginConfigs { - matchFunc := Indexing.GetMatcher(name) - if matchFunc == nil { - logp.Warn("Unable to find matcher plugin %s", name) - continue - } + metaGen := NewGenDefaultMeta(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels) + indexers := NewIndexers(config.Indexers, metaGen) - matcher, err := matchFunc(pluginConfig) - if err != nil { - logp.Warn("Unable to initialize matcher plugin %s due to error %v", name, err) - } - - matchers.matchers = append(matchers.matchers, matcher) - - } - } + matchers := NewMatchers(config.Matchers) - if len(matchers.matchers) == 0 { + if matchers.Empty() { return nil, fmt.Errorf("Can not initialize kubernetes plugin with zero matcher plugins") } @@ -175,10 +128,10 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) { logp.Debug("kubernetes", "Using host ", config.Host) logp.Debug("kubernetes", "Initializing watcher") if client != nil { - watcher := NewPodWatcher(client, &indexers, config.SyncPeriod, config.CleanupTimeout, config.Host) + watcher := NewPodWatcher(client, indexers, config.SyncPeriod, config.CleanupTimeout, config.Host) if watcher.Run() { - return &kubernetesAnnotator{podWatcher: watcher, matchers: &matchers}, nil + return &kubernetesAnnotator{podWatcher: watcher, matchers: matchers}, nil } return nil, fatalError diff --git a/libbeat/processors/add_kubernetes_metadata/matchers.go b/libbeat/processors/add_kubernetes_metadata/matchers.go new file mode 100644 index 00000000000..04dc963912e --- /dev/null +++ b/libbeat/processors/add_kubernetes_metadata/matchers.go @@ -0,0 +1,155 @@ +package add_kubernetes_metadata + +import ( + "fmt" + "sync" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/codec" + "github.com/elastic/beats/libbeat/outputs/codec/format" +) + +const ( + FieldMatcherName = "fields" + FieldFormatMatcherName = "field_format" +) + +// Matcher takes a new event and returns the index +type Matcher interface { + // MetadataIndex returns the index string to use in annotation lookups for the given + // event. A previous indexer should have generated that index for this to work + // This function can return "" if the event doesn't match + MetadataIndex(event common.MapStr) string +} + +type Matchers struct { + sync.RWMutex + matchers []Matcher +} + +type MatcherConstructor func(config common.Config) (Matcher, error) + +func NewMatchers(configs PluginConfig) *Matchers { + matchers := []Matcher{} + for _, pluginConfigs := range configs { + for name, pluginConfig := range pluginConfigs { + matchFunc := Indexing.GetMatcher(name) + if matchFunc == nil { + logp.Warn("Unable to find matcher plugin %s", name) + continue + } + + matcher, err := matchFunc(pluginConfig) + if err != nil { + logp.Warn("Unable to initialize matcher plugin %s due to error %v", name, err) + } + + matchers = append(matchers, matcher) + + } + } + return &Matchers{ + matchers: matchers, + } +} + +// MetadataIndex returns the index string for the first matcher from the Registry returning one +func (m *Matchers) MetadataIndex(event common.MapStr) string { + m.RLock() + defer m.RUnlock() + for _, matcher := range m.matchers { + index := matcher.MetadataIndex(event) + if index != "" { + return index + } + } + + // No index returned + return "" +} + +func (m *Matchers) Empty() bool { + if len(m.matchers) == 0 { + return true + } + + return false +} + +type FieldMatcher struct { + MatchFields []string +} + +func NewFieldMatcher(cfg common.Config) (Matcher, error) { + config := struct { + LookupFields []string `config:"lookup_fields"` + }{} + + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("fail to unpack the `lookup_fields` configuration: %s", err) + } + + if len(config.LookupFields) == 0 { + return nil, fmt.Errorf("lookup_fields can not be empty") + } + + return &FieldMatcher{MatchFields: config.LookupFields}, nil +} + +func (f *FieldMatcher) MetadataIndex(event common.MapStr) string { + for _, field := range f.MatchFields { + keyIface, err := event.GetValue(field) + if err == nil { + key, ok := keyIface.(string) + if ok { + return key + } + } + } + + return "" +} + +type FieldFormatMatcher struct { + Codec codec.Codec +} + +func NewFieldFormatMatcher(cfg common.Config) (Matcher, error) { + config := struct { + Format string `config:"format"` + }{} + + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("fail to unpack the `format` configuration of `field_format` matcher: %s", err) + } + + if config.Format == "" { + return nil, fmt.Errorf("`format` of `field_format` matcher can't be empty") + } + + return &FieldFormatMatcher{ + Codec: format.New(fmtstr.MustCompileEvent(config.Format)), + }, nil + +} + +func (f *FieldFormatMatcher) MetadataIndex(event common.MapStr) string { + bytes, err := f.Codec.Encode("", &beat.Event{ + Fields: event, + }) + + if err != nil { + logp.Debug("kubernetes", "Unable to apply field format pattern on event") + } + + if len(bytes) == 0 { + return "" + } + + return string(bytes) +} diff --git a/libbeat/processors/add_kubernetes_metadata/matchers_test.go b/libbeat/processors/add_kubernetes_metadata/matchers_test.go new file mode 100644 index 00000000000..0b7dd1f5d8e --- /dev/null +++ b/libbeat/processors/add_kubernetes_metadata/matchers_test.go @@ -0,0 +1,87 @@ +package add_kubernetes_metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestFieldMatcher(t *testing.T) { + testCfg := map[string]interface{}{ + "lookup_fields": []string{}, + } + fieldCfg, err := common.NewConfigFrom(testCfg) + + assert.Nil(t, err) + matcher, err := NewFieldMatcher(*fieldCfg) + assert.NotNil(t, err) + + testCfg["lookup_fields"] = "foo" + fieldCfg, _ = common.NewConfigFrom(testCfg) + + matcher, err = NewFieldMatcher(*fieldCfg) + assert.NotNil(t, matcher) + assert.Nil(t, err) + + input := common.MapStr{ + "foo": "bar", + } + + out := matcher.MetadataIndex(input) + assert.Equal(t, out, "bar") + + nonMatchInput := common.MapStr{ + "not": "match", + } + + out = matcher.MetadataIndex(nonMatchInput) + assert.Equal(t, out, "") +} + +func TestFieldFormatMatcher(t *testing.T) { + testCfg := map[string]interface{}{} + fieldCfg, err := common.NewConfigFrom(testCfg) + + assert.Nil(t, err) + matcher, err := NewFieldFormatMatcher(*fieldCfg) + assert.NotNil(t, err) + + testCfg["format"] = `%{[namespace]}/%{[pod]}` + fieldCfg, _ = common.NewConfigFrom(testCfg) + + matcher, err = NewFieldFormatMatcher(*fieldCfg) + assert.NotNil(t, matcher) + assert.Nil(t, err) + + event := common.MapStr{ + "namespace": "foo", + "pod": "bar", + } + + out := matcher.MetadataIndex(event) + assert.Equal(t, "foo/bar", out) + + event = common.MapStr{ + "foo": "bar", + } + out = matcher.MetadataIndex(event) + assert.Empty(t, out) + + testCfg["format"] = `%{[dimensions.namespace]}/%{[dimensions.pod]}` + fieldCfg, _ = common.NewConfigFrom(testCfg) + matcher, err = NewFieldFormatMatcher(*fieldCfg) + assert.NotNil(t, matcher) + assert.Nil(t, err) + + event = common.MapStr{ + "dimensions": common.MapStr{ + "pod": "bar", + "namespace": "foo", + }, + } + + out = matcher.MetadataIndex(event) + assert.Equal(t, "foo/bar", out) +} diff --git a/libbeat/processors/add_kubernetes_metadata/podwatcher.go b/libbeat/processors/add_kubernetes_metadata/podwatcher.go index db539a52b65..6347c96cdbb 100644 --- a/libbeat/processors/add_kubernetes_metadata/podwatcher.go +++ b/libbeat/processors/add_kubernetes_metadata/podwatcher.go @@ -2,7 +2,6 @@ package add_kubernetes_metadata import ( "context" - "encoding/json" "sync" "time" @@ -164,26 +163,9 @@ func (p *PodWatcher) onPodDelete(pod *Pod) { } } -func (p *PodWatcher) getPodMeta(pod *corev1.Pod) *Pod { - bytes, err := json.Marshal(pod) - if err != nil { - logp.Warn("Unable to marshal %v", pod.String()) - return nil - } - - po := &Pod{} - err = json.Unmarshal(bytes, po) - if err != nil { - logp.Warn("Unable to marshal %v", pod.String()) - return nil - } - - return po -} - func (p *PodWatcher) worker() { for po := range p.podQueue { - pod := p.getPodMeta(po) + pod := GetPodMeta(po) if pod.Metadata.DeletionTimestamp != "" { p.onPodDelete(pod) } else { diff --git a/metricbeat/processor/add_kubernetes_metadata/indexing.go b/metricbeat/processor/add_kubernetes_metadata/indexers.go similarity index 100% rename from metricbeat/processor/add_kubernetes_metadata/indexing.go rename to metricbeat/processor/add_kubernetes_metadata/indexers.go diff --git a/metricbeat/processor/add_kubernetes_metadata/indexing_test.go b/metricbeat/processor/add_kubernetes_metadata/indexers_test.go similarity index 100% rename from metricbeat/processor/add_kubernetes_metadata/indexing_test.go rename to metricbeat/processor/add_kubernetes_metadata/indexers_test.go