Skip to content

Commit

Permalink
LineReader: Reuse temporary buffer to reduce per-line allocation (#27782
Browse files Browse the repository at this point in the history
)

## What does this PR do?

Previously, the `LineReader` would allocate a []byte of size `config.BufferSize` before decoding each line. The underlying array's size allocation is fixed, so `outBuffer.Append` retains all of it even when the appended bytes are much shorter.

With this change, we store a single `tempBuffer []byte` which is reused across lines anywhere we need temporary storage. Converting to `outBuffer.Write` forces the buffer to copy data out of tempBuffer, but is able to only allocate space for the written bytes.

## Why is it important?

In our production environment, we run beats with k8s-enforced memory limits and are trying to resolve OOMs. The LineReader code path contributes a significant amount of memory allocation. The benchmarks added in bench_test.go show this reduces the memory profile with various line lengths:

```
goos: darwin
goarch: amd64
pkg: github.com/elastic/beats/v7/libbeat/reader/readfile
cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz

name                                 old time/op          new time/op          delta
EncoderReader/buffer-sized_lines-16           125µs ± 3%            94µs ± 9%  -24.55%  (p=0.008 n=5+5)
EncoderReader/short_lines-16                 52.6µs ± 4%          36.3µs ±10%  -30.88%  (p=0.008 n=5+5)
EncoderReader/long_lines-16                  1.82ms ± 2%          1.70ms ±10%     ~     (p=0.151 n=5+5)
EncoderReader/skip_lines-16                   133µs ± 3%           140µs ± 8%     ~     (p=0.151 n=5+5)

name                                 old alloc/op         new alloc/op         delta
EncoderReader/buffer-sized_lines-16           442kB ± 0%           239kB ± 0%  -46.07%  (p=0.000 n=4+5)
EncoderReader/short_lines-16                  118kB ± 0%            15kB ± 0%  -87.27%  (p=0.008 n=5+5)
EncoderReader/long_lines-16                  8.73MB ± 0%          7.63MB ± 0%  -12.62%  (p=0.000 n=4+5)
EncoderReader/skip_lines-16                   270kB ± 0%           220kB ± 0%  -18.58%  (p=0.008 n=5+5)

name                                 old allocs/op        new allocs/op        delta
EncoderReader/buffer-sized_lines-16             718 ± 0%             519 ± 0%  -27.72%  (p=0.008 n=5+5)
EncoderReader/short_lines-16                    522 ± 0%             421 ± 0%  -19.35%  (p=0.008 n=5+5)
EncoderReader/long_lines-16                   2.65k ± 0%           1.58k ± 0%  -40.54%  (p=0.008 n=5+5)
EncoderReader/skip_lines-16                     420 ± 0%             419 ± 0%   -0.24%  (p=0.008 n=5+5)
```
  • Loading branch information
bmoylan committed Sep 9, 2021
1 parent b04ac92 commit 0e3788b
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added support for parsing syslog dates containing a leading 0 (e.g. `Sep 01`) rather than a space. {pull}27775[27775]
- Add base64 Encode functionality to httpjson input. {pull}27681[27681]
- Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735]
- Improve memory usage of line reader of `log` and `filestream` input. {pull}27782[27782]


*Heartbeat*
Expand Down
83 changes: 83 additions & 0 deletions libbeat/reader/readfile/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package readfile

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"math/rand"
"testing"

"golang.org/x/text/encoding"
)

func BenchmarkEncoderReader(b *testing.B) {
const (
bufferSize = 1024
lineMaxLimit = 1000000 // never hit by the input data
)

runBench := func(name string, lineMaxLimit int, lines []byte) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for bN := 0; bN < b.N; bN++ {
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit})
if err != nil {
b.Fatal("failed to initialize reader:", err)
}
// Read decodec lines and test
size := 0
for i := 0; ; i++ {
msg, err := reader.Next()
if err != nil {
if err == io.EOF {
b.ReportMetric(float64(i), "processed_lines")
break
} else {
b.Fatal("unexpected error:", err)
}
}
size += msg.Bytes
}
b.ReportMetric(float64(size), "processed_bytes")
}
})
}

runBench("buffer-sized lines", lineMaxLimit, createBenchmarkLines(100, 1020))
runBench("short lines", lineMaxLimit, createBenchmarkLines(100, 10))
runBench("long lines", lineMaxLimit, createBenchmarkLines(100, 10_000))
// short lineMaxLimit to exercise skipUntilNewLine
runBench("skip lines", 1024, createBenchmarkLines(100, 10_000))
}

func createBenchmarkLines(numLines int, lineLength int) []byte {
buf := bytes.NewBuffer(nil)
for i := 0; i < numLines; i++ {
line := make([]byte, hex.DecodedLen(lineLength))
if _, err := rand.Read(line); err != nil {
panic(fmt.Sprintf("failed to generate random input: %v", err))
}
buf.WriteString(hex.EncodeToString(line))
buf.WriteRune('\n')
}
return buf.Bytes()
}
32 changes: 15 additions & 17 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import (

const unlimited = 0

// lineReader reads lines from underlying reader, decoding the input stream
// LineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.ReadCloser
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
Expand All @@ -44,10 +43,11 @@ type LineReader struct {
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
}

// New creates a new reader object
// NewLineReader creates a new reader object
func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

Expand All @@ -64,13 +64,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {

return &LineReader{
reader: input,
bufferSize: config.BufferSize,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
}, nil
}
Expand Down Expand Up @@ -133,18 +133,17 @@ func (r *LineReader) advance() error {
r.inOffset = newOffset
}

buf := make([]byte, r.bufferSize)

// Try to read more bytes into buffer
n, err := r.reader.Read(buf)
n, err := r.reader.Read(r.tempBuffer)

if err == io.EOF && n > 0 {
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
err = nil
}

// Appends buffer also in case of err
r.inBuffer.Append(buf[:n])
// Write to buffer also in case of err
r.inBuffer.Write(r.tempBuffer[:n])

if err != nil {
return err
}
Expand All @@ -170,7 +169,7 @@ func (r *LineReader) advance() error {

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
skipped, err := r.skipUntilNewLine()
if err != nil {
r.logger.Error("Error skipping until new line, err:", err)
return err
Expand Down Expand Up @@ -204,7 +203,7 @@ func (r *LineReader) advance() error {
return err
}

func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
func (r *LineReader) skipUntilNewLine() (int, error) {
// The length of the line skipped
skipped := r.inBuffer.Len()

Expand All @@ -221,14 +220,14 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {

// Read until the new line is found
for idx := -1; idx == -1; {
n, err := r.reader.Read(buf)
n, err := r.reader.Read(r.tempBuffer)

// Check bytes read for newLine
if n > 0 {
idx = bytes.Index(buf[:n], r.nl)
idx = bytes.Index(r.tempBuffer[:n], r.nl)

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx
} else {
skipped += n
Expand All @@ -249,14 +248,13 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {

func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
inBytes := r.inBuffer.Bytes()
start := 0

for start < end {
var nDst, nSrc int

nDst, nSrc, err = r.decoder.Transform(buffer, inBytes[start:end], false)
nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false)
if err != nil {
// Check if error is different from destination buffer too short
if err != transform.ErrShortDst {
Expand All @@ -270,7 +268,7 @@ func (r *LineReader) decode(end int) (int, error) {
}

start += nSrc
r.outBuffer.Write(buffer[:nDst])
r.outBuffer.Write(r.tempBuffer[:nDst])
}

r.byteCount += start
Expand Down

0 comments on commit 0e3788b

Please sign in to comment.