Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elasticsearch Monitoring] Add cluster_metadata to cluster_stats docs #8445

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Add Kafka dashboard. {pull}8457[8457]
- Add untyped metric support to the prometheus module. {pull}8681[8681]
- Release Kafka module as GA. {pull}8854[8854]
- Collect custom cluster `display_name` in `elasticsearch/cluster_stats` metricset. {pull}8445[8445]

*Packetbeat*

Expand Down
24 changes: 24 additions & 0 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,22 @@ func apmIndicesExist(clusterState common.MapStr) (bool, error) {
return false, nil
}

func getClusterMetadataSettings(m *MetricSet) (common.MapStr, error) {
// For security reasons we only get the display_name setting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can set whatever they want in the cluster metadata. They might set sensitive information, and we would end up indexing it into monitoring indices. So, to be on the safer side, we decided to just collect the specific key, display_name, from cluster metadata for now.

See complete discussion about this starting here: elastic/elasticsearch#34023 (comment).

Note that even though we only collect the specific key for now, we are indexing the complete nesting structure for the key. That way, if we later decide to allow more keys or all keys, there will be no break in the expected structure for the UI.

filterPaths := []string{"*.cluster.metadata.display_name"}
clusterSettings, err := elasticsearch.GetClusterSettingsWithDefaults(m.HTTP, m.HTTP.GetURI(), filterPaths)
if err != nil {
return nil, errors.Wrap(err, "failure to get cluster settings")
}

clusterSettings, err = elasticsearch.MergeClusterSettings(clusterSettings)
if err != nil {
return nil, errors.Wrap(err, "failure to merge cluster settings")
}

return clusterSettings, nil
}

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
Expand Down Expand Up @@ -219,6 +235,14 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, c
"stack_stats": stackStats,
}

clusterSettings, err := getClusterMetadataSettings(m)
if err != nil {
return err
}
if clusterSettings != nil {
event.RootFields.Put("cluster_settings", clusterSettings)
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
r.Event(event)

Expand Down
114 changes: 105 additions & 9 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/helper/elastic"
Expand Down Expand Up @@ -96,7 +98,7 @@ func IsMaster(http *helper.HTTP, uri string) (bool, error) {
}

func getNodeName(http *helper.HTTP, uri string) (string, error) {
content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
content, err := fetchPath(http, uri, "/_nodes/_local/nodes", "")
if err != nil {
return "", err
}
Expand All @@ -116,7 +118,7 @@ func getNodeName(http *helper.HTTP, uri string) (string, error) {

func getMasterName(http *helper.HTTP, uri string) (string, error) {
// TODO: evaluate on why when run with ?local=true request does not contain master_node field
content, err := fetchPath(http, uri, "_cluster/state/master_node")
content, err := fetchPath(http, uri, "_cluster/state/master_node", "")
if err != nil {
return "", err
}
Expand All @@ -133,7 +135,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {
// GetInfo returns the data for the Elasticsearch / endpoint.
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {

content, err := fetchPath(http, uri, "/")
content, err := fetchPath(http, uri, "/", "")
if err != nil {
return nil, err
}
Expand All @@ -144,13 +146,13 @@ func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
return info, nil
}

func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
func fetchPath(http *helper.HTTP, uri, path string, query string) ([]byte, error) {
defer http.SetURI(uri)

// Parses the uri to replace the path
u, _ := url.Parse(uri)
u.Path = path
u.RawQuery = ""
u.RawQuery = query

// Http helper includes the HostData with username and password
http.SetURI(u.String())
Expand All @@ -160,7 +162,7 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
// GetNodeInfo returns the node information.
func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) {

content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
content, err := fetchPath(http, uri, "/_nodes/_local/nodes", "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -191,7 +193,7 @@ func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {

// Not cached, fetch license from Elasticsearch
if license == nil {
content, err := fetchPath(http, resetURI, "_xpack/license")
content, err := fetchPath(http, resetURI, "_xpack/license", "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -225,7 +227,7 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm
clusterStateURI += "/" + strings.Join(metrics, ",")
}

content, err := fetchPath(http, resetURI, clusterStateURI)
content, err := fetchPath(http, resetURI, clusterStateURI, "")
if err != nil {
return nil, err
}
Expand All @@ -235,9 +237,39 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm
return clusterState, err
}

// GetClusterSettingsWithDefaults returns cluster settings.
func GetClusterSettingsWithDefaults(http *helper.HTTP, resetURI string, filterPaths []string) (common.MapStr, error) {
return GetClusterSettings(http, resetURI, true, filterPaths)
}

// GetClusterSettings returns cluster settings
func GetClusterSettings(http *helper.HTTP, resetURI string, includeDefaults bool, filterPaths []string) (common.MapStr, error) {
clusterSettingsURI := "_cluster/settings"
var queryParams []string
if includeDefaults {
queryParams = append(queryParams, "include_defaults=true")
}

if filterPaths != nil && len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, clusterSettingsURI, queryString)
if err != nil {
return nil, err
}

var clusterSettings map[string]interface{}
err = json.Unmarshal(content, &clusterSettings)
return clusterSettings, err
}

// GetStackUsage returns stack usage information.
func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_xpack/usage")
content, err := fetchPath(http, resetURI, "_xpack/usage", "")
if err != nil {
return nil, err
}
Expand All @@ -259,6 +291,47 @@ func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error
return nil
}

// MergeClusterSettings merges cluster settings in the correct precedence order
func MergeClusterSettings(clusterSettings common.MapStr) (common.MapStr, error) {
transientSettings, err := getSettingGroup(clusterSettings, "transient")
if err != nil {
return nil, err
}

persistentSettings, err := getSettingGroup(clusterSettings, "persistent")
if err != nil {
return nil, err
}

settings, err := getSettingGroup(clusterSettings, "default")
if err != nil {
return nil, err
}

// Transient settings override persistent settings which override default settings
if settings == nil {
settings = persistentSettings
}

if settings == nil {
settings = transientSettings
}

if settings == nil {
return nil, nil
}

if persistentSettings != nil {
settings.DeepUpdate(persistentSettings)
}

if transientSettings != nil {
settings.DeepUpdate(transientSettings)
}

return settings, nil
}

// IsCCRStatsAPIAvailable returns whether the CCR stats API is available in the given version
// of Elasticsearch.
func IsCCRStatsAPIAvailable(currentElasticsearchVersion string) (bool, error) {
Expand Down Expand Up @@ -295,3 +368,26 @@ func (c *_licenseCache) set(license common.MapStr, ttl time.Duration) {
c.ttl = ttl
c.cachedOn = time.Now()
}

func getSettingGroup(allSettings common.MapStr, groupKey string) (common.MapStr, error) {
hasSettingGroup, err := allSettings.HasKey(groupKey)
if err != nil {
return nil, errors.Wrap(err, "failure to determine if "+groupKey+" settings exist")
}

if !hasSettingGroup {
return nil, nil
}

settings, err := allSettings.GetValue(groupKey)
if err != nil {
return nil, errors.Wrap(err, "failure to extract "+groupKey+" settings")
}

v, ok := settings.(map[string]interface{})
if !ok {
return nil, errors.Wrap(err, groupKey+" settings are not a map")
}

return common.MapStr(v), nil
}