From 51267b4e4a02ef27ab37c3dabad8682f5228aece Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Mon, 20 Aug 2018 23:07:05 +0200 Subject: [PATCH] Fix deadlock when file integrity monitor is started A deadlock is possible in auditbeat's file_integrity module under Windows: When enough events arrive while watches are being installed, the event channel can fill causing the installation of a watch to block. This patch makes sure that events are received while watches are being installed, and at the same time ensures that no event is lost. --- CHANGELOG.asciidoc | 1 + .../file_integrity/eventreader_fsnotify.go | 51 ++++++++++++-- .../module/file_integrity/eventreader_test.go | 70 +++++++++++++++++++ 3 files changed, 117 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 21df332cc37..45578316834 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Fixed a crash in the file_integrity module under Linux. {issue}7753[7753] - Fixed a data race in the file_integrity module. {issue}8009[8009] +- Fixed a deadlock in the file_integrity module. {pull}8027[8027] *Filebeat* diff --git a/auditbeat/module/file_integrity/eventreader_fsnotify.go b/auditbeat/module/file_integrity/eventreader_fsnotify.go index c08e8186a6e..b228ebc59a3 100644 --- a/auditbeat/module/file_integrity/eventreader_fsnotify.go +++ b/auditbeat/module/file_integrity/eventreader_fsnotify.go @@ -47,7 +47,6 @@ func NewEventReader(c Config) (EventProducer, error) { return &reader{ watcher: watcher, config: c, - eventC: make(chan Event, 1), log: logp.NewLogger(moduleName), }, nil } @@ -56,7 +55,16 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { if err := r.watcher.Start(); err != nil { return nil, errors.Wrap(err, "unable to start watcher") } - go r.consumeEvents(done) + + queueDone := make(chan struct{}) + queueC := make(chan []*Event) + + // Launch a separate goroutine to fetch all events that happen while + // watches are being installed. + go func() { + defer close(queueC) + queueC <- r.enqueueEvents(queueDone) + }() // Windows implementation of fsnotify needs to have the watched paths // installed after the event consumer is started, to avoid a potential @@ -73,21 +81,53 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { } } + close(queueDone) + events := <-queueC + + // Populate callee's event channel with the previously received events + r.eventC = make(chan Event, 1+len(events)) + for _, ev := range events { + r.eventC <- *ev + } + + go r.consumeEvents(done) + r.log.Infow("Started fsnotify watcher", "file_path", r.config.Paths, "recursive", r.config.Recursive) return r.eventC, nil } +func (r *reader) enqueueEvents(done <-chan struct{}) (events []*Event) { + for { + ev := r.nextEvent(done) + if ev == nil { + return + } + events = append(events, ev) + } +} + func (r *reader) consumeEvents(done <-chan struct{}) { defer close(r.eventC) defer r.watcher.Close() for { - select { - case <-done: + ev := r.nextEvent(done) + if ev == nil { r.log.Debug("fsnotify reader terminated") return + } + r.eventC <- *ev + } +} + +func (r *reader) nextEvent(done <-chan struct{}) *Event { + for { + select { + case <-done: + return nil + case event := <-r.watcher.EventChannel(): if event.Name == "" || r.config.IsExcludedPath(event.Name) || !r.config.IsIncludedPath(event.Name) { @@ -102,7 +142,8 @@ func (r *reader) consumeEvents(done <-chan struct{}) { r.config.MaxFileSizeBytes, r.config.HashTypes) e.rtt = time.Since(start) - r.eventC <- e + return &e + case err := <-r.watcher.ErrorChannel(): // a bug in fsnotify can cause spurious nil errors to be sent // on the error channel. diff --git a/auditbeat/module/file_integrity/eventreader_test.go b/auditbeat/module/file_integrity/eventreader_test.go index f0bf074902c..f444aa6ea88 100644 --- a/auditbeat/module/file_integrity/eventreader_test.go +++ b/auditbeat/module/file_integrity/eventreader_test.go @@ -18,10 +18,12 @@ package file_integrity import ( + "fmt" "io/ioutil" "os" "path/filepath" "runtime" + "strings" "syscall" "testing" "time" @@ -233,6 +235,74 @@ func TestEventReader(t *testing.T) { }) } +func TestRaces(t *testing.T) { + const ( + fileMode os.FileMode = 0640 + N = 100 + ) + + var dirs []string + + for i := 0; i < N; i++ { + dir, err := ioutil.TempDir("", "audit") + if err != nil { + t.Fatal(err) + } + if dir, err = filepath.EvalSymlinks(dir); err != nil { + t.Fatal(err) + } + dirs = append(dirs, dir) + } + + defer func() { + for _, dir := range dirs { + os.RemoveAll(dir) + } + }() + + // Create a new EventProducer. + config := defaultConfig + config.Paths = dirs + config.Recursive = true + r, err := NewEventReader(config) + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + defer close(done) + + // Generate a lot of events in parallel to Start() so there is a chance of + // events arriving before all watched dirs are Add()-ed + go func() { + for i := 0; i < 10; i++ { + for _, dir := range dirs { + fname := filepath.Join(dir, fmt.Sprintf("%d.dat", i)) + ioutil.WriteFile(fname, []byte("hello"), fileMode) + } + } + }() + eventC, err := r.Start(done) + if err != nil { + t.Fatal(err) + } + + const marker = "test_file" + for _, dir := range dirs { + fname := filepath.Join(dir, marker) + ioutil.WriteFile(fname, []byte("hello"), fileMode) + } + + got := 0 + for i := 0; got < N; i++ { + ev := readTimeout(t, eventC) + if strings.Contains(ev.Path, marker) { + got++ + } + } + assert.Equal(t, N, got) +} + // readTimeout reads one event from the channel and returns it. If it does // not receive an event after one second it will time-out and fail the test. func readTimeout(t testing.TB, events <-chan Event) Event {