Skip to content

Commit

Permalink
Change Prometheus receiver to use the new components interfaces (#1515)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Aug 7, 2020
1 parent 44d010f commit 9e70411
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 87 deletions.
10 changes: 5 additions & 5 deletions receiver/prometheusreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

Expand Down Expand Up @@ -64,7 +64,7 @@ func TestLoadConfigWithEnvVar(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_env.yaml"), factories)
require.NoError(t, err)
Expand All @@ -89,7 +89,7 @@ func TestLoadConfigK8s(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_k8s.yaml"), factories)
require.NoError(t, err)
Expand All @@ -116,7 +116,7 @@ func TestLoadConfigFailsOnUnknownSection(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(
t,
Expand All @@ -133,7 +133,7 @@ func TestLoadConfigFailsOnUnknownPrometheusSection(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(
t,
Expand Down
38 changes: 16 additions & 22 deletions receiver/prometheusreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

// This file implements config for Prometheus receiver.
Expand All @@ -43,17 +42,15 @@ var (
errNilScrapeConfig = errors.New("expecting a non-nil ScrapeConfig")
)

// Factory is the factory for receiver.
type Factory struct {
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithMetrics(createMetricsReceiver),
receiverhelper.WithCustomUnmarshaler(customUnmarshaler))
}

// Type gets the type of the Receiver config created by this factory.
func (f *Factory) Type() configmodels.Type {
return typeStr
}

// CustomUnmarshaler returns custom unmarshaler for this config.
func (f *Factory) Unmarshal(componentViperSection *viper.Viper, intoCfg interface{}) error {
func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{}) error {
if componentViperSection == nil {
return nil
}
Expand Down Expand Up @@ -86,8 +83,7 @@ func (f *Factory) Unmarshal(componentViperSection *viper.Viper, intoCfg interfac
return nil
}

// CreateDefaultConfig creates the default configuration for receiver.
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
func createDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
Expand All @@ -96,17 +92,15 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
}
}

// CreateTraceReceiver creates a trace receiver based on provided config.
func (f *Factory) CreateTraceReceiver(context.Context, *zap.Logger, configmodels.Receiver, consumer.TraceConsumerOld) (component.TraceReceiver, error) {
// Prometheus does not support traces
return nil, configerror.ErrDataTypeIsNotSupported
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func (f *Factory) CreateMetricsReceiver(_ context.Context, logger *zap.Logger, cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumerOld) (component.MetricsReceiver, error) {
func createMetricsReceiver(
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
config := cfg.(*Config)
if config.PrometheusConfig == nil || len(config.PrometheusConfig.ScrapeConfigs) == 0 {
return nil, errNilScrapeConfig
}
return newPrometheusReceiver(logger, config, nextConsumer), nil
return newPrometheusReceiver(params.Logger, config, nextConsumer), nil
}
15 changes: 5 additions & 10 deletions receiver/prometheusreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,23 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configerror"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := &Factory{}
cfg := factory.CreateDefaultConfig()
cfg := createDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateReceiver(t *testing.T) {
factory := &Factory{}
cfg := factory.CreateDefaultConfig()

tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, tReceiver)
cfg := createDefaultConfig()

// The default config does not provide scrape_config so we expect that metrics receiver
// creation must also fail.
mReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, nil)
creationParams := component.ReceiverCreateParams{Logger: zap.NewNop()}
mReceiver, err := createMetricsReceiver(context.Background(), creationParams, cfg, nil)
assert.Equal(t, err, errNilScrapeConfig)
assert.Nil(t, mReceiver)
}
17 changes: 0 additions & 17 deletions receiver/prometheusreceiver/internal/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
package internal

import (
"context"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
"go.uber.org/zap"

"go.opentelemetry.io/collector/consumer/consumerdata"
)

// test helpers
Expand Down Expand Up @@ -50,19 +46,6 @@ func (m *mockMetadataCache) SharedLabels() labels.Labels {
return labels.FromStrings("__scheme__", "http")
}

func newMockConsumer() *mockConsumer {
return &mockConsumer{}
}

type mockConsumer struct {
md *consumerdata.MetricsData
}

func (m *mockConsumer) ConsumeMetricsData(_ context.Context, md consumerdata.MetricsData) error {
m.md = &md
return nil
}

type mockScrapeManager struct {
targets map[string][]*scrape.Target
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type OcaStore interface {
type ocaStore struct {
running int32
logger *zap.Logger
sink consumer.MetricsConsumerOld
sink consumer.MetricsConsumer
mc *mService
once *sync.Once
ctx context.Context
Expand All @@ -59,7 +59,7 @@ type ocaStore struct {
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumerOld, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string) OcaStore {
func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string) OcaStore {
return &ocaStore{
running: runningStateInit,
ctx: ctx,
Expand Down
7 changes: 4 additions & 3 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/obsreport"
)

Expand All @@ -59,7 +60,7 @@ type transaction struct {
id int64
ctx context.Context
isNew bool
sink consumer.MetricsConsumerOld
sink consumer.MetricsConsumer
job string
instance string
jobsMap *JobsMap
Expand All @@ -71,7 +72,7 @@ type transaction struct {
logger *zap.Logger
}

func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string, ms MetadataService, sink consumer.MetricsConsumerOld, logger *zap.Logger) *transaction {
func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string, ms MetadataService, sink consumer.MetricsConsumer, logger *zap.Logger) *transaction {
return &transaction{
id: atomic.AddInt64(&idSeq, 1),
ctx: ctx,
Expand Down Expand Up @@ -182,7 +183,7 @@ func (tr *transaction) Commit() error {
Metrics: metrics,
}
numTimeseries, numPoints = obsreport.CountMetricPoints(md)
err = tr.sink.ConsumeMetricsData(ctx, md)
err = tr.sink.ConsumeMetrics(ctx, pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{md}))
}
obsreport.EndMetricsReceiveOp(
ctx, dataformat, numPoints, numTimeseries, err)
Expand Down
49 changes: 29 additions & 20 deletions receiver/prometheusreceiver/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func Test_transaction(t *testing.T) {
Expand Down Expand Up @@ -60,25 +63,25 @@ func Test_transaction(t *testing.T) {
rn := "prometheus"

t.Run("Commit Without Adding", func(t *testing.T) {
mcon := newMockConsumer()
tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger)
nomc := exportertest.NewNopMetricsExporter()
tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger)
if got := tr.Commit(); got != nil {
t.Errorf("expecting nil from Commit() but got err %v", got)
}
})

t.Run("Rollback dose nothing", func(t *testing.T) {
mcon := newMockConsumer()
tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger)
nomc := exportertest.NewNopMetricsExporter()
tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger)
if got := tr.Rollback(); got != nil {
t.Errorf("expecting nil from Rollback() but got err %v", got)
}
})

badLabels := labels.Labels([]labels.Label{{Name: "foo", Value: "bar"}})
t.Run("Add One No Target", func(t *testing.T) {
mcon := newMockConsumer()
tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger)
nomc := exportertest.NewNopMetricsExporter()
tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger)
if _, got := tr.Add(badLabels, time.Now().Unix()*1000, 1.0); got == nil {
t.Errorf("expecting error from Add() but got nil")
}
Expand All @@ -89,8 +92,8 @@ func Test_transaction(t *testing.T) {
{Name: "job", Value: "test2"},
{Name: "foo", Value: "bar"}})
t.Run("Add One Job not found", func(t *testing.T) {
mcon := newMockConsumer()
tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger)
nomc := exportertest.NewNopMetricsExporter()
tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger)
if _, got := tr.Add(jobNotFoundLb, time.Now().Unix()*1000, 1.0); got == nil {
t.Errorf("expecting error from Add() but got nil")
}
Expand All @@ -100,8 +103,8 @@ func Test_transaction(t *testing.T) {
{Name: "job", Value: "test"},
{Name: "__name__", Value: "foo"}})
t.Run("Add One Good", func(t *testing.T) {
mcon := newMockConsumer()
tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger)
sink := new(exportertest.SinkMetricsExporter)
tr := newTransaction(context.Background(), nil, true, rn, ms, sink, testLogger)
if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, 1.0); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
Expand All @@ -110,28 +113,34 @@ func Test_transaction(t *testing.T) {
t.Errorf("expecting nil from Commit() but got err %v", got)
}
expected := createNode("test", "localhost:8080", "http")
md := mcon.md
if !proto.Equal(md.Node, expected) {
t.Errorf("generated node %v and expected node %v is different\n", md.Node, expected)
mds := sink.AllMetrics()
if len(mds) != 1 {
t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics())
}
ocmds := pdatautil.MetricsToMetricsData(mds[0])
if len(ocmds) != 1 {
t.Fatalf("wanted one batch per node, got %v\n", sink.AllMetrics())
}
if !proto.Equal(ocmds[0].Node, expected) {
t.Errorf("generated node %v and expected node %v is different\n", ocmds[0].Node, expected)
}

if len(md.Metrics) != 1 {
t.Errorf("expecting one metrics, but got %v\n", len(md.Metrics))
if len(ocmds[0].Metrics) != 1 {
t.Errorf("expecting one metrics, but got %v\n", len(ocmds[0].Metrics))
}
})

t.Run("Drop NaN value", func(t *testing.T) {
mcon := newMockConsumer()
tr := newTransaction(context.Background(), nil, true, rn, ms, mcon, testLogger)
sink := new(exportertest.SinkMetricsExporter)
tr := newTransaction(context.Background(), nil, true, rn, ms, sink, testLogger)
if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, math.NaN()); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
if got := tr.Commit(); got != nil {
t.Errorf("expecting nil from Commit() but got err %v", got)
}

if mcon.md != nil {
t.Errorf("wanted nil, got %v\n", mcon.md)
if len(sink.AllMetrics()) != 0 {
t.Errorf("wanted nil, got %v\n", sink.AllMetrics())
}
})

Expand Down
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type pReceiver struct {
startOnce sync.Once
stopOnce sync.Once
cfg *Config
consumer consumer.MetricsConsumerOld
consumer consumer.MetricsConsumer
cancel context.CancelFunc
logger *zap.Logger
}

// New creates a new prometheus.Receiver reference.
func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.MetricsConsumerOld) *pReceiver {
func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.MetricsConsumer) *pReceiver {
pr := &pReceiver{
cfg: cfg,
consumer: next,
Expand Down
Loading

0 comments on commit 9e70411

Please sign in to comment.