From 0e3788b0a6ec8610f2c731b762ef3572177ca90a Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Wed, 8 Sep 2021 23:52:07 -0700 Subject: [PATCH] LineReader: Reuse temporary buffer to reduce per-line allocation (#27782) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What does this PR do? Previously, the `LineReader` would allocate a []byte of size `config.BufferSize` before decoding each line. The underlying array's size allocation is fixed, so `outBuffer.Append` retains all of it even when the appended bytes are much shorter. With this change, we store a single `tempBuffer []byte` which is reused across lines anywhere we need temporary storage. Converting to `outBuffer.Write` forces the buffer to copy data out of tempBuffer, but is able to only allocate space for the written bytes. ## Why is it important? In our production environment, we run beats with k8s-enforced memory limits and are trying to resolve OOMs. The LineReader code path contributes a significant amount of memory allocation. The benchmarks added in bench_test.go show this reduces the memory profile with various line lengths: ``` goos: darwin goarch: amd64 pkg: github.com/elastic/beats/v7/libbeat/reader/readfile cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz name old time/op new time/op delta EncoderReader/buffer-sized_lines-16 125µs ± 3% 94µs ± 9% -24.55% (p=0.008 n=5+5) EncoderReader/short_lines-16 52.6µs ± 4% 36.3µs ±10% -30.88% (p=0.008 n=5+5) EncoderReader/long_lines-16 1.82ms ± 2% 1.70ms ±10% ~ (p=0.151 n=5+5) EncoderReader/skip_lines-16 133µs ± 3% 140µs ± 8% ~ (p=0.151 n=5+5) name old alloc/op new alloc/op delta EncoderReader/buffer-sized_lines-16 442kB ± 0% 239kB ± 0% -46.07% (p=0.000 n=4+5) EncoderReader/short_lines-16 118kB ± 0% 15kB ± 0% -87.27% (p=0.008 n=5+5) EncoderReader/long_lines-16 8.73MB ± 0% 7.63MB ± 0% -12.62% (p=0.000 n=4+5) EncoderReader/skip_lines-16 270kB ± 0% 220kB ± 0% -18.58% (p=0.008 n=5+5) name old allocs/op new allocs/op delta EncoderReader/buffer-sized_lines-16 718 ± 0% 519 ± 0% -27.72% (p=0.008 n=5+5) EncoderReader/short_lines-16 522 ± 0% 421 ± 0% -19.35% (p=0.008 n=5+5) EncoderReader/long_lines-16 2.65k ± 0% 1.58k ± 0% -40.54% (p=0.008 n=5+5) EncoderReader/skip_lines-16 420 ± 0% 419 ± 0% -0.24% (p=0.008 n=5+5) ``` --- CHANGELOG.next.asciidoc | 1 + libbeat/reader/readfile/bench_test.go | 83 +++++++++++++++++++++++++++ libbeat/reader/readfile/line.go | 32 +++++------ 3 files changed, 99 insertions(+), 17 deletions(-) create mode 100644 libbeat/reader/readfile/bench_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c327879d3a3..50fa23f0e23 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -747,6 +747,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added support for parsing syslog dates containing a leading 0 (e.g. `Sep 01`) rather than a space. {pull}27775[27775] - Add base64 Encode functionality to httpjson input. {pull}27681[27681] - Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735] +- Improve memory usage of line reader of `log` and `filestream` input. {pull}27782[27782] *Heartbeat* diff --git a/libbeat/reader/readfile/bench_test.go b/libbeat/reader/readfile/bench_test.go new file mode 100644 index 00000000000..b1f6e7667f6 --- /dev/null +++ b/libbeat/reader/readfile/bench_test.go @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readfile + +import ( + "bytes" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "math/rand" + "testing" + + "golang.org/x/text/encoding" +) + +func BenchmarkEncoderReader(b *testing.B) { + const ( + bufferSize = 1024 + lineMaxLimit = 1000000 // never hit by the input data + ) + + runBench := func(name string, lineMaxLimit int, lines []byte) { + b.Run(name, func(b *testing.B) { + b.ReportAllocs() + for bN := 0; bN < b.N; bN++ { + reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit}) + if err != nil { + b.Fatal("failed to initialize reader:", err) + } + // Read decodec lines and test + size := 0 + for i := 0; ; i++ { + msg, err := reader.Next() + if err != nil { + if err == io.EOF { + b.ReportMetric(float64(i), "processed_lines") + break + } else { + b.Fatal("unexpected error:", err) + } + } + size += msg.Bytes + } + b.ReportMetric(float64(size), "processed_bytes") + } + }) + } + + runBench("buffer-sized lines", lineMaxLimit, createBenchmarkLines(100, 1020)) + runBench("short lines", lineMaxLimit, createBenchmarkLines(100, 10)) + runBench("long lines", lineMaxLimit, createBenchmarkLines(100, 10_000)) + // short lineMaxLimit to exercise skipUntilNewLine + runBench("skip lines", 1024, createBenchmarkLines(100, 10_000)) +} + +func createBenchmarkLines(numLines int, lineLength int) []byte { + buf := bytes.NewBuffer(nil) + for i := 0; i < numLines; i++ { + line := make([]byte, hex.DecodedLen(lineLength)) + if _, err := rand.Read(line); err != nil { + panic(fmt.Sprintf("failed to generate random input: %v", err)) + } + buf.WriteString(hex.EncodeToString(line)) + buf.WriteRune('\n') + } + return buf.Bytes() +} diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index c36b524dde2..78331a7d246 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -30,12 +30,11 @@ import ( const unlimited = 0 -// lineReader reads lines from underlying reader, decoding the input stream +// LineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. type LineReader struct { reader io.ReadCloser - bufferSize int maxBytes int // max bytes per line limit to avoid OOM with malformatted files nl []byte decodedNl []byte @@ -44,10 +43,11 @@ type LineReader struct { inOffset int // input buffer read offset byteCount int // number of bytes decoded from input buffer into output buffer decoder transform.Transformer + tempBuffer []byte logger *logp.Logger } -// New creates a new reader object +// NewLineReader creates a new reader object func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { encoder := config.Codec.NewEncoder() @@ -64,13 +64,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { return &LineReader{ reader: input, - bufferSize: config.BufferSize, maxBytes: config.MaxBytes, decoder: config.Codec.NewDecoder(), nl: nl, decodedNl: terminator, inBuffer: streambuf.New(nil), outBuffer: streambuf.New(nil), + tempBuffer: make([]byte, config.BufferSize), logger: logp.NewLogger("reader_line"), }, nil } @@ -133,18 +133,17 @@ func (r *LineReader) advance() error { r.inOffset = newOffset } - buf := make([]byte, r.bufferSize) - // Try to read more bytes into buffer - n, err := r.reader.Read(buf) + n, err := r.reader.Read(r.tempBuffer) if err == io.EOF && n > 0 { // Continue processing the returned bytes. The next call will yield EOF with 0 bytes. err = nil } - // Appends buffer also in case of err - r.inBuffer.Append(buf[:n]) + // Write to buffer also in case of err + r.inBuffer.Write(r.tempBuffer[:n]) + if err != nil { return err } @@ -170,7 +169,7 @@ func (r *LineReader) advance() error { // If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine if idx == -1 && r.inBuffer.Len() > r.maxBytes { - skipped, err := r.skipUntilNewLine(buf) + skipped, err := r.skipUntilNewLine() if err != nil { r.logger.Error("Error skipping until new line, err:", err) return err @@ -204,7 +203,7 @@ func (r *LineReader) advance() error { return err } -func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { +func (r *LineReader) skipUntilNewLine() (int, error) { // The length of the line skipped skipped := r.inBuffer.Len() @@ -221,14 +220,14 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { // Read until the new line is found for idx := -1; idx == -1; { - n, err := r.reader.Read(buf) + n, err := r.reader.Read(r.tempBuffer) // Check bytes read for newLine if n > 0 { - idx = bytes.Index(buf[:n], r.nl) + idx = bytes.Index(r.tempBuffer[:n], r.nl) if idx != -1 { - r.inBuffer.Append(buf[idx+len(r.nl) : n]) + r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n]) skipped += idx } else { skipped += n @@ -249,14 +248,13 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { func (r *LineReader) decode(end int) (int, error) { var err error - buffer := make([]byte, 1024) inBytes := r.inBuffer.Bytes() start := 0 for start < end { var nDst, nSrc int - nDst, nSrc, err = r.decoder.Transform(buffer, inBytes[start:end], false) + nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false) if err != nil { // Check if error is different from destination buffer too short if err != transform.ErrShortDst { @@ -270,7 +268,7 @@ func (r *LineReader) decode(end int) (int, error) { } start += nSrc - r.outBuffer.Write(buffer[:nDst]) + r.outBuffer.Write(r.tempBuffer[:nDst]) } r.byteCount += start