Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes annotator #3888

Merged
merged 1 commit into from
Apr 18, 2017
Merged

Kubernetes annotator #3888

merged 1 commit into from
Apr 18, 2017

Conversation

vjsamuel
Copy link
Contributor

@vjsamuel vjsamuel commented Apr 3, 2017

This PR adds support for Kubernetes as a processor for Beats. The Kubernetes processor allows Beats to enrich events with metadata coming from the Kubernetes Pod from which the event originated. Metadata might include:

  • pod name
  • pod namespace
  • container name

The Kubernetes processor relies on two constructs:

  • Indexers - used to generate metadata from Pods and store them with unique keys. Example of keys might be container IDs, IP:Port combinations, PodName.
  • Matchers - used to generate a lookup key from an event. Example of a matcher might be a log_path matcher that looks at the source field of an event, extract the container ID from the log path and use it as a lookup key to retrieve metadata about the Pod from which the log message originated.

@elasticmachine
Copy link
Collaborator

Jenkins standing by to test this. If you aren't a maintainer, you can ignore this comment. Someone with commit access, please review this and clear it for Jenkins to run.

1 similar comment
@elasticmachine
Copy link
Collaborator

Jenkins standing by to test this. If you aren't a maintainer, you can ignore this comment. Someone with commit access, please review this and clear it for Jenkins to run.

@exekias exekias self-requested a review April 3, 2017 08:40
Copy link
Contributor

@exekias exekias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome contribution @vjsamuel, thank you!! I have left some questions and comments, I'll try to test this in a while

}

func init() {
processors.RegisterPlugin("annotate.kubernetes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to use just kubernetes instead of annotate.kubernetes, until now we have left the processor type as something implicit

commonMeta := common.MapStr{
"pod": name,
"namespace": pod.Namespace,
"labels": pod.Labels,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.


//Pod is in terminating state, remove the metadata
if pod.DeletionTimestamp != nil {
p.removePodMapEntry(name, endpoints, containers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering, what happens when you delete the pod here but some logs are still being processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typically, log flow/metric scraping stops after the pod is marked for terminated. we could also move this to another method so that we delete it after the container is completely destroyed.

default:
pod, ok := podObj.(*corev1.Pod)
if ok {
logp.Info("Processing %s:%s", pod.Namespace, pod.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be Debug?


p.h.Reset()
fmt.Fprint(p.h, commonMeta)
commonMeta["_hash"] = p.h.Sum64()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are you using this hash for?

return event, nil
}

} else if value, ok := event["pod"]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comes from a custom beat or module?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we leave this check to make sure that any beat that writes an event with a key pod, we can still annotate it with the metadata.

)

const (
timeout = time.Second * 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be configurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. agreed.


delete(event, "pod")
} else {
return event, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log an error here? Because that means someone configured the kubernetes processor but no field was found to match and no enhancement happened.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i could have a filebeat configuration, that tails /var/log/* which is not managed by kubernetes and /var/lib/docker/containers/*/*log which is managed by kubernetes. the processor can be global and it would not pass through this check when a message from /var/log come in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That speaks for having local processors.

If you don't want to see the error with the global config, the processor checks could be used to exclude some files from processing.

func (k kubernetesAnnotator) Run(event common.MapStr) (common.MapStr, error) {

metadata := common.MapStr{}
if value, ok := event["source"]; ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the right place for this to happen. I would suggest in a first version to make it configurable in the processor config with format strings on which fields should be picked / combined for the lookup.

In a second version we should definitively have some auto detection for example based on the beat type or where it could be passed as a option to Run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like the idea of format strings. didnt think about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my use case is that I have a source file that has a container id and i need to extract it out. This is not supported by format strings today.

"github.com/elastic/beats/libbeat/logp"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how many of these imports we will actually need or are empty interfaces we could also have locally. The reason I bring this up because on the docker side we have currently too many dependencies because of this and in the end all we do is reading stats which is only a very small subset. If this also applies to kubernetes, perhaps we can remove most of the dependencies here with our own client or having some empty interfaces in our code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a custom client is not an option here, as this one is more advanced than a stateless REST consumer. Check SharedIndexInformer and Watch mechanism, the client code creates a copy of the state by watching k8s API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin , im in agreement with @exekias, the kube client is a quite advanced client implementation. its not worth the effort to write our own client. we can have these vendored local to the kubernetes plugin.

@vjsamuel
Copy link
Contributor Author

vjsamuel commented Apr 3, 2017

One issue that I am facing is that logp and glog both use v as the verbosity flag. logp uses it at -v and glog uses it as --v=<verbosity_level (1-10)>. this causes:

panic: ./filebeat flag redefined: v

goroutine 1 [running]:
panic(0x5295c20, 0xc4202ddad0)
	/usr/local/Cellar/go/1.7.4_2/libexec/src/runtime/panic.go:500 +0x1a1
flag.(*FlagSet).Var(0xc42009e000, 0x5f10ea0, 0x5f7f3e8, 0x552b627, 0x1, 0x55465c0, 0x14)
	/usr/local/Cellar/go/1.7.4_2/libexec/src/flag/flag.go:791 +0x43e
flag.Var(0x5f10ea0, 0x5f7f3e8, 0x552b627, 0x1, 0x55465c0, 0x14)
	/usr/local/Cellar/go/1.7.4_2/libexec/src/flag/flag.go:806 +0x69
github.com/golang/glog.init.1()
	/Users/vija/workspace/go/src/github.com/golang/glog/glog.go:401 +0xe5
github.com/golang/glog.init()
	/Users/vija/workspace/go/src/github.com/golang/glog/glog_file.go:125 +0x1c1
k8s.io/apimachinery/pkg/labels.init()
	/Users/vija/workspace/go/src/k8s.io/apimachinery/pkg/labels/selector.go:837 +0x5d
k8s.io/apimachinery/pkg/apis/meta/v1.init()

cc: @urso @ruflin

@ruflin
Copy link
Member

ruflin commented Apr 5, 2017

I remember we had in the past our own FlagSet to prevent overlapping but I think we removed it as it was no issue anymore and made things simpler for having flags in different packages. Perhaps time to reintroduce it? I assume there isn't a trick in golang to disable some of the flags from library on compile time?

@vjsamuel vjsamuel force-pushed the kubernetes_annotator branch 2 times, most recently from 7bfc4f7 to 84210aa Compare April 7, 2017 15:57
@exekias exekias force-pushed the kubernetes_annotator branch 2 times, most recently from f0d45b9 to 020c563 Compare April 10, 2017 13:53
@exekias
Copy link
Contributor

exekias commented Apr 10, 2017

for the record, after changing to a different k8s client the flags issue is gone. We changed to a more lightweight client to avoid increasing filebeat binary size

@exekias exekias force-pushed the kubernetes_annotator branch 2 times, most recently from 70d7aef to 1c96434 Compare April 10, 2017 14:03
NOTICE Outdated
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
golang.org/x/text
Copy link
Member

@ruflin ruflin Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this already in the global vendor directory. Not sure if we can tell govendor somehow about this that it only adds the packages which are not there. Or add the missing packages there? @vjsamuel any idea?

BTW: Same is true for other packages and we have the same issue in the docker module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin, if there is a version mismatch between $BEATS_ROOT/vendor and $MODULE/vendor then there would be two copies. if we can upgrade the root vendor to the more recent version this would ideally go away

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's cool, I personally miss some more verbosity from govendor

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjsamuel @exekias Could we "sync" the two to have the same commit? You can use @hash in govendor to pick the exact same package. For now I would just pick the same as in the top directory as we just updated to master very recently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have moved the dependency to root vendor so now they share the same commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjsamuel That means now all dependencies are on the top level? Not sure I like that the kubernetes dependencies are on the top level as they are module specific. Any chance to have a mix? Means the common ones on the top level and all the other dependencies in here?

"comment": "",
"ignore": "test github.com/elastic/beats",
"package": [
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appengine can be added to ignore

@exekias exekias added in progress Pull request is currently in progress. Metricbeat Metricbeat v6.0.0-alpha1 review labels Apr 11, 2017
NOTICE Outdated
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
golang.org/x/text
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjsamuel @exekias Could we "sync" the two to have the same commit? You can use @hash in govendor to pick the exact same package. For now I would just pick the same as in the top directory as we just updated to master very recently.

for k, v := range pod.Metadata.Labels {
labelMap[k] = v
}
return common.MapStr{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under which namespace will this end up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have the namespacing discussion open, I think we should put everything under the same kubernetes namespace but I was waiting to see the rest of modules to know if it makes sense


func newLogsPathMatcher(cfg common.Config) Matcher {
config := struct {
LogsPath string `config:"logs_path"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this is in filebeat, that would mean the filebeat processors have an additional config option?

}

func newKubernetesAnnotator(cfg common.Config) (processors.Processor, error) {
config := kubeAnnotatorConfig{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we normally have the defaults as a var external to the method. Something called defaultKubeAnnotatorConfig in a config.go file. Check some other config.go files for an example. This makes it easy to find the config options.

)

const (
podCheckPeriod = 20 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't find where this is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its unused. i will remove this one.

}
}

func (p *PodWatcher) deletePod(pod *corev1.Pod) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this remove as the Pod will still exist (hopefully) and it's only removed from the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the naming convention that kube controllers have for various events that are processed which are:

onAdd
onUpdate
onDelete

we could rename it to onPodAdd, onPodUpdate and onPodDelete. does that work @ruflin ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the onPodDelete but as it is a private method naming is not too important. I just stumbled over it when looking at the code.

Copy link
Member

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we have no tests for this processor. Would it be possible to test at least parts of the processor without predefined data structures to not have to start a kubernetes environment.

There are a few new config options. Could we add to the PR docs and example of how the config will look like and also the end output event? I would also be happy to have this in doc.go in the package itself.

@vjsamuel
Copy link
Contributor Author

@ruflin, i have moved the config to a dedicated config.go file so that the global configs are easier to keep track of.

@vjsamuel
Copy link
Contributor Author

@ruflin, basic test cases and processor documentation have been added as well.

in_cluster: true
-------------------------------------------------------------------------------

The configuration below enables the processor on a beat running as a process on the kubernetes node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also work outside the kubernetes node, I would just say "outside the cluster"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outside the node is useless for logs as the filebeat needs to sit inside the same node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking also in the metricbeat case, but works for me, it's a minor thing anyway

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, capitalize Beat and Kubernetes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return kubeAnnotatorConfig{
InCluster: true,
SyncPeriod: 1 * time.Second,
Namespace: "kube-system",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should suggest default? I guess entry level users are not aware of kube-system and may miss beats when listing pods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wouldnt recommend that. default namespace is only for control plane components. kube-system or monitoring are recommended namespaces.


// AddIndexer to the register
func (r *Register) AddIndexer(name string, indexer IndexConstructor) {
r.indexers[name] = indexer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing all RWMutex locking logic in getters & setters, could you please add it? Like in https://github.com/elastic/beats/pull/3888/files#diff-2afe85a18185c27752efda431319dac3R120

//Add a log path matcher which can extract container ID from the "source" field.
logsPathMatcher, err := newLogsPathMatcher(*cfg)
if err == nil {
kubernetes.Indexing.AddDefaultMatcher(logsPathMatcher)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just adding the registered matcher/indexer name here, instead of registering an instance? like AddDefaultIndexer("logs_path")

This would force us to register it before marking it as default

Copy link
Member

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some more high level comments. Will have a detailed look again tomorrow morning. In general LGTM. Lets make sure to get it in rather soonish and then iterate on top of it.

@exekias Could you already start a follow up github meta issue to track open points?

@@ -17,6 +17,9 @@ import (
"github.com/elastic/beats/filebeat/publisher"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"

//Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processors/annotate/kubernetes"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that approach

@@ -0,0 +1,91 @@
package kubernetes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a little bit a mess with package names singular/plural. We started all new packages to be singular (like module) when it contains multiple packages with each package one module. processors initially contained all processors in one package I think, that is why it was called processors and not processor. It should be also changed in libbeat later but in filebeat we should directly start with processor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

`logs_path` matcher that takes the `source` field, extracts the container ID and uses it to retrieve
metadata.

The configuration below enables the processor when the beat as a pod in kubernetes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"when the beat as a pod" ? Is a word missing here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor changes to capitalization, word choice, and commas: "For example, Filebeat defines the container indexer, which indexes pod metadata based on all container IDs, and a logs_path matcher, which takes the source field, extracts the container ID, and uses it to retrieve metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

in_cluster: false
host: <hostname>
kube_config: ~/.kube/config
enable_default_indexers: false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would turn this around to default_indexers.enabled to be consistend with enabled in other places. With ucfg the enabled comes for "free" as it is already part of the config option.

Same below for default_matches.enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

matchers:
- fields:
lookup_fields: ["metricset.host"]
-------------------------------------------------------------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I miss a note on how where the enriched data for the event will be put. How will the end event look like?

@@ -12,6 +12,9 @@ import (

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/pkg/errors"

//Add metricbeat specific processors
_ "github.com/elastic/beats/metricbeat/processors/annotate/kubernetes"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should auto generate this in the future for all beats based on the convention for the namespace and make it part of the include package.

@@ -0,0 +1,126 @@
package kubernetes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the package name see comment in filebeat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@exekias exekias mentioned this pull request Apr 17, 2017
10 tasks
Copy link
Contributor

@dedemorton dedemorton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor edits and questions about areas of the doc that will be potentially confusing to users.

=== kubernetes

The `kubernetes` processor annotates each event with relevant metadata based on
which kubernetes pod the event originated from. Each event is annotated with:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize Kubernetes whenever it's used as a product name.

the `ip_port` indexer can take a kubernetes pod and index the pod metadata based on all
`pod_ip:container_port` combinations.

Matchers are used to build lookup keys with which indices can be queried. For example, the `fields`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little hard to follow because of the passive voice. Maybe say, "...build lookup keys for querying indices."

`pod_ip:container_port` combinations.

Matchers are used to build lookup keys with which indices can be queried. For example, the `fields`
matcher can take a list of fields based on whose values, indices can be queried to retrieve metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't follow your meaning here, so you'll need to reword. Might also help to have a specific example. In fact, the concepts here are a bit hard to follow (not sure if it's a problem with the doc or my lack of knowledge regarding Kubernetes). Specific examples somewhere showing how the events are enriched with Kubernetes data would be very helpful.

Matchers are used to build lookup keys with which indices can be queried. For example, the `fields`
matcher can take a list of fields based on whose values, indices can be queried to retrieve metadata.

Each beat can define its own default indexers and matchers which are enabled by default. For example,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize Beat and Filebeat.

`logs_path` matcher that takes the `source` field, extracts the container ID and uses it to retrieve
metadata.

The configuration below enables the processor when the beat as a pod in kubernetes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor changes to capitalization, word choice, and commas: "For example, Filebeat defines the container indexer, which indexes pod metadata based on all container IDs, and a logs_path matcher, which takes the source field, extracts the container ID, and uses it to retrieve metadata.

[source,yaml]
-------------------------------------------------------------------------------
processors:
- kuberenes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this say kubernetes? This question applies to other places where you use kuberenes instead of kubernetes.

* Namespace
* Labels

The `kuberentes` process has two basic building blocks which are:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this say kubernetes processor instead of process? Note that there is also a typo: kuberentes

Matchers are used to build lookup keys with which indices can be queried. For example, the `fields`
matcher can take a list of fields based on whose values, indices can be queried to retrieve metadata.

Each beat can define its own default indexers and matchers which are enabled by default. For example,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do users know which indexers and matchers are enabled for each Beat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we would need to document individual beats on what indexers/matchers each of them enable by default. having them in libbeat documentation wouldnt be ideal.

in_cluster: true
-------------------------------------------------------------------------------

The configuration below enables the processor on a beat running as a process on the kubernetes node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, capitalize Beat and Kubernetes.

kube_config: ~/.kube/config
-------------------------------------------------------------------------------

The configuration below allows custom indexers and matchers to be enabled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this configuration enable the indexers and matchers, or does it allow you to enable them somewhere else? The passive voice used here makes it sound like the latter.

Copy link
Member

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few minor comments but I'm ok with moving forward with the PR as is and address the comments later. Please also check the comment from @dedemorton

Before we merge it, all the commits should be squashed into 1 and the commit message and PR message should be updated.

A few additional things we should do after merging:

  • Going through the config options again and check the naming.
  • Discuss again where which part of the indexer / matcher logic should be. I could see that for example the ip:port matcher logic is in libbeat close to the processor, but configuration and activation as default matcher happens in metricbeat. Same for filebeat.
  • Discuss on how to handle dependencies best, as having it on the global level is not optimal, but also having duplicates is not good.

return nil, fmt.Errorf("fail to unpack the `logs_path` configuration: %s", err)
}

logPath := config.LogsPath
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be logsPath?


indexer := kubernetes.Indexing.GetIndexer(kubernetes.ContainerIndexerName)
//Add a container indexer by default.
if indexer != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we split the code into 2 func init() functions which I think Golang allows, we could make the code quite a bit nicer. Also it allows to check for err != nil and then return which is more the "standard" golang way.

SyncPeriod time.Duration `config:"sync_period"`
Indexers PluginConfig `config:"indexers"`
Matchers PluginConfig `config:"matchers"`
DefaultMatchers Enabled `config:"default_matchers"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use here config:"default_matchers.enabled" and then just use a bool. No additional type needed.

return kubeAnnotatorConfig{
InCluster: true,
SyncPeriod: 1 * time.Second,
Namespace: "kube-system",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand how this Namespace works. Is that something set in the kubernetes environment? Is there a default when I just set up Kubernetes?

return nil, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err)
}

err = validate(config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ucfg checks for a Validate() error interface and calls the method automatically on unpack. See https://github.com/elastic/beats/blob/master/heartbeat/monitors/active/http/config.go#L82 This could be used here (can be done later)

return nil, err
}

indexers := Indexers{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the double nesting here? Naming gets kind of interesting like this.

return kubernetesAnnotator{podWatcher: watcher, matchers: &matchers}, nil
}

return nil, fatalError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line could be removed because there is the same return in the end.

logp.Err("Unable to get indexer plugin %s", IpPortIndexerName)
}

matcher := kubernetes.Indexing.GetMatcher(kubernetes.FieldMatcherName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two init functions would simplify the logic here (see comment in filebeat)

@vjsamuel vjsamuel force-pushed the kubernetes_annotator branch 2 times, most recently from 21ec33c to f1e4147 Compare April 18, 2017 07:31
This PR adds support for Kubernetes as a processor for Beats. The Kubernetes processor allows Beats to enrich events with metadata coming from the Kubernetes Pod from which the event originated. Metadata might include:

* pod name
* pod namespace
* container name

The Kubernetes processor relies on two constructs:

* Indexers - used to generate metadata from Pods and store them with unique keys. Example of keys might be container IDs, IP:Port combinations, PodName.
* Matchers - used to generate a lookup key from an event. Example of a matcher might be a log_path matcher that looks at the source field of an event, extract the container ID from the log path and use it as a lookup key to retrieve metadata about the Pod from which the log message originated.
@exekias exekias removed the in progress Pull request is currently in progress. label Apr 18, 2017
@exekias exekias merged commit 54d557e into elastic:master Apr 18, 2017
@vjsamuel vjsamuel deleted the kubernetes_annotator branch May 2, 2017 15:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants