Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Support Node and Service autodiscovery in k8s provider #26801

Merged
merged 25 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@
- Set `agent.id` to the Fleet Agent ID in events published from inputs backed by Beats. {issue}21121[21121] {pull}26394[26394]
- Enable configuring monitoring namespace {issue}26439[26439]
- Communicate with Fleet Server over HTTP2. {pull}26474[26474]
- Support Node and Service autodiscovery in kubernetes dynamic provider. {pull}26801[26801]
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package kubernetes

import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
)

// Config for kubernetes provider
Expand All @@ -21,7 +23,8 @@ type Config struct {
Node string `config:"node"`

// Scope of the provider (cluster or node)
Scope string `config:"scope"`
Scope string `config:"scope"`
Resource string `config:"resource"`
}

// InitDefaults initializes the default values for the config.
Expand All @@ -30,3 +33,22 @@ func (c *Config) InitDefaults() {
c.CleanupTimeout = 60 * time.Second
c.Scope = "node"
}

// Validate ensures correctness of config
func (c *Config) Validate() error {
// Check if resource is either node or pod. If yes then default the scope to "node" if not provided.
// Default the scope to "cluster" for everything else.
switch c.Resource {
case "node", "pod":
if c.Scope == "" {
c.Scope = "node"
}
default:
if c.Scope == "node" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting that you can override almost all settings per resource, but not scope

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logp.L().Warnf("can not set scope to `node` when using resource %s. resetting scope to `cluster`", c.Resource)
}
c.Scope = "cluster"
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package kubernetes

import (
"fmt"
"time"

k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand All @@ -20,6 +21,10 @@ const (
PodPriority = 0
// ContainerPriority is the priority that container mappings are added to the provider.
ContainerPriority = 1
// NodePriority is the priority that node mappings are added to the provider.
NodePriority = 0
// ServicePriority is the priority that service mappings are added to the provider.
ServicePriority = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should each be unique and they should be tiered. I think the proper order here should be:

NodePriority = 0
PodPriority = 1
ContainerPriority = 2
ServicePriority = 3

That is because a Node contains a pod, a pod contains containers, and services point at pods.

)

func init() {
Expand All @@ -31,12 +36,6 @@ type dynamicProvider struct {
config *Config
}

type eventWatcher struct {
logger *logger.Logger
cleanupTimeout time.Duration
comm composable.DynamicProviderComm
}

// DynamicProviderBuilder builds the dynamic provider.
func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) {
var cfg Config
Expand Down Expand Up @@ -69,15 +68,10 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
p.config.Node = ""
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: p.config.SyncPeriod,
Node: p.config.Node,
//Namespace: p.config.Namespace,
}, nil)
watcher, err := p.newWatcher(comm, client)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher")
}
watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm})

err = watcher.Start()
if err != nil {
Expand All @@ -87,135 +81,28 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
return nil
}

func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) {
mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
}

// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
// get Pod metadata (not specific to any container)
p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors)

// Emit all containers in the pod
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)

// TODO deal with init containers stopping after initialization
p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses)
}

func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) {
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
cid, runtime := kubernetes.ContainerIDWithRuntime(c)
containerIDs[c.Name] = cid
runtimes[c.Name] = runtime
}

for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.Name]
if cid == "" {
continue
// newWatcher initializes the proper watcher according to the given resource (pod, node, service).
func (p *dynamicProvider) newWatcher(comm composable.DynamicProviderComm, client k8s.Interface) (kubernetes.Watcher, error) {
switch p.config.Resource {
case "pod":
watcher, err := NewPodWatcher(comm, p.config, p.logger, client)
if err != nil {
return nil, err
}

// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
"id": cid,
"name": c.Name,
"image": c.Image,
"runtime": runtimes[c.Name],
},
return watcher, nil
case "node":
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client)
if err != nil {
return nil, err
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
return watcher, nil
case "service":
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client)
if err != nil {
return nil, err
}

// Emit the container
p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
}
}

func (p *eventWatcher) emitStopped(pod *kubernetes.Pod) {
p.comm.Remove(string(pod.GetUID()))

for _, c := range pod.Spec.Containers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}

for _, c := range pod.Spec.InitContainers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}
}

// OnAdd ensures processing of pod objects that are newly added
func (p *eventWatcher) OnAdd(obj interface{}) {
p.logger.Debugf("pod add: %+v", obj)
p.emitRunning(obj.(*kubernetes.Pod))
}

// OnUpdate emits events for a given pod depending on the state of the pod,
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *eventWatcher) OnUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
switch pod.Status.Phase {
case kubernetes.PodSucceeded, kubernetes.PodFailed:
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
return
case kubernetes.PodPending:
p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj)
return
return watcher, nil
default:
return nil, fmt.Errorf("unsupported autodiscover resource %s", p.config.Resource)
}

p.logger.Debugf("pod update: %+v", obj)
p.emitRunning(pod)
}

// OnDelete stops pod objects that are deleted
func (p *eventWatcher) OnDelete(obj interface{}) {
p.logger.Debugf("pod delete: %+v", obj)
pod := obj.(*kubernetes.Pod)
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
}
Loading