From 7a65cc0b135d2ff355cdc35b97aadc686d9f4608 Mon Sep 17 00:00:00 2001 From: Mario Date: Tue, 30 Jan 2024 14:40:35 +0100 Subject: [PATCH] Add configurable remote_write headers to metrics-generator (#3175) * Add configurable remote_write headers to metrics-generator * Add test * chlog * Fix test * Fix registerer panic * Add metric * Support variable expansion in overrides config * Another chlog entry * Minor changes * Reset Prometheus registry in tests --- CHANGELOG.md | 2 + cmd/tempo/main.go | 3 + modules/generator/generator.go | 2 +- modules/generator/overrides.go | 2 + modules/generator/overrides_test.go | 4 + modules/generator/storage/config_util.go | 8 +- modules/generator/storage/config_util_test.go | 6 +- modules/generator/storage/instance.go | 75 ++++++++++++- modules/generator/storage/instance_test.go | 100 +++++++++++++++++- modules/generator/storage/overrides.go | 5 + modules/overrides/config.go | 18 +++- modules/overrides/config_legacy.go | 3 + modules/overrides/config_test.go | 4 + modules/overrides/interface.go | 1 + modules/overrides/runtime_config_overrides.go | 25 ++++- .../runtime_config_overrides_test.go | 74 +++++++++++++ 16 files changed, 316 insertions(+), 16 deletions(-) create mode 100644 modules/generator/storage/overrides.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cd752082b2..edabaa89242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,8 @@ * [ENHANCEMENT] Added a `frontend-search` cache role for job search caching. [#3225](https://github.com/grafana/tempo/pull/3225) (@joe-elliott) * [ENHANCEMENT] Added a `parquet-page` cache role for page level caching. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott) * [ENHANCEMENT] Update opentelemetry-collector-contrib dependency to the latest version, v0.89.0 [#3148](https://github.com/grafana/tempo/pull/3148) (@gebn) +* [ENHANCEMENT] Add per-tenant configurable remote_write headers to metrics-generator [#3175](https://github.com/grafana/tempo/pull/3175) (@mapno) +* [ENHANCEMENT] Add variable expansion support to overrides configuration [#3175](https://github.com/grafana/tempo/pull/3175) (@mapno) * [ENHANCEMENT] Update memcached default image in jsonnet for multiple CVE [#3310](https://github.com/grafana/tempo/pull/3310) (@zalegrala) * [ENHANCEMENT] Add /status/overrides/{tenant} endpoint [#3244](https://github.com/grafana/tempo/pull/3244) (@kvrhdn) * [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno) diff --git a/cmd/tempo/main.go b/cmd/tempo/main.go index 3a74fa0fc1b..5747a03af08 100644 --- a/cmd/tempo/main.go +++ b/cmd/tempo/main.go @@ -196,6 +196,9 @@ func loadConfig() (*app.Config, bool, error) { } + // Pass --config.expand-env flag to overrides module + config.Overrides.ExpandEnv = configExpandEnv + // overlay with cli flagext.IgnoredFlag(flag.CommandLine, configFileOption, "Configuration file to load") flagext.IgnoredFlag(flag.CommandLine, configExpandEnvOption, "Whether to expand environment variables in config file") diff --git a/modules/generator/generator.go b/modules/generator/generator.go index 1e2bab3ef82..fdbacac3a20 100644 --- a/modules/generator/generator.go +++ b/modules/generator/generator.go @@ -248,7 +248,7 @@ func (g *Generator) createInstance(id string) (*instance, error) { // main registry only if successful. reg := prometheus.NewRegistry() - wal, err := storage.New(&g.cfg.Storage, id, reg, g.logger) + wal, err := storage.New(&g.cfg.Storage, g.overrides, id, reg, g.logger) if err != nil { return nil, err } diff --git a/modules/generator/overrides.go b/modules/generator/overrides.go index 1aea3c9f780..afdeb5bd4ee 100644 --- a/modules/generator/overrides.go +++ b/modules/generator/overrides.go @@ -4,6 +4,7 @@ import ( "time" "github.com/grafana/tempo/modules/generator/registry" + "github.com/grafana/tempo/modules/generator/storage" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/sharedconfig" filterconfig "github.com/grafana/tempo/pkg/spanfilter/config" @@ -12,6 +13,7 @@ import ( type metricsGeneratorOverrides interface { registry.Overrides + storage.Overrides MetricsGeneratorIngestionSlack(userID string) time.Duration MetricsGeneratorProcessors(userID string) map[string]struct{} diff --git a/modules/generator/overrides_test.go b/modules/generator/overrides_test.go index 6cdecc391b2..ed2d80f4d96 100644 --- a/modules/generator/overrides_test.go +++ b/modules/generator/overrides_test.go @@ -57,6 +57,10 @@ func (m *mockOverrides) MetricsGenerationTraceIDLabelName(userID string) string return "" } +func (m *mockOverrides) MetricsGeneratorRemoteWriteHeaders(string) map[string]string { + return nil +} + func (m *mockOverrides) MetricsGeneratorProcessorServiceGraphsHistogramBuckets(string) []float64 { return m.serviceGraphsHistogramBuckets } diff --git a/modules/generator/storage/config_util.go b/modules/generator/storage/config_util.go index e468d65b026..0aacbf1f6ef 100644 --- a/modules/generator/storage/config_util.go +++ b/modules/generator/storage/config_util.go @@ -13,7 +13,7 @@ import ( // generateTenantRemoteWriteConfigs creates a copy of the remote write configurations with the // X-Scope-OrgID header present for the given tenant, unless Tempo is run in single tenant mode or instructed not to add X-Scope-OrgID header. -func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWriteConfig, tenant string, addOrgIDHeader bool, logger log.Logger) []*prometheus_config.RemoteWriteConfig { +func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWriteConfig, tenant string, headers map[string]string, addOrgIDHeader bool, logger log.Logger) []*prometheus_config.RemoteWriteConfig { var cloneCfgs []*prometheus_config.RemoteWriteConfig for _, originalCfg := range originalCfgs { @@ -36,6 +36,12 @@ func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWri cloneCfg.Headers[user.OrgIDHeaderName] = tenant } + // Inject/overwrite custom headers + // Caution! This can overwrite the X-Scope-OrgID header + for k, v := range headers { + cloneCfg.Headers[k] = v + } + cloneCfgs = append(cloneCfgs, cloneCfg) } diff --git a/modules/generator/storage/config_util_test.go b/modules/generator/storage/config_util_test.go index 74b01bc9fb4..ac88751e3fb 100644 --- a/modules/generator/storage/config_util_test.go +++ b/modules/generator/storage/config_util_test.go @@ -32,7 +32,7 @@ func Test_generateTenantRemoteWriteConfigs(t *testing.T) { addOrgIDHeader := true - result := generateTenantRemoteWriteConfigs(original, "my-tenant", addOrgIDHeader, logger) + result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, addOrgIDHeader, logger) assert.Equal(t, original[0].URL, result[0].URL) assert.Equal(t, map[string]string{}, original[0].Headers, "Original headers have been modified") @@ -61,7 +61,7 @@ func Test_generateTenantRemoteWriteConfigs_singleTenant(t *testing.T) { addOrgIDHeader := true - result := generateTenantRemoteWriteConfigs(original, util.FakeTenantID, addOrgIDHeader, logger) + result := generateTenantRemoteWriteConfigs(original, util.FakeTenantID, nil, addOrgIDHeader, logger) assert.Equal(t, original[0].URL, result[0].URL) @@ -95,7 +95,7 @@ func Test_generateTenantRemoteWriteConfigs_addOrgIDHeader(t *testing.T) { addOrgIDHeader := false - result := generateTenantRemoteWriteConfigs(original, "my-tenant", addOrgIDHeader, logger) + result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, addOrgIDHeader, logger) assert.Equal(t, original[0].URL, result[0].URL) assert.Empty(t, original[0].Headers, "X-Scope-OrgID header is not added") diff --git a/modules/generator/storage/instance.go b/modules/generator/storage/instance.go index 983bc2826d2..923a64c1d1a 100644 --- a/modules/generator/storage/instance.go +++ b/modules/generator/storage/instance.go @@ -6,10 +6,12 @@ import ( "fmt" "os" "path/filepath" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" prometheus_config "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/scrape" @@ -19,6 +21,12 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" ) +var metricStorageHeadersUpdateFailed = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "metrics_generator_storage_headers_update_failed_total", + Help: "The total number of times updating the remote write headers failed", +}, []string{"tenant"}) + type Storage interface { storage.Appendable @@ -27,16 +35,23 @@ type Storage interface { } type storageImpl struct { + cfg *Config walDir string + remote *remote.Storage storage storage.Storage + tenantID string + currentHeaders map[string]string + overrides Overrides + closeCh chan struct{} + logger log.Logger } var _ Storage = (*storageImpl)(nil) // New creates a metrics WAL that remote writes its data. -func New(cfg *Config, tenant string, reg prometheus.Registerer, logger log.Logger) (Storage, error) { +func New(cfg *Config, o Overrides, tenant string, reg prometheus.Registerer, logger log.Logger) (Storage, error) { logger = log.With(logger, "tenant", tenant) reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenant}, reg) @@ -58,8 +73,9 @@ func New(cfg *Config, tenant string, reg prometheus.Registerer, logger log.Logge } remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTimeCallback, walDir, cfg.RemoteWriteFlushDeadline, &noopScrapeManager{}) + headers := o.MetricsGeneratorRemoteWriteHeaders(tenant) remoteStorageConfig := &prometheus_config.Config{ - RemoteWriteConfigs: generateTenantRemoteWriteConfigs(cfg.RemoteWrite, tenant, cfg.RemoteWriteAddOrgIDHeader, logger), + RemoteWriteConfigs: generateTenantRemoteWriteConfigs(cfg.RemoteWrite, tenant, headers, cfg.RemoteWriteAddOrgIDHeader, logger), } err = remoteStorage.ApplyConfig(remoteStorageConfig) @@ -73,12 +89,23 @@ func New(cfg *Config, tenant string, reg prometheus.Registerer, logger log.Logge return nil, err } - return &storageImpl{ + s := &storageImpl{ + cfg: cfg, walDir: walDir, + remote: remoteStorage, storage: storage.NewFanout(logger, wal, remoteStorage), + tenantID: tenant, + currentHeaders: headers, + overrides: o, + closeCh: make(chan struct{}), + logger: logger, - }, nil + } + + go s.watchOverrides() + + return s, nil } func (s *storageImpl) Appender(ctx context.Context) storage.Appender { @@ -87,6 +114,7 @@ func (s *storageImpl) Appender(ctx context.Context) storage.Appender { func (s *storageImpl) Close() error { level.Info(s.logger).Log("msg", "closing WAL", "dir", s.walDir) + close(s.closeCh) return tsdb_errors.NewMulti( s.storage.Close(), @@ -98,6 +126,45 @@ func (s *storageImpl) Close() error { ).Err() } +func (s *storageImpl) watchOverrides() { + t := time.NewTicker(30 * time.Second) + defer t.Stop() + + for { + select { + case <-t.C: + newHeaders := s.overrides.MetricsGeneratorRemoteWriteHeaders(s.tenantID) + if !headersEqual(s.currentHeaders, newHeaders) { + level.Info(s.logger).Log("msg", "updating remote write headers") + s.currentHeaders = newHeaders + err := s.remote.ApplyConfig(&prometheus_config.Config{ + RemoteWriteConfigs: generateTenantRemoteWriteConfigs(s.cfg.RemoteWrite, s.tenantID, newHeaders, s.cfg.RemoteWriteAddOrgIDHeader, s.logger), + }) + if err != nil { + metricStorageHeadersUpdateFailed.WithLabelValues(s.tenantID).Inc() + level.Error(s.logger).Log("msg", "Failed to update remote write headers. Remote write will continue with old headers", "err", err) + } + } + case <-s.closeCh: + return + } + } +} + +func headersEqual(a, b map[string]string) bool { + if len(a) != len(b) { + return false + } + + for k, v := range a { + if b[k] != v { + return false + } + } + + return true +} + type noopScrapeManager struct{} func (noop *noopScrapeManager) Get() (*scrape.Manager, error) { diff --git a/modules/generator/storage/instance_test.go b/modules/generator/storage/instance_test.go index 01b48f8e746..f3c8bce762a 100644 --- a/modules/generator/storage/instance_test.go +++ b/modules/generator/storage/instance_test.go @@ -40,7 +40,7 @@ func TestInstance(t *testing.T) { cfg.Path = t.TempDir() cfg.RemoteWrite = mockServer.remoteWriteConfig() - instance, err := New(&cfg, "test-tenant", prometheus.DefaultRegisterer, logger) + instance, err := New(&cfg, &mockOverrides{}, "test-tenant", &noopRegisterer{}, logger) require.NoError(t, err) // Refuse requests - the WAL should buffer data until requests succeed @@ -115,7 +115,7 @@ func TestInstance_multiTenancy(t *testing.T) { var instances []Storage for i := 0; i < 3; i++ { - instance, err := New(&cfg, strconv.Itoa(i), prometheus.DefaultRegisterer, logger) + instance, err := New(&cfg, &mockOverrides{}, strconv.Itoa(i), &noopRegisterer{}, logger) assert.NoError(t, err) instances = append(instances, instance) } @@ -183,12 +183,84 @@ func TestInstance_cantWriteToWAL(t *testing.T) { cfg.Path = "/root" // We should be able to attempt to create the instance multiple times - _, err := New(&cfg, "test-tenant", prometheus.DefaultRegisterer, log.NewNopLogger()) + _, err := New(&cfg, &mockOverrides{}, "test-tenant", &noopRegisterer{}, log.NewNopLogger()) require.Error(t, err) - _, err = New(&cfg, "test-tenant", prometheus.DefaultRegisterer, log.NewNopLogger()) + _, err = New(&cfg, &mockOverrides{}, "test-tenant", &noopRegisterer{}, log.NewNopLogger()) require.Error(t, err) } +func TestInstance_remoteWriteHeaders(t *testing.T) { + var err error + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) + + mockServer := newMockPrometheusRemoteWriterServer(logger) + defer mockServer.close() + + var cfg Config + cfg.RegisterFlagsAndApplyDefaults("", nil) + cfg.Path = t.TempDir() + cfg.RemoteWrite = mockServer.remoteWriteConfig() + + headers := map[string]string{user.OrgIDHeaderName: "my-other-tenant"} + + instance, err := New(&cfg, &mockOverrides{headers}, "test-tenant", &noopRegisterer{}, logger) + require.NoError(t, err) + + // Refuse requests - the WAL should buffer data until requests succeed + mockServer.refuseRequests.Store(true) + + sendCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Append some data every second + go poll(sendCtx, time.Second, func() { + appender := instance.Appender(context.Background()) + + lbls := labels.FromMap(map[string]string{"__name__": "my-metrics"}) + ref, err := appender.Append(0, lbls, time.Now().UnixMilli(), 1.0) + assert.NoError(t, err) + + _, err = appender.AppendExemplar(ref, lbls, exemplar.Exemplar{ + Labels: labels.FromMap(map[string]string{"traceID": "123"}), + Value: 1.2, + }) + assert.NoError(t, err) + + if sendCtx.Err() != nil { + return + } + + assert.NoError(t, appender.Commit()) + }) + + // Wait until remote.Storage has tried at least once to send data + err = waitUntil(20*time.Second, func() bool { + mockServer.mtx.Lock() + defer mockServer.mtx.Unlock() + + return mockServer.refusedRequests > 0 + }) + require.NoError(t, err, "timed out while waiting for refused requests") + + // Allow requests + mockServer.refuseRequests.Store(false) + + // Shutdown the instance - even though previous requests failed, remote.Storage should flush pending data + err = instance.Close() + assert.NoError(t, err) + + // WAL should be empty again + entries, err := os.ReadDir(cfg.Path) + assert.NoError(t, err) + assert.Len(t, entries, 0) + + // Verify we received metrics + assert.Len(t, mockServer.timeSeries, 1) + assert.Contains(t, mockServer.timeSeries, "my-other-tenant") + // We should have received at least 2 time series: one for the sample and one for the examplar + assert.GreaterOrEqual(t, len(mockServer.timeSeries["my-other-tenant"]), 2) +} + type mockPrometheusRemoteWriteServer struct { mtx sync.Mutex @@ -285,3 +357,23 @@ func waitUntil(timeout time.Duration, f func() bool) error { time.Sleep(50 * time.Millisecond) } } + +var _ Overrides = (*mockOverrides)(nil) + +type mockOverrides struct { + headers map[string]string +} + +func (m *mockOverrides) MetricsGeneratorRemoteWriteHeaders(string) map[string]string { + return m.headers +} + +var _ prometheus.Registerer = (*noopRegisterer)(nil) + +type noopRegisterer struct{} + +func (n *noopRegisterer) Register(prometheus.Collector) error { return nil } + +func (n *noopRegisterer) MustRegister(...prometheus.Collector) {} + +func (n *noopRegisterer) Unregister(prometheus.Collector) bool { return true } diff --git a/modules/generator/storage/overrides.go b/modules/generator/storage/overrides.go new file mode 100644 index 00000000000..8d91ef05aa4 --- /dev/null +++ b/modules/generator/storage/overrides.go @@ -0,0 +1,5 @@ +package storage + +type Overrides interface { + MetricsGeneratorRemoteWriteHeaders(userID string) map[string]string +} diff --git a/modules/overrides/config.go b/modules/overrides/config.go index 8787536190d..6e58ff3473b 100644 --- a/modules/overrides/config.go +++ b/modules/overrides/config.go @@ -5,8 +5,8 @@ import ( "time" "github.com/grafana/tempo/pkg/util/listtomap" - "github.com/grafana/tempo/tempodb/backend" + "github.com/prometheus/common/config" "github.com/prometheus/client_golang/prometheus" @@ -107,6 +107,20 @@ type ProcessorOverrides struct { LocalBlocks LocalBlocksOverrides `yaml:"local_blocks,omitempty" json:"local_blocks,omitempty"` } +type RemoteWriteHeaders map[string]config.Secret + +func (h *RemoteWriteHeaders) toStringStringMap() map[string]string { + if h == nil { + return nil + } + + headers := make(map[string]string) + for k, v := range *h { + headers[k] = string(v) + } + return headers +} + type MetricsGeneratorOverrides struct { RingSize int `yaml:"ring_size,omitempty" json:"ring_size,omitempty"` Processors listtomap.ListToMap `yaml:"processors,omitempty" json:"processors,omitempty"` @@ -114,6 +128,7 @@ type MetricsGeneratorOverrides struct { CollectionInterval time.Duration `yaml:"collection_interval,omitempty" json:"collection_interval,omitempty"` DisableCollection bool `yaml:"disable_collection,omitempty" json:"disable_collection,omitempty"` TraceIDLabelName string `yaml:"trace_id_label_name,omitempty" json:"trace_id_label_name,omitempty"` + RemoteWriteHeaders RemoteWriteHeaders `yaml:"remote_write_headers,omitempty" json:"remote_write_headers,omitempty"` Forwarder ForwarderOverrides `yaml:"forwarder,omitempty" json:"forwarder,omitempty"` @@ -174,6 +189,7 @@ type Config struct { UserConfigurableOverridesConfig UserConfigurableOverridesConfig `yaml:"user_configurable_overrides" json:"user_configurable_overrides"` ConfigType ConfigType `yaml:"-" json:"-"` + ExpandEnv bool `yaml:"-" json:"-"` } func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { diff --git a/modules/overrides/config_legacy.go b/modules/overrides/config_legacy.go index 6e4685061f0..a65f9a81679 100644 --- a/modules/overrides/config_legacy.go +++ b/modules/overrides/config_legacy.go @@ -28,6 +28,7 @@ func (c *Overrides) toLegacy() LegacyOverrides { MetricsGeneratorCollectionInterval: c.MetricsGenerator.CollectionInterval, MetricsGeneratorDisableCollection: c.MetricsGenerator.DisableCollection, MetricsGeneratorTraceIDLabelName: c.MetricsGenerator.TraceIDLabelName, + MetricsGeneratorRemoteWriteHeaders: c.MetricsGenerator.RemoteWriteHeaders, MetricsGeneratorForwarderQueueSize: c.MetricsGenerator.Forwarder.QueueSize, MetricsGeneratorForwarderWorkers: c.MetricsGenerator.Forwarder.Workers, MetricsGeneratorProcessorServiceGraphsHistogramBuckets: c.MetricsGenerator.Processor.ServiceGraphs.HistogramBuckets, @@ -86,6 +87,7 @@ type LegacyOverrides struct { MetricsGeneratorTraceIDLabelName string `yaml:"metrics_generator_trace_id_label_name" json:"metrics_generator_trace_id_label_name"` MetricsGeneratorForwarderQueueSize int `yaml:"metrics_generator_forwarder_queue_size" json:"metrics_generator_forwarder_queue_size"` MetricsGeneratorForwarderWorkers int `yaml:"metrics_generator_forwarder_workers" json:"metrics_generator_forwarder_workers"` + MetricsGeneratorRemoteWriteHeaders RemoteWriteHeaders `yaml:"metrics_generator_remote_write_headers,omitempty" json:"metrics_generator_remote_write_headers,omitempty"` MetricsGeneratorProcessorServiceGraphsHistogramBuckets []float64 `yaml:"metrics_generator_processor_service_graphs_histogram_buckets" json:"metrics_generator_processor_service_graphs_histogram_buckets"` MetricsGeneratorProcessorServiceGraphsDimensions []string `yaml:"metrics_generator_processor_service_graphs_dimensions" json:"metrics_generator_processor_service_graphs_dimensions"` MetricsGeneratorProcessorServiceGraphsPeerAttributes []string `yaml:"metrics_generator_processor_service_graphs_peer_attributes" json:"metrics_generator_processor_service_graphs_peer_attributes"` @@ -150,6 +152,7 @@ func (l *LegacyOverrides) toNewLimits() Overrides { DisableCollection: l.MetricsGeneratorDisableCollection, TraceIDLabelName: l.MetricsGeneratorTraceIDLabelName, IngestionSlack: l.MetricsGeneratorIngestionSlack, + RemoteWriteHeaders: l.MetricsGeneratorRemoteWriteHeaders, Forwarder: ForwarderOverrides{ QueueSize: l.MetricsGeneratorForwarderQueueSize, Workers: l.MetricsGeneratorForwarderWorkers, diff --git a/modules/overrides/config_test.go b/modules/overrides/config_test.go index d39927cbb7f..6c33a1c947f 100644 --- a/modules/overrides/config_test.go +++ b/modules/overrides/config_test.go @@ -107,6 +107,8 @@ metrics_generator_collection_interval: 5s metrics_generator_disable_collection: false metrics_generator_forwarder_queue_size: 6 metrics_generator_forwarder_workers: 7 +metrics_generator_remote_write_headers: + tenant-id: foo metrics_generator_processor_service_graphs_histogram_buckets: [1,2] metrics_generator_processor_service_graphs_dimensions: ['foo'] metrics_generator_processor_service_graphs_peer_attributes: ['foo'] @@ -171,6 +173,8 @@ defaults: max_active_series: 4 collection_interval: 5s disable_collection: false + remote_write_headers: + tenant-id: foo forwarder: queue_size: 6 workers: 7 diff --git a/modules/overrides/interface.go b/modules/overrides/interface.go index 650697e02cb..8f9f7d3969e 100644 --- a/modules/overrides/interface.go +++ b/modules/overrides/interface.go @@ -43,6 +43,7 @@ type Interface interface { MetricsGeneratorCollectionInterval(userID string) time.Duration MetricsGeneratorDisableCollection(userID string) bool MetricsGenerationTraceIDLabelName(userID string) string + MetricsGeneratorRemoteWriteHeaders(userID string) map[string]string MetricsGeneratorForwarderQueueSize(userID string) int MetricsGeneratorForwarderWorkers(userID string) int MetricsGeneratorProcessorServiceGraphsHistogramBuckets(userID string) []float64 diff --git a/modules/overrides/runtime_config_overrides.go b/modules/overrides/runtime_config_overrides.go index 9708abbab28..4e957d5eb4d 100644 --- a/modules/overrides/runtime_config_overrides.go +++ b/modules/overrides/runtime_config_overrides.go @@ -1,12 +1,14 @@ package overrides import ( + "bytes" "context" "fmt" "io" "net/http" "time" + "github.com/drone/envsubst" "github.com/go-kit/log/level" "github.com/grafana/dskit/runtimeconfig" @@ -60,10 +62,24 @@ func (o *perTenantOverrides) forUser(userID string) *Overrides { } // loadPerTenantOverrides is of type runtimeconfig.Loader -func loadPerTenantOverrides(typ ConfigType) func(r io.Reader) (interface{}, error) { +func loadPerTenantOverrides(typ ConfigType, expandEnv bool) func(r io.Reader) (interface{}, error) { return func(r io.Reader) (interface{}, error) { overrides := &perTenantOverrides{} + if expandEnv { + rr := r.(*bytes.Reader) + b, err := io.ReadAll(rr) + if err != nil { + return nil, err + } + + s, err := envsubst.EvalEnv(string(b)) + if err != nil { + return nil, fmt.Errorf("failed to expand env vars: %w", err) + } + r = bytes.NewReader([]byte(s)) + } + decoder := yaml.NewDecoder(r) decoder.SetStrict(true) if err := decoder.Decode(&overrides); err != nil { @@ -107,7 +123,7 @@ func newRuntimeConfigOverrides(cfg Config, registerer prometheus.Registerer) (Se runtimeCfg := runtimeconfig.Config{ LoadPath: []string{cfg.PerTenantOverrideConfig}, ReloadPeriod: time.Duration(cfg.PerTenantOverridePeriod), - Loader: loadPerTenantOverrides(cfg.ConfigType), + Loader: loadPerTenantOverrides(cfg.ConfigType, cfg.ExpandEnv), } runtimeCfgMgr, err := runtimeconfig.New(runtimeCfg, "overrides", prometheus.WrapRegistererWithPrefix("tempo_", registerer), log.Logger) if err != nil { @@ -323,6 +339,11 @@ func (o *runtimeConfigOverridesManager) MetricsGeneratorIngestionSlack(userID st return o.getOverridesForUser(userID).MetricsGenerator.IngestionSlack } +// MetricsGeneratorRemoteWriteHeaders returns the custom remote write headers for this tenant. +func (o *runtimeConfigOverridesManager) MetricsGeneratorRemoteWriteHeaders(userID string) map[string]string { + return o.getOverridesForUser(userID).MetricsGenerator.RemoteWriteHeaders.toStringStringMap() +} + // MetricsGeneratorRingSize is the desired size of the metrics-generator ring for this tenant. // Using shuffle sharding, a tenant can use a smaller ring than the entire ring. func (o *runtimeConfigOverridesManager) MetricsGeneratorRingSize(userID string) int { diff --git a/modules/overrides/runtime_config_overrides_test.go b/modules/overrides/runtime_config_overrides_test.go index b8e1f280071..ebf3e6ae809 100644 --- a/modules/overrides/runtime_config_overrides_test.go +++ b/modules/overrides/runtime_config_overrides_test.go @@ -1,8 +1,11 @@ package overrides import ( + "bytes" "context" "fmt" + "net/http" + "net/http/httptest" "os" "path/filepath" "testing" @@ -10,6 +13,7 @@ import ( "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -492,6 +496,76 @@ overrides: } } +func TestRemoteWriteHeaders(t *testing.T) { + cfg := Config{ + Defaults: Overrides{ + MetricsGenerator: MetricsGeneratorOverrides{ + RemoteWriteHeaders: map[string]config.Secret{ + "Authorization": "Bearer secret-token", + }, + }, + }, + } + + overrides, err := newRuntimeConfigOverrides(cfg, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.TODO(), overrides)) + + buff := bytes.NewBuffer(nil) + req := httptest.NewRequest(http.MethodGet, "/", nil) + require.NoError(t, overrides.WriteStatusRuntimeConfig(buff, req)) + + fmt.Println(buff.String()) +} + +func TestExpandEnvOverrides(t *testing.T) { + const envVar = "TOKEN" + cfg := Config{ + Defaults: Overrides{ + MetricsGenerator: MetricsGeneratorOverrides{ + RemoteWriteHeaders: map[string]config.Secret{ + "Authorization": "Bearer token", + }, + }, + }, + ExpandEnv: true, + } + // Set the ORG_ID env var + require.NoError(t, os.Setenv(envVar, "super-secret-token")) + t.Cleanup(func() { + require.NoError(t, os.Unsetenv(envVar)) + }) + + perTenantOverrides := fmt.Sprintf(` +overrides: + user1: + metrics_generator: + remote_write_headers: + Authorization: Bearer ${%s} +`, envVar) + + overridesFile := filepath.Join(t.TempDir(), "Overrides.yaml") + + require.NoError(t, os.WriteFile(overridesFile, []byte(perTenantOverrides), os.ModePerm)) + + cfg.PerTenantOverrideConfig = overridesFile + cfg.PerTenantOverridePeriod = model.Duration(time.Hour) + + overrides, err := newRuntimeConfigOverrides(cfg, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.TODO(), overrides)) + + expectedRemoteWriteHeaders := map[string]map[string]string{ + "user1": {"Authorization": "Bearer super-secret-token"}, + "user2": {"Authorization": "Bearer token"}, + } + for user, expected := range expectedRemoteWriteHeaders { + assert.Equal(t, expected, overrides.MetricsGeneratorRemoteWriteHeaders(user)) + } + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), overrides)) +} + func createAndInitializeRuntimeOverridesManager(t *testing.T, defaultLimits Overrides, perTenantOverrides []byte) (Service, func()) { cfg := Config{ Defaults: defaultLimits,