Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into add-vparquet4-encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
stoewer committed May 6, 2024
2 parents 8108e03 + 174b428 commit 393b2a7
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 103 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* [BUGFIX] Add spss and limit to the frontend cache key to prevent the return of incorrect results. [#3557](https://github.com/grafana/tempo/pull/3557) (@joe-elliott)
* [BUGFIX] Use os path separator to split blocks path. [#3552](https://github.com/grafana/tempo/issues/3552) (@teyyubismayil)
* [BUGFIX] Correctly parse traceql queries with > 1024 character attribute names or static values. [#3571](https://github.com/grafana/tempo/issues/3571) (@joe-elliott)
* [BUGFIX] Fix span-metrics' subprocessors bug that applied wrong configs when running multiple tenants. [#3612](https://github.com/grafana/tempo/pull/3612) (@mapno)

## v2.4.1

Expand Down
6 changes: 6 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,11 @@ func (cfg *ProcessorConfig) copyWithOverrides(o metricsGeneratorOverrides, userI

copyCfg.ServiceGraphs.EnableVirtualNodeLabel = o.MetricsGeneratorProcessorServiceGraphsEnableVirtualNodeLabel(userID)

copySubprocessors := make(map[spanmetrics.Subprocessor]bool)
for sp, enabled := range cfg.SpanMetrics.Subprocessors {
copySubprocessors[sp] = enabled
}
copyCfg.SpanMetrics.Subprocessors = copySubprocessors

return copyCfg, nil
}
1 change: 1 addition & 0 deletions modules/generator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestProcessorConfig_copyWithOverrides(t *testing.T) {
HistogramBuckets: []float64{1, 2},
Dimensions: []string{"namespace"},
IntrinsicDimensions: spanmetrics.IntrinsicDimensions{Service: true},
Subprocessors: map[spanmetrics.Subprocessor]bool{},
},
}

Expand Down
131 changes: 131 additions & 0 deletions modules/generator/generator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package generator

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
"github.com/grafana/tempo/modules/overrides"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
user1 = "user1"
user2 = "user2"
)

func TestGeneratorSpanMetrics_subprocessorConcurrency(t *testing.T) {
overridesFile := filepath.Join(t.TempDir(), "Overrides.yaml")
overridesConfig := overrides.Config{
Defaults: overrides.Overrides{
MetricsGenerator: overrides.MetricsGeneratorOverrides{
Processors: map[string]struct{}{
spanmetrics.Name: {},
},
CollectionInterval: 2 * time.Second,
},
},
PerTenantOverrideConfig: overridesFile,
PerTenantOverridePeriod: model.Duration(time.Second),
}

require.NoError(t, os.WriteFile(overridesFile, []byte(fmt.Sprintf(`
overrides:
%s:
metrics_generator:
collection_interval: 1s
processors:
- %s
`, user1, spanmetrics.Name)), os.ModePerm))

o, err := overrides.NewOverrides(overridesConfig, nil, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), o))

generatorConfig := &Config{}
generatorConfig.Storage.Path = t.TempDir()
generatorConfig.Ring.KVStore.Store = "inmemory"
generatorConfig.Processor.SpanMetrics.RegisterFlagsAndApplyDefaults("", nil)
g, err := New(generatorConfig, o, prometheus.NewRegistry(), newTestLogger(t))
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), g))

t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), o))
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), g))
})

allSubprocessors := map[spanmetrics.Subprocessor]bool{spanmetrics.Count: true, spanmetrics.Latency: true, spanmetrics.Size: true}

// All subprocessors should be enabled for user1
instance1, err := g.getOrCreateInstance(user1)
require.NoError(t, err)
verifySubprocessors(t, instance1, allSubprocessors)

// All subprocessors should be enabled for user2
instance2, err := g.getOrCreateInstance(user2)
require.NoError(t, err)
verifySubprocessors(t, instance2, allSubprocessors)

// Change overrides for user1
require.NoError(t, os.WriteFile(overridesFile, []byte(fmt.Sprintf(`
overrides:
%s:
metrics_generator:
collection_interval: 1s
processors:
- %s
`, user1, spanmetrics.Count.String())), os.ModePerm))
time.Sleep(15 * time.Second) // Wait for overrides to be applied. Reload is hardcoded to 10s :(

// Only Count should be enabled for user1
instance1, err = g.getOrCreateInstance(user1)
require.NoError(t, err)
verifySubprocessors(t, instance1, map[spanmetrics.Subprocessor]bool{spanmetrics.Count: true, spanmetrics.Latency: false, spanmetrics.Size: false})

// All subprocessors should be enabled for user2
instance2, err = g.getOrCreateInstance(user2)
require.NoError(t, err)
verifySubprocessors(t, instance2, allSubprocessors)
}

func verifySubprocessors(t *testing.T, instance *instance, expected map[spanmetrics.Subprocessor]bool) {
instance.processorsMtx.RLock()
defer instance.processorsMtx.RUnlock()

require.Len(t, instance.processors, 1)

processor, ok := instance.processors[spanmetrics.Name]
require.True(t, ok)

require.Equal(t, len(processor.(*spanmetrics.Processor).Cfg.Subprocessors), len(expected))

cfg := processor.(*spanmetrics.Processor).Cfg
for sub, enabled := range expected {
assert.Equal(t, enabled, cfg.Subprocessors[sub])
}
}

var _ log.Logger = (*testLogger)(nil)

type testLogger struct {
t *testing.T
}

func newTestLogger(t *testing.T) log.Logger {
return testLogger{t: t}
}

func (l testLogger) Log(keyvals ...interface{}) error {
l.t.Log(keyvals...)
return nil
}
Loading

0 comments on commit 393b2a7

Please sign in to comment.