diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 03c116d896d..f1c0f06b248 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -82,6 +82,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `setup.dashboards.index` setting not working. {pull}17749[17749] - Fix Elasticsearch license endpoint URL referenced in error message. {issue}17880[17880] {pull}18030[18030] - Fix panic when assigning a key to a `nil` value in an event. {pull}18143[18143] +- Gives monitoring reporter hosts, if configured, total precedence over corresponding output hosts. {issue}17937[17937] {pull}17991[17991] +- Arbitrary fields and metadata maps are now deep merged into event. {pull}17958[17958] +- Change `decode_json_fields` processor, to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] *Auditbeat* @@ -215,6 +218,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Set `agent.name` to the hostname by default. {issue}16377[16377] {pull}18000[18000] - Add support for basic ECS logging. {pull}17974[17974] - Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153] +- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958] *Auditbeat* @@ -294,6 +298,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added an input option `publisher_pipeline.disable_host` to disable `host.name` from being added to events by default. {pull}18159[18159] - Improve ECS categorization field mappings in system module. {issue}16031[16031] {pull}18065[18065] +- When using the `json.*` setting available on some inputs, decoded fields are now deep-merged into existing event. {pull}17958[17958] +- Change the `json.*` input settings implementation to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] *Heartbeat* @@ -373,6 +379,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add aggregation aligner as a config parameter for googlecloud stackdriver metricset. {issue}17141[[17141] {pull}17719[17719] - Move the perfmon metricset to GA. {issue}16608[16608] {pull}17879[17879] - Add static mapping for metricsets under aws module. {pull}17614[17614] {pull}17650[17650] +- Collect new `bulk` indexing metrics from Elasticsearch when `xpack.enabled:true` is set. {issue} {pull}17992[17992] *Packetbeat* diff --git a/libbeat/common/jsontransform/jsonhelper.go b/libbeat/common/jsontransform/jsonhelper.go index 1490bcff170..164e1e9e1f4 100644 --- a/libbeat/common/jsontransform/jsonhelper.go +++ b/libbeat/common/jsontransform/jsonhelper.go @@ -29,11 +29,12 @@ import ( // WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) { if !overwriteKeys { - for k, v := range keys { - if _, exists := event.Fields[k]; !exists && k != "@timestamp" && k != "@metadata" { - event.Fields[k] = v - } - } + // @timestamp and @metadata fields are root-level fields. We remove them so they + // don't become part of event.Fields. + removeKeys(keys, "@timestamp", "@metadata") + + // Then, perform deep update without overwriting + event.Fields.DeepUpdateNoOverwrite(keys) return } @@ -64,7 +65,7 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys } case map[string]interface{}: - event.Meta.Update(common.MapStr(m)) + event.Meta.DeepUpdate(common.MapStr(m)) default: event.SetErrorWithOption(createJSONError("failed to update @metadata"), addErrKey) @@ -83,13 +84,21 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys continue } event.Fields[k] = vstr - - default: - event.Fields[k] = v } } + + // We have accounted for @timestamp, @metadata, type above. So let's remove these keys and + // deep update the event with the rest of the keys. + removeKeys(keys, "@timestamp", "@metadata", "type") + event.Fields.DeepUpdate(keys) } func createJSONError(message string) common.MapStr { return common.MapStr{"message": message, "type": "json"} } + +func removeKeys(keys map[string]interface{}, names ...string) { + for _, name := range names { + delete(keys, name) + } +} diff --git a/libbeat/common/jsontransform/jsonhelper_test.go b/libbeat/common/jsontransform/jsonhelper_test.go new file mode 100644 index 00000000000..d7679579be1 --- /dev/null +++ b/libbeat/common/jsontransform/jsonhelper_test.go @@ -0,0 +1,136 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package jsontransform + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestWriteJSONKeys(t *testing.T) { + now := time.Now() + now = now.Round(time.Second) + + eventTimestamp := time.Date(2020, 01, 01, 01, 01, 00, 0, time.UTC) + eventMetadata := common.MapStr{ + "foo": "bar", + "baz": common.MapStr{ + "qux": 17, + }, + } + eventFields := common.MapStr{ + "top_a": 23, + "top_b": common.MapStr{ + "inner_c": "see", + "inner_d": "dee", + }, + } + + tests := map[string]struct { + keys map[string]interface{} + overwriteKeys bool + expectedMetadata common.MapStr + expectedTimestamp time.Time + expectedFields common.MapStr + }{ + "overwrite_true": { + overwriteKeys: true, + keys: map[string]interface{}{ + "@metadata": map[string]interface{}{ + "foo": "NEW_bar", + "baz": map[string]interface{}{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + "@timestamp": now.Format(time.RFC3339), + "top_b": map[string]interface{}{ + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + expectedMetadata: common.MapStr{ + "foo": "NEW_bar", + "baz": common.MapStr{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + expectedTimestamp: now, + expectedFields: common.MapStr{ + "top_a": 23, + "top_b": common.MapStr{ + "inner_c": "see", + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + }, + "overwrite_false": { + overwriteKeys: false, + keys: map[string]interface{}{ + "@metadata": map[string]interface{}{ + "foo": "NEW_bar", + "baz": map[string]interface{}{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + "@timestamp": now.Format(time.RFC3339), + "top_b": map[string]interface{}{ + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + expectedMetadata: eventMetadata.Clone(), + expectedTimestamp: eventTimestamp, + expectedFields: common.MapStr{ + "top_a": 23, + "top_b": common.MapStr{ + "inner_c": "see", + "inner_d": "dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + event := &beat.Event{ + Timestamp: eventTimestamp, + Meta: eventMetadata.Clone(), + Fields: eventFields.Clone(), + } + + WriteJSONKeys(event, test.keys, test.overwriteKeys, false) + require.Equal(t, test.expectedMetadata, event.Meta) + require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano()) + require.Equal(t, test.expectedFields, event.Fields) + }) + } +} diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index e6812515af9..0f79af4e874 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" + errw "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) @@ -59,6 +61,10 @@ type Reporter interface { type ReporterFactory func(beat.Info, Settings, *common.Config) (Reporter, error) +type hostsCfg struct { + Hosts []string `config:"hosts"` +} + var ( defaultConfig = config{} @@ -111,9 +117,7 @@ func getReporterConfig( // merge reporter config with output config if both are present if outCfg := outputs.Config(); outputs.Name() == name && outCfg != nil { // require monitoring to not configure any hosts if output is configured: - hosts := struct { - Hosts []string `config:"hosts"` - }{} + hosts := hostsCfg{} rc.Unpack(&hosts) if settings.Format == FormatXPackMonitoringBulk && len(hosts.Hosts) > 0 { @@ -127,6 +131,13 @@ func getReporterConfig( if err != nil { return "", nil, err } + + // Make sure hosts from reporter configuration get precedence over hosts + // from output configuration + if err := mergeHosts(merged, outCfg, rc); err != nil { + return "", nil, err + } + rc = merged } @@ -155,3 +166,44 @@ func collectSubObject(cfg *common.Config) *common.Config { } return out } + +func mergeHosts(merged, outCfg, reporterCfg *common.Config) error { + if merged == nil { + merged = common.NewConfig() + } + + outputHosts := hostsCfg{} + if outCfg != nil { + if err := outCfg.Unpack(&outputHosts); err != nil { + return errw.Wrap(err, "unable to parse hosts from output config") + } + } + + reporterHosts := hostsCfg{} + if reporterCfg != nil { + if err := reporterCfg.Unpack(&reporterHosts); err != nil { + return errw.Wrap(err, "unable to parse hosts from reporter config") + } + } + + if len(outputHosts.Hosts) == 0 && len(reporterHosts.Hosts) == 0 { + return nil + } + + // Give precedence to reporter hosts over output hosts + var newHostsCfg *common.Config + var err error + if len(reporterHosts.Hosts) > 0 { + newHostsCfg, err = common.NewConfigFrom(reporterHosts.Hosts) + } else { + newHostsCfg, err = common.NewConfigFrom(outputHosts.Hosts) + } + if err != nil { + return errw.Wrap(err, "unable to make config from new hosts") + } + + if err := merged.SetChild("hosts", -1, newHostsCfg); err != nil { + return errw.Wrap(err, "unable to set new hosts into merged config") + } + return nil +} diff --git a/libbeat/monitoring/report/report_test.go b/libbeat/monitoring/report/report_test.go new file mode 100644 index 00000000000..45b0dadc83f --- /dev/null +++ b/libbeat/monitoring/report/report_test.go @@ -0,0 +1,78 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package report + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestMergeHosts(t *testing.T) { + tests := map[string]struct { + outCfg *common.Config + reporterCfg *common.Config + expectedCfg *common.Config + }{ + "no_hosts": { + expectedCfg: newConfigWithHosts(), + }, + "only_reporter_hosts": { + reporterCfg: newConfigWithHosts("r1", "r2"), + expectedCfg: newConfigWithHosts("r1", "r2"), + }, + "only_output_hosts": { + outCfg: newConfigWithHosts("o1", "o2"), + expectedCfg: newConfigWithHosts("o1", "o2"), + }, + "equal_hosts": { + outCfg: newConfigWithHosts("o1", "o2"), + reporterCfg: newConfigWithHosts("r1", "r2"), + expectedCfg: newConfigWithHosts("r1", "r2"), + }, + "more_output_hosts": { + outCfg: newConfigWithHosts("o1", "o2"), + reporterCfg: newConfigWithHosts("r1"), + expectedCfg: newConfigWithHosts("r1"), + }, + "more_reporter_hosts": { + outCfg: newConfigWithHosts("o1"), + reporterCfg: newConfigWithHosts("r1", "r2"), + expectedCfg: newConfigWithHosts("r1", "r2"), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mergedCfg := common.MustNewConfigFrom(map[string]interface{}{}) + err := mergeHosts(mergedCfg, test.outCfg, test.reporterCfg) + require.NoError(t, err) + + require.Equal(t, test.expectedCfg, mergedCfg) + }) + } +} + +func newConfigWithHosts(hosts ...string) *common.Config { + if len(hosts) == 0 { + return common.MustNewConfigFrom(map[string][]string{}) + } + return common.MustNewConfigFrom(map[string][]string{"hosts": hosts}) +} diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index c2264f9d6a8..46825ee0084 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -28,6 +28,8 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common" + s "github.com/elastic/beats/v7/libbeat/common/schema" + c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/elastic" @@ -63,6 +65,9 @@ var CCRStatsAPIAvailableVersion = common.MustNewVersion("6.5.0") // EnrichStatsAPIAvailableVersion is the version of Elasticsearch since when the Enrich stats API is available. var EnrichStatsAPIAvailableVersion = common.MustNewVersion("7.5.0") +// BulkStatsAvailableVersion is the version since when bulk indexing stats are available +var BulkStatsAvailableVersion = common.MustNewVersion("7.8.0") + // Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id. var clusterIDCache = map[string]string{} @@ -107,6 +112,14 @@ type licenseWrapper struct { License License `json:"license"` } +var BulkStatsDict = c.Dict("bulk", s.Schema{ + "total_operations": c.Int("total_operations"), + "total_time_in_millis": c.Int("total_time_in_millis"), + "total_size_in_bytes": c.Int("total_size_in_bytes"), + "avg_time_in_millis": c.Int("avg_time_in_millis"), + "avg_size_in_bytes": c.Int("avg_size_in_bytes"), +}, c.DictOptional) + // GetClusterID fetches cluster id for given nodeID. func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) { // Check if cluster id already cached. If yes, return it. diff --git a/metricbeat/module/elasticsearch/index/data_xpack.go b/metricbeat/module/elasticsearch/index/data_xpack.go index 35e9119fdf7..6c73b4ee2e1 100644 --- a/metricbeat/module/elasticsearch/index/data_xpack.go +++ b/metricbeat/module/elasticsearch/index/data_xpack.go @@ -65,6 +65,7 @@ type indexStats struct { IndexTimeInMillis int `json:"index_time_in_millis"` ThrottleTimeInMillis int `json:"throttle_time_in_millis"` } `json:"indexing"` + Bulk bulkStats `json:"bulk"` Merges struct { TotalSizeInBytes int `json:"total_size_in_bytes"` } `json:"merges"` @@ -120,6 +121,14 @@ type shardStats struct { Relocating int `json:"relocating"` } +type bulkStats struct { + TotalOperations int `json:"total_operations"` + TotalTimeInMillis int `json:"total_time_in_millis"` + TotalSizeInBytes int `json:"total_size_in_bytes"` + AvgTimeInMillis int `json:"throttle_time_in_millis"` + AvgSizeInBytes int `json:"avg_size_in_bytes"` +} + func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { clusterStateMetrics := []string{"metadata", "routing_table"} clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics) diff --git a/metricbeat/module/elasticsearch/index/index.go b/metricbeat/module/elasticsearch/index/index.go index cd2dc3ffca0..3454b0d6554 100644 --- a/metricbeat/module/elasticsearch/index/index.go +++ b/metricbeat/module/elasticsearch/index/index.go @@ -18,8 +18,12 @@ package index import ( + "net/url" + "strings" + "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/elasticsearch" ) @@ -67,14 +71,22 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } - content, err := m.HTTP.FetchContent() + info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI) if err != nil { + return errors.Wrap(err, "failed to get info from Elasticsearch") + } + + if err := m.updateServicePath(*info.Version.Number); err != nil { + if m.XPack { + m.Logger().Error(err) + return nil + } return err } - info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI) + content, err := m.HTTP.FetchContent() if err != nil { - return errors.Wrap(err, "failed to get info from Elasticsearch") + return err } if m.XPack { @@ -92,3 +104,35 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } + +func (m *MetricSet) updateServicePath(esVersion common.Version) error { + p, err := getServicePath(esVersion) + if err != nil { + return err + } + + m.SetServiceURI(p) + return nil + +} + +func getServicePath(esVersion common.Version) (string, error) { + currPath := statsPath + if esVersion.LessThan(elasticsearch.BulkStatsAvailableVersion) { + // Can't request bulk stats so don't change service URI + return currPath, nil + } + + u, err := url.Parse(currPath) + if err != nil { + return "", err + } + + if strings.HasSuffix(u.Path, ",bulk") { + // Bulk stats already being requested so don't change service URI + return currPath, nil + } + + u.Path += ",bulk" + return u.String(), nil +} diff --git a/metricbeat/module/elasticsearch/index/index_test.go b/metricbeat/module/elasticsearch/index/index_test.go new file mode 100644 index 00000000000..8c4106e9944 --- /dev/null +++ b/metricbeat/module/elasticsearch/index/index_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package index + +import ( + "strings" + "testing" + "testing/quick" + + "github.com/elastic/beats/v7/libbeat/common" + + "github.com/stretchr/testify/require" +) + +func TestGetServiceURI(t *testing.T) { + tests := map[string]struct { + esVersion *common.Version + expectedPath string + }{ + "bulk_stats_unavailable": { + esVersion: common.MustNewVersion("7.7.0"), + expectedPath: statsPath, + }, + "bulk_stats_available": { + esVersion: common.MustNewVersion("7.8.0"), + expectedPath: strings.Replace(statsPath, statsMetrics, statsMetrics+",bulk", 1), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + newURI, err := getServicePath(*test.esVersion) + require.NoError(t, err) + require.Equal(t, test.expectedPath, newURI) + }) + } +} + +func TestGetServiceURIMultipleCalls(t *testing.T) { + err := quick.Check(func(r uint) bool { + numCalls := 2 + (r % 10) // between 2 and 11 + + var uri string + var err error + for i := uint(0); i < numCalls; i++ { + uri, err = getServicePath(*common.MustNewVersion("7.8.0")) + if err != nil { + return false + } + } + + return err == nil && uri == strings.Replace(statsPath, statsMetrics, statsMetrics+",bulk", 1) + }, nil) + require.NoError(t, err) +} diff --git a/metricbeat/module/elasticsearch/index_summary/data_xpack.go b/metricbeat/module/elasticsearch/index_summary/data_xpack.go index d1e00ea64b8..4e35744133d 100644 --- a/metricbeat/module/elasticsearch/index_summary/data_xpack.go +++ b/metricbeat/module/elasticsearch/index_summary/data_xpack.go @@ -51,6 +51,7 @@ var ( "is_throttled": c.Bool("is_throttled"), "throttle_time_in_millis": c.Int("throttle_time_in_millis"), }), + "bulk": elasticsearch.BulkStatsDict, "search": c.Dict("search", s.Schema{ "query_total": c.Int("query_total"), "query_time_in_millis": c.Int("query_time_in_millis"), diff --git a/metricbeat/module/elasticsearch/node_stats/data_xpack.go b/metricbeat/module/elasticsearch/node_stats/data_xpack.go index f7f612b11ee..53340103176 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_xpack.go +++ b/metricbeat/module/elasticsearch/node_stats/data_xpack.go @@ -49,6 +49,7 @@ var ( "index_time_in_millis": c.Int("index_time_in_millis"), "throttle_time_in_millis": c.Int("throttle_time_in_millis"), }), + "bulk": elasticsearch.BulkStatsDict, "search": c.Dict("search", s.Schema{ "query_total": c.Int("query_total"), "query_time_in_millis": c.Int("query_time_in_millis"), diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index d7877bfc81f..0fb1c64d273 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -30,6 +30,7 @@ - Use default output by default {pull}18091[18091] - Use /tmp for default monitoring endpoint location for libbeat {pull}18131[18131] - Fix panic and flaky tests for the Agent. {pull}18135[18135] +- Fix default configuration after enroll {pull}18232[18232] ==== New features diff --git a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl index 15582908fe7..0e7c950cdd5 100644 --- a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl @@ -23,6 +23,14 @@ datasources: - metricset: filesystem dataset: system.filesystem +settings.monitoring: + # enabled turns on monitoring of running processes + enabled: true + # enables log monitoring + logs: true + # enables metrics monitoring + metrics: true + # management: # # Mode of management, the Elastic Agent support two modes of operation: # # @@ -112,14 +120,6 @@ datasources: # # Default is false # exponential: false -# settings.monitoring: -# # enabled turns on monitoring of running processes -# enabled: false -# # enables log monitoring -# logs: false -# # enables metrics monitoring -# metrics: false - # Sets log level. The default log level is info. # Available log levels are: error, warning, info, debug #logging.level: trace diff --git a/x-pack/elastic-agent/_meta/elastic-agent.fleet.yml b/x-pack/elastic-agent/_meta/elastic-agent.fleet.yml index b8b3eb2ac99..30c0a68431b 100644 --- a/x-pack/elastic-agent/_meta/elastic-agent.fleet.yml +++ b/x-pack/elastic-agent/_meta/elastic-agent.fleet.yml @@ -1,27 +1,27 @@ -#================================ General ===================================== +# ================================ General ===================================== # Beats is configured under Fleet, you can define most settings # from the Kibana UI. You can update this file to configure the settings that # are not supported by Fleet. -# management: -# mode: "fleet" +management: + mode: "fleet" -# # Check in frequency configure the time between calls to fleet to retrieve the new configuration. -# # -# # Default is 30s -# #checkin_frequency: 30s + # Check in frequency configure the time between calls to fleet to retrieve the new configuration. + # + # Default is 30s + #checkin_frequency: 30s -# # Add variance between API calls to better distribute the calls. -# #jitter: 5s + # Add variance between API calls to better distribute the calls. + #jitter: 5s -# # The Elastic Agent does Exponential backoff when an error happen. -# # -# #backoff: -# # -# # Initial time to wait before retrying the call. -# # init: 1s -# # -# # Maximum time to wait before retrying the call. -# # max: 10s + # The Elastic Agent does Exponential backoff when an error happen. + # + #backoff: + # + # Initial time to wait before retrying the call. + # init: 1s + # + # Maximum time to wait before retrying the call. + # max: 10s # download: # # source of the artifacts, requires elastic like structure and naming of the binaries diff --git a/x-pack/elastic-agent/elastic-agent.yml b/x-pack/elastic-agent/elastic-agent.yml index ae06f02c816..d28cce65ab5 100644 --- a/x-pack/elastic-agent/elastic-agent.yml +++ b/x-pack/elastic-agent/elastic-agent.yml @@ -29,6 +29,14 @@ datasources: - metricset: filesystem dataset: system.filesystem +settings.monitoring: + # enabled turns on monitoring of running processes + enabled: true + # enables log monitoring + logs: true + # enables metrics monitoring + metrics: true + # management: # # Mode of management, the Elastic Agent support two modes of operation: # # @@ -118,14 +126,6 @@ datasources: # # Default is false # exponential: false -# settings.monitoring: -# # enabled turns on monitoring of running processes -# enabled: false -# # enables log monitoring -# logs: false -# # enables metrics monitoring -# metrics: false - # Sets log level. The default log level is info. # Available log levels are: error, warning, info, debug #logging.level: trace diff --git a/x-pack/elastic-agent/pkg/agent/application/configuration_embed.go b/x-pack/elastic-agent/pkg/agent/application/configuration_embed.go index 7ccc52a40a8..b6bc827cc82 100644 --- a/x-pack/elastic-agent/pkg/agent/application/configuration_embed.go +++ b/x-pack/elastic-agent/pkg/agent/application/configuration_embed.go @@ -15,7 +15,7 @@ var DefaultAgentFleetConfig []byte func init() { // Packed File // _meta/elastic-agent.fleet.yml - unpacked := packer.MustUnpack("eJycVlGPo7oVfu/P2PfeggmrS6U+xJnBmDthGnKDsV8qbGeBxE5QEyCm6n+vDAnJblfVVR9GGgX7nPN95zvf8b++/EPvr8Vf9qq4XGvx56Lcn66/fFP7/fUXo9WXv37BgxP8/fc/9offpeI6M5T05R+9M/9tzgf8HiuapwNGasAo6bhmDfMyw8imlMQ/shyXqb4plm8uOFKd3MIDJYuSoUwXxFcYuZ3wNqUAmSORapm3/g0bqIUOrjhKz2wLPwoSV1y/l9nu2OLwGUPm8EjzVNlzRb4umVYXtoUON/DAga8LIl2hd6VElcJRomSUNlzLwZ6n+cbmqex3rgMHR4krItiJU6rYCtb7LUQcZUquFvbclRK/YiC7MuI7n+X5gFfLkpPgyH4/l1izCyOZg8vzb9gsS7xalxtQKQouZWGxnmIl8kxxnfjfcbSFToFsferKtrBmeeoxkrUYJRVHN1tnh9E9tv0/ipWM4oblTN25aVkuSgqClml1knlcSaQ6Xi8edcz1pCjTNM8ucgUbYZbDOnp8Sw4Fyg4FUG0OmGF56DLiH/a/n8v1AIfVHSs2EDK0KaUOTUHCloJdSXXmSJApXkO4ez+WFIQXHq1LiX6dvqHMYJQ2wktNQWJXonsv7vg+HrFB08goVeJwLpN8/duqXpd4tTzgcMQYchQOEqkDfg9Pln+M0o7l6zLNq54DX/FT2lj9Uh0eChBoVkOvQJbHsMUoM0IHBqOqEhFU/81NUlOS/JMDph99xf2MOeZaOQUJL9hqh+xGbBKEjVzBmhHWWf0InTnipBquxQu+vp1xINUW+eYrfguHx2/zt3e32iN1lcR56MDq16NEORjFiunA6sSwPDV7olpmJs2MXK9mLCUn4eKzXt5sX21shgKP61tHyebrnCtKOpnHB7aFHavvcfLYKQirqJcOHys49r/IYyUMVDPv6NYUYFfaPkqSODKPleWWrWBLiTvi5oDesccNHzWSDU/8u5aZZSeBam1dwriLzzfa7rXqpzNw4F5mKMiy3fH4Fa/is4zSXgzn7gOERiKlKUkcYfy5pg+ddB8g7STwLxyER2GCmpHQEebX+okXVhKVU8/G2tJG6OwgUWD229EjHEpUWxD/ZDX1iGW9SyCrpfDE8rtOorQSWioZBkfLD/XSTpxsrfi498Y8LUOhQ3One3KfDvg5P05BXMW9zMGIdaKGR+7JlqOgYmjsa291SIlUd07m85/167zOPtfTPD1PHvFeiiir+dirq9pvoStAdrznASyPG3bX50tt1ucqcUobpsODfIlfEDp5ImINR7vSequIYiVA1srVT+Yxkg1HfSmj2J/qGXX3Y02Tp6PsQvPEKci6pcDOUlYLoBy2vWs5gicx5f05t+RWCS9tqPF75i3r1xkTXmp95ZtAoVOs4JHlyUFoVVuvmPp/6ygInYIELUaBfrljuZj5FybohQ5OQodXYRblNt+MHFByG7iZuH3mHTk2jITHUUcRNBxIQ4lTCg8qCpQuSDL58ml99wTY8FPiUHK75BO+n2ItiD9IZOccjzMtorijIBvEcH6Ze6tft+Ir2HMvdjDyXU5iJWqouYdtDZUE/ohP6ODA8mSYNY3chqtgvDd6x9uyXy9nvV5pXjU8z6543Iubkp+yK9XW00fMg0Chx3V2nDAnSniJEv1jN4aL3BvvfcVv697Gfs6l9dKsk/mmZKM3J5Y/Z5pDX0nzXa0tjuSZkcUTv7E7DLbcvhu87DD+BpjL0c1/xLIzTE9H23d7dp6lFy90rPdhFPRs3ptjXOsLQ0FSJSw29F4yHTY8ygzbTNjunH4b/ZoErnx72ZXTjvY/n3OvuA5rjmysVNmdaHs2ahJUyupy1I/19+2Yv+deMhQkvrDt4oe9DbXV4DwrKGupPUc2X3FkvXk362LcSVop4QYH7o1zOwi75wlrbO5nLwP9fFesW/yeKqZDl0eT5tfmuWN2456zez5wudUDoOV6C6+MhK3tiVxBryC3i91bc0xiz/nqsT8pYY59E46/IVYV5GbfaI+5GHmwuyMHSSftW6tevuyR1Pq/j6OkZyRpmMVn4NF6guWO1/AqzMs76n/hsv2ack73H++dOZdb7cPgEft77ki4KIjr8u3/m3vZfzcnYML1WS9dvpl1E+4j2HGdtRIpO99HlltN2n2bDeNM5OnZcslGTz2OsWU0vscnbzdQfR/j2csMKKdYjXtlvk+Jf8RvL9oZLCfLC36j/cdqeVsfXvj5ARPT4UWA3UOXC4GC1u4G+3b6rO+63Zz/9uXff/pPAAAA//+74FmX") + unpacked := packer.MustUnpack("eJycVlGPo7oVfu/P2PfeEhNWS6U+xMxgzE6Yhmxs7JcK21kgsRN0E0ig6n+vbDJJZnVVXfVhpBGxz/nOd77vHP/7y7/M9lz+bavL07mRfy2r7eH820+93Z5/G4z+8vcveFiE//zh/bm/NUw4DbQ08Qm//Mk7j7/veIARp7EnB9jKAe4ECExJ1UwasscJ6TgiA37lJ06J9xbBQPibioG4wyjX3OiOr+FZ+JmHk0yrJG+FUWPULCt+SHuxhl6JNtUa6IbRoMYxSd8ayO8xEnLhKPa4O6dHjHgr0KZS6FvFQNhxow+qSDVO8iNfw5EXuVfS4CAHez72bB5mf0dBryI4qgJehJ96nK4qdthXublqXqy6yMxqYeIDpzMtDquvUbOoBA33/MexwmaqDVfH7zha7PBrduQ0+x0j3WHEB17EM06D3Xb9zI3FWFtcraCbihniKUC0aOCO0evJ4hMDNAIRrSL3/8CLfCgpAfzGiTDExyjrheEt98nAirwVYN5ZbPhyrHC0rHJEDCvISUW2N4txmSwdRgZqzcC5FTo00pCroqRj/v4rflle5OWjjnjPIwhYkbaMBju+hg0vcp9T0uHXeLVef8Jqf/N4gSuO9KiStGUH4n1gZSA+iWQ5YQNtq5Jcy92xyorld/dtgBt7LqfXWvp5ywYIOSCdiuBeADLiVzKXKOw4DbySxieM0pqBc88NqxSoLW+1aKCWh7SXDTyyAl44feYiaxjNfheAm/fKcXOrUXdlkbeMXiqF9NniFQP0GdUeRqnmJhz4euJ+S3XHh6d6ovkUA9kYq6/4JR6fuf9B43lJZzOnYTqzve5xouqyWNl+G+FbXaZaJWlgNXnTqOP0bcJYCRrP35vF1fbNeQKFvjDXnlGrwWXl8idZr4rU9qfnzQ1fkXol5TXz8/Etgk6DZZFqOUAtUDwqpHcYXdsSbCrp54Oimed8guKOR7BjdNYKIysB2KRTlLbCxFZ/4z0v2nR8WPQK6M7iksNs/v7Cuq3Rl+kMHIXVJSBks99/xVF6VEl+keOxfwPxoJA2jGaeHII7pjeT9W8g7xUITgLEezmEzTRbvjWPemGtUOU8PmHLW2nITqFwuHnMY1R3jlP0iGU9LVG8K0F84MXyOx4WFU7yWhqlVRzuLT/Mz3t5sFjxfuu7PJ2dL6zw+gf3+Yir4w5HTreut8InHkbcam8vfNUJFNYcOb1crO4YVfrGyf38e3Pzo8URLStu9Imv4YUV+dFpBb1WMiGNcL066+0aziQg+1sewIu05Qft+vSETfMI1vKQt9zEO/UUv6SsEib0PmZkWSwrmaRaTj6L77p4tXMu8HCiWoEuldXnhMfp7ldMRprwjBE5sSLzSrrsGPhWKUAaCbTH1/Mbz/Agp7x/zO3d98GF+4tHr5HupJ/XAl1/ShR7ZQT3vMh20uiGF8tb/689A7FX0rDDKDRPdywXd/7lEF6kCQ/SxGc5zKt1sXIcMHodxTBx+8jrOB44jfeTN+EggBoY9SrpQ82ANiXNaoV0Lw52rrnetuKQeda/xVTfH9Za0mBUyM4P7Dwtk7RngIxyPN57JZx+Z7WIpn2EUTATNNWygUb42GKoFQhcfdKEO15k413TaGbnurvnZsfL4rJc3PV6ZkXdioKccQJ7eVhV4kDOzJDhVvMoUewLt7ttzZmWfqblZbpv51Hhu3tuV9jYD1/aXUZ6VawqOzcxyix/3uTDQKvhE9YOJ+rI6fxR//R+6IS/qqRPdu4b4DOBrsFHrI+9rFDdyifvPc1Cz84+jMILL9KpP839XTKWNNfuXYJeK27iViRk4KupthunP90eoOFMvRyr5QjHyHr9NqffH77XwsSNQDZWrhUig+2Z0ySotdWl04/dG2uX/yL8bCxpeuLr+S97GRqrwbtXEOmYPUdXX3FiZ/PmrgtpiCeN1nIW7oTvfDtKRHYl5a3N/ehlaG6YW8f1q31rxTORTJpfDvN7zI2Nedjb99JMWD0AVi3X8Mxp3NmeqAj6Jb3anavvMak9FzzeL5S73e++IV6X9GrfOB+++Hi7jAXIekUDN/seXsvt/A9wkl04zVpu6xvg3s4Ey51o4FkOT2+P/1WX7deUc7r/4/j0zeaa1ds4/Ij9mbvHvv4/cy8un3wCprrem8VMrO66ibcJ7IUhnULa+nvPC6tJu2/J6DxR5MfpHWVn6t7FVkmqWXGb7QPUn2M8ekmA9srI7ZX7fUaDPX550s5oOVmc8Au7vEWL63L3xM8vNXETnyTYfOjy0xvsvbnpdnX8x5f//OW/AQAA////ZUbx") raw, ok := unpacked["_meta/elastic-agent.fleet.yml"] if !ok { // ensure we have something loaded. diff --git a/x-pack/metricbeat/module/aws/aws.go b/x-pack/metricbeat/module/aws/aws.go index dcf047da87d..5476f0dae5f 100644 --- a/x-pack/metricbeat/module/aws/aws.go +++ b/x-pack/metricbeat/module/aws/aws.go @@ -103,6 +103,7 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { // collecting the first one. if output.AccountAliases != nil { metricSet.AccountName = output.AccountAliases[0] + base.Logger().Debug("AWS Credentials belong to account name: ", metricSet.AccountName) } } @@ -115,6 +116,7 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { base.Logger().Warn("failed to get caller identity, please check permission setting: ", err) } else { metricSet.AccountID = *outputIdentity.Account + base.Logger().Debug("AWS Credentials belong to account ID: ", metricSet.AccountID) } // Construct MetricSet with a full regions list diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go index 05c9b21cd3a..5674a45eccd 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go @@ -18,6 +18,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" @@ -51,6 +52,7 @@ func init() { // interface methods except for Fetch. type MetricSet struct { *aws.MetricSet + logger *logp.Logger CloudwatchConfigs []Config `config:"metrics" validate:"nonzero,required"` } @@ -113,6 +115,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ MetricSet: metricSet, + logger: logp.NewLogger(metricsetName), CloudwatchConfigs: config.CloudwatchMetrics, }, nil } @@ -132,10 +135,13 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { // Get listMetricDetailTotal and namespaceDetailTotal from configuration listMetricDetailTotal, namespaceDetailTotal := m.readCloudwatchConfig() + m.logger.Debugf("listMetricDetailTotal = %s", listMetricDetailTotal) + m.logger.Debugf("namespaceDetailTotal = %s", namespaceDetailTotal) // Create events based on listMetricDetailTotal from configuration if len(listMetricDetailTotal.metricsWithStats) != 0 { for _, regionName := range m.MetricSet.RegionsList { + m.logger.Debugf("Collecting metrics from AWS region %s", regionName) awsConfig := m.MetricSet.AwsConfig.Copy() awsConfig.Region = regionName @@ -150,6 +156,8 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { return errors.Wrap(err, "createEvents failed for region "+regionName) } + m.logger.Debugf("Collected metrics of metrics = %d", len(eventsWithIdentifier)) + err = reportEvents(eventsWithIdentifier, report) if err != nil { return errors.Wrap(err, "reportEvents failed") @@ -158,6 +166,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { } for _, regionName := range m.MetricSet.RegionsList { + m.logger.Debugf("Collecting metrics from AWS region %s", regionName) awsConfig := m.MetricSet.AwsConfig.Copy() awsConfig.Region = regionName @@ -169,9 +178,11 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { // Create events based on namespaceDetailTotal from configuration for namespace, namespaceDetails := range namespaceDetailTotal { + m.logger.Debugf("Collected metrics from namespace %s", namespace) + listMetricsOutput, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch) if err != nil { - m.Logger().Info(err.Error()) + m.logger.Info(err.Error()) continue } @@ -189,6 +200,8 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { return errors.Wrap(err, "createEvents failed for region "+regionName) } + m.logger.Debugf("Collected number of metrics = %d", len(eventsWithIdentifier)) + err = reportEvents(eventsWithIdentifier, report) if err != nil { return errors.Wrap(err, "reportEvents failed") @@ -434,12 +447,14 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes // Construct metricDataQueries metricDataQueries := createMetricDataQueries(listMetricWithStatsTotal, m.Period) + m.logger.Debugf("Number of MetricDataQueries = %d", len(metricDataQueries)) if len(metricDataQueries) == 0 { return events, nil } // Use metricDataQueries to make GetMetricData API calls metricDataResults, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime) + m.logger.Debugf("Number of metricDataResults = %d", len(metricDataResults)) if err != nil { return events, errors.Wrap(err, "GetMetricDataResults failed") } @@ -482,7 +497,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes resourceTagMap, err := aws.GetResourcesTags(svcResourceAPI, []string{resourceType}) if err != nil { // If GetResourcesTags failed, continue report event just without tags. - m.Logger().Info(errors.Wrap(err, "getResourcesTags failed, skipping region "+regionName)) + m.logger.Info(errors.Wrap(err, "getResourcesTags failed, skipping region "+regionName)) } if len(tagsFilter) != 0 && len(resourceTagMap) == 0 { diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go index 92016c1cbc9..0b8cc468c06 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -1232,6 +1233,7 @@ func TestCreateEventsWithIdentifier(t *testing.T) { m := MetricSet{} m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}} m.MetricSet = &aws.MetricSet{Period: 5} + m.logger = logp.NewLogger("test") mockTaggingSvc := &MockResourceGroupsTaggingClient{} mockCloudwatchSvc := &MockCloudWatchClient{} @@ -1272,6 +1274,7 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) { m := MetricSet{} m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}} m.MetricSet = &aws.MetricSet{Period: 5, AccountID: accountID} + m.logger = logp.NewLogger("test") mockTaggingSvc := &MockResourceGroupsTaggingClient{} mockCloudwatchSvc := &MockCloudWatchClientWithoutDim{}