Skip to content

Commit

Permalink
[Heartbeat] Fix excessive memory usage when parsing bodies (#15639)
Browse files Browse the repository at this point in the history
* [Heartbeat] Fix excessive memory usage when parsing bodies

In 7.5 we introduced a regression where we would allocate a 100MiB
buffer per HTTP request. This change fixes that to stream data instead.
  • Loading branch information
andrewvc committed Jan 22, 2020
1 parent 4f0d0d7 commit 080dedb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ TLS or Beats that accept connections over TLS and validate client certificates.

*Heartbeat*

- Fixed excessive memory usage introduced in 7.5 due to over-allocating memory for HTTP checks. {pull}15639[15639]

*Journalbeat*

Expand Down
51 changes: 15 additions & 36 deletions heartbeat/monitors/active/http/respbody.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"encoding/hex"
"io"
"net/http"
"unicode/utf8"
"strings"

"github.com/docker/go-units"

"github.com/elastic/beats/heartbeat/reason"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -31,7 +33,7 @@ import (
// maxBufferBodyBytes sets a hard limit on how much we're willing to buffer for any reason internally.
// since we must buffer the whole body for body validators this is effectively a cap on that.
// 100MiB out to be enough for everybody.
const maxBufferBodyBytes = 100 * 1024 * 1024
const maxBufferBodyBytes = 100 * units.MiB

func processBody(resp *http.Response, config responseConfig, validator multiValidator) (common.MapStr, reason.Reason) {
// Determine how much of the body to actually buffer in memory
Expand Down Expand Up @@ -94,43 +96,20 @@ func readBody(resp *http.Response, maxSampleBytes int) (bodySample string, bodyS

func readPrefixAndHash(body io.ReadCloser, maxPrefixSize int) (respSize int, prefix string, hashStr string, err error) {
hash := sha256.New()
// Function to lazily get the body of the response
rawBuf := make([]byte, 1024)

// Buffer to hold the prefix output along with tracking info
prefixBuf := make([]byte, maxPrefixSize)
prefixRemainingBytes := maxPrefixSize
prefixWriteOffset := 0
for {
readSize, readErr := body.Read(rawBuf)

respSize += readSize
hash.Write(rawBuf[:readSize])

if prefixRemainingBytes > 0 {
if readSize >= prefixRemainingBytes {
copy(prefixBuf[prefixWriteOffset:maxPrefixSize], rawBuf[:prefixRemainingBytes])
prefixWriteOffset += prefixRemainingBytes
prefixRemainingBytes = 0
} else {
copy(prefixBuf[prefixWriteOffset:prefixWriteOffset+readSize], rawBuf[:readSize])
prefixWriteOffset += readSize
prefixRemainingBytes -= readSize
}
}

if readErr == io.EOF {
break
}
var prefixBuf strings.Builder

if readErr != nil {
return 0, "", "", readErr
}
n, err := io.Copy(&prefixBuf, io.TeeReader(io.LimitReader(body, int64(maxPrefixSize)), hash))
if err == nil {
// finish streaming into hash if the body has not been fully consumed yet
var m int64
m, err = io.Copy(hash, body)
n += m
}

// We discard the body if it is not valid UTF-8
if utf8.Valid(prefixBuf[:prefixWriteOffset]) {
prefix = string(prefixBuf[:prefixWriteOffset])
if err != nil && err != io.EOF {
return 0, "", "", err
}
return respSize, prefix, hex.EncodeToString(hash.Sum(nil)), nil

return int(n), prefixBuf.String(), hex.EncodeToString(hash.Sum(nil)), nil
}

0 comments on commit 080dedb

Please sign in to comment.