Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LineReader: Reuse temporary buffer to reduce per-line allocation #27782

Merged
merged 4 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- update ecs.version to ECS 1.11.0. {pull}27107[27107]
- Add base64 Encode functionality to httpjson input. {pull}27681[27681]
- Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735]
- Improve memory usage of line reader of `log` and `filestream` input. {pull}27782[27782]


*Heartbeat*
Expand Down
83 changes: 83 additions & 0 deletions libbeat/reader/readfile/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package readfile
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you all have preferences on where/how to include benchmarks. Happy to move/remove this as desired.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine where you put it.


import (
"bytes"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"math/rand"
"testing"

"golang.org/x/text/encoding"
)

func BenchmarkEncoderReader(b *testing.B) {
const (
bufferSize = 1024
lineMaxLimit = 1000000 // never hit by the input data
)

runBench := func(name string, lineMaxLimit int, lines []byte) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for bN := 0; bN < b.N; bN++ {
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit})
if err != nil {
b.Fatal("failed to initialize reader:", err)
}
// Read decodec lines and test
size := 0
for i := 0; ; i++ {
msg, err := reader.Next()
if err != nil {
if err == io.EOF {
b.ReportMetric(float64(i), "processed_lines")
break
} else {
b.Fatal("unexpected error:", err)
}
}
size += msg.Bytes
}
b.ReportMetric(float64(size), "processed_bytes")
}
})
}

runBench("buffer-sized lines", lineMaxLimit, createBenchmarkLines(100, 1020))
runBench("short lines", lineMaxLimit, createBenchmarkLines(100, 10))
runBench("long lines", lineMaxLimit, createBenchmarkLines(100, 10_000))
// short lineMaxLimit to exercise skipUntilNewLine
runBench("skip lines", 1024, createBenchmarkLines(100, 10_000))
}

func createBenchmarkLines(numLines int, lineLength int) []byte {
buf := bytes.NewBuffer(nil)
for i := 0; i < numLines; i++ {
line := make([]byte, hex.DecodedLen(lineLength))
if _, err := rand.Read(line); err != nil {
panic(fmt.Sprintf("failed to generate random input: %v", err))
}
buf.WriteString(hex.EncodeToString(line))
buf.WriteRune('\n')
}
return buf.Bytes()
}
32 changes: 15 additions & 17 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import (

const unlimited = 0

// lineReader reads lines from underlying reader, decoding the input stream
// LineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.ReadCloser
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
Expand All @@ -44,10 +43,11 @@ type LineReader struct {
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
}

// New creates a new reader object
// NewLineReader creates a new reader object
func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

Expand All @@ -64,13 +64,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {

return &LineReader{
reader: input,
bufferSize: config.BufferSize,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
}, nil
}
Expand Down Expand Up @@ -133,18 +133,17 @@ func (r *LineReader) advance() error {
r.inOffset = newOffset
}

buf := make([]byte, r.bufferSize)

// Try to read more bytes into buffer
n, err := r.reader.Read(buf)
n, err := r.reader.Read(r.tempBuffer)

if err == io.EOF && n > 0 {
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
err = nil
}

// Appends buffer also in case of err
r.inBuffer.Append(buf[:n])
// Write to buffer also in case of err
r.inBuffer.Write(r.tempBuffer[:n])

if err != nil {
return err
}
Expand All @@ -170,7 +169,7 @@ func (r *LineReader) advance() error {

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
skipped, err := r.skipUntilNewLine()
if err != nil {
r.logger.Error("Error skipping until new line, err:", err)
return err
Expand Down Expand Up @@ -204,7 +203,7 @@ func (r *LineReader) advance() error {
return err
}

func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
func (r *LineReader) skipUntilNewLine() (int, error) {
// The length of the line skipped
skipped := r.inBuffer.Len()

Expand All @@ -221,14 +220,14 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {

// Read until the new line is found
for idx := -1; idx == -1; {
n, err := r.reader.Read(buf)
n, err := r.reader.Read(r.tempBuffer)

// Check bytes read for newLine
if n > 0 {
idx = bytes.Index(buf[:n], r.nl)
idx = bytes.Index(r.tempBuffer[:n], r.nl)

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx
} else {
skipped += n
Expand All @@ -249,14 +248,13 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {

func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
inBytes := r.inBuffer.Bytes()
start := 0

for start < end {
var nDst, nSrc int

nDst, nSrc, err = r.decoder.Transform(buffer, inBytes[start:end], false)
nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false)
if err != nil {
// Check if error is different from destination buffer too short
if err != transform.ErrShortDst {
Expand All @@ -270,7 +268,7 @@ func (r *LineReader) decode(end int) (int, error) {
}

start += nSrc
r.outBuffer.Write(buffer[:nDst])
r.outBuffer.Write(r.tempBuffer[:nDst])
}

r.byteCount += start
Expand Down