Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tag "truncated" to "log.flags" if incoming line is longer than configured limit #7991

1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Add custom unpack to log hints config to avoid env resolution {pull}7710[7710]
- Keep raw user agent information after parsing as user_agent_raw in Filebeat modules. {pull}7823[7832]
- Make docker input check if container strings are empty {pull}7960[7960]
- Add tag "truncated" to "log.status" if incoming line is longer than configured limit. {pull}7991[7991]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
description: >
Logging level.

- name: log.flags
description: >
This field contains the flags of the event.

- name: event.created
type: date
description: >
Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,14 @@ type: keyword
Logging level.


--

*`log.flags`*::
+
--
This field contains the flags of the event.


--

*`event.created`*::
Expand Down
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions filebeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,28 @@ func (m *Message) IsEmpty() bool {
return false
}

func (msg *Message) AddFields(fields common.MapStr) {
// AddFields adds fields to the message.
func (m *Message) AddFields(fields common.MapStr) {
if fields == nil {
return
}

if msg.Fields == nil {
msg.Fields = common.MapStr{}
if m.Fields == nil {
m.Fields = common.MapStr{}
}
msg.Fields.Update(fields)
m.Fields.Update(fields)
}

// AddTagsWithKey adds tags to the message with an arbitrary key.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method Message.AddFlagsWithKey should be of the form "AddFlagsWithKey ..."

// If the field does not exist, it is created.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make AddTagsWithKey takes a variadic arguments for tags?
Common use case its probably to add a single tag.

AddTagsWithKey(key string, tags ...[]string) error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should name it in the code the same as it's in the event again. If we stay with status this should be AddStatusWithKey. Alternative as proposed above we could call it flags so it would be AddFlagsWithKey. Or if we stick with tags here it should probably be log.tags?

Would AddFlagWithKey(key string, flag string) error be enough here? It seems all the usage we have below is only adding 1 flag.

func (m *Message) AddFlagsWithKey(key string, flags ...string) error {
if len(flags) == 0 {
return nil
}

if m.Fields == nil {
m.Fields = common.MapStr{}
}

return common.AddTagsWithKey(m.Fields, key, flags)
}
17 changes: 17 additions & 0 deletions filebeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Reader struct {
separator []byte
last []byte
numLines int
truncated int
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't think about multiline being able to capture truncated bytes as well. Good catch 👍

err error // last seen error
state func(*Reader) (reader.Message, error)
message reader.Message
Expand Down Expand Up @@ -262,13 +263,19 @@ func (mlr *Reader) clear() {
mlr.message = reader.Message{}
mlr.last = nil
mlr.numLines = 0
mlr.truncated = 0
mlr.err = nil
}

// finalize writes the existing content into the returned message and resets all reader variables.
func (mlr *Reader) finalize() reader.Message {
if mlr.truncated > 0 {
mlr.message.AddFlagsWithKey("log.flags", "truncated")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mlr.truncate > 0 branch do in the worst case 4 allocations on every run.
I would define the common.MapStr in a package variable that I would reuse.

// Copy message from existing content
msg := mlr.message

mlr.clear()
return msg
}
Expand Down Expand Up @@ -303,6 +310,16 @@ func (mlr *Reader) addLine(m reader.Message) {
}
mlr.message.Content = append(tmp, m.Content[:space]...)
mlr.numLines++

// add number of truncated bytes to fields
diff := len(m.Content) - space
if diff > 0 {
mlr.truncated += diff
}
} else {
// increase the number of skipped bytes, if cannot add
mlr.truncated += len(m.Content)

}

mlr.last = m.Content
Expand Down
83 changes: 83 additions & 0 deletions filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,41 @@ func TestMultilineBeforeNegateOKWithEmptyLine(t *testing.T) {
)
}

func TestMultilineAfterTruncated(t *testing.T) {
pattern := match.MustCompile(`^[ ]`) // next line is indented a space
maxLines := 2
testMultilineTruncated(t,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test where the truncated flag should be misssing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
true,
[]string{
"line1\n line1.1\n line1.2\n",
"line2\n line2.1\n line2.2\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
testMultilineTruncated(t,
Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
false,
[]string{
"line1\n line1.1\n",
"line2\n line2.1\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
}

func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
_, buf := createLineBuffer(expected...)
r := createMultilineTestReader(t, buf, cfg)
Expand Down Expand Up @@ -177,6 +212,54 @@ func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
}
}

func testMultilineTruncated(t *testing.T, cfg Config, events int, truncated bool, input, expected []string) {
_, buf := createLineBuffer(input...)
r := createMultilineTestReader(t, buf, cfg)

var messages []reader.Message
for {
message, err := r.Next()
if err != nil {
break
}

messages = append(messages, message)
}

if len(messages) != events {
t.Fatalf("expected %v lines, read only %v line(s)", len(expected), len(messages))
}

for _, message := range messages {
found := false
statusFlags, err := message.Fields.GetValue("log.flags")
if err != nil {
if !truncated {
assert.False(t, found)
return
}
t.Fatalf("error while getting log.status field: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}

func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reader.Reader {
encFactory, ok := encoding.FindEncoding("plain")
if !ok {
Expand Down
1 change: 1 addition & 0 deletions filebeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
message.AddFlagsWithKey("log.flags", "truncated")
}
return message, err
}
88 changes: 88 additions & 0 deletions filebeat/reader/readfile/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package readfile

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
)

type mockReader struct {
line []byte
}

func (m *mockReader) Next() (reader.Message, error) {
return reader.Message{
Content: m.line,
}, nil
}

var limitTests = []struct {
line string
maxBytes int
truncated bool
}{
{"long-long-line", 5, true},
{"long-long-line", 3, true},
{"long-long-line", len("long-long-line"), false},
}

func TestLimitReader(t *testing.T) {
for _, test := range limitTests {
r := NewLimitReader(&mockReader{[]byte(test.line)}, test.maxBytes)

msg, err := r.Next()
if err != nil {
t.Fatalf("Error reading from mock reader: %v", err)
}

assert.Equal(t, test.maxBytes, len(msg.Content))

found := false
statusFlags, err := msg.Fields.GetValue("log.flags")
if err != nil {
if !test.truncated {
assert.False(t, found)
return
}
t.Fatalf("Error getting truncated value: %v", err)
}

switch flags := statusFlags.(type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can it be both types? Same question for the other test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it to mimic the testing of the old AddTags. But as the input is never interface, I am removing it.

case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if test.truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}
27 changes: 20 additions & 7 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,25 +305,38 @@ func MergeFields(ms, fields MapStr, underRoot bool) error {
// exist then it will be created. If the tags field exists and is not a []string
// then an error will be returned. It does not deduplicate the list of tags.
func AddTags(ms MapStr, tags []string) error {
return AddTagsWithKey(ms, TagsKey, tags)
}

// AddTagsWithKey appends a tag to the key field of ms. If the field does not
// exist then it will be created. If the field exists and is not a []string
// then an error will be returned. It does not deduplicate the list.
func AddTagsWithKey(ms MapStr, key string, tags []string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for the AddTagsWithKey method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I would keep the naming here as Tags even though I would rename everything in filebeat code above to Flags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Done.

if ms == nil || len(tags) == 0 {
return nil
}
eventTags, exists := ms[TagsKey]
if !exists {
ms[TagsKey] = tags

k, subMap, oldTags, present, err := mapFind(key, ms, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wodner if this has a perfomance impact on the old AddTags method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Keys without dots can be processed in the first loop on a fast path in mapFind. If the key exists as is, it's simply returned at the beginning.

// Fast path, key is present as is.
if v, exists := data[key]; exists {
    return key, data, v, true, nil
}

If not, the only additional cost is one IndexRune which looks for a dot in the key. But currently old tags don't have dots in their keys. Thus, the function returns at the second possible point without doing anything "expensive" e.g. creating additional submaps.

idx := strings.IndexRune(key, '.')
if idx < 0 {
    return key, data, nil, false, nil
}

if err != nil {
return err
}

if !present {
subMap[k] = tags
return nil
}

switch arr := eventTags.(type) {
switch arr := oldTags.(type) {
case []string:
ms[TagsKey] = append(arr, tags...)
subMap[k] = append(arr, tags...)
case []interface{}:
for _, tag := range tags {
arr = append(arr, tag)
}
ms[TagsKey] = arr
subMap[k] = arr
default:
return errors.Errorf("expected string array by type is %T", eventTags)
return errors.Errorf("expected string array by type is %T", oldTags)

}
return nil
}
Expand Down
Loading