diff --git a/receiver/prometheusreceiver/config_test.go b/receiver/prometheusreceiver/config_test.go index 7c7e8c91aef..68b8c82f50a 100644 --- a/receiver/prometheusreceiver/config_test.go +++ b/receiver/prometheusreceiver/config_test.go @@ -33,7 +33,7 @@ func TestLoadConfig(t *testing.T) { factories, err := componenttest.ExampleComponents() assert.NoError(t, err) - factory := &Factory{} + factory := NewFactory() factories.Receivers[typeStr] = factory cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) @@ -64,7 +64,7 @@ func TestLoadConfigWithEnvVar(t *testing.T) { factories, err := componenttest.ExampleComponents() assert.NoError(t, err) - factory := &Factory{} + factory := NewFactory() factories.Receivers[typeStr] = factory cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_env.yaml"), factories) require.NoError(t, err) @@ -89,7 +89,7 @@ func TestLoadConfigK8s(t *testing.T) { factories, err := componenttest.ExampleComponents() assert.NoError(t, err) - factory := &Factory{} + factory := NewFactory() factories.Receivers[typeStr] = factory cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_k8s.yaml"), factories) require.NoError(t, err) @@ -116,7 +116,7 @@ func TestLoadConfigFailsOnUnknownSection(t *testing.T) { factories, err := componenttest.ExampleComponents() assert.NoError(t, err) - factory := &Factory{} + factory := NewFactory() factories.Receivers[typeStr] = factory cfg, err := configtest.LoadConfigFile( t, @@ -133,7 +133,7 @@ func TestLoadConfigFailsOnUnknownPrometheusSection(t *testing.T) { factories, err := componenttest.ExampleComponents() assert.NoError(t, err) - factory := &Factory{} + factory := NewFactory() factories.Receivers[typeStr] = factory cfg, err := configtest.LoadConfigFile( t, diff --git a/receiver/prometheusreceiver/factory.go b/receiver/prometheusreceiver/factory.go index 09eaf191916..a30821db1f8 100644 --- a/receiver/prometheusreceiver/factory.go +++ b/receiver/prometheusreceiver/factory.go @@ -20,13 +20,12 @@ import ( "fmt" "github.com/spf13/viper" - "go.uber.org/zap" "gopkg.in/yaml.v2" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements config for Prometheus receiver. @@ -43,17 +42,15 @@ var ( errNilScrapeConfig = errors.New("expecting a non-nil ScrapeConfig") ) -// Factory is the factory for receiver. -type Factory struct { +func NewFactory() component.ReceiverFactory { + return receiverhelper.NewFactory( + typeStr, + createDefaultConfig, + receiverhelper.WithMetrics(createMetricsReceiver), + receiverhelper.WithCustomUnmarshaler(customUnmarshaler)) } -// Type gets the type of the Receiver config created by this factory. -func (f *Factory) Type() configmodels.Type { - return typeStr -} - -// CustomUnmarshaler returns custom unmarshaler for this config. -func (f *Factory) Unmarshal(componentViperSection *viper.Viper, intoCfg interface{}) error { +func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{}) error { if componentViperSection == nil { return nil } @@ -86,8 +83,7 @@ func (f *Factory) Unmarshal(componentViperSection *viper.Viper, intoCfg interfac return nil } -// CreateDefaultConfig creates the default configuration for receiver. -func (f *Factory) CreateDefaultConfig() configmodels.Receiver { +func createDefaultConfig() configmodels.Receiver { return &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, @@ -96,17 +92,15 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { } } -// CreateTraceReceiver creates a trace receiver based on provided config. -func (f *Factory) CreateTraceReceiver(context.Context, *zap.Logger, configmodels.Receiver, consumer.TraceConsumerOld) (component.TraceReceiver, error) { - // Prometheus does not support traces - return nil, configerror.ErrDataTypeIsNotSupported -} - -// CreateMetricsReceiver creates a metrics receiver based on provided config. -func (f *Factory) CreateMetricsReceiver(_ context.Context, logger *zap.Logger, cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumerOld) (component.MetricsReceiver, error) { +func createMetricsReceiver( + _ context.Context, + params component.ReceiverCreateParams, + cfg configmodels.Receiver, + nextConsumer consumer.MetricsConsumer, +) (component.MetricsReceiver, error) { config := cfg.(*Config) if config.PrometheusConfig == nil || len(config.PrometheusConfig.ScrapeConfigs) == 0 { return nil, errNilScrapeConfig } - return newPrometheusReceiver(logger, config, nextConsumer), nil + return newPrometheusReceiver(params.Logger, config, nextConsumer), nil } diff --git a/receiver/prometheusreceiver/factory_test.go b/receiver/prometheusreceiver/factory_test.go index cd134a3deb8..e5ca73644fc 100644 --- a/receiver/prometheusreceiver/factory_test.go +++ b/receiver/prometheusreceiver/factory_test.go @@ -21,28 +21,23 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" - "go.opentelemetry.io/collector/config/configerror" ) func TestCreateDefaultConfig(t *testing.T) { - factory := &Factory{} - cfg := factory.CreateDefaultConfig() + cfg := createDefaultConfig() assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, configcheck.ValidateConfig(cfg)) } func TestCreateReceiver(t *testing.T) { - factory := &Factory{} - cfg := factory.CreateDefaultConfig() - - tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) - assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) - assert.Nil(t, tReceiver) + cfg := createDefaultConfig() // The default config does not provide scrape_config so we expect that metrics receiver // creation must also fail. - mReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, nil) + creationParams := component.ReceiverCreateParams{Logger: zap.NewNop()} + mReceiver, err := createMetricsReceiver(context.Background(), creationParams, cfg, nil) assert.Equal(t, err, errNilScrapeConfig) assert.Nil(t, mReceiver) } diff --git a/receiver/prometheusreceiver/internal/internal_test.go b/receiver/prometheusreceiver/internal/internal_test.go index 37bf0737a51..5365b5b953c 100644 --- a/receiver/prometheusreceiver/internal/internal_test.go +++ b/receiver/prometheusreceiver/internal/internal_test.go @@ -15,13 +15,9 @@ package internal import ( - "context" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/scrape" "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/consumerdata" ) // test helpers @@ -50,19 +46,6 @@ func (m *mockMetadataCache) SharedLabels() labels.Labels { return labels.FromStrings("__scheme__", "http") } -func newMockConsumer() *mockConsumer { - return &mockConsumer{} -} - -type mockConsumer struct { - md *consumerdata.MetricsData -} - -func (m *mockConsumer) ConsumeMetricsData(_ context.Context, md consumerdata.MetricsData) error { - m.md = &md - return nil -} - type mockScrapeManager struct { targets map[string][]*scrape.Target } diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go index 76594969099..a45f5d82809 100644 --- a/receiver/prometheusreceiver/internal/ocastore.go +++ b/receiver/prometheusreceiver/internal/ocastore.go @@ -49,7 +49,7 @@ type OcaStore interface { type ocaStore struct { running int32 logger *zap.Logger - sink consumer.MetricsConsumerOld + sink consumer.MetricsConsumer mc *mService once *sync.Once ctx context.Context @@ -59,7 +59,7 @@ type ocaStore struct { } // NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable -func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumerOld, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string) OcaStore { +func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string) OcaStore { return &ocaStore{ running: runningStateInit, ctx: ctx, diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go index 7c41b5a2802..922489f6ea0 100644 --- a/receiver/prometheusreceiver/internal/transaction.go +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -34,6 +34,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/obsreport" ) @@ -59,7 +60,7 @@ type transaction struct { id int64 ctx context.Context isNew bool - sink consumer.MetricsConsumerOld + sink consumer.MetricsConsumer job string instance string jobsMap *JobsMap @@ -71,7 +72,7 @@ type transaction struct { logger *zap.Logger } -func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string, ms MetadataService, sink consumer.MetricsConsumerOld, logger *zap.Logger) *transaction { +func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string, ms MetadataService, sink consumer.MetricsConsumer, logger *zap.Logger) *transaction { return &transaction{ id: atomic.AddInt64(&idSeq, 1), ctx: ctx, @@ -182,7 +183,7 @@ func (tr *transaction) Commit() error { Metrics: metrics, } numTimeseries, numPoints = obsreport.CountMetricPoints(md) - err = tr.sink.ConsumeMetricsData(ctx, md) + err = tr.sink.ConsumeMetrics(ctx, pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{md})) } obsreport.EndMetricsReceiveOp( ctx, dataformat, numPoints, numTimeseries, err) diff --git a/receiver/prometheusreceiver/internal/transaction_test.go b/receiver/prometheusreceiver/internal/transaction_test.go index 4fcb0afcf4a..f81f373b7cb 100644 --- a/receiver/prometheusreceiver/internal/transaction_test.go +++ b/receiver/prometheusreceiver/internal/transaction_test.go @@ -24,6 +24,9 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/scrape" "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/exporter/exportertest" ) func Test_transaction(t *testing.T) { @@ -60,16 +63,16 @@ func Test_transaction(t *testing.T) { rn := "prometheus" t.Run("Commit Without Adding", func(t *testing.T) { - mcon := newMockConsumer() - tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger) + nomc := exportertest.NewNopMetricsExporter() + tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger) if got := tr.Commit(); got != nil { t.Errorf("expecting nil from Commit() but got err %v", got) } }) t.Run("Rollback dose nothing", func(t *testing.T) { - mcon := newMockConsumer() - tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger) + nomc := exportertest.NewNopMetricsExporter() + tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger) if got := tr.Rollback(); got != nil { t.Errorf("expecting nil from Rollback() but got err %v", got) } @@ -77,8 +80,8 @@ func Test_transaction(t *testing.T) { badLabels := labels.Labels([]labels.Label{{Name: "foo", Value: "bar"}}) t.Run("Add One No Target", func(t *testing.T) { - mcon := newMockConsumer() - tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger) + nomc := exportertest.NewNopMetricsExporter() + tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger) if _, got := tr.Add(badLabels, time.Now().Unix()*1000, 1.0); got == nil { t.Errorf("expecting error from Add() but got nil") } @@ -89,8 +92,8 @@ func Test_transaction(t *testing.T) { {Name: "job", Value: "test2"}, {Name: "foo", Value: "bar"}}) t.Run("Add One Job not found", func(t *testing.T) { - mcon := newMockConsumer() - tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger) + nomc := exportertest.NewNopMetricsExporter() + tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger) if _, got := tr.Add(jobNotFoundLb, time.Now().Unix()*1000, 1.0); got == nil { t.Errorf("expecting error from Add() but got nil") } @@ -100,8 +103,8 @@ func Test_transaction(t *testing.T) { {Name: "job", Value: "test"}, {Name: "__name__", Value: "foo"}}) t.Run("Add One Good", func(t *testing.T) { - mcon := newMockConsumer() - tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger) + sink := new(exportertest.SinkMetricsExporter) + tr := newTransaction(context.Background(), nil, true, rn, ms, sink, testLogger) if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, 1.0); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) } @@ -110,28 +113,34 @@ func Test_transaction(t *testing.T) { t.Errorf("expecting nil from Commit() but got err %v", got) } expected := createNode("test", "localhost:8080", "http") - md := mcon.md - if !proto.Equal(md.Node, expected) { - t.Errorf("generated node %v and expected node %v is different\n", md.Node, expected) + mds := sink.AllMetrics() + if len(mds) != 1 { + t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics()) + } + ocmds := pdatautil.MetricsToMetricsData(mds[0]) + if len(ocmds) != 1 { + t.Fatalf("wanted one batch per node, got %v\n", sink.AllMetrics()) + } + if !proto.Equal(ocmds[0].Node, expected) { + t.Errorf("generated node %v and expected node %v is different\n", ocmds[0].Node, expected) } - if len(md.Metrics) != 1 { - t.Errorf("expecting one metrics, but got %v\n", len(md.Metrics)) + if len(ocmds[0].Metrics) != 1 { + t.Errorf("expecting one metrics, but got %v\n", len(ocmds[0].Metrics)) } }) t.Run("Drop NaN value", func(t *testing.T) { - mcon := newMockConsumer() - tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger) + sink := new(exportertest.SinkMetricsExporter) + tr := newTransaction(context.Background(), nil, true, rn, ms, sink, testLogger) if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, math.NaN()); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) } if got := tr.Commit(); got != nil { t.Errorf("expecting nil from Commit() but got err %v", got) } - - if mcon.md != nil { - t.Errorf("wanted nil, got %v\n", mcon.md) + if len(sink.AllMetrics()) != 0 { + t.Errorf("wanted nil, got %v\n", sink.AllMetrics()) } }) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 0d42f6b7eaa..318f8cf434c 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -35,13 +35,13 @@ type pReceiver struct { startOnce sync.Once stopOnce sync.Once cfg *Config - consumer consumer.MetricsConsumerOld + consumer consumer.MetricsConsumer cancel context.CancelFunc logger *zap.Logger } // New creates a new prometheus.Receiver reference. -func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.MetricsConsumerOld) *pReceiver { +func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.MetricsConsumer) *pReceiver { pr := &pReceiver{ cfg: cfg, consumer: next, diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index 247ec27b19a..ad7e8729f8d 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -39,6 +39,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/exporter/exportertest" ) @@ -1034,7 +1035,7 @@ func testEndToEnd(t *testing.T, targets []*testData, useStartTimeMetric bool) { require.Nilf(t, err, "Failed to create Promtheus config: %v", err) defer mp.Close() - cms := new(exportertest.SinkMetricsExporterOld) + cms := new(exportertest.SinkMetricsExporter) rcvr := newPrometheusReceiver(logger, &Config{PrometheusConfig: cfg, UseStartTimeMetric: useStartTimeMetric}, cms) require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()), "Failed to invoke Start: %v", err) @@ -1047,11 +1048,14 @@ func testEndToEnd(t *testing.T, targets []*testData, useStartTimeMetric bool) { // split and store results by target name results := make(map[string][]consumerdata.MetricsData) for _, m := range metrics { - result, ok := results[m.Node.ServiceInfo.Name] - if !ok { - result = make([]consumerdata.MetricsData, 0) + ocmds := pdatautil.MetricsToMetricsData(m) + for _, ocmd := range ocmds { + result, ok := results[ocmd.Node.ServiceInfo.Name] + if !ok { + result = make([]consumerdata.MetricsData, 0) + } + results[ocmd.Node.ServiceInfo.Name] = append(result, ocmd) } - results[m.Node.ServiceInfo.Name] = append(result, m) } lres, lep := len(results), len(mp.endpoints) diff --git a/service/defaultcomponents/defaults.go b/service/defaultcomponents/defaults.go index c0f8c9f2860..9e8eb51a833 100644 --- a/service/defaultcomponents/defaults.go +++ b/service/defaultcomponents/defaults.go @@ -70,7 +70,7 @@ func Components() ( jaegerreceiver.NewFactory(), fluentforwardreceiver.NewFactory(), zipkinreceiver.NewFactory(), - &prometheusreceiver.Factory{}, + prometheusreceiver.NewFactory(), &opencensusreceiver.Factory{}, otlpreceiver.NewFactory(), hostmetricsreceiver.NewFactory(),