Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] keep track of bytes read when max_bytes exceeded #28352

Merged
merged 2 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191]
- Add support for username in cisco asa security negotiation logs {pull}26975[26975]
- Relax time parsing and capture group and session type in Cisco ASA module {issue}24710[24710] {pull}28325[28325]
- Correctly track bytes read when max_bytes is exceeded. {issue}28317[28317] {pull}28352[28352]

*Heartbeat*

Expand Down
11 changes: 9 additions & 2 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
}, nil
}

// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
// Next reads the next line until the new line character. The return
// value b is the byte slice that contains the next line. The return
// value n is the number of bytes that were consumed from the
// underlying reader to read the next line. If the LineReader is
// configured with maxBytes n may be larger than the length of b due
// to skipped lines.
func (r *LineReader) Next() (b []byte, n int, err error) {
// This loop is need in case advance detects an line ending which turns out
// not to be one when decoded. If that is the case, reading continues.
for {
Expand Down Expand Up @@ -162,6 +167,7 @@ func (r *LineReader) advance() error {
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
r.byteCount += idx + len(r.nl)
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
Expand All @@ -175,6 +181,7 @@ func (r *LineReader) advance() error {
return err
}
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
r.byteCount += skipped
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
}
Expand Down
7 changes: 1 addition & 6 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestMaxBytesLimit(t *testing.T) {
// Read decodec lines and test
var idx int
for i := 0; ; i++ {
b, n, err := reader.Next()
b, _, err := reader.Next()
if err != nil {
if err == io.EOF {
break
Expand All @@ -387,12 +387,7 @@ func TestMaxBytesLimit(t *testing.T) {
break
}

gotLen := n - len(nl)
s := string(b[:len(b)-len(nl)])
if len(line) != gotLen {
t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen)
}

if line != s {
t.Fatalf("lines do not match, expected: %s got: %s", line, s)
}
Expand Down