Skip to content

Commit

Permalink
Fix race on shared maps in global fields
Browse files Browse the repository at this point in the history
On publish fields are added to an event in this order:
- local/global configured fields
- dynamic fields
- "beat" metadata

When merging the fields, shared structures must not be overwritten or
updated concurrently. This is enforced by cloning the original fields
structure before applying updates.

This adds missing Clone operations if configured fields add new
fields to the `beat` namespace or if dynamic fields are enabled.
  • Loading branch information
urso committed May 3, 2018
1 parent a503644 commit 2bf300c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Remove double slashes in Windows service script. {pull}6491[6491]
- Ensure Kubernetes labels/annotations don't break mapping {pull}6490[6490]
- Ensure that the dashboard zip files can't contain files outside of the kibana directory. {pull}6921[6921]
- Fix map overwrite panics by cloning shared structs before doing the update. {pull}6947[6947]

*Auditbeat*

Expand Down
38 changes: 29 additions & 9 deletions libbeat/publisher/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func newProcessorPipeline(
localProcessors = makeClientProcessors(config)
)

needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil
// needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil
needsCopy := true

if !config.SkipNormalization {
// setup 1: generalize/normalize output (P)
Expand Down Expand Up @@ -81,11 +82,19 @@ func newProcessorPipeline(
}

if len(fields) > 0 {
processors.add(makeAddFieldsProcessor("fields", fields, needsCopy))
// Enforce a copy of fields if dynamic fields are configured or beats
// metadata will be merged into the fields.
// With dynamic fields potentially changing at any time, we need to copy,
// so we do not change shared structures be accident.
fieldsNeedsCopy := needsCopy || config.DynamicFields != nil || fields["beat"] != nil
processors.add(makeAddFieldsProcessor("fields", fields, fieldsNeedsCopy))
}

if config.DynamicFields != nil {
processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, needsCopy))
checkCopy := func(m common.MapStr) bool {
return needsCopy || hasKey(m, "beat")
}
processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, checkCopy))
}

// setup 5: client processor list
Expand Down Expand Up @@ -250,13 +259,19 @@ func makeAddFieldsProcessor(name string, fields common.MapStr, copy bool) *proce
return newAnnotateProcessor(name, fn)
}

func makeAddDynMetaProcessor(name string, meta *common.MapStrPointer, copy bool) *processorFn {
fn := func(event *beat.Event) { event.Fields.DeepUpdate(meta.Get()) }
if copy {
fn = func(event *beat.Event) { event.Fields.DeepUpdate(meta.Get().Clone()) }
}
func makeAddDynMetaProcessor(
name string,
meta *common.MapStrPointer,
checkCopy func(m common.MapStr) bool,
) *processorFn {
return newAnnotateProcessor(name, func(event *beat.Event) {
dynFields := meta.Get()
if checkCopy(dynFields) {
dynFields = dynFields.Clone()
}

return newAnnotateProcessor(name, fn)
event.Fields.DeepUpdate(dynFields)
})
}

func debugPrintProcessor(info beat.Info) *processorFn {
Expand Down Expand Up @@ -290,3 +305,8 @@ func makeClientProcessors(config beat.ClientConfig) processors.Processor {
list: procs.All(),
}
}

func hasKey(m common.MapStr, key string) bool {
_, exists := m[key]
return exists
}

0 comments on commit 2bf300c

Please sign in to comment.