Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #8445 to 6.x: [Elasticsearch Monitoring] Add cluster_metadata to cluster_stats docs #8990

Merged
merged 1 commit into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Add Kafka dashboard. {pull}8457[8457]
- Add untyped metric support to the prometheus module. {pull}8681[8681]
- Release Kafka module as GA. {pull}8854[8854]
- Collect custom cluster `display_name` in `elasticsearch/cluster_stats` metricset. {pull}8445[8445]

*Packetbeat*

Expand Down
24 changes: 24 additions & 0 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,22 @@ func apmIndicesExist(clusterState common.MapStr) (bool, error) {
return false, nil
}

func getClusterMetadataSettings(m *MetricSet) (common.MapStr, error) {
// For security reasons we only get the display_name setting
filterPaths := []string{"*.cluster.metadata.display_name"}
clusterSettings, err := elasticsearch.GetClusterSettingsWithDefaults(m.HTTP, m.HTTP.GetURI(), filterPaths)
if err != nil {
return nil, errors.Wrap(err, "failure to get cluster settings")
}

clusterSettings, err = elasticsearch.MergeClusterSettings(clusterSettings)
if err != nil {
return nil, errors.Wrap(err, "failure to merge cluster settings")
}

return clusterSettings, nil
}

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
Expand Down Expand Up @@ -219,6 +235,14 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, c
"stack_stats": stackStats,
}

clusterSettings, err := getClusterMetadataSettings(m)
if err != nil {
return err
}
if clusterSettings != nil {
event.RootFields.Put("cluster_settings", clusterSettings)
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
r.Event(event)

Expand Down
114 changes: 105 additions & 9 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/helper/elastic"
Expand Down Expand Up @@ -96,7 +98,7 @@ func IsMaster(http *helper.HTTP, uri string) (bool, error) {
}

func getNodeName(http *helper.HTTP, uri string) (string, error) {
content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
content, err := fetchPath(http, uri, "/_nodes/_local/nodes", "")
if err != nil {
return "", err
}
Expand All @@ -116,7 +118,7 @@ func getNodeName(http *helper.HTTP, uri string) (string, error) {

func getMasterName(http *helper.HTTP, uri string) (string, error) {
// TODO: evaluate on why when run with ?local=true request does not contain master_node field
content, err := fetchPath(http, uri, "_cluster/state/master_node")
content, err := fetchPath(http, uri, "_cluster/state/master_node", "")
if err != nil {
return "", err
}
Expand All @@ -133,7 +135,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {
// GetInfo returns the data for the Elasticsearch / endpoint.
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {

content, err := fetchPath(http, uri, "/")
content, err := fetchPath(http, uri, "/", "")
if err != nil {
return nil, err
}
Expand All @@ -144,13 +146,13 @@ func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
return info, nil
}

func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
func fetchPath(http *helper.HTTP, uri, path string, query string) ([]byte, error) {
defer http.SetURI(uri)

// Parses the uri to replace the path
u, _ := url.Parse(uri)
u.Path = path
u.RawQuery = ""
u.RawQuery = query

// Http helper includes the HostData with username and password
http.SetURI(u.String())
Expand All @@ -160,7 +162,7 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
// GetNodeInfo returns the node information.
func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) {

content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
content, err := fetchPath(http, uri, "/_nodes/_local/nodes", "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -191,7 +193,7 @@ func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {

// Not cached, fetch license from Elasticsearch
if license == nil {
content, err := fetchPath(http, resetURI, "_xpack/license")
content, err := fetchPath(http, resetURI, "_xpack/license", "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -225,7 +227,7 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm
clusterStateURI += "/" + strings.Join(metrics, ",")
}

content, err := fetchPath(http, resetURI, clusterStateURI)
content, err := fetchPath(http, resetURI, clusterStateURI, "")
if err != nil {
return nil, err
}
Expand All @@ -235,9 +237,39 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm
return clusterState, err
}

// GetClusterSettingsWithDefaults returns cluster settings.
func GetClusterSettingsWithDefaults(http *helper.HTTP, resetURI string, filterPaths []string) (common.MapStr, error) {
return GetClusterSettings(http, resetURI, true, filterPaths)
}

// GetClusterSettings returns cluster settings
func GetClusterSettings(http *helper.HTTP, resetURI string, includeDefaults bool, filterPaths []string) (common.MapStr, error) {
clusterSettingsURI := "_cluster/settings"
var queryParams []string
if includeDefaults {
queryParams = append(queryParams, "include_defaults=true")
}

if filterPaths != nil && len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, clusterSettingsURI, queryString)
if err != nil {
return nil, err
}

var clusterSettings map[string]interface{}
err = json.Unmarshal(content, &clusterSettings)
return clusterSettings, err
}

// GetStackUsage returns stack usage information.
func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_xpack/usage")
content, err := fetchPath(http, resetURI, "_xpack/usage", "")
if err != nil {
return nil, err
}
Expand All @@ -259,6 +291,47 @@ func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error
return nil
}

// MergeClusterSettings merges cluster settings in the correct precedence order
func MergeClusterSettings(clusterSettings common.MapStr) (common.MapStr, error) {
transientSettings, err := getSettingGroup(clusterSettings, "transient")
if err != nil {
return nil, err
}

persistentSettings, err := getSettingGroup(clusterSettings, "persistent")
if err != nil {
return nil, err
}

settings, err := getSettingGroup(clusterSettings, "default")
if err != nil {
return nil, err
}

// Transient settings override persistent settings which override default settings
if settings == nil {
settings = persistentSettings
}

if settings == nil {
settings = transientSettings
}

if settings == nil {
return nil, nil
}

if persistentSettings != nil {
settings.DeepUpdate(persistentSettings)
}

if transientSettings != nil {
settings.DeepUpdate(transientSettings)
}

return settings, nil
}

// IsCCRStatsAPIAvailable returns whether the CCR stats API is available in the given version
// of Elasticsearch.
func IsCCRStatsAPIAvailable(currentElasticsearchVersion string) (bool, error) {
Expand Down Expand Up @@ -295,3 +368,26 @@ func (c *_licenseCache) set(license common.MapStr, ttl time.Duration) {
c.ttl = ttl
c.cachedOn = time.Now()
}

func getSettingGroup(allSettings common.MapStr, groupKey string) (common.MapStr, error) {
hasSettingGroup, err := allSettings.HasKey(groupKey)
if err != nil {
return nil, errors.Wrap(err, "failure to determine if "+groupKey+" settings exist")
}

if !hasSettingGroup {
return nil, nil
}

settings, err := allSettings.GetValue(groupKey)
if err != nil {
return nil, errors.Wrap(err, "failure to extract "+groupKey+" settings")
}

v, ok := settings.(map[string]interface{})
if !ok {
return nil, errors.Wrap(err, groupKey+" settings are not a map")
}

return common.MapStr(v), nil
}