Skip to content

Commit

Permalink
Modify telemetry_config_providers
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Jul 25, 2023
1 parent 329a5cd commit 7b42fce
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 316 deletions.
185 changes: 82 additions & 103 deletions agent/hcp/telemetry_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@ package hcp

import (
"context"
"fmt"
"net/url"
"regexp"
"sync"
"time"

goMetrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/hashstructure/v2"

hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
)

const (
// defaultRefreshInterval is a default duration, after each
defaultRefreshInterval = 5 * time.Minute
)

var (
Expand All @@ -26,73 +22,88 @@ var (

// TelemetryConfigProviderOpts is used to initialize a telemetryConfigProvider.
type TelemetryConfigProviderOpts struct {
ctx context.Context
endpoint *url.URL
labels map[string]string
filters *regexp.Regexp
refreshInterval time.Duration
cloudCfg config.CloudConfig
hcpClient hcpclient.Client
Ctx context.Context
MetricsConfig *hcpclient.MetricsConfig
RefreshInterval time.Duration
HCPClient hcpclient.Client
}

// metricsConfig is a set of configurable settings for metrics collection, processing and export.
type metricsConfig struct {
// dynamicConfig is a set of configurable settings for metrics collection, processing and export.
type dynamicConfig struct {
endpoint *url.URL
labels map[string]string
filters *regexp.Regexp
// refreshInterval controls the interval at which configuration is fetched from HCP to refresh config.
refreshInterval time.Duration
}

// telemetryConfigProvider holds metrics configuration and settings for its continuous
// fetch of new config from HCP.
// telemetryConfigProvider holds metrics configuration and settings for continuous fetch of new config from HCP.
type telemetryConfigProvider struct {
// metricsConfig holds metrics configuration that can be dynamically updated
// telemetryConfig holds configuration that can be dynamically updated
// based on updates fetched from HCP.
metricsConfig *metricsConfig

// refreshInterval controls the interval at which new configuration is fetched from HCP.
refreshInterval time.Duration
cfg *dynamicConfig
// telemetryConfigHash is used to compare two telemetryConfig objects to see if they are the same.
cfgHash uint64

// a reader-writer mutex is used as the provider is read heavy, as the OTEL components
// access metricsConfig, while config is only updated (write) when there are changes.
rw sync.RWMutex

// access telemetryConfig, while config is only updated (write) when there are changes.
rw sync.RWMutex
logger hclog.Logger
cloudCfg config.CloudConfig
hcpClient hcpclient.Client
}

func NewTelemetryConfigProvider(opts *TelemetryConfigProviderOpts) *telemetryConfigProvider {
m := &metricsConfig{
endpoint: opts.endpoint,
labels: opts.labels,
filters: opts.filters,
func NewTelemetryConfigProvider(opts *TelemetryConfigProviderOpts) (*telemetryConfigProvider, error) {
if opts.Ctx == nil {
return nil, fmt.Errorf("missing ctx")
}

if opts.HCPClient == nil {
return nil, fmt.Errorf("missing HCP client")
}

if opts.MetricsConfig == nil {
return nil, fmt.Errorf("missing metrics config")
}

if opts.RefreshInterval <= 0 {
return nil, fmt.Errorf("invalid refresh interval")
}

cfg := &dynamicConfig{
endpoint: opts.MetricsConfig.Endpoint,
labels: opts.MetricsConfig.Labels,
filters: opts.MetricsConfig.Filters,
refreshInterval: opts.RefreshInterval,
}

hash, err := calculateHash(cfg)
if err != nil {
return nil, fmt.Errorf("failed to calculate hash: %w", err)
}

t := &telemetryConfigProvider{
metricsConfig: m,
logger: hclog.FromContext(opts.ctx).Named("telemetry_config_provider"),
refreshInterval: opts.refreshInterval,
cloudCfg: opts.cloudCfg,
hcpClient: opts.hcpClient,
cfg: cfg,
cfgHash: hash,
logger: hclog.FromContext(opts.Ctx).Named("telemetry_config_provider"),
hcpClient: opts.HCPClient,
}

go t.run(opts.ctx)
go t.run(opts.Ctx, opts.RefreshInterval)

return t
return t, nil
}

// run continously checks for updates to the telemetry configuration by
// making a request to HCP, and verifying for any updated fields.
func (t *telemetryConfigProvider) run(ctx context.Context) {
ticker := time.NewTicker(t.refreshInterval)
// run continously checks for updates to the telemetry configuration by making a request to HCP.
// Modification of config only occurs if changes are detected to decrease write locks that block read locks.
func (t *telemetryConfigProvider) run(ctx context.Context, refreshInterval time.Duration) {
ticker := time.NewTicker(refreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if m, hasChanged := t.checkUpdate(ctx); hasChanged {
// Only update metricsConfig changes are detected
// to decrease usage of write locks that block read locks.
t.modifyMetricsConfig(m)
if newCfg, hasChanged := t.checkUpdate(ctx); hasChanged {
t.modifyTelemetryConfig(newCfg)
ticker.Reset(newCfg.refreshInterval)
}
case <-ctx.Done():
return
Expand All @@ -102,102 +113,70 @@ func (t *telemetryConfigProvider) run(ctx context.Context) {

// checkUpdate makes a HTTP request to HCP to return a new metrics configuration and true, if config changed.
// checkUpdate does not update the metricsConfig field to prevent acquiring the write lock unnecessarily.
func (t *telemetryConfigProvider) checkUpdate(ctx context.Context) (*metricsConfig, bool) {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
func (t *telemetryConfigProvider) checkUpdate(ctx context.Context) (*dynamicConfig, bool) {
t.rw.RLock()
defer t.rw.RUnlock()

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

telemetryCfg, err := t.hcpClient.FetchTelemetryConfig(ctx)
if err != nil {
t.logger.Error("failed to fetch telemetry config from HCP")
t.logger.Error("failed to fetch telemetry config from HCP", "error", err)
goMetrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil, false
}

t.rw.RLock()
defer t.rw.RUnlock()

// TODO: Do we want a enabled config field?
endpoint, _ := telemetryCfg.Enabled()
endpointURL, err := url.Parse(endpoint)
if err != nil {
t.logger.Error("failed to update config: invalid endpoint URL")
goMetrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil, false
newDynamicConfig := &dynamicConfig{
filters: telemetryCfg.MetricsConfig.Filters,
endpoint: telemetryCfg.MetricsConfig.Endpoint,
labels: telemetryCfg.MetricsConfig.Labels,
refreshInterval: telemetryCfg.RefreshConfig.RefreshInterval,
}

filters, err := telemetryCfg.FilterRegex()
newHash, err := calculateHash(newDynamicConfig)
if err != nil {
t.logger.Error("failed to update config: invalid filters")
t.logger.Error("failed to calculate hash for new config", "error", err)
goMetrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil, false
}

labels := telemetryCfg.DefaultLabels(t.cloudCfg)

newMetricsConfig := &metricsConfig{
filters: filters,
endpoint: endpointURL,
labels: labels,
}

newEndpoint := endpoint != t.metricsConfig.endpoint.String()
newFilters := filters.String() != t.metricsConfig.filters.String()
newLabels := labelsChanged(t.metricsConfig.labels, labels)

hasChanged := newEndpoint || newFilters || newLabels

// TODO: Add refresh interval once added to the protos on the TGW side.
return newMetricsConfig, hasChanged
}

// labelsChanged returns true if newLabels is different from oldLabels.
func labelsChanged(newLabels map[string]string, oldLabels map[string]string) bool {
// if length is different, then labels have changed, so return true.
if len(newLabels) != len(oldLabels) {
return true
}

// If length is the same, we must verify k,v pairs have not changed.
// If a new key is not in the old labels, return true.
// If a value has changed, return true.
for newKey, newValue := range newLabels {
if oldValue, ok := oldLabels[newKey]; !ok || newValue != oldValue {
return true
}
}

// labels have not changed.
return false
return newDynamicConfig, newHash == t.cfgHash
}

// modifyMetricsConfig acquires a write lock to modify it with a given metricsConfig object.
func (t *telemetryConfigProvider) modifyMetricsConfig(m *metricsConfig) {
// modifynewTelemetryConfig acquires a write lock to modify it with a given newTelemetryConfig object.
func (t *telemetryConfigProvider) modifyTelemetryConfig(newCfg *dynamicConfig) {
t.rw.Lock()
defer t.rw.Unlock()

t.metricsConfig = m
t.cfg = newCfg
}

// GetEndpoint acquires a read lock to return endpoint configuration for consumers.
func (t *telemetryConfigProvider) GetEndpoint() *url.URL {
t.rw.RLock()
defer t.rw.RUnlock()

return t.metricsConfig.endpoint
return t.cfg.endpoint
}

// GetFilters acquires a read lock to return filters configuration for consumers.
func (t *telemetryConfigProvider) GetFilters() *regexp.Regexp {
t.rw.RLock()
defer t.rw.RUnlock()

return t.metricsConfig.filters
return t.cfg.filters
}

// GetLabels acquires a read lock to return labels configuration for consumers.
func (t *telemetryConfigProvider) GetLabels() map[string]string {
t.rw.RLock()
defer t.rw.RUnlock()

return t.metricsConfig.labels
return t.cfg.labels
}

// calculateHash returns a uint64 hash for data that can be used for comparisons.
func calculateHash(cfg *dynamicConfig) (uint64, error) {
return hashstructure.Hash(*cfg, hashstructure.FormatV2, nil)
}
Loading

0 comments on commit 7b42fce

Please sign in to comment.