From c74b42d3b7b3eba0d2fc036be5f441068a74f739 Mon Sep 17 00:00:00 2001 From: Weil0ng Date: Mon, 10 Aug 2020 12:48:32 -0700 Subject: [PATCH] Adds a new config StartTimeMetricRegex (#1511) Previously the prometheus receiver only accepts `process_start_time_metric` as the start time when UseStartTimeMetric is set. For applications that export similar metrics but with a prefix, e.g., via NewProcessCollector(namespace), the receiver would drop such metrics. By adding StartTimeMetricRegex, at least we will be able to cover such use cases. --- receiver/prometheusreceiver/config.go | 1 + receiver/prometheusreceiver/config_test.go | 1 + .../internal/metricsbuilder.go | 56 +++++++---- .../internal/metricsbuilder_test.go | 96 ++++++++++++++++++- .../prometheusreceiver/internal/ocastore.go | 40 ++++---- .../internal/ocastore_test.go | 2 +- .../internal/transaction.go | 50 +++++----- .../internal/transaction_test.go | 12 +-- .../prometheusreceiver/metrics_receiver.go | 2 +- .../metrics_receiver_test.go | 87 +++++++++++++++++ .../prometheusreceiver/testdata/config.yaml | 1 + 11 files changed, 271 insertions(+), 77 deletions(-) diff --git a/receiver/prometheusreceiver/config.go b/receiver/prometheusreceiver/config.go index cca2dd36220..718279339ca 100644 --- a/receiver/prometheusreceiver/config.go +++ b/receiver/prometheusreceiver/config.go @@ -29,6 +29,7 @@ type Config struct { BufferPeriod time.Duration `mapstructure:"buffer_period"` BufferCount int `mapstructure:"buffer_count"` UseStartTimeMetric bool `mapstructure:"use_start_time_metric"` + StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"` // ConfigPlaceholder is just an entry to make the configuration pass a check // that requires that all keys present in the config actually exist on the diff --git a/receiver/prometheusreceiver/config_test.go b/receiver/prometheusreceiver/config_test.go index 68b8c82f50a..ccbf1ddae8b 100644 --- a/receiver/prometheusreceiver/config_test.go +++ b/receiver/prometheusreceiver/config_test.go @@ -54,6 +54,7 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, r1.PrometheusConfig.ScrapeConfigs[0].JobName, "demo") assert.Equal(t, time.Duration(r1.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval), 5*time.Second) assert.Equal(t, r1.UseStartTimeMetric, true) + assert.Equal(t, r1.StartTimeMetricRegex, "^(.+_)*process_start_time_seconds$") } func TestLoadConfigWithEnvVar(t *testing.T) { diff --git a/receiver/prometheusreceiver/internal/metricsbuilder.go b/receiver/prometheusreceiver/internal/metricsbuilder.go index 5f1ba9aeb26..c87ba98ddbd 100644 --- a/receiver/prometheusreceiver/internal/metricsbuilder.go +++ b/receiver/prometheusreceiver/internal/metricsbuilder.go @@ -17,6 +17,7 @@ package internal import ( "errors" "fmt" + "regexp" "strconv" "strings" @@ -51,33 +52,46 @@ var ( ) type metricBuilder struct { - hasData bool - hasInternalMetric bool - mc MetadataCache - metrics []*metricspb.Metric - numTimeseries int - droppedTimeseries int - useStartTimeMetric bool - startTime float64 - scrapeLatencyMs float64 - scrapeStatus string - logger *zap.Logger - currentMf MetricFamily + hasData bool + hasInternalMetric bool + mc MetadataCache + metrics []*metricspb.Metric + numTimeseries int + droppedTimeseries int + useStartTimeMetric bool + startTimeMetricRegex *regexp.Regexp + startTime float64 + scrapeLatencyMs float64 + scrapeStatus string + logger *zap.Logger + currentMf MetricFamily } // newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus // scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object // by calling its Build function -func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, logger *zap.Logger) *metricBuilder { - +func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger) *metricBuilder { + var regex *regexp.Regexp + if startTimeMetricRegex != "" { + regex, _ = regexp.Compile(startTimeMetricRegex) + } return &metricBuilder{ - mc: mc, - metrics: make([]*metricspb.Metric, 0), - logger: logger, - numTimeseries: 0, - droppedTimeseries: 0, - useStartTimeMetric: useStartTimeMetric, + mc: mc, + metrics: make([]*metricspb.Metric, 0), + logger: logger, + numTimeseries: 0, + droppedTimeseries: 0, + useStartTimeMetric: useStartTimeMetric, + startTimeMetricRegex: regex, + } +} + +func (b *metricBuilder) matchStartTimeMetric(metricName string) bool { + if b.startTimeMetricRegex != nil { + return b.startTimeMetricRegex.MatchString(metricName) } + + return metricName == startTimeMetricName } // AddDataPoint is for feeding prometheus data complexValue in its processing order @@ -104,7 +118,7 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error b.scrapeLatencyMs = v * 1000 } return nil - case b.useStartTimeMetric && metricName == startTimeMetricName: + case b.useStartTimeMetric && b.matchStartTimeMetric(metricName): b.startTime = v } diff --git a/receiver/prometheusreceiver/internal/metricsbuilder_test.go b/receiver/prometheusreceiver/internal/metricsbuilder_test.go index d5338c9626c..1a5d4a9c7bd 100644 --- a/receiver/prometheusreceiver/internal/metricsbuilder_test.go +++ b/receiver/prometheusreceiver/internal/metricsbuilder_test.go @@ -29,6 +29,7 @@ import ( const startTs = int64(1555366610000) const interval = int64(15 * 1000) +const defaultBuilderStartTime = float64(1.0) var testMetadata = map[string]scrape.MetricMetadata{ "counter_test": {Metric: "counter_test", Type: textparse.MetricTypeCounter, Help: "", Unit: ""}, @@ -44,6 +45,12 @@ var testMetadata = map[string]scrape.MetricMetadata{ "poor_name_count": {Metric: "poor_name_count", Type: textparse.MetricTypeCounter, Help: "", Unit: ""}, "up": {Metric: "up", Type: textparse.MetricTypeCounter, Help: "", Unit: ""}, "scrape_foo": {Metric: "scrape_foo", Type: textparse.MetricTypeCounter, Help: "", Unit: ""}, + "example_process_start_time_seconds": {Metric: "example_process_start_time_seconds", + Type: textparse.MetricTypeGauge, Help: "", Unit: ""}, + "process_start_time_seconds": {Metric: "process_start_time_seconds", + Type: textparse.MetricTypeGauge, Help: "", Unit: ""}, + "badprocess_start_time_seconds": {Metric: "badprocess_start_time_seconds", + Type: textparse.MetricTypeGauge, Help: "", Unit: ""}, } type testDataPoint struct { @@ -90,8 +97,8 @@ func runBuilderTests(t *testing.T, tests []buildTestData) { mc := newMockMetadataCache(testMetadata) st := startTs for i, page := range tt.inputs { - b := newMetricBuilder(mc, true, testLogger) - b.startTime = 1.0 // set to a non-zero value + b := newMetricBuilder(mc, true, "", testLogger) + b.startTime = defaultBuilderStartTime // set to a non-zero value for _, pt := range page.pts { // set ts for testing pt.t = st @@ -106,6 +113,85 @@ func runBuilderTests(t *testing.T, tests []buildTestData) { } } +func runBuilderStartTimeTests(t *testing.T, tests []buildTestData, + startTimeMetricRegex string, expectedBuilderStartTime float64) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mc := newMockMetadataCache(testMetadata) + st := startTs + for _, page := range tt.inputs { + b := newMetricBuilder(mc, true, startTimeMetricRegex, + testLogger) + b.startTime = defaultBuilderStartTime // set to a non-zero value + for _, pt := range page.pts { + // set ts for testing + pt.t = st + assert.NoError(t, b.AddDataPoint(pt.lb, pt.t, pt.v)) + } + _, _, _, err := b.Build() + assert.NoError(t, err) + assert.EqualValues(t, b.startTime, expectedBuilderStartTime) + st += interval + } + }) + } +} + +func Test_startTimeMetricMatch(t *testing.T) { + matchBuilderStartTime := float64(123.456) + matchTests := []buildTestData{ + { + name: "prefix_match", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("example_process_start_time_seconds", + matchBuilderStartTime, "foo", "bar"), + }, + }, + }, + }, + { + name: "match", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("process_start_time_seconds", + matchBuilderStartTime, "foo", "bar"), + }, + }, + }, + }, + } + nomatchTests := []buildTestData{ + { + name: "nomatch1", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("_process_start_time_seconds", + matchBuilderStartTime, "foo", "bar"), + }, + }, + }, + }, + { + name: "nomatch2", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("subprocess_start_time_seconds", + matchBuilderStartTime, "foo", "bar"), + }, + }, + }, + }, + } + + runBuilderStartTimeTests(t, matchTests, "^(.+_)*process_start_time_seconds$", matchBuilderStartTime) + runBuilderStartTimeTests(t, nomatchTests, "^(.+_)*process_start_time_seconds$", defaultBuilderStartTime) +} + func Test_metricBuilder_counters(t *testing.T) { tests := []buildTestData{ { @@ -1055,7 +1141,7 @@ func Test_metricBuilder_skipped(t *testing.T) { func Test_metricBuilder_baddata(t *testing.T) { t.Run("empty-metric-name", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, testLogger) + b := newMetricBuilder(mc, true, "", testLogger) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound { t.Error("expecting errMetricNameNotFound error, but get nil") @@ -1069,7 +1155,7 @@ func Test_metricBuilder_baddata(t *testing.T) { t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, testLogger) + b := newMetricBuilder(mc, true, "", testLogger) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel { t.Error("expecting errEmptyBoundaryLabel error, but get nil") @@ -1078,7 +1164,7 @@ func Test_metricBuilder_baddata(t *testing.T) { t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, testLogger) + b := newMetricBuilder(mc, true, "", testLogger) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel { t.Error("expecting errEmptyBoundaryLabel error, but get nil") diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go index a45f5d82809..9addd023514 100644 --- a/receiver/prometheusreceiver/internal/ocastore.go +++ b/receiver/prometheusreceiver/internal/ocastore.go @@ -47,28 +47,30 @@ type OcaStore interface { // OpenCensus Store for prometheus type ocaStore struct { - running int32 - logger *zap.Logger - sink consumer.MetricsConsumer - mc *mService - once *sync.Once - ctx context.Context - jobsMap *JobsMap - useStartTimeMetric bool - receiverName string + running int32 + logger *zap.Logger + sink consumer.MetricsConsumer + mc *mService + once *sync.Once + ctx context.Context + jobsMap *JobsMap + useStartTimeMetric bool + startTimeMetricRegex string + receiverName string } // NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable -func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string) OcaStore { +func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string) OcaStore { return &ocaStore{ - running: runningStateInit, - ctx: ctx, - sink: sink, - logger: logger, - once: &sync.Once{}, - jobsMap: jobsMap, - useStartTimeMetric: useStartTimeMetric, - receiverName: receiverName, + running: runningStateInit, + ctx: ctx, + sink: sink, + logger: logger, + once: &sync.Once{}, + jobsMap: jobsMap, + useStartTimeMetric: useStartTimeMetric, + startTimeMetricRegex: startTimeMetricRegex, + receiverName: receiverName, } } @@ -83,7 +85,7 @@ func (o *ocaStore) SetScrapeManager(scrapeManager *scrape.Manager) { func (o *ocaStore) Appender() storage.Appender { state := atomic.LoadInt32(&o.running) if state == runningStateReady { - return newTransaction(o.ctx, o.jobsMap, o.useStartTimeMetric, o.receiverName, o.mc, o.sink, o.logger) + return newTransaction(o.ctx, o.jobsMap, o.useStartTimeMetric, o.startTimeMetricRegex, o.receiverName, o.mc, o.sink, o.logger) } else if state == runningStateInit { panic("ScrapeManager is not set") } diff --git a/receiver/prometheusreceiver/internal/ocastore_test.go b/receiver/prometheusreceiver/internal/ocastore_test.go index d71ca3e23d2..f49a64d8fa5 100644 --- a/receiver/prometheusreceiver/internal/ocastore_test.go +++ b/receiver/prometheusreceiver/internal/ocastore_test.go @@ -26,7 +26,7 @@ import ( func TestOcaStore(t *testing.T) { - o := NewOcaStore(context.Background(), nil, nil, nil, false, "prometheus") + o := NewOcaStore(context.Background(), nil, nil, nil, false, "", "prometheus") o.SetScrapeManager(&scrape.Manager{}) app := o.Appender() diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go index 922489f6ea0..548e2198799 100644 --- a/receiver/prometheusreceiver/internal/transaction.go +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -57,32 +57,34 @@ var errNoJobInstance = errors.New("job or instance cannot be found from labels") // will be flush to the downstream consumer, or Rollback, which means discard all the data, is called and all data // points are discarded. type transaction struct { - id int64 - ctx context.Context - isNew bool - sink consumer.MetricsConsumer - job string - instance string - jobsMap *JobsMap - useStartTimeMetric bool - receiverName string - ms MetadataService - node *commonpb.Node - metricBuilder *metricBuilder - logger *zap.Logger + id int64 + ctx context.Context + isNew bool + sink consumer.MetricsConsumer + job string + instance string + jobsMap *JobsMap + useStartTimeMetric bool + startTimeMetricRegex string + receiverName string + ms MetadataService + node *commonpb.Node + metricBuilder *metricBuilder + logger *zap.Logger } -func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string, ms MetadataService, sink consumer.MetricsConsumer, logger *zap.Logger) *transaction { +func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string, ms MetadataService, sink consumer.MetricsConsumer, logger *zap.Logger) *transaction { return &transaction{ - id: atomic.AddInt64(&idSeq, 1), - ctx: ctx, - isNew: true, - sink: sink, - jobsMap: jobsMap, - useStartTimeMetric: useStartTimeMetric, - receiverName: receiverName, - ms: ms, - logger: logger, + id: atomic.AddInt64(&idSeq, 1), + ctx: ctx, + isNew: true, + sink: sink, + jobsMap: jobsMap, + useStartTimeMetric: useStartTimeMetric, + startTimeMetricRegex: startTimeMetricRegex, + receiverName: receiverName, + ms: ms, + logger: logger, } } @@ -133,7 +135,7 @@ func (tr *transaction) initTransaction(ls labels.Labels) error { tr.instance = instance } tr.node = createNode(job, instance, mc.SharedLabels().Get(model.SchemeLabel)) - tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.logger) + tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger) tr.isNew = false return nil } diff --git a/receiver/prometheusreceiver/internal/transaction_test.go b/receiver/prometheusreceiver/internal/transaction_test.go index f81f373b7cb..7e09c5c9e2b 100644 --- a/receiver/prometheusreceiver/internal/transaction_test.go +++ b/receiver/prometheusreceiver/internal/transaction_test.go @@ -64,7 +64,7 @@ func Test_transaction(t *testing.T) { t.Run("Commit Without Adding", func(t *testing.T) { nomc := exportertest.NewNopMetricsExporter() - tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger) + tr := newTransaction(context.Background(), nil, true, "", rn, ms, nomc, testLogger) if got := tr.Commit(); got != nil { t.Errorf("expecting nil from Commit() but got err %v", got) } @@ -72,7 +72,7 @@ func Test_transaction(t *testing.T) { t.Run("Rollback dose nothing", func(t *testing.T) { nomc := exportertest.NewNopMetricsExporter() - tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger) + tr := newTransaction(context.Background(), nil, true, "", rn, ms, nomc, testLogger) if got := tr.Rollback(); got != nil { t.Errorf("expecting nil from Rollback() but got err %v", got) } @@ -81,7 +81,7 @@ func Test_transaction(t *testing.T) { badLabels := labels.Labels([]labels.Label{{Name: "foo", Value: "bar"}}) t.Run("Add One No Target", func(t *testing.T) { nomc := exportertest.NewNopMetricsExporter() - tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger) + tr := newTransaction(context.Background(), nil, true, "", rn, ms, nomc, testLogger) if _, got := tr.Add(badLabels, time.Now().Unix()*1000, 1.0); got == nil { t.Errorf("expecting error from Add() but got nil") } @@ -93,7 +93,7 @@ func Test_transaction(t *testing.T) { {Name: "foo", Value: "bar"}}) t.Run("Add One Job not found", func(t *testing.T) { nomc := exportertest.NewNopMetricsExporter() - tr := newTransaction(context.Background(), nil, true, rn, ms, nomc, testLogger) + tr := newTransaction(context.Background(), nil, true, "", rn, ms, nomc, testLogger) if _, got := tr.Add(jobNotFoundLb, time.Now().Unix()*1000, 1.0); got == nil { t.Errorf("expecting error from Add() but got nil") } @@ -104,7 +104,7 @@ func Test_transaction(t *testing.T) { {Name: "__name__", Value: "foo"}}) t.Run("Add One Good", func(t *testing.T) { sink := new(exportertest.SinkMetricsExporter) - tr := newTransaction(context.Background(), nil, true, rn, ms, sink, testLogger) + tr := newTransaction(context.Background(), nil, true, "", rn, ms, sink, testLogger) if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, 1.0); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) } @@ -132,7 +132,7 @@ func Test_transaction(t *testing.T) { t.Run("Drop NaN value", func(t *testing.T) { sink := new(exportertest.SinkMetricsExporter) - tr := newTransaction(context.Background(), nil, true, rn, ms, sink, testLogger) + tr := newTransaction(context.Background(), nil, true, "", rn, ms, sink, testLogger) if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, math.NaN()); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) } diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 318f8cf434c..d58c68f0f83 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -62,7 +62,7 @@ func (pr *pReceiver) Start(_ context.Context, host component.Host) error { if !pr.cfg.UseStartTimeMetric { jobsMap = internal.NewJobsMap(2 * time.Minute) } - app := internal.NewOcaStore(c, pr.consumer, pr.logger, jobsMap, pr.cfg.UseStartTimeMetric, pr.cfg.Name()) + app := internal.NewOcaStore(c, pr.consumer, pr.logger, jobsMap, pr.cfg.UseStartTimeMetric, pr.cfg.StartTimeMetricRegex, pr.cfg.Name()) // need to use a logger with the gokitLog interface l := internal.NewZapToGokitLogAdapter(pr.logger) scrapeManager := scrape.NewManager(l, app) diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index ad7e8729f8d..41047e95bd3 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -1066,3 +1066,90 @@ func testEndToEnd(t *testing.T, targets []*testData, useStartTimeMetric bool) { target.validateFunc(t, target, results[target.name]) } } + +var startTimeMetricRegexPage = ` +# HELP go_threads Number of OS threads created +# TYPE go_threads gauge +go_threads 19 +# HELP http_requests_total The total number of HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="post",code="200"} 100 +http_requests_total{method="post",code="400"} 5 +# HELP http_request_duration_seconds A histogram of the request duration. +# TYPE http_request_duration_seconds histogram +http_request_duration_seconds_bucket{le="0.05"} 1000 +http_request_duration_seconds_bucket{le="0.5"} 1500 +http_request_duration_seconds_bucket{le="1"} 2000 +http_request_duration_seconds_bucket{le="+Inf"} 2500 +http_request_duration_seconds_sum 5000 +http_request_duration_seconds_count 2500 +# HELP rpc_duration_seconds A summary of the RPC duration in seconds. +# TYPE rpc_duration_seconds summary +rpc_duration_seconds{quantile="0.01"} 1 +rpc_duration_seconds{quantile="0.9"} 5 +rpc_duration_seconds{quantile="0.99"} 8 +rpc_duration_seconds_sum 5000 +rpc_duration_seconds_count 1000 +# HELP example_process_start_time_seconds Start time of the process since unix epoch in seconds. +# TYPE example_process_start_time_seconds gauge +example_process_start_time_seconds 400.8 +` + +// TestStartTimeMetricRegex validates that timeseries have start time regex set to 'process_start_time_seconds' +func TestStartTimeMetricRegex(t *testing.T) { + targets := []*testData{ + { + name: "target1", + pages: []mockPrometheusResponse{ + {code: 200, data: startTimeMetricRegexPage}, + }, + validateFunc: verifyStartTimeMetricPage, + }, + { + name: "target2", + pages: []mockPrometheusResponse{ + {code: 200, data: startTimeMetricPage}, + }, + validateFunc: verifyStartTimeMetricPage, + }, + } + testEndToEndRegex(t, targets, true, "^(.+_)*process_start_time_seconds$") +} + +func testEndToEndRegex(t *testing.T, targets []*testData, useStartTimeMetric bool, startTimeMetricRegex string) { + // 1. setup mock server + mp, cfg, err := setupMockPrometheus(targets...) + require.Nilf(t, err, "Failed to create Promtheus config: %v", err) + defer mp.Close() + + cms := new(exportertest.SinkMetricsExporter) + rcvr := newPrometheusReceiver(logger, &Config{PrometheusConfig: cfg, UseStartTimeMetric: useStartTimeMetric, StartTimeMetricRegex: startTimeMetricRegex}, cms) + + require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()), "Failed to invoke Start: %v", err) + defer rcvr.Shutdown(context.Background()) + + // wait for all provided data to be scraped + mp.wg.Wait() + metrics := cms.AllMetrics() + + // split and store results by target name + results := make(map[string][]consumerdata.MetricsData) + for _, m := range metrics { + ocmds := pdatautil.MetricsToMetricsData(m) + for _, ocmd := range ocmds { + result, ok := results[ocmd.Node.ServiceInfo.Name] + if !ok { + result = make([]consumerdata.MetricsData, 0) + } + results[ocmd.Node.ServiceInfo.Name] = append(result, ocmd) + } + } + + lres, lep := len(results), len(mp.endpoints) + assert.Equalf(t, lep, lres, "want %d targets, but got %v\n", lep, lres) + + // loop to validate outputs for each targets + for _, target := range targets { + target.validateFunc(t, target, results[target.name]) + } +} diff --git a/receiver/prometheusreceiver/testdata/config.yaml b/receiver/prometheusreceiver/testdata/config.yaml index 0c5f5d21721..32513a9ad79 100644 --- a/receiver/prometheusreceiver/testdata/config.yaml +++ b/receiver/prometheusreceiver/testdata/config.yaml @@ -4,6 +4,7 @@ receivers: buffer_period: 234 buffer_count: 45 use_start_time_metric: true + start_time_metric_regex: '^(.+_)*process_start_time_seconds$' config: scrape_configs: - job_name: 'demo'