Skip to content

Commit

Permalink
Cherry-pick elastic#19764 to 7.7: [Auditbeat] Fix up socket dataset r…
Browse files Browse the repository at this point in the history
…unaway CPU usage (elastic#19783)

* [Auditbeat] Fix up socket dataset runaway CPU usage (elastic#19764)

* Fix up socket dataset
* Add Changelog entry

(cherry picked from commit f1ef970)

* fix up changelog

* Fix changelog
  • Loading branch information
Andrew Stucki committed Jul 9, 2020
1 parent 3d2e03b commit d1750e8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- system/package: Fix parsing of Installed-Size field of DEB packages. {issue}16661[16661] {pull}17188[17188]
- system module: Fix panic during initialisation when /proc/stat can't be read. {pull}17569[17569]
- system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033]
- system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033] {pull}19764[19764]
- system/socket: Fixed tracking of long-running connections. {pull}19033[19033]

*Filebeat*
Expand Down
15 changes: 9 additions & 6 deletions x-pack/auditbeat/module/system/socket/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (s *state) ExpireOlder() {
deadline = s.clock().Add(-s.socketTimeout)
for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if sock, ok := item.(*socket); ok {
s.onSockDestroyed(sock.sock, 0)
s.onSockDestroyed(sock.sock, sock, 0)
} else {
s.socketLRU.get()
}
Expand Down Expand Up @@ -704,13 +704,16 @@ func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error {
s.Lock()
defer s.Unlock()

return s.onSockDestroyed(ptr, pid)
return s.onSockDestroyed(ptr, nil, pid)
}

func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error {
sock, found := s.socks[ptr]
if !found {
return nil
func (s *state) onSockDestroyed(ptr uintptr, sock *socket, pid uint32) error {
var found bool
if sock == nil {
sock, found = s.socks[ptr]
if !found {
return nil
}
}
// Enrich with pid
if sock.pid == 0 && pid != 0 {
Expand Down
27 changes: 26 additions & 1 deletion x-pack/auditbeat/module/system/socket/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
evs := []event{

callExecve(meta(1234, 1234, 1), []string{"/usr/bin/curl", "https://example.net/", "-o", "/tmp/site.html"}),
&commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20},
&execveRet{Meta: meta(1234, 1234, 2), Retval: 1234},
Expand Down Expand Up @@ -293,6 +292,32 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
}
}

func TestSocketExpirationWithOverwrittenSockets(t *testing.T) {
const (
sock uintptr = 0xff1234
flowTimeout = time.Hour
socketTimeout = time.Minute * 3
closeTimeout = time.Minute
)
st := makeState(nil, (*logWrapper)(t), flowTimeout, socketTimeout, closeTimeout, time.Second)
now := time.Now()
st.clock = func() time.Time {
return now
}
if err := feedEvents([]event{
&inetCreate{Meta: meta(1234, 1236, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1236, 5), Sock: sock},
&inetCreate{Meta: meta(1234, 1237, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1237, 5), Sock: sock},
}, st, t); err != nil {
t.Fatal(err)
}
now = now.Add(closeTimeout + 1)
st.ExpireOlder()
now = now.Add(socketTimeout + 1)
st.ExpireOlder()
}

func TestUDPOutgoingSinglePacketWithProcess(t *testing.T) {
const (
localIP = "192.168.33.10"
Expand Down

0 comments on commit d1750e8

Please sign in to comment.