Skip to content

Commit

Permalink
Improve container data emission
Browse files Browse the repository at this point in the history
Signed-off-by: chrismark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark committed Jul 13, 2021
1 parent 929c183 commit 43f5136
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 53 deletions.
130 changes: 78 additions & 52 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type pod struct {
comm composable.DynamicProviderComm
}

type podData struct {
pod *kubernetes.Pod
type providerData struct {
uid string
mapping map[string]interface{}
processors []map[string]interface{}
}
Expand All @@ -52,7 +52,7 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) {
// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
// get Pod metadata (not specific to any container)
p.comm.AddOrUpdate(string(data.pod.GetUID()), PodPriority, data.mapping, data.processors)
p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors)

// Emit all containers in the pod
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)
Expand All @@ -62,54 +62,19 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) {
}

func (p *pod) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) {
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
cid, runtime := kubernetes.ContainerIDWithRuntime(c)
containerIDs[c.Name] = cid
runtimes[c.Name] = runtime
}

for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.Name]
if cid == "" {
continue
}

// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
"id": cid,
"name": c.Name,
"image": c.Image,
"runtime": runtimes[c.Name],
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
providerDataChan := make(chan providerData)
done := make(chan bool, 1)
go generateContainerData(pod, containers, containerstatuses, providerDataChan, done)

for {
select {
case data := <-providerDataChan:
// Emit the container
p.comm.AddOrUpdate(data.uid, ContainerPriority, data.mapping, data.processors)
case <-done:
return
}

// Emit the container
p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
}
}

Expand Down Expand Up @@ -162,7 +127,7 @@ func (p *pod) OnDelete(obj interface{}) {
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
}

func generatePodData(pod *kubernetes.Pod) podData {
func generatePodData(pod *kubernetes.Pod) providerData {
//TODO: add metadata here too ie -> meta := s.metagen.Generate(pod)

// Pass annotations to all events so that it can be used in templating and by annotation builders.
Expand All @@ -181,8 +146,8 @@ func generatePodData(pod *kubernetes.Pod) podData {
"ip": pod.Status.PodIP,
},
}
return podData{
pod: pod,
return providerData{
uid: string(pod.GetUID()),
mapping: mapping,
processors: []map[string]interface{}{
{
Expand All @@ -194,3 +159,64 @@ func generatePodData(pod *kubernetes.Pod) podData {
},
}
}

func generateContainerData(
pod *kubernetes.Pod,
containers []kubernetes.Container,
containerstatuses []kubernetes.PodContainerStatus,
dataChan chan providerData,
done chan bool) {
//TODO: add metadata here too ie -> meta := s.metagen.Generate()

containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
cid, runtime := kubernetes.ContainerIDWithRuntime(c)
containerIDs[c.Name] = cid
runtimes[c.Name] = runtime
}

for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.Name]
if cid == "" {
continue
}

// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
"id": cid,
"name": c.Name,
"image": c.Image,
"runtime": runtimes[c.Name],
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
}
dataChan <- providerData{
uid: eventID,
mapping: mapping,
processors: processors,
}
}
done <- true
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package kubernetes

import (
"fmt"
"testing"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/stretchr/testify/assert"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -64,7 +66,95 @@ func TestGeneratePodData(t *testing.T) {
},
}

assert.Equal(t, pod, data.pod)
assert.Equal(t, string(pod.GetUID()), data.uid)
assert.Equal(t, mapping, data.mapping)
assert.Equal(t, processors, data.processors)
}

func TestGenerateContainerPodData(t *testing.T) {
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
pod := &kubernetes.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
UID: types.UID(uid),
Namespace: "testns",
Labels: map[string]string{
"foo": "bar",
},
Annotations: map[string]string{
"app": "production",
},
},
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
Spec: kubernetes.PodSpec{
NodeName: "testnode",
},
Status: kubernetes.PodStatus{PodIP: "127.0.0.5"},
}

providerDataChan := make(chan providerData)
done := make(chan bool, 1)

containers := []kubernetes.Container{
{
Name: "nginx",
Image: "nginx:1.120",
Ports: []kubernetes.ContainerPort{
{
Name: "http",
Protocol: v1.ProtocolTCP,
ContainerPort: 80,
},
},
},
}
containerStatuses := []kubernetes.PodContainerStatus{
{
Name: "nginx",
Ready: true,
ContainerID: "crio://asdfghdeadbeef",
},
}
go generateContainerData(pod, containers, containerStatuses, providerDataChan, done)

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
"id": "asdfghdeadbeef",
"name": "nginx",
"image": "nginx:1.120",
"runtime": "crio",
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
}

cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx")
for {
select {
case data := <-providerDataChan:
assert.Equal(t, cuid, data.uid)
assert.Equal(t, mapping, data.mapping)
assert.Equal(t, processors, data.processors)
case <-done:
return
}
}

}

0 comments on commit 43f5136

Please sign in to comment.