Skip to content

Commit

Permalink
Fix deadlock when file integrity monitor is started
Browse files Browse the repository at this point in the history
A deadlock is possible in auditbeat's file_integrity module under
Windows: When enough events arrive while watches are being installed,
the event channel can fill causing the installation of a watch to
block.

This patch makes sure that events are received while watches are being
installed, and at the same time ensures that no event is lost.
  • Loading branch information
adriansr committed Aug 22, 2018
1 parent 698cf9c commit 51267b4
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]

- Fixed a crash in the file_integrity module under Linux. {issue}7753[7753]
- Fixed a data race in the file_integrity module. {issue}8009[8009]
- Fixed a deadlock in the file_integrity module. {pull}8027[8027]

*Filebeat*

Expand Down
51 changes: 46 additions & 5 deletions auditbeat/module/file_integrity/eventreader_fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func NewEventReader(c Config) (EventProducer, error) {
return &reader{
watcher: watcher,
config: c,
eventC: make(chan Event, 1),
log: logp.NewLogger(moduleName),
}, nil
}
Expand All @@ -56,7 +55,16 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
if err := r.watcher.Start(); err != nil {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents(done)

queueDone := make(chan struct{})
queueC := make(chan []*Event)

// Launch a separate goroutine to fetch all events that happen while
// watches are being installed.
go func() {
defer close(queueC)
queueC <- r.enqueueEvents(queueDone)
}()

// Windows implementation of fsnotify needs to have the watched paths
// installed after the event consumer is started, to avoid a potential
Expand All @@ -73,21 +81,53 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
}
}

close(queueDone)
events := <-queueC

// Populate callee's event channel with the previously received events
r.eventC = make(chan Event, 1+len(events))
for _, ev := range events {
r.eventC <- *ev
}

go r.consumeEvents(done)

r.log.Infow("Started fsnotify watcher",
"file_path", r.config.Paths,
"recursive", r.config.Recursive)
return r.eventC, nil
}

func (r *reader) enqueueEvents(done <-chan struct{}) (events []*Event) {
for {
ev := r.nextEvent(done)
if ev == nil {
return
}
events = append(events, ev)
}
}

func (r *reader) consumeEvents(done <-chan struct{}) {
defer close(r.eventC)
defer r.watcher.Close()

for {
select {
case <-done:
ev := r.nextEvent(done)
if ev == nil {
r.log.Debug("fsnotify reader terminated")
return
}
r.eventC <- *ev
}
}

func (r *reader) nextEvent(done <-chan struct{}) *Event {
for {
select {
case <-done:
return nil

case event := <-r.watcher.EventChannel():
if event.Name == "" || r.config.IsExcludedPath(event.Name) ||
!r.config.IsIncludedPath(event.Name) {
Expand All @@ -102,7 +142,8 @@ func (r *reader) consumeEvents(done <-chan struct{}) {
r.config.MaxFileSizeBytes, r.config.HashTypes)
e.rtt = time.Since(start)

r.eventC <- e
return &e

case err := <-r.watcher.ErrorChannel():
// a bug in fsnotify can cause spurious nil errors to be sent
// on the error channel.
Expand Down
70 changes: 70 additions & 0 deletions auditbeat/module/file_integrity/eventreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package file_integrity

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -233,6 +235,74 @@ func TestEventReader(t *testing.T) {
})
}

func TestRaces(t *testing.T) {
const (
fileMode os.FileMode = 0640
N = 100
)

var dirs []string

for i := 0; i < N; i++ {
dir, err := ioutil.TempDir("", "audit")
if err != nil {
t.Fatal(err)
}
if dir, err = filepath.EvalSymlinks(dir); err != nil {
t.Fatal(err)
}
dirs = append(dirs, dir)
}

defer func() {
for _, dir := range dirs {
os.RemoveAll(dir)
}
}()

// Create a new EventProducer.
config := defaultConfig
config.Paths = dirs
config.Recursive = true
r, err := NewEventReader(config)
if err != nil {
t.Fatal(err)
}

done := make(chan struct{})
defer close(done)

// Generate a lot of events in parallel to Start() so there is a chance of
// events arriving before all watched dirs are Add()-ed
go func() {
for i := 0; i < 10; i++ {
for _, dir := range dirs {
fname := filepath.Join(dir, fmt.Sprintf("%d.dat", i))
ioutil.WriteFile(fname, []byte("hello"), fileMode)
}
}
}()
eventC, err := r.Start(done)
if err != nil {
t.Fatal(err)
}

const marker = "test_file"
for _, dir := range dirs {
fname := filepath.Join(dir, marker)
ioutil.WriteFile(fname, []byte("hello"), fileMode)
}

got := 0
for i := 0; got < N; i++ {
ev := readTimeout(t, eventC)
if strings.Contains(ev.Path, marker) {
got++
}
}
assert.Equal(t, N, got)
}

// readTimeout reads one event from the channel and returns it. If it does
// not receive an event after one second it will time-out and fail the test.
func readTimeout(t testing.TB, events <-chan Event) Event {
Expand Down

0 comments on commit 51267b4

Please sign in to comment.