Skip to content

Commit

Permalink
Fix stopping of modules started by kubernetes autodiscover (#10476)
Browse files Browse the repository at this point in the history
Kubernetes autodiscover only emits events for containers with
an ID in pods with an IP, but when a pod is being terminated,
their containers can lack of ID and the pod itself can lack of IP.
This leads to modules that are never stopped because the
delete event that should stop them lacks of the needed
information.

This change makes two things to avoid this problem:
    * Don't require the pod to have an IP on stop events.
    * Use IDs for containers that don't depend on its state.
  • Loading branch information
jsoriano committed Feb 7, 2019
1 parent 97920c9 commit 15f2f26
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix TLS certificate DoS vulnerability. {pull}10302[10302]
- Fix panic and file unlock in spool on atomic operation (arm, x86-32). File lock was not released when panic occurs, leading to the beat deadlocking on startup. {pull}10289[10289]
- Fix encoding of timestamps when using disk spool. {issue}10099[10099]
- Fix stopping of modules started by kubernetes autodiscover. {pull}10476[10476]
- Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587]

*Auditbeat*
Expand Down
28 changes: 19 additions & 9 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kubernetes

import (
"fmt"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -144,12 +145,16 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
containerstatuses []*kubernetes.PodContainerStatus) {
host := pod.Status.GetPodIP()

// Do not emit events without host (container is still being configured)
if host == "" {
// If the container doesn't exist in the runtime or its network
// is not configured, it won't have an IP. Skip it as we cannot
// generate configs without host, and an update will arrive when
// the container is ready.
// If stopping, emit the event in any case to ensure cleanup.
if host == "" && flag != "stop" {
return
}

// Collect all container IDs and runtimes from status information.
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
Expand All @@ -160,13 +165,18 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku

// Emit container and port information
for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.GetName()]

// If there is a container ID that is empty then ignore it. It either means that the container is still starting
// up or the container is shutting down.
if cid == "" {
if cid == "" && flag != "stop" {
continue
}

// This must be an id that doesn't depend on the state of the container
// so it works also on `stop` if containers have been already deleted.
eventID := fmt.Sprintf("%s.%s", pod.Metadata.GetUid(), c.GetName())

cmeta := common.MapStr{
"id": cid,
"name": c.GetName(),
Expand All @@ -190,7 +200,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
if len(c.Ports) == 0 {
event := bus.Event{
"provider": p.uuid,
"id": cid,
"id": eventID,
flag: true,
"host": host,
"kubernetes": kubemeta,
Expand All @@ -204,7 +214,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
for _, port := range c.Ports {
event := bus.Event{
"provider": p.uuid,
"id": cid,
"id": eventID,
flag: true,
"host": host,
"port": port.GetContainerPort(),
Expand Down
167 changes: 166 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func TestEmitEvent(t *testing.T) {
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
containerImage := "elastic/filebeat:6.3.0"
node := "node"
cid := "005f3b90-4b9d-12f8-acf0-31020a840133.filebeat"
UUID, err := uuid.NewV4()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestEmitEvent(t *testing.T) {
Expected: bus.Event{
"start": true,
"host": "127.0.0.1",
"id": "foobar",
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
Expand Down Expand Up @@ -270,6 +271,170 @@ func TestEmitEvent(t *testing.T) {
},
Expected: nil,
},
{
Message: "Test pod without container id",
Flag: "start",
Pod: &v1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &name,
Uid: &uid,
Namespace: &namespace,
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: &v1.PodStatus{
PodIP: &podIP,
ContainerStatuses: []*kubernetes.PodContainerStatus{
{
Name: &name,
},
},
},
Spec: &v1.PodSpec{
NodeName: &node,
Containers: []*kubernetes.Container{
{
Image: &containerImage,
Name: &name,
},
},
},
},
Expected: nil,
},
{
Message: "Test stop pod without host",
Flag: "stop",
Pod: &v1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &name,
Uid: &uid,
Namespace: &namespace,
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: &v1.PodStatus{
ContainerStatuses: []*kubernetes.PodContainerStatus{
{
Name: &name,
},
},
},
Spec: &v1.PodSpec{
NodeName: &node,
Containers: []*kubernetes.Container{
{
Image: &containerImage,
Name: &name,
},
},
},
},
Expected: bus.Event{
"stop": true,
"host": "",
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
"id": "",
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
"runtime": "",
},
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
},
"node": common.MapStr{
"name": "node",
},
"namespace": "default",
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
}, "pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
},
},
"config": []*common.Config{},
},
},
{
Message: "Test stop pod without container id",
Flag: "stop",
Pod: &v1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &name,
Uid: &uid,
Namespace: &namespace,
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: &v1.PodStatus{
PodIP: &podIP,
ContainerStatuses: []*kubernetes.PodContainerStatus{
{
Name: &name,
},
},
},
Spec: &v1.PodSpec{
NodeName: &node,
Containers: []*kubernetes.Container{
{
Image: &containerImage,
Name: &name,
},
},
},
},
Expected: bus.Event{
"stop": true,
"host": "127.0.0.1",
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
"id": "",
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
"runtime": "",
},
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
},
"node": common.MapStr{
"name": "node",
},
"namespace": "default",
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
}, "pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
},
},
"config": []*common.Config{},
},
},
}

for _, test := range tests {
Expand Down

0 comments on commit 15f2f26

Please sign in to comment.