Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/master' into ecszap-mvp
Browse files Browse the repository at this point in the history
  • Loading branch information
simitt committed May 5, 2020
2 parents ac3706b + d7fbc77 commit 0569c82
Show file tree
Hide file tree
Showing 19 changed files with 493 additions and 52 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `setup.dashboards.index` setting not working. {pull}17749[17749]
- Fix Elasticsearch license endpoint URL referenced in error message. {issue}17880[17880] {pull}18030[18030]
- Fix panic when assigning a key to a `nil` value in an event. {pull}18143[18143]
- Gives monitoring reporter hosts, if configured, total precedence over corresponding output hosts. {issue}17937[17937] {pull}17991[17991]
- Arbitrary fields and metadata maps are now deep merged into event. {pull}17958[17958]
- Change `decode_json_fields` processor, to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958]

*Auditbeat*

Expand Down Expand Up @@ -215,6 +218,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Set `agent.name` to the hostname by default. {issue}16377[16377] {pull}18000[18000]
- Add support for basic ECS logging. {pull}17974[17974]
- Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153]
- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958]

*Auditbeat*

Expand Down Expand Up @@ -294,6 +298,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added an input option `publisher_pipeline.disable_host` to disable `host.name`
from being added to events by default. {pull}18159[18159]
- Improve ECS categorization field mappings in system module. {issue}16031[16031] {pull}18065[18065]
- When using the `json.*` setting available on some inputs, decoded fields are now deep-merged into existing event. {pull}17958[17958]
- Change the `json.*` input settings implementation to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958]

*Heartbeat*

Expand Down Expand Up @@ -373,6 +379,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add aggregation aligner as a config parameter for googlecloud stackdriver metricset. {issue}17141[[17141] {pull}17719[17719]
- Move the perfmon metricset to GA. {issue}16608[16608] {pull}17879[17879]
- Add static mapping for metricsets under aws module. {pull}17614[17614] {pull}17650[17650]
- Collect new `bulk` indexing metrics from Elasticsearch when `xpack.enabled:true` is set. {issue} {pull}17992[17992]

*Packetbeat*

Expand Down
27 changes: 18 additions & 9 deletions libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (
// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) {
if !overwriteKeys {
for k, v := range keys {
if _, exists := event.Fields[k]; !exists && k != "@timestamp" && k != "@metadata" {
event.Fields[k] = v
}
}
// @timestamp and @metadata fields are root-level fields. We remove them so they
// don't become part of event.Fields.
removeKeys(keys, "@timestamp", "@metadata")

// Then, perform deep update without overwriting
event.Fields.DeepUpdateNoOverwrite(keys)
return
}

Expand Down Expand Up @@ -64,7 +65,7 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys
}

case map[string]interface{}:
event.Meta.Update(common.MapStr(m))
event.Meta.DeepUpdate(common.MapStr(m))

default:
event.SetErrorWithOption(createJSONError("failed to update @metadata"), addErrKey)
Expand All @@ -83,13 +84,21 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys
continue
}
event.Fields[k] = vstr

default:
event.Fields[k] = v
}
}

// We have accounted for @timestamp, @metadata, type above. So let's remove these keys and
// deep update the event with the rest of the keys.
removeKeys(keys, "@timestamp", "@metadata", "type")
event.Fields.DeepUpdate(keys)
}

func createJSONError(message string) common.MapStr {
return common.MapStr{"message": message, "type": "json"}
}

func removeKeys(keys map[string]interface{}, names ...string) {
for _, name := range names {
delete(keys, name)
}
}
136 changes: 136 additions & 0 deletions libbeat/common/jsontransform/jsonhelper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestWriteJSONKeys(t *testing.T) {
now := time.Now()
now = now.Round(time.Second)

eventTimestamp := time.Date(2020, 01, 01, 01, 01, 00, 0, time.UTC)
eventMetadata := common.MapStr{
"foo": "bar",
"baz": common.MapStr{
"qux": 17,
},
}
eventFields := common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "dee",
},
}

tests := map[string]struct {
keys map[string]interface{}
overwriteKeys bool
expectedMetadata common.MapStr
expectedTimestamp time.Time
expectedFields common.MapStr
}{
"overwrite_true": {
overwriteKeys: true,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(time.RFC3339),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedMetadata: common.MapStr{
"foo": "NEW_bar",
"baz": common.MapStr{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
expectedTimestamp: now,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
"overwrite_false": {
overwriteKeys: false,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(time.RFC3339),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
event := &beat.Event{
Timestamp: eventTimestamp,
Meta: eventMetadata.Clone(),
Fields: eventFields.Clone(),
}

WriteJSONKeys(event, test.keys, test.overwriteKeys, false)
require.Equal(t, test.expectedMetadata, event.Meta)
require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano())
require.Equal(t, test.expectedFields, event.Fields)
})
}
}
58 changes: 55 additions & 3 deletions libbeat/monitoring/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"fmt"

errw "github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)
Expand Down Expand Up @@ -59,6 +61,10 @@ type Reporter interface {

type ReporterFactory func(beat.Info, Settings, *common.Config) (Reporter, error)

type hostsCfg struct {
Hosts []string `config:"hosts"`
}

var (
defaultConfig = config{}

Expand Down Expand Up @@ -111,9 +117,7 @@ func getReporterConfig(
// merge reporter config with output config if both are present
if outCfg := outputs.Config(); outputs.Name() == name && outCfg != nil {
// require monitoring to not configure any hosts if output is configured:
hosts := struct {
Hosts []string `config:"hosts"`
}{}
hosts := hostsCfg{}
rc.Unpack(&hosts)

if settings.Format == FormatXPackMonitoringBulk && len(hosts.Hosts) > 0 {
Expand All @@ -127,6 +131,13 @@ func getReporterConfig(
if err != nil {
return "", nil, err
}

// Make sure hosts from reporter configuration get precedence over hosts
// from output configuration
if err := mergeHosts(merged, outCfg, rc); err != nil {
return "", nil, err
}

rc = merged
}

Expand Down Expand Up @@ -155,3 +166,44 @@ func collectSubObject(cfg *common.Config) *common.Config {
}
return out
}

func mergeHosts(merged, outCfg, reporterCfg *common.Config) error {
if merged == nil {
merged = common.NewConfig()
}

outputHosts := hostsCfg{}
if outCfg != nil {
if err := outCfg.Unpack(&outputHosts); err != nil {
return errw.Wrap(err, "unable to parse hosts from output config")
}
}

reporterHosts := hostsCfg{}
if reporterCfg != nil {
if err := reporterCfg.Unpack(&reporterHosts); err != nil {
return errw.Wrap(err, "unable to parse hosts from reporter config")
}
}

if len(outputHosts.Hosts) == 0 && len(reporterHosts.Hosts) == 0 {
return nil
}

// Give precedence to reporter hosts over output hosts
var newHostsCfg *common.Config
var err error
if len(reporterHosts.Hosts) > 0 {
newHostsCfg, err = common.NewConfigFrom(reporterHosts.Hosts)
} else {
newHostsCfg, err = common.NewConfigFrom(outputHosts.Hosts)
}
if err != nil {
return errw.Wrap(err, "unable to make config from new hosts")
}

if err := merged.SetChild("hosts", -1, newHostsCfg); err != nil {
return errw.Wrap(err, "unable to set new hosts into merged config")
}
return nil
}
78 changes: 78 additions & 0 deletions libbeat/monitoring/report/report_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package report

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestMergeHosts(t *testing.T) {
tests := map[string]struct {
outCfg *common.Config
reporterCfg *common.Config
expectedCfg *common.Config
}{
"no_hosts": {
expectedCfg: newConfigWithHosts(),
},
"only_reporter_hosts": {
reporterCfg: newConfigWithHosts("r1", "r2"),
expectedCfg: newConfigWithHosts("r1", "r2"),
},
"only_output_hosts": {
outCfg: newConfigWithHosts("o1", "o2"),
expectedCfg: newConfigWithHosts("o1", "o2"),
},
"equal_hosts": {
outCfg: newConfigWithHosts("o1", "o2"),
reporterCfg: newConfigWithHosts("r1", "r2"),
expectedCfg: newConfigWithHosts("r1", "r2"),
},
"more_output_hosts": {
outCfg: newConfigWithHosts("o1", "o2"),
reporterCfg: newConfigWithHosts("r1"),
expectedCfg: newConfigWithHosts("r1"),
},
"more_reporter_hosts": {
outCfg: newConfigWithHosts("o1"),
reporterCfg: newConfigWithHosts("r1", "r2"),
expectedCfg: newConfigWithHosts("r1", "r2"),
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
mergedCfg := common.MustNewConfigFrom(map[string]interface{}{})
err := mergeHosts(mergedCfg, test.outCfg, test.reporterCfg)
require.NoError(t, err)

require.Equal(t, test.expectedCfg, mergedCfg)
})
}
}

func newConfigWithHosts(hosts ...string) *common.Config {
if len(hosts) == 0 {
return common.MustNewConfigFrom(map[string][]string{})
}
return common.MustNewConfigFrom(map[string][]string{"hosts": hosts})
}
Loading

0 comments on commit 0569c82

Please sign in to comment.