Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor metadata generator to support adding metadata across resources #14875

Merged
merged 10 commits into from
Jan 14, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Libbeat: Do not overwrite agent.*, ecs.version, and host.name. {pull}14407[14407]
- Libbeat: Cleanup the x-pack licenser code to use the new license endpoint and the new format. {pull}15091[15091]
- Users can now specify `monitoring.cloud.*` to override `monitoring.elasticsearch.*` settings. {issue}14399[14399] {pull}15254[15254]
- Refactor metadata generator to support adding metadata across resources {pull}14875[14875]
- Update to ECS 1.4.0. {pull}14844[14844]

*Auditbeat*
Expand Down
4 changes: 4 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
Expand All @@ -48,6 +50,8 @@ type Config struct {
Builders []*common.Config `config:"builders"`
Appenders []*common.Config `config:"appenders"`
Templates template.MapperSettings `config:"templates"`

AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs for this new parameter?

}

func defaultConfig() *Config {
Expand Down
20 changes: 6 additions & 14 deletions libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,26 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/libbeat/common/safemapstr"
"github.com/elastic/beats/libbeat/logp"
)

type node struct {
uuid uuid.UUID
config *Config
metagen kubernetes.MetaGenerator
metagen metadata.MetaGen
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
}

// NewNodeEventer creates an eventer that can discover and process node objects
func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
metagen, err := kubernetes.NewMetaGenerator(cfg)
if err != nil {
return nil, err
}

logger := logp.NewLogger("autodiscover.node")

config := defaultConfig()
err = cfg.Unpack(&config)
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
Expand All @@ -70,7 +66,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
})
}, nil)

if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
Expand All @@ -80,7 +76,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
config: config,
uuid: uuid,
publish: publish,
metagen: metagen,
metagen: metadata.NewNodeMetadataGenerator(cfg, watcher.Store()),
logger: logger,
watcher: watcher,
}
Expand Down Expand Up @@ -172,11 +168,7 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
}

eventID := fmt.Sprint(node.GetObjectMeta().GetUID())
meta := n.metagen.ResourceMetadata(node)

// TODO: Refactor metagen to make sure that this is seamless
meta.Put("node.name", node.Name)
meta.Put("node.uid", string(node.GetObjectMeta().GetUID()))
meta := n.metagen.Generate(node)

kubemeta := meta.Clone()
// Pass annotations to all events so that it can be used in templating and by annotation builders.
Expand Down
18 changes: 12 additions & 6 deletions libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -114,6 +116,11 @@ func TestEmitEvent_Node(t *testing.T) {
nodeIP := "192.168.0.1"
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
UUID, err := uuid.NewV4()

typeMeta := metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
}
if err != nil {
t.Fatal(err)
}
Expand All @@ -134,6 +141,7 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Expand Down Expand Up @@ -180,7 +188,8 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: v1.NodeStatus{},
TypeMeta: typeMeta,
Status: v1.NodeStatus{},
},
Expected: nil,
},
Expand All @@ -194,6 +203,7 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{},
Conditions: []v1.NodeCondition{
Expand Down Expand Up @@ -236,11 +246,7 @@ func TestEmitEvent_Node(t *testing.T) {
t.Fatal(err)
}

metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
if err != nil {
t.Fatal(err)
}

metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
Expand Down
95 changes: 74 additions & 21 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,31 @@ import (
k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/libbeat/autodiscover/builder"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/libbeat/common/safemapstr"
"github.com/elastic/beats/libbeat/logp"
)

type pod struct {
uuid uuid.UUID
config *Config
metagen kubernetes.MetaGenerator
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
uuid uuid.UUID
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
}

// NewPodEventer creates an eventer that can discover and process pod objects
func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
metagen, err := kubernetes.NewMetaGenerator(cfg)
if err != nil {
return nil, err
}

logger := logp.NewLogger("autodiscover.pod")

config := defaultConfig()
err = cfg.Unpack(&config)
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
Expand All @@ -71,18 +68,52 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Namespace: config.Namespace,
})
}, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Pod{}, err)
}

var nodeMeta, namespaceMeta metadata.MetaGen
var nodeWatcher, namespaceWatcher kubernetes.Watcher
metaConf := config.AddResourceMetadata
if metaConf != nil {
if metaConf.Node != nil && metaConf.Node.Enabled() {
options := kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
}
if config.Namespace != "" {
options.Namespace = config.Namespace
}
nodeWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}

nodeMeta = metadata.NewNodeMetadataGenerator(metaConf.Node, nodeWatcher.Store())
}

if metaConf.Namespace != nil && metaConf.Namespace.Enabled() {
namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
odacremolbap marked this conversation as resolved.
Show resolved Hide resolved
}, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}

namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store())
}
}

p := &pod{
config: config,
uuid: uuid,
publish: publish,
metagen: metagen,
logger: logger,
watcher: watcher,
config: config,
uuid: uuid,
publish: publish,
metagen: metadata.NewPodMetadataGenerator(cfg, watcher.Store(), nodeMeta, namespaceMeta),
logger: logger,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
}

watcher.AddEventHandler(p)
Expand Down Expand Up @@ -168,12 +199,33 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {

// Start starts the eventer
func (p *pod) Start() error {
if p.nodeWatcher != nil {
err := p.nodeWatcher.Start()
if err != nil {
return err
}
}

if p.namespaceWatcher != nil {
if err := p.namespaceWatcher.Start(); err != nil {
return err
}
}

return p.watcher.Start()
odacremolbap marked this conversation as resolved.
Show resolved Hide resolved
}

// Stop stops the eventer
func (p *pod) Stop() {
p.watcher.Stop()

if p.namespaceWatcher != nil {
p.namespaceWatcher.Stop()
}

if p.nodeWatcher != nil {
p.nodeWatcher.Stop()
}
}

func (p *pod) emit(pod *kubernetes.Pod, flag string) {
Expand Down Expand Up @@ -231,7 +283,8 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
"image": c.Image,
"runtime": runtimes[c.Name],
}
meta := p.metagen.ContainerMetadata(pod, c.Name, c.Image)
meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Name),
metadata.WithFields("container.image", c.Image))

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
Expand Down
18 changes: 13 additions & 5 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -175,6 +177,11 @@ func TestEmitEvent(t *testing.T) {
t.Fatal(err)
}

typeMeta := metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
}

tests := []struct {
Message string
Flag string
Expand All @@ -192,6 +199,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -264,6 +272,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Expand Down Expand Up @@ -295,6 +304,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -326,6 +336,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Expand Down Expand Up @@ -393,6 +404,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -459,11 +471,7 @@ func TestEmitEvent(t *testing.T) {
t.Fatal(err)
}

metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
if err != nil {
t.Fatal(err)
}

metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
Expand Down
Loading