From 918f17a7c772167c536e5f7c97e7ebc8b8a68712 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 10 Sep 2020 16:14:33 +0200 Subject: [PATCH] Filebeat: Fix random error on harvester close (#21048) This fixes a race condition when a harvester is closed at the same time that its source file size is being calculated. Before this fix, `Error updating file size` errors are randomly printed, because the file handle becomes closed inside the os.Stat call. --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/harvester.go | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 77ffbd9b172..dc6a588dc1b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -260,6 +260,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update documentation in the azure module filebeat. {pull}20815[20815] - Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908] - Remove wrongly mapped `tls.client.server_name` from `fortinet/firewall` fileset. {pull}20983[20983] +- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index c9014b61de9..6b16861f8ec 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -83,6 +83,7 @@ type Harvester struct { // shutdown handling done chan struct{} + doneWg *sync.WaitGroup stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex @@ -138,6 +139,7 @@ func NewHarvester( publishState: publishState, done: make(chan struct{}), stopWg: &sync.WaitGroup{}, + doneWg: &sync.WaitGroup{}, id: id, outletFactory: outletFactory, } @@ -299,7 +301,11 @@ func (h *Harvester) Run() error { logp.Info("Harvester started for file: %s", h.state.Source) - go h.monitorFileSize() + h.doneWg.Add(1) + go func() { + h.monitorFileSize() + h.doneWg.Done() + }() for { select { @@ -378,7 +384,8 @@ func (h *Harvester) monitorFileSize() { func (h *Harvester) stop() { h.stopOnce.Do(func() { close(h.done) - + // Wait for goroutines monitoring h.done to terminate before closing source. + h.doneWg.Wait() filesMetrics.Remove(h.id.String()) }) }