Skip to content

Commit

Permalink
Refactor transaction metrics aggregation, rename config (#4083) (#4124)
Browse files Browse the repository at this point in the history
* beater/config: aggregation.transactions

Nest apm-server.aggregation.* under
apm-server.aggregation.transactions,
to make way for service destination
metrics.

* txmetrics: rename AggregateTransformables

Rename AggregateTransformables to ProcessTransformables,
to align it with is usage as a processor within a pipeline
of event processors.

Also, remove TODO about maphash - it's not worthwhile.

* tests/system: update aggregation config

* x-pack/apm-server: restructure for more processors

Co-authored-by: Andrew Wilkins <axw@elastic.co>
  • Loading branch information
jalvz and axw committed Aug 31, 2020
1 parent 0bc1d52 commit 7450ee9
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 88 deletions.
19 changes: 13 additions & 6 deletions beater/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,27 @@ const (
defaultAggregationRUMUserAgentLRUSize = 5000
)

// AggregationConfig holds configuration related to metrics aggregation.
// AggregationConfig holds configuration related to various metrics aggregations.
type AggregationConfig struct {
Transactions TransactionAggregationConfig `config:"transactions"`
}

// TransactionAggregationConfig holds configuration related to transaction metrics aggregation.
type TransactionAggregationConfig struct {
Enabled bool `config:"enabled"`
Interval time.Duration `config:"interval" validate:"min=1"`
MaxTransactionGroups int `config:"max_transaction_groups" validate:"min=1"`
MaxTransactionGroups int `config:"max_groups" validate:"min=1"`
HDRHistogramSignificantFigures int `config:"hdrhistogram_significant_figures" validate:"min=1, max=5"`
RUMUserAgentLRUSize int `config:"rum.user_agent.lru_size" validate:"min=1"`
}

func defaultAggregationConfig() AggregationConfig {
return AggregationConfig{
Interval: defaultAggregationInterval,
MaxTransactionGroups: defaultAggregationMaxTransactionGroups,
HDRHistogramSignificantFigures: defaultAggregationHDRHistogramSignificantFigures,
RUMUserAgentLRUSize: defaultAggregationRUMUserAgentLRUSize,
Transactions: TransactionAggregationConfig{
Interval: defaultAggregationInterval,
MaxTransactionGroups: defaultAggregationMaxTransactionGroups,
HDRHistogramSignificantFigures: defaultAggregationHDRHistogramSignificantFigures,
RUMUserAgentLRUSize: defaultAggregationRUMUserAgentLRUSize,
},
}
}
18 changes: 9 additions & 9 deletions beater/config/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,24 @@ func TestAggregationConfigInvalid(t *testing.T) {

for _, test := range []test{{
name: "non-positive interval",
key: "aggregation.interval",
key: "aggregation.transactions.interval",
value: "0",
expect: "Error processing configuration: requires duration < 1 accessing 'aggregation.interval'",
expect: "Error processing configuration: requires duration < 1 accessing 'aggregation.transactions.interval'",
}, {
name: "non-positive max_transaction_groups",
key: "aggregation.max_transaction_groups",
name: "non-positive max_groups",
key: "aggregation.transactions.max_groups",
value: float64(0),
expect: "Error processing configuration: requires value < 1 accessing 'aggregation.max_transaction_groups'",
expect: "Error processing configuration: requires value < 1 accessing 'aggregation.transactions.max_groups'",
}, {
name: "non-positive hdrhistogram_significant_figures",
key: "aggregation.hdrhistogram_significant_figures",
key: "aggregation.transactions.hdrhistogram_significant_figures",
value: float64(0),
expect: "Error processing configuration: requires value < 1 accessing 'aggregation.hdrhistogram_significant_figures'",
expect: "Error processing configuration: requires value < 1 accessing 'aggregation.transactions.hdrhistogram_significant_figures'",
}, {
name: "hdrhistogram_significant_figures too high",
key: "aggregation.hdrhistogram_significant_figures",
key: "aggregation.transactions.hdrhistogram_significant_figures",
value: float64(6),
expect: "Error processing configuration: requires value > 5 accessing 'aggregation.hdrhistogram_significant_figures'",
expect: "Error processing configuration: requires value > 5 accessing 'aggregation.transactions.hdrhistogram_significant_figures'",
}} {
t.Run(test.name, func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
Expand Down
4 changes: 2 additions & 2 deletions beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
return nil, err
}

if !c.Sampling.KeepUnsampled && !c.Aggregation.Enabled {
if !c.Sampling.KeepUnsampled && !c.Aggregation.Transactions.Enabled {
// Unsampled transactions should only be dropped
// when transaction aggregation is enabled in the
// server. This means the aggregations performed
// by the APM UI will not have access to a complete
// representation of the latency distribution.
logger.Warn("" +
"apm-server.sampling.keep_unsampled and " +
"apm-server.aggregation.enabled are both false, " +
"apm-server.aggregation.transactions.enabled are both false, " +
"which will lead to incorrect metrics being reported in the APM UI",
)
}
Expand Down
50 changes: 28 additions & 22 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,15 @@ func Test_UnpackConfig(t *testing.T) {
"elasticsearch.hosts": []string{"localhost:9201", "localhost:9202"},
},
"aggregation": map[string]interface{}{
"enabled": true,
"interval": "1s",
"max_transaction_groups": 123,
"hdrhistogram_significant_figures": 1,
"rum": map[string]interface{}{
"user_agent": map[string]interface{}{
"lru_size": 123,
"transactions": map[string]interface{}{
"enabled": true,
"interval": "1s",
"max_groups": 123,
"hdrhistogram_significant_figures": 1,
"rum": map[string]interface{}{
"user_agent": map[string]interface{}{
"lru_size": 123,
},
},
},
},
Expand Down Expand Up @@ -212,11 +214,13 @@ func Test_UnpackConfig(t *testing.T) {
esConfigured: true,
},
Aggregation: AggregationConfig{
Enabled: true,
Interval: time.Second,
MaxTransactionGroups: 123,
HDRHistogramSignificantFigures: 1,
RUMUserAgentLRUSize: 123,
Transactions: TransactionAggregationConfig{
Enabled: true,
Interval: time.Second,
MaxTransactionGroups: 123,
HDRHistogramSignificantFigures: 1,
RUMUserAgentLRUSize: 123,
},
},
Sampling: SamplingConfig{
KeepUnsampled: true,
Expand Down Expand Up @@ -252,11 +256,11 @@ func Test_UnpackConfig(t *testing.T) {
},
},
},
"jaeger.grpc.enabled": true,
"api_key.enabled": true,
"aggregation.enabled": true,
"aggregation.rum.user_agent.lru_size": 123,
"sampling.keep_unsampled": false,
"jaeger.grpc.enabled": true,
"api_key.enabled": true,
"aggregation.transactions.enabled": true,
"aggregation.transactions.rum.user_agent.lru_size": 123,
"sampling.keep_unsampled": false,
},
outCfg: &Config{
Host: "localhost:3000",
Expand Down Expand Up @@ -326,11 +330,13 @@ func Test_UnpackConfig(t *testing.T) {
},
APIKeyConfig: &APIKeyConfig{Enabled: true, LimitPerMin: 100, ESConfig: elasticsearch.DefaultConfig()},
Aggregation: AggregationConfig{
Enabled: true,
Interval: time.Minute,
MaxTransactionGroups: 1000,
HDRHistogramSignificantFigures: 2,
RUMUserAgentLRUSize: 123,
Transactions: TransactionAggregationConfig{
Enabled: true,
Interval: time.Minute,
MaxTransactionGroups: 1000,
HDRHistogramSignificantFigures: 2,
RUMUserAgentLRUSize: 123,
},
},
Sampling: SamplingConfig{
KeepUnsampled: false,
Expand Down
4 changes: 2 additions & 2 deletions tests/system/config/apm-server.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ apm-server:
{% endif %}

{% if aggregation_enabled %}
aggregation.enabled: {{ aggregation_enabled }}
aggregation.transactions.enabled: {{ aggregation_enabled }}
{% endif %}
{% if aggregation_interval %}
aggregation.interval: {{ aggregation_interval }}
aggregation.transactions.interval: {{ aggregation_interval }}
{% endif %}

sampling:
Expand Down
2 changes: 1 addition & 1 deletion tests/system/test_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ def config(self):
return cfg

def test(self):
expected = "apm-server.sampling.keep_unsampled and apm-server.aggregation.enabled are both false, which will lead to incorrect metrics being reported in the APM UI"
expected = "apm-server.sampling.keep_unsampled and apm-server.aggregation.transactions.enabled are both false, which will lead to incorrect metrics being reported in the APM UI"
self.assertIn(expected, self.get_log())
5 changes: 2 additions & 3 deletions x-pack/apm-server/aggregation/txmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,14 @@ func (a *Aggregator) publish(ctx context.Context) error {
})
}

// AggregateTransformables aggregates all transactions contained in
// ProcessTransformables aggregates all transactions contained in
// "in", returning the input with any metricsets requiring immediate
// publication appended.
//
// This method is expected to be used immediately prior to publishing
// the events, so that the metricsets requiring immediate publication
// can be included in the same batch.
func (a *Aggregator) AggregateTransformables(in []transform.Transformable) []transform.Transformable {
func (a *Aggregator) ProcessTransformables(in []transform.Transformable) []transform.Transformable {
out := in
for _, tf := range in {
if tx, ok := tf.(*model.Transaction); ok {
Expand Down Expand Up @@ -429,7 +429,6 @@ type transactionAggregationKey struct {
}

func (k *transactionAggregationKey) hash() uint64 {
// TODO(axw) when we upgrade to Go 1.14, change this to maphash.
var h xxhash.Digest
if k.traceRoot {
h.WriteString("1")
Expand Down
6 changes: 3 additions & 3 deletions x-pack/apm-server/aggregation/txmetrics/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestNewAggregatorConfigInvalid(t *testing.T) {
}
}

func TestAggregateTransformablesOverflow(t *testing.T) {
func TestProcessTransformablesOverflow(t *testing.T) {
reqs := make(chan publish.PendingReq, 1)

agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{
Expand All @@ -90,7 +90,7 @@ func TestAggregateTransformablesOverflow(t *testing.T) {
input = append(input, &model.Transaction{Name: "foo"})
input = append(input, &model.Transaction{Name: "bar"})
}
output := agg.AggregateTransformables(input)
output := agg.ProcessTransformables(input)
assert.Equal(t, input, output)

// The third transaction group will return a metricset for immediate publication.
Expand All @@ -100,7 +100,7 @@ func TestAggregateTransformablesOverflow(t *testing.T) {
Duration: float64(time.Minute / time.Millisecond),
})
}
output = agg.AggregateTransformables(input)
output = agg.ProcessTransformables(input)
assert.Len(t, output, len(input)+2)
assert.Equal(t, input, output[:len(input)])

Expand Down
110 changes: 70 additions & 40 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,85 @@ import (

"github.com/elastic/apm-server/beater"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics"
"github.com/elastic/apm-server/x-pack/apm-server/cmd"
)

// runServerWithAggregator runs the APM Server. If aggregation
// is enabled, then a txmetrics.Aggregator will also be run,
// and the publish.Reporter will be wrapped such that all
// transactions pass through the aggregator before being
// published to libbeat.
func runServerWithAggregator(ctx context.Context, runServer beater.RunServerFunc, args beater.ServerParams) error {
if !args.Config.Aggregation.Enabled {
return runServer(ctx, args)
type namedProcessor struct {
name string
processor
}

type processor interface {
ProcessTransformables([]transform.Transformable) []transform.Transformable
Run() error
Stop(context.Context) error
}

// newProcessors returns a list of processors which will process
// events in sequential order, prior to the events being published.
func newProcessors(args beater.ServerParams) ([]namedProcessor, error) {
var processors []namedProcessor
if args.Config.Aggregation.Transactions.Enabled {
const name = "transaction metrics aggregation"
args.Logger.Infof("creating %s with config: %+v", name, args.Config.Aggregation)
agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{
Report: args.Reporter,
MaxTransactionGroups: args.Config.Aggregation.Transactions.MaxTransactionGroups,
MetricsInterval: args.Config.Aggregation.Transactions.Interval,
HDRHistogramSignificantFigures: args.Config.Aggregation.Transactions.HDRHistogramSignificantFigures,
RUMUserAgentLRUSize: args.Config.Aggregation.Transactions.RUMUserAgentLRUSize,
})
if err != nil {
return nil, errors.Wrapf(err, "error creating %s", name)
}
processors = append(processors, namedProcessor{name: name, processor: agg})
}
return processors, nil
}

agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{
Report: args.Reporter,
MaxTransactionGroups: args.Config.Aggregation.MaxTransactionGroups,
MetricsInterval: args.Config.Aggregation.Interval,
HDRHistogramSignificantFigures: args.Config.Aggregation.HDRHistogramSignificantFigures,
RUMUserAgentLRUSize: args.Config.Aggregation.RUMUserAgentLRUSize,
})
if err != nil {
return errors.Wrap(err, "error creating aggregator")
// runServerWithProcessors runs the APM Server and the given list of processors.
//
// newProcessors returns a list of processors which will process events in
// sequential order, prior to the events being published.
func runServerWithProcessors(ctx context.Context, runServer beater.RunServerFunc, args beater.ServerParams, processors ...namedProcessor) error {
if len(processors) == 0 {
return runServer(ctx, args)
}

origReport := args.Reporter
args.Reporter = func(ctx context.Context, req publish.PendingReq) error {
req.Transformables = agg.AggregateTransformables(req.Transformables)
for _, p := range processors {
req.Transformables = p.ProcessTransformables(req.Transformables)
}
return origReport(ctx, req)
}

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args.Logger.Infof("aggregator started with config: %+v", args.Config.Aggregation)
if err := agg.Run(); err != nil {
args.Logger.Errorf("aggregator aborted", logp.Error(err))
return err
}
args.Logger.Infof("aggregator stopped")
return nil
})
g.Go(func() error {
<-ctx.Done()
stopctx := context.Background()
if args.Config.ShutdownTimeout > 0 {
// On shutdown wait for the aggregator to stop
// in order to flush any accumulated metrics.
var cancel context.CancelFunc
stopctx, cancel = context.WithTimeout(stopctx, args.Config.ShutdownTimeout)
defer cancel()
}
return agg.Stop(stopctx)
})
for _, p := range processors {
p := p // copy for closure
g.Go(func() error {
if err := p.Run(); err != nil {
args.Logger.Errorf("%s aborted", p.name, logp.Error(err))
return err
}
args.Logger.Infof("%s stopped", p.name)
return nil
})
g.Go(func() error {
<-ctx.Done()
stopctx := context.Background()
if args.Config.ShutdownTimeout > 0 {
// On shutdown wait for the aggregator to stop
// in order to flush any accumulated metrics.
var cancel context.CancelFunc
stopctx, cancel = context.WithTimeout(stopctx, args.Config.ShutdownTimeout)
defer cancel()
}
return p.Stop(stopctx)
})
}
g.Go(func() error {
return runServer(ctx, args)
})
Expand All @@ -77,7 +103,11 @@ func runServerWithAggregator(ctx context.Context, runServer beater.RunServerFunc
var rootCmd = cmd.NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{
WrapRunServer: func(runServer beater.RunServerFunc) beater.RunServerFunc {
return func(ctx context.Context, args beater.ServerParams) error {
return runServerWithAggregator(ctx, runServer, args)
processors, err := newProcessors(args)
if err != nil {
return err
}
return runServerWithProcessors(ctx, runServer, args, processors...)
}
},
}))
Expand Down

0 comments on commit 7450ee9

Please sign in to comment.