Skip to content

Commit

Permalink
Add configurable remote_write headers to metrics-generator (#3175)
Browse files Browse the repository at this point in the history
* Add configurable remote_write headers to metrics-generator

* Add test

* chlog

* Fix test

* Fix registerer panic

* Add metric

* Support variable expansion in overrides config

* Another chlog entry

* Minor changes

* Reset Prometheus registry in tests
  • Loading branch information
mapno authored Jan 30, 2024
1 parent babdaa2 commit 7a65cc0
Show file tree
Hide file tree
Showing 16 changed files with 316 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
* [ENHANCEMENT] Added a `frontend-search` cache role for job search caching. [#3225](https://github.com/grafana/tempo/pull/3225) (@joe-elliott)
* [ENHANCEMENT] Added a `parquet-page` cache role for page level caching. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott)
* [ENHANCEMENT] Update opentelemetry-collector-contrib dependency to the latest version, v0.89.0 [#3148](https://github.com/grafana/tempo/pull/3148) (@gebn)
* [ENHANCEMENT] Add per-tenant configurable remote_write headers to metrics-generator [#3175](https://github.com/grafana/tempo/pull/3175) (@mapno)
* [ENHANCEMENT] Add variable expansion support to overrides configuration [#3175](https://github.com/grafana/tempo/pull/3175) (@mapno)
* [ENHANCEMENT] Update memcached default image in jsonnet for multiple CVE [#3310](https://github.com/grafana/tempo/pull/3310) (@zalegrala)
* [ENHANCEMENT] Add /status/overrides/{tenant} endpoint [#3244](https://github.com/grafana/tempo/pull/3244) (@kvrhdn)
* [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno)
Expand Down
3 changes: 3 additions & 0 deletions cmd/tempo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func loadConfig() (*app.Config, bool, error) {

}

// Pass --config.expand-env flag to overrides module
config.Overrides.ExpandEnv = configExpandEnv

// overlay with cli
flagext.IgnoredFlag(flag.CommandLine, configFileOption, "Configuration file to load")
flagext.IgnoredFlag(flag.CommandLine, configExpandEnvOption, "Whether to expand environment variables in config file")
Expand Down
2 changes: 1 addition & 1 deletion modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (g *Generator) createInstance(id string) (*instance, error) {
// main registry only if successful.
reg := prometheus.NewRegistry()

wal, err := storage.New(&g.cfg.Storage, id, reg, g.logger)
wal, err := storage.New(&g.cfg.Storage, g.overrides, id, reg, g.logger)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/modules/generator/storage"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/sharedconfig"
filterconfig "github.com/grafana/tempo/pkg/spanfilter/config"
Expand All @@ -12,6 +13,7 @@ import (

type metricsGeneratorOverrides interface {
registry.Overrides
storage.Overrides

MetricsGeneratorIngestionSlack(userID string) time.Duration
MetricsGeneratorProcessors(userID string) map[string]struct{}
Expand Down
4 changes: 4 additions & 0 deletions modules/generator/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (m *mockOverrides) MetricsGenerationTraceIDLabelName(userID string) string
return ""
}

func (m *mockOverrides) MetricsGeneratorRemoteWriteHeaders(string) map[string]string {
return nil
}

func (m *mockOverrides) MetricsGeneratorProcessorServiceGraphsHistogramBuckets(string) []float64 {
return m.serviceGraphsHistogramBuckets
}
Expand Down
8 changes: 7 additions & 1 deletion modules/generator/storage/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// generateTenantRemoteWriteConfigs creates a copy of the remote write configurations with the
// X-Scope-OrgID header present for the given tenant, unless Tempo is run in single tenant mode or instructed not to add X-Scope-OrgID header.
func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWriteConfig, tenant string, addOrgIDHeader bool, logger log.Logger) []*prometheus_config.RemoteWriteConfig {
func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWriteConfig, tenant string, headers map[string]string, addOrgIDHeader bool, logger log.Logger) []*prometheus_config.RemoteWriteConfig {
var cloneCfgs []*prometheus_config.RemoteWriteConfig

for _, originalCfg := range originalCfgs {
Expand All @@ -36,6 +36,12 @@ func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWri
cloneCfg.Headers[user.OrgIDHeaderName] = tenant
}

// Inject/overwrite custom headers
// Caution! This can overwrite the X-Scope-OrgID header
for k, v := range headers {
cloneCfg.Headers[k] = v
}

cloneCfgs = append(cloneCfgs, cloneCfg)
}

Expand Down
6 changes: 3 additions & 3 deletions modules/generator/storage/config_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Test_generateTenantRemoteWriteConfigs(t *testing.T) {

addOrgIDHeader := true

result := generateTenantRemoteWriteConfigs(original, "my-tenant", addOrgIDHeader, logger)
result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, addOrgIDHeader, logger)

assert.Equal(t, original[0].URL, result[0].URL)
assert.Equal(t, map[string]string{}, original[0].Headers, "Original headers have been modified")
Expand Down Expand Up @@ -61,7 +61,7 @@ func Test_generateTenantRemoteWriteConfigs_singleTenant(t *testing.T) {

addOrgIDHeader := true

result := generateTenantRemoteWriteConfigs(original, util.FakeTenantID, addOrgIDHeader, logger)
result := generateTenantRemoteWriteConfigs(original, util.FakeTenantID, nil, addOrgIDHeader, logger)

assert.Equal(t, original[0].URL, result[0].URL)

Expand Down Expand Up @@ -95,7 +95,7 @@ func Test_generateTenantRemoteWriteConfigs_addOrgIDHeader(t *testing.T) {

addOrgIDHeader := false

result := generateTenantRemoteWriteConfigs(original, "my-tenant", addOrgIDHeader, logger)
result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, addOrgIDHeader, logger)

assert.Equal(t, original[0].URL, result[0].URL)
assert.Empty(t, original[0].Headers, "X-Scope-OrgID header is not added")
Expand Down
75 changes: 71 additions & 4 deletions modules/generator/storage/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
prometheus_config "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/scrape"
Expand All @@ -19,6 +21,12 @@ import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)

var metricStorageHeadersUpdateFailed = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "metrics_generator_storage_headers_update_failed_total",
Help: "The total number of times updating the remote write headers failed",
}, []string{"tenant"})

type Storage interface {
storage.Appendable

Expand All @@ -27,16 +35,23 @@ type Storage interface {
}

type storageImpl struct {
cfg *Config
walDir string
remote *remote.Storage
storage storage.Storage

tenantID string
currentHeaders map[string]string
overrides Overrides
closeCh chan struct{}

logger log.Logger
}

var _ Storage = (*storageImpl)(nil)

// New creates a metrics WAL that remote writes its data.
func New(cfg *Config, tenant string, reg prometheus.Registerer, logger log.Logger) (Storage, error) {
func New(cfg *Config, o Overrides, tenant string, reg prometheus.Registerer, logger log.Logger) (Storage, error) {
logger = log.With(logger, "tenant", tenant)
reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenant}, reg)

Expand All @@ -58,8 +73,9 @@ func New(cfg *Config, tenant string, reg prometheus.Registerer, logger log.Logge
}
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTimeCallback, walDir, cfg.RemoteWriteFlushDeadline, &noopScrapeManager{})

headers := o.MetricsGeneratorRemoteWriteHeaders(tenant)
remoteStorageConfig := &prometheus_config.Config{
RemoteWriteConfigs: generateTenantRemoteWriteConfigs(cfg.RemoteWrite, tenant, cfg.RemoteWriteAddOrgIDHeader, logger),
RemoteWriteConfigs: generateTenantRemoteWriteConfigs(cfg.RemoteWrite, tenant, headers, cfg.RemoteWriteAddOrgIDHeader, logger),
}

err = remoteStorage.ApplyConfig(remoteStorageConfig)
Expand All @@ -73,12 +89,23 @@ func New(cfg *Config, tenant string, reg prometheus.Registerer, logger log.Logge
return nil, err
}

return &storageImpl{
s := &storageImpl{
cfg: cfg,
walDir: walDir,
remote: remoteStorage,
storage: storage.NewFanout(logger, wal, remoteStorage),

tenantID: tenant,
currentHeaders: headers,
overrides: o,
closeCh: make(chan struct{}),

logger: logger,
}, nil
}

go s.watchOverrides()

return s, nil
}

func (s *storageImpl) Appender(ctx context.Context) storage.Appender {
Expand All @@ -87,6 +114,7 @@ func (s *storageImpl) Appender(ctx context.Context) storage.Appender {

func (s *storageImpl) Close() error {
level.Info(s.logger).Log("msg", "closing WAL", "dir", s.walDir)
close(s.closeCh)

return tsdb_errors.NewMulti(
s.storage.Close(),
Expand All @@ -98,6 +126,45 @@ func (s *storageImpl) Close() error {
).Err()
}

func (s *storageImpl) watchOverrides() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()

for {
select {
case <-t.C:
newHeaders := s.overrides.MetricsGeneratorRemoteWriteHeaders(s.tenantID)
if !headersEqual(s.currentHeaders, newHeaders) {
level.Info(s.logger).Log("msg", "updating remote write headers")
s.currentHeaders = newHeaders
err := s.remote.ApplyConfig(&prometheus_config.Config{
RemoteWriteConfigs: generateTenantRemoteWriteConfigs(s.cfg.RemoteWrite, s.tenantID, newHeaders, s.cfg.RemoteWriteAddOrgIDHeader, s.logger),
})
if err != nil {
metricStorageHeadersUpdateFailed.WithLabelValues(s.tenantID).Inc()
level.Error(s.logger).Log("msg", "Failed to update remote write headers. Remote write will continue with old headers", "err", err)
}
}
case <-s.closeCh:
return
}
}
}

func headersEqual(a, b map[string]string) bool {
if len(a) != len(b) {
return false
}

for k, v := range a {
if b[k] != v {
return false
}
}

return true
}

type noopScrapeManager struct{}

func (noop *noopScrapeManager) Get() (*scrape.Manager, error) {
Expand Down
100 changes: 96 additions & 4 deletions modules/generator/storage/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestInstance(t *testing.T) {
cfg.Path = t.TempDir()
cfg.RemoteWrite = mockServer.remoteWriteConfig()

instance, err := New(&cfg, "test-tenant", prometheus.DefaultRegisterer, logger)
instance, err := New(&cfg, &mockOverrides{}, "test-tenant", &noopRegisterer{}, logger)
require.NoError(t, err)

// Refuse requests - the WAL should buffer data until requests succeed
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestInstance_multiTenancy(t *testing.T) {
var instances []Storage

for i := 0; i < 3; i++ {
instance, err := New(&cfg, strconv.Itoa(i), prometheus.DefaultRegisterer, logger)
instance, err := New(&cfg, &mockOverrides{}, strconv.Itoa(i), &noopRegisterer{}, logger)
assert.NoError(t, err)
instances = append(instances, instance)
}
Expand Down Expand Up @@ -183,12 +183,84 @@ func TestInstance_cantWriteToWAL(t *testing.T) {
cfg.Path = "/root"

// We should be able to attempt to create the instance multiple times
_, err := New(&cfg, "test-tenant", prometheus.DefaultRegisterer, log.NewNopLogger())
_, err := New(&cfg, &mockOverrides{}, "test-tenant", &noopRegisterer{}, log.NewNopLogger())
require.Error(t, err)
_, err = New(&cfg, "test-tenant", prometheus.DefaultRegisterer, log.NewNopLogger())
_, err = New(&cfg, &mockOverrides{}, "test-tenant", &noopRegisterer{}, log.NewNopLogger())
require.Error(t, err)
}

func TestInstance_remoteWriteHeaders(t *testing.T) {
var err error
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))

mockServer := newMockPrometheusRemoteWriterServer(logger)
defer mockServer.close()

var cfg Config
cfg.RegisterFlagsAndApplyDefaults("", nil)
cfg.Path = t.TempDir()
cfg.RemoteWrite = mockServer.remoteWriteConfig()

headers := map[string]string{user.OrgIDHeaderName: "my-other-tenant"}

instance, err := New(&cfg, &mockOverrides{headers}, "test-tenant", &noopRegisterer{}, logger)
require.NoError(t, err)

// Refuse requests - the WAL should buffer data until requests succeed
mockServer.refuseRequests.Store(true)

sendCtx, cancel := context.WithCancel(context.Background())
defer cancel()

// Append some data every second
go poll(sendCtx, time.Second, func() {
appender := instance.Appender(context.Background())

lbls := labels.FromMap(map[string]string{"__name__": "my-metrics"})
ref, err := appender.Append(0, lbls, time.Now().UnixMilli(), 1.0)
assert.NoError(t, err)

_, err = appender.AppendExemplar(ref, lbls, exemplar.Exemplar{
Labels: labels.FromMap(map[string]string{"traceID": "123"}),
Value: 1.2,
})
assert.NoError(t, err)

if sendCtx.Err() != nil {
return
}

assert.NoError(t, appender.Commit())
})

// Wait until remote.Storage has tried at least once to send data
err = waitUntil(20*time.Second, func() bool {
mockServer.mtx.Lock()
defer mockServer.mtx.Unlock()

return mockServer.refusedRequests > 0
})
require.NoError(t, err, "timed out while waiting for refused requests")

// Allow requests
mockServer.refuseRequests.Store(false)

// Shutdown the instance - even though previous requests failed, remote.Storage should flush pending data
err = instance.Close()
assert.NoError(t, err)

// WAL should be empty again
entries, err := os.ReadDir(cfg.Path)
assert.NoError(t, err)
assert.Len(t, entries, 0)

// Verify we received metrics
assert.Len(t, mockServer.timeSeries, 1)
assert.Contains(t, mockServer.timeSeries, "my-other-tenant")
// We should have received at least 2 time series: one for the sample and one for the examplar
assert.GreaterOrEqual(t, len(mockServer.timeSeries["my-other-tenant"]), 2)
}

type mockPrometheusRemoteWriteServer struct {
mtx sync.Mutex

Expand Down Expand Up @@ -285,3 +357,23 @@ func waitUntil(timeout time.Duration, f func() bool) error {
time.Sleep(50 * time.Millisecond)
}
}

var _ Overrides = (*mockOverrides)(nil)

type mockOverrides struct {
headers map[string]string
}

func (m *mockOverrides) MetricsGeneratorRemoteWriteHeaders(string) map[string]string {
return m.headers
}

var _ prometheus.Registerer = (*noopRegisterer)(nil)

type noopRegisterer struct{}

func (n *noopRegisterer) Register(prometheus.Collector) error { return nil }

func (n *noopRegisterer) MustRegister(...prometheus.Collector) {}

func (n *noopRegisterer) Unregister(prometheus.Collector) bool { return true }
5 changes: 5 additions & 0 deletions modules/generator/storage/overrides.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package storage

type Overrides interface {
MetricsGeneratorRemoteWriteHeaders(userID string) map[string]string
}
Loading

0 comments on commit 7a65cc0

Please sign in to comment.