Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][Idea] Create field mappings on the fly instead of using template #7972

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions libbeat/common/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ func (f Fields) HasKey(key string) bool {
return f.hasKey(keys)
}

// HasKey checks if inside fields the given key exists

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method Fields.GetKey should be of the form "GetKey ..."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method Fields.GetKey should be of the form "GetKey ..."

// The key can be in the form of a.b.c and it will check if the nested field exist
// In case the key is `a` and there is a value `a.b` false is return as it only
// returns true if it's a leave node
func (f Fields) GetKey(key string) *Field {
keys := strings.Split(key, ".")
return f.getKey(keys)
}

// HasNode checks if inside fields the given node exists
// In contrast to HasKey it not only compares the leaf nodes but
// every single key it traverses.
Expand Down Expand Up @@ -193,6 +202,32 @@ func (f Fields) hasKey(keys []string) bool {
return false
}

func (f Fields) getKey(keys []string) *Field {
// Nothing to compare anymore
if len(keys) == 0 {
return nil
}

key := keys[0]
keys = keys[1:]

for _, field := range f {
if field.Name == key {

if len(field.Fields) > 0 {
return field.Fields.getKey(keys)
}
// Last entry in the tree but still more keys
if len(keys) > 0 {
return nil
}

return &field
}
}
return nil
}

// GetKeys returns a flat list of keys this Fields contains
func (f Fields) GetKeys() []string {
return f.getKeys("")
Expand All @@ -216,3 +251,44 @@ func (f Fields) getKeys(namespace string) []string {

return keys
}

// GetKeys returns a flat list of keys this Fields contains

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method Fields.GetFlatFields should be of the form "GetFlatFields ..."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method Fields.GetFlatFields should be of the form "GetFlatFields ..."

func (f Fields) GetFlatFields() map[string]Field {
return f.getFlatFields("")
}

func (f Fields) getFlatFields(namespace string) map[string]Field {

var fields = map[string]Field{}

for _, field := range f {
fieldName := namespace + "." + field.Name
if namespace == "" {
fieldName = field.Name
}
if len(field.Fields) == 0 {
fields[fieldName] = field
} else {
f := field.Fields.getFlatFields(fieldName)
// Iterate through all of them and add them
for k, ff := range f {
fields[k] = ff
}
}
}

return fields
}

func (f Fields) Flatten() map[string]Field {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Fields.Flatten should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Fields.Flatten should have comment or be unexported

keys := f.getKeys("")

fields := map[string]Field{}
for _, k := range keys {
field := f.GetKey(k)
field.Name = k
fields[k] = *field
}

return fields
}
77 changes: 76 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ import (
"net/url"
"time"

"strings"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/template"
"github.com/elastic/beats/libbeat/testing"
)

Expand Down Expand Up @@ -497,6 +501,10 @@ func bulkCollectPublishFails(
count := len(data)
failed := data[:0]
stats := bulkResultStats{}

// Collecting all keys in events which failed because of strict mapping. Makes sure every key exists only once.
var fieldKeys = map[string]struct{}{}

for i := 0; i < count; i++ {
status, msg, err := itemStatus(reader)
if err != nil {
Expand All @@ -515,7 +523,17 @@ func bulkCollectPublishFails(
continue // ok
}

if status < 500 && status != 429 {
if status == 400 && strings.Contains(string(msg), "strict_dynamic_mapping_exception") {
// Unfortunately the error contains only the first missing field, means we must add all fields for the mapping again.
logp.Err("dynamic mapping excpetion: %+v, %s", data[i].Content.Fields.Flatten(), string(msg))

keys := data[i].Content.Fields.Flatten()
// Collect all keys of the event. Existing fields are just overwritten.
for k := range keys {
fieldKeys[k] = struct{}{}
}

} else if status < 500 && status != 429 {
// hard failure, don't collect
logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
Expand All @@ -527,6 +545,63 @@ func bulkCollectPublishFails(
failed = append(failed, data[i])
}

// Load all missing fields to the index
if len(fieldKeys) > 0 {
var fields common.Fields

fieldsMap := template.Fields.GetFlatFields()
for k := range fieldKeys {

if field, ok := fieldsMap[k]; ok {
logp.Err("%s, %v", k, field.Type)
field.Name = k
fields = append(fields, field)
} else {
// Is it dynamic mapping keys that we don't find here?
logp.Err("nil field: %s", k)
}
}

// @timestamp field needs to be added manually because it's not in the fields list
fields = append(fields, fieldsMap["@timestamp"])

// TODO: how do we deal with dynamic mappings in indices? Can they be overlaoded too?

conf := map[string]interface{}{
"hosts": []string{"localhost:9200"},
}

cfg, _ := common.NewConfigFrom(conf)

esClient, err := NewConnectedClient(cfg)
if err != nil {
logp.Err("connected client: %s", err)
}
logp.Err("Version: %s", esClient.GetVersion())

// TODO: fetch elasticsearch version to define how fields are processed
version, _ := common.NewVersion(esClient.GetVersion())
properties := common.MapStr{}
processor := template.Processor{EsVersion: *version}
if err := processor.Process(fields, "", properties); err != nil {
logp.Err("Procesing issue: %s", err)
}

logp.Err("Properties: %s", properties)

// TODO: create mapping based on elasticsearch version

// Send missing mapping
body := common.MapStr{"properties": properties}
_, _, err = esClient.Request("PUT", "/metricbeat-7.0.0-alpha1-2018.08.15/_mapping/doc", "", nil, body)
if err != nil {
// TODO: we should check error and if index not exist error, we should create the index and resend mapping
logp.Err("Error loading index mapping: %s", err)
}

// Data is now resent but mapping should exist.
}

return failed, stats
}

Expand Down
3 changes: 3 additions & 0 deletions libbeat/template/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (l *Loader) Load() error {
if l.config.JSON.Enabled {
templateName = l.config.JSON.Name
}

tmpl.LoadBytes(l.fields)

// Check if template already exist or should be overwritten
exists := l.CheckTemplate(templateName)
if !exists || l.config.Overwrite {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
dynamicTemplates []common.MapStr

defaultFields []string

Fields common.Fields

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var Fields should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var Fields should have comment or be unexported

)

type Template struct {
Expand Down Expand Up @@ -120,6 +122,8 @@ func (t *Template) load(fields common.Fields) (common.MapStr, error) {
t.Lock()
defer t.Unlock()

Fields = fields

dynamicTemplates = nil
defaultFields = nil

Expand Down