From 03b42273e6f23c15f53dc89569bbbd33d2eac5ca Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 13 Jul 2020 16:43:23 -0400 Subject: [PATCH] [Filebeat] Fix reference leak in TCP and Unix socket inputs (#19459) (#19501) The tcp and unix input sources were leaking references causing a memory leak. When an accepted connection ended inputsource/common.Closer was supposed to delete the pointer that it held to the connection, but due to a code error `delete` was being called on the wrong map. Instead of modifying the common.Closer I replaced it with a cancellable context.Context which is designed to propagate signals from parent to children and requires less code. (cherry picked from commit 61f484681d01dd3a01323c343f9c035a9cb8520d) --- CHANGELOG.next.asciidoc | 1 + filebeat/inputsource/common/handler.go | 12 ++++-------- filebeat/inputsource/common/listener.go | 8 ++++++++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6adee56aace..506b91151ba 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -231,6 +231,7 @@ field. You can revert this change by configuring tags for the module and omittin - Fix date and timestamp formats for fortigate module {pull}19316[19316] - Add missing `default_field: false` to aws filesets fields.yml. {pull}19568[19568] - Fix tls mapping in suricata module {issue}19492[19492] {pull}19494[19494] +- Fix memory leak in tcp and unix input sources. {pull}19459[19459] *Heartbeat* diff --git a/filebeat/inputsource/common/handler.go b/filebeat/inputsource/common/handler.go index 84786086f4e..a55ee1755d5 100644 --- a/filebeat/inputsource/common/handler.go +++ b/filebeat/inputsource/common/handler.go @@ -19,6 +19,7 @@ package common import ( "bufio" + "context" "net" "github.com/pkg/errors" @@ -31,7 +32,7 @@ import ( type HandlerFactory func(config ListenerConfig) ConnectionHandler // ConnectionHandler interface provides mechanisms for handling of incoming connections -type ConnectionHandler func(CloseRef, net.Conn) error +type ConnectionHandler func(context.Context, net.Conn) error // MetadataFunc defines callback executed when a line is read from the split handler. type MetadataFunc func(net.Conn) inputsource.NetworkMetadata @@ -39,7 +40,7 @@ type MetadataFunc func(net.Conn) inputsource.NetworkMetadata // SplitHandlerFactory allows creation of a handler that has splitting capabilities. func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory { return func(config ListenerConfig) ConnectionHandler { - return ConnectionHandler(func(closer CloseRef, conn net.Conn) error { + return ConnectionHandler(func(ctx context.Context, conn net.Conn) error { metadata := metadataCallback(conn) maxMessageSize := uint64(config.MaxMessageSize) @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me scanner.Buffer(buffer, int(maxMessageSize)) for { select { - case <-closer.Done(): + case <-ctx.Done(): break default: } - // Ensure that if the Conn is already closed then dont attempt to scan again - if closer.Err() == ErrClosed { - break - } - if !scanner.Scan() { break } diff --git a/filebeat/inputsource/common/listener.go b/filebeat/inputsource/common/listener.go index f4890ccc767..0798b6aaa11 100644 --- a/filebeat/inputsource/common/listener.go +++ b/filebeat/inputsource/common/listener.go @@ -78,6 +78,14 @@ func (l *Listener) Start() error { return err } + l.ctx, l.cancel = context.WithCancel(context.Background()) + go func() { + <-l.ctx.Done() + l.Listener.Close() + }() + + l.log.Info("Started listening for " + l.family.String() + " connection") + l.wg.Add(1) go func() { defer l.wg.Done()