Skip to content

Commit

Permalink
Fix: protect the registry critical zone when stop/close are called. (e…
Browse files Browse the repository at this point in the history
…lastic#6959)

Changes how the lock is applied to the registry we need to protect the
hash until the go routine is starte.

Before we were calling `h.add()` inside the goroutine, meaning the
following scenario could happend:

- Start acquire the look.
- Stop request the lock
- Goroutine call add, request the lock
- Start release the lock
- Stop acquire the lock, close all harveter.
- Goroutine acquire the lock start harvester.

Moving out the `h.add()` of the goroutine make sure when we share the
same lock as stop and start.

Ref: elastic#6879
  • Loading branch information
ph authored and ruflin committed May 2, 2018
1 parent 3d3636d commit ec72bfc
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Fix memory leak in log prospector when files cannot be read. {issue}6797[6797]
- Add raw JSON to message field when JSON parsing fails. {issue}6516[6516]
- Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877]
- Fix a data race between stopping and starting of the harverters. {issue}#6879[6879]

*Heartbeat*
- Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616]
Expand Down
12 changes: 5 additions & 7 deletions filebeat/harvester/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ func (r *Registry) remove(h Harvester) {
delete(r.harvesters, h.ID())
}

func (r *Registry) add(h Harvester) {
r.Lock()
defer r.Unlock()
r.harvesters[h.ID()] = h
}

// Stop stops all harvesters in the registry
func (r *Registry) Stop() {
r.Lock()
Expand Down Expand Up @@ -71,12 +65,16 @@ func (r *Registry) Start(h Harvester) error {
}

r.wg.Add(1)

// Add the harvester to the registry and share the lock with stop making sure Start() and Stop()
// have a consistent view of the harvesters.
r.harvesters[h.ID()] = h

go func() {
defer func() {
r.remove(h)
r.wg.Done()
}()
r.add(h)
// Starts harvester and picks the right type. In case type is not set, set it to default (log)
err := h.Run()
if err != nil {
Expand Down

0 comments on commit ec72bfc

Please sign in to comment.