Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Fix excessive memory usage when parsing bodies #15639

Merged
merged 11 commits into from
Jan 22, 2020
51 changes: 15 additions & 36 deletions heartbeat/monitors/active/http/respbody.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"encoding/hex"
"io"
"net/http"
"unicode/utf8"
"strings"

"github.com/docker/go-units"

"github.com/elastic/beats/heartbeat/reason"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -31,7 +33,7 @@ import (
// maxBufferBodyBytes sets a hard limit on how much we're willing to buffer for any reason internally.
// since we must buffer the whole body for body validators this is effectively a cap on that.
// 100MiB out to be enough for everybody.
const maxBufferBodyBytes = 100 * 1024 * 1024
const maxBufferBodyBytes = 100 * units.MiB

func processBody(resp *http.Response, config responseConfig, validator multiValidator) (common.MapStr, reason.Reason) {
// Determine how much of the body to actually buffer in memory
Expand Down Expand Up @@ -94,43 +96,20 @@ func readBody(resp *http.Response, maxSampleBytes int) (bodySample string, bodyS

func readPrefixAndHash(body io.ReadCloser, maxPrefixSize int) (respSize int, prefix string, hashStr string, err error) {
hash := sha256.New()
// Function to lazily get the body of the response
rawBuf := make([]byte, 1024)

// Buffer to hold the prefix output along with tracking info
prefixBuf := make([]byte, maxPrefixSize)
prefixRemainingBytes := maxPrefixSize
prefixWriteOffset := 0
for {
readSize, readErr := body.Read(rawBuf)

respSize += readSize
hash.Write(rawBuf[:readSize])

if prefixRemainingBytes > 0 {
if readSize >= prefixRemainingBytes {
copy(prefixBuf[prefixWriteOffset:maxPrefixSize], rawBuf[:prefixRemainingBytes])
prefixWriteOffset += prefixRemainingBytes
prefixRemainingBytes = 0
} else {
copy(prefixBuf[prefixWriteOffset:prefixWriteOffset+readSize], rawBuf[:readSize])
prefixWriteOffset += readSize
prefixRemainingBytes -= readSize
}
}

if readErr == io.EOF {
break
}
var prefixBuf strings.Builder

if readErr != nil {
return 0, "", "", readErr
}
n, err := io.Copy(&prefixBuf, io.TeeReader(io.LimitReader(body, int64(maxPrefixSize)), hash))
if err == nil {
// finish streaming into hash if the body has not been fully consumed yet
var m int64
m, err = io.Copy(hash, body)
n += m
}

// We discard the body if it is not valid UTF-8
if utf8.Valid(prefixBuf[:prefixWriteOffset]) {
prefix = string(prefixBuf[:prefixWriteOffset])
if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change in semantics. The original code used to break if err == io.EOF.

This should fix it:
if err != nil && err != io.EOF {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed and pushed. I think there are no other outstanding issues.

return 0, "", "", err
}
Copy link

@urso urso Jan 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the whole loop can be replaced using the io package only. e.g.

hash := sha256.New()
var prefixBuf strings.Builder
n, err := io.Copy(&prefixBuf, io.TeeReader(io.LimitReader(body, maxPrefixSize), hash))

This reads up to maxPrefixSize bytes from the body and writes every single byte to hash and prefixBuf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooooooh, that's nice, will give it a shot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually wondering now, the only reason we do the prefix reading in the first place is to give something for the response checkers to read. The problem before was that if you used a JSON checker and a grep checker, they'd share the same IO, so one would use up the stream before the other could get to work.

Maybe we should just use the tee reader in the first place to split the stream between all the users, then we can always read the full body and not worry about reading some fixed quantity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think that's a bit out of scope here. I'd like to keep the code as-is.

I tried to get things cleaner with the TeeReader, but that really requires us to have more of a limitWriter than a limitReader. The code sample you posted unfortunately only hashes the prefix, whereas the current code hashes the full body.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the hash it should be enough to add another io.Copy(body, hash) if the first io.Copy did not reach EOF. It's still the same reader, and readers are stateful. The second copy should continue appending to the hash from the last known position. Like:

hash := sha256.New()
var prefixBuf strings.Builder
n, err := io.Copy(&prefixBuf, io.TeeReader(io.LimitReader(body, maxPrefixSize), hash))
if err == nil {
  // finish streaming into hash if the body has not been fully consumed yet
  var m int
  m, err = io.Copy(hash, body)
  n += m
}

Alternatively we could introduce a LimitedWriter and a TeeWriter (as you've already noticed). They don't exist in our code base, so we would need to implement these ourselves :(. Then the operation would become:

io.Copy(TeeWriter(LimitedWriter(prefixBuf, ...), hash), body)

The idea is similar to my first sample code. Let's try to reduce code handling buffers, indices, offsets and size limits in loops by replacing them using more declarative (reusable?) reader/writer constructors.

Unfortunately parsers like JSON or regexp support are pull based. Meaning they require an io.Reader in order to "pull" new bytes. Constructing pipelines using a mix of pull and push (reader and writer) can still be somewhat tricky.

At least for JSON I have an alternative. In Beats we ues go-structform for spooling to disk and in our outputs. Interestingly the JSON parser has a push based interface: https://github.com/elastic/go-structform/blob/master/json/parse.go#L154

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed the code to use io.Copy, much cleaner, thanks!

Thinking about it, I think I'm fine with the buffer approach for now. Chances are that users dealing with large files don't want to use either JSON or regexp matching. I think I'm fine waiting for that feature request to come in.

return respSize, prefix, hex.EncodeToString(hash.Sum(nil)), nil

return int(n), prefixBuf.String(), hex.EncodeToString(hash.Sum(nil)), nil
}