diff --git a/beater/config/aggregation.go b/beater/config/aggregation.go index 0ca7ac30efd..dfe3af8e1ed 100644 --- a/beater/config/aggregation.go +++ b/beater/config/aggregation.go @@ -28,20 +28,27 @@ const ( defaultAggregationRUMUserAgentLRUSize = 5000 ) -// AggregationConfig holds configuration related to metrics aggregation. +// AggregationConfig holds configuration related to various metrics aggregations. type AggregationConfig struct { + Transactions TransactionAggregationConfig `config:"transactions"` +} + +// TransactionAggregationConfig holds configuration related to transaction metrics aggregation. +type TransactionAggregationConfig struct { Enabled bool `config:"enabled"` Interval time.Duration `config:"interval" validate:"min=1"` - MaxTransactionGroups int `config:"max_transaction_groups" validate:"min=1"` + MaxTransactionGroups int `config:"max_groups" validate:"min=1"` HDRHistogramSignificantFigures int `config:"hdrhistogram_significant_figures" validate:"min=1, max=5"` RUMUserAgentLRUSize int `config:"rum.user_agent.lru_size" validate:"min=1"` } func defaultAggregationConfig() AggregationConfig { return AggregationConfig{ - Interval: defaultAggregationInterval, - MaxTransactionGroups: defaultAggregationMaxTransactionGroups, - HDRHistogramSignificantFigures: defaultAggregationHDRHistogramSignificantFigures, - RUMUserAgentLRUSize: defaultAggregationRUMUserAgentLRUSize, + Transactions: TransactionAggregationConfig{ + Interval: defaultAggregationInterval, + MaxTransactionGroups: defaultAggregationMaxTransactionGroups, + HDRHistogramSignificantFigures: defaultAggregationHDRHistogramSignificantFigures, + RUMUserAgentLRUSize: defaultAggregationRUMUserAgentLRUSize, + }, } } diff --git a/beater/config/aggregation_test.go b/beater/config/aggregation_test.go index 9f69205de07..a052246d899 100644 --- a/beater/config/aggregation_test.go +++ b/beater/config/aggregation_test.go @@ -39,24 +39,24 @@ func TestAggregationConfigInvalid(t *testing.T) { for _, test := range []test{{ name: "non-positive interval", - key: "aggregation.interval", + key: "aggregation.transactions.interval", value: "0", - expect: "Error processing configuration: requires duration < 1 accessing 'aggregation.interval'", + expect: "Error processing configuration: requires duration < 1 accessing 'aggregation.transactions.interval'", }, { - name: "non-positive max_transaction_groups", - key: "aggregation.max_transaction_groups", + name: "non-positive max_groups", + key: "aggregation.transactions.max_groups", value: float64(0), - expect: "Error processing configuration: requires value < 1 accessing 'aggregation.max_transaction_groups'", + expect: "Error processing configuration: requires value < 1 accessing 'aggregation.transactions.max_groups'", }, { name: "non-positive hdrhistogram_significant_figures", - key: "aggregation.hdrhistogram_significant_figures", + key: "aggregation.transactions.hdrhistogram_significant_figures", value: float64(0), - expect: "Error processing configuration: requires value < 1 accessing 'aggregation.hdrhistogram_significant_figures'", + expect: "Error processing configuration: requires value < 1 accessing 'aggregation.transactions.hdrhistogram_significant_figures'", }, { name: "hdrhistogram_significant_figures too high", - key: "aggregation.hdrhistogram_significant_figures", + key: "aggregation.transactions.hdrhistogram_significant_figures", value: float64(6), - expect: "Error processing configuration: requires value > 5 accessing 'aggregation.hdrhistogram_significant_figures'", + expect: "Error processing configuration: requires value > 5 accessing 'aggregation.transactions.hdrhistogram_significant_figures'", }} { t.Run(test.name, func(t *testing.T) { _, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{ diff --git a/beater/config/config.go b/beater/config/config.go index 89af20ae63a..6b4f2a66333 100644 --- a/beater/config/config.go +++ b/beater/config/config.go @@ -137,7 +137,7 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error) return nil, err } - if !c.Sampling.KeepUnsampled && !c.Aggregation.Enabled { + if !c.Sampling.KeepUnsampled && !c.Aggregation.Transactions.Enabled { // Unsampled transactions should only be dropped // when transaction aggregation is enabled in the // server. This means the aggregations performed @@ -145,7 +145,7 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error) // representation of the latency distribution. logger.Warn("" + "apm-server.sampling.keep_unsampled and " + - "apm-server.aggregation.enabled are both false, " + + "apm-server.aggregation.transactions.enabled are both false, " + "which will lead to incorrect metrics being reported in the APM UI", ) } diff --git a/beater/config/config_test.go b/beater/config/config_test.go index 56f5c6e3e11..3261c4948ec 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -117,13 +117,15 @@ func Test_UnpackConfig(t *testing.T) { "elasticsearch.hosts": []string{"localhost:9201", "localhost:9202"}, }, "aggregation": map[string]interface{}{ - "enabled": true, - "interval": "1s", - "max_transaction_groups": 123, - "hdrhistogram_significant_figures": 1, - "rum": map[string]interface{}{ - "user_agent": map[string]interface{}{ - "lru_size": 123, + "transactions": map[string]interface{}{ + "enabled": true, + "interval": "1s", + "max_groups": 123, + "hdrhistogram_significant_figures": 1, + "rum": map[string]interface{}{ + "user_agent": map[string]interface{}{ + "lru_size": 123, + }, }, }, }, @@ -212,11 +214,13 @@ func Test_UnpackConfig(t *testing.T) { esConfigured: true, }, Aggregation: AggregationConfig{ - Enabled: true, - Interval: time.Second, - MaxTransactionGroups: 123, - HDRHistogramSignificantFigures: 1, - RUMUserAgentLRUSize: 123, + Transactions: TransactionAggregationConfig{ + Enabled: true, + Interval: time.Second, + MaxTransactionGroups: 123, + HDRHistogramSignificantFigures: 1, + RUMUserAgentLRUSize: 123, + }, }, Sampling: SamplingConfig{ KeepUnsampled: true, @@ -252,11 +256,11 @@ func Test_UnpackConfig(t *testing.T) { }, }, }, - "jaeger.grpc.enabled": true, - "api_key.enabled": true, - "aggregation.enabled": true, - "aggregation.rum.user_agent.lru_size": 123, - "sampling.keep_unsampled": false, + "jaeger.grpc.enabled": true, + "api_key.enabled": true, + "aggregation.transactions.enabled": true, + "aggregation.transactions.rum.user_agent.lru_size": 123, + "sampling.keep_unsampled": false, }, outCfg: &Config{ Host: "localhost:3000", @@ -326,11 +330,13 @@ func Test_UnpackConfig(t *testing.T) { }, APIKeyConfig: &APIKeyConfig{Enabled: true, LimitPerMin: 100, ESConfig: elasticsearch.DefaultConfig()}, Aggregation: AggregationConfig{ - Enabled: true, - Interval: time.Minute, - MaxTransactionGroups: 1000, - HDRHistogramSignificantFigures: 2, - RUMUserAgentLRUSize: 123, + Transactions: TransactionAggregationConfig{ + Enabled: true, + Interval: time.Minute, + MaxTransactionGroups: 1000, + HDRHistogramSignificantFigures: 2, + RUMUserAgentLRUSize: 123, + }, }, Sampling: SamplingConfig{ KeepUnsampled: false, diff --git a/tests/system/config/apm-server.yml.j2 b/tests/system/config/apm-server.yml.j2 index 26da3417d95..3430608f75f 100644 --- a/tests/system/config/apm-server.yml.j2 +++ b/tests/system/config/apm-server.yml.j2 @@ -138,10 +138,10 @@ apm-server: {% endif %} {% if aggregation_enabled %} - aggregation.enabled: {{ aggregation_enabled }} + aggregation.transactions.enabled: {{ aggregation_enabled }} {% endif %} {% if aggregation_interval %} - aggregation.interval: {{ aggregation_interval }} + aggregation.transactions.interval: {{ aggregation_interval }} {% endif %} sampling: diff --git a/tests/system/test_sampling.py b/tests/system/test_sampling.py index 410afa72812..52c6255b4a8 100644 --- a/tests/system/test_sampling.py +++ b/tests/system/test_sampling.py @@ -52,5 +52,5 @@ def config(self): return cfg def test(self): - expected = "apm-server.sampling.keep_unsampled and apm-server.aggregation.enabled are both false, which will lead to incorrect metrics being reported in the APM UI" + expected = "apm-server.sampling.keep_unsampled and apm-server.aggregation.transactions.enabled are both false, which will lead to incorrect metrics being reported in the APM UI" self.assertIn(expected, self.get_log()) diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator.go b/x-pack/apm-server/aggregation/txmetrics/aggregator.go index e6a8427cd8a..4960dbbb931 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator.go @@ -215,14 +215,14 @@ func (a *Aggregator) publish(ctx context.Context) error { }) } -// AggregateTransformables aggregates all transactions contained in +// ProcessTransformables aggregates all transactions contained in // "in", returning the input with any metricsets requiring immediate // publication appended. // // This method is expected to be used immediately prior to publishing // the events, so that the metricsets requiring immediate publication // can be included in the same batch. -func (a *Aggregator) AggregateTransformables(in []transform.Transformable) []transform.Transformable { +func (a *Aggregator) ProcessTransformables(in []transform.Transformable) []transform.Transformable { out := in for _, tf := range in { if tx, ok := tf.(*model.Transaction); ok { @@ -429,7 +429,6 @@ type transactionAggregationKey struct { } func (k *transactionAggregationKey) hash() uint64 { - // TODO(axw) when we upgrade to Go 1.14, change this to maphash. var h xxhash.Digest if k.traceRoot { h.WriteString("1") diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go index 8304252ccb1..883a413b2ef 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go @@ -71,7 +71,7 @@ func TestNewAggregatorConfigInvalid(t *testing.T) { } } -func TestAggregateTransformablesOverflow(t *testing.T) { +func TestProcessTransformablesOverflow(t *testing.T) { reqs := make(chan publish.PendingReq, 1) agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{ @@ -90,7 +90,7 @@ func TestAggregateTransformablesOverflow(t *testing.T) { input = append(input, &model.Transaction{Name: "foo"}) input = append(input, &model.Transaction{Name: "bar"}) } - output := agg.AggregateTransformables(input) + output := agg.ProcessTransformables(input) assert.Equal(t, input, output) // The third transaction group will return a metricset for immediate publication. @@ -100,7 +100,7 @@ func TestAggregateTransformablesOverflow(t *testing.T) { Duration: float64(time.Minute / time.Millisecond), }) } - output = agg.AggregateTransformables(input) + output = agg.ProcessTransformables(input) assert.Len(t, output, len(input)+2) assert.Equal(t, input, output[:len(input)]) diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 9fba366a616..b5b75b5aa32 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -15,59 +15,85 @@ import ( "github.com/elastic/apm-server/beater" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics" "github.com/elastic/apm-server/x-pack/apm-server/cmd" ) -// runServerWithAggregator runs the APM Server. If aggregation -// is enabled, then a txmetrics.Aggregator will also be run, -// and the publish.Reporter will be wrapped such that all -// transactions pass through the aggregator before being -// published to libbeat. -func runServerWithAggregator(ctx context.Context, runServer beater.RunServerFunc, args beater.ServerParams) error { - if !args.Config.Aggregation.Enabled { - return runServer(ctx, args) +type namedProcessor struct { + name string + processor +} + +type processor interface { + ProcessTransformables([]transform.Transformable) []transform.Transformable + Run() error + Stop(context.Context) error +} + +// newProcessors returns a list of processors which will process +// events in sequential order, prior to the events being published. +func newProcessors(args beater.ServerParams) ([]namedProcessor, error) { + var processors []namedProcessor + if args.Config.Aggregation.Transactions.Enabled { + const name = "transaction metrics aggregation" + args.Logger.Infof("creating %s with config: %+v", name, args.Config.Aggregation) + agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{ + Report: args.Reporter, + MaxTransactionGroups: args.Config.Aggregation.Transactions.MaxTransactionGroups, + MetricsInterval: args.Config.Aggregation.Transactions.Interval, + HDRHistogramSignificantFigures: args.Config.Aggregation.Transactions.HDRHistogramSignificantFigures, + RUMUserAgentLRUSize: args.Config.Aggregation.Transactions.RUMUserAgentLRUSize, + }) + if err != nil { + return nil, errors.Wrapf(err, "error creating %s", name) + } + processors = append(processors, namedProcessor{name: name, processor: agg}) } + return processors, nil +} - agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{ - Report: args.Reporter, - MaxTransactionGroups: args.Config.Aggregation.MaxTransactionGroups, - MetricsInterval: args.Config.Aggregation.Interval, - HDRHistogramSignificantFigures: args.Config.Aggregation.HDRHistogramSignificantFigures, - RUMUserAgentLRUSize: args.Config.Aggregation.RUMUserAgentLRUSize, - }) - if err != nil { - return errors.Wrap(err, "error creating aggregator") +// runServerWithProcessors runs the APM Server and the given list of processors. +// +// newProcessors returns a list of processors which will process events in +// sequential order, prior to the events being published. +func runServerWithProcessors(ctx context.Context, runServer beater.RunServerFunc, args beater.ServerParams, processors ...namedProcessor) error { + if len(processors) == 0 { + return runServer(ctx, args) } origReport := args.Reporter args.Reporter = func(ctx context.Context, req publish.PendingReq) error { - req.Transformables = agg.AggregateTransformables(req.Transformables) + for _, p := range processors { + req.Transformables = p.ProcessTransformables(req.Transformables) + } return origReport(ctx, req) } g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args.Logger.Infof("aggregator started with config: %+v", args.Config.Aggregation) - if err := agg.Run(); err != nil { - args.Logger.Errorf("aggregator aborted", logp.Error(err)) - return err - } - args.Logger.Infof("aggregator stopped") - return nil - }) - g.Go(func() error { - <-ctx.Done() - stopctx := context.Background() - if args.Config.ShutdownTimeout > 0 { - // On shutdown wait for the aggregator to stop - // in order to flush any accumulated metrics. - var cancel context.CancelFunc - stopctx, cancel = context.WithTimeout(stopctx, args.Config.ShutdownTimeout) - defer cancel() - } - return agg.Stop(stopctx) - }) + for _, p := range processors { + p := p // copy for closure + g.Go(func() error { + if err := p.Run(); err != nil { + args.Logger.Errorf("%s aborted", p.name, logp.Error(err)) + return err + } + args.Logger.Infof("%s stopped", p.name) + return nil + }) + g.Go(func() error { + <-ctx.Done() + stopctx := context.Background() + if args.Config.ShutdownTimeout > 0 { + // On shutdown wait for the aggregator to stop + // in order to flush any accumulated metrics. + var cancel context.CancelFunc + stopctx, cancel = context.WithTimeout(stopctx, args.Config.ShutdownTimeout) + defer cancel() + } + return p.Stop(stopctx) + }) + } g.Go(func() error { return runServer(ctx, args) }) @@ -77,7 +103,11 @@ func runServerWithAggregator(ctx context.Context, runServer beater.RunServerFunc var rootCmd = cmd.NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{ WrapRunServer: func(runServer beater.RunServerFunc) beater.RunServerFunc { return func(ctx context.Context, args beater.ServerParams) error { - return runServerWithAggregator(ctx, runServer, args) + processors, err := newProcessors(args) + if err != nil { + return err + } + return runServerWithProcessors(ctx, runServer, args, processors...) } }, }))