From 785218e548de34aab1bd208c3341f7dcdabee153 Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Fri, 2 Oct 2020 12:02:44 +1000 Subject: [PATCH] Extend receiverhelper scraper functions to simplify use of scrape (scrape metrics slice or resource metrics slice instead of having to return a metrics data object) --- receiver/receiverhelper/receiver.go | 128 ++++++++++++++++++---- receiver/receiverhelper/receiver_test.go | 133 ++++++++++++++++++----- receiver/receiverhelper/scraper.go | 81 ++++++++++---- 3 files changed, 271 insertions(+), 71 deletions(-) diff --git a/receiver/receiverhelper/receiver.go b/receiver/receiverhelper/receiver.go index ad0b3881aec..fde7ad79fe4 100644 --- a/receiver/receiverhelper/receiver.go +++ b/receiver/receiverhelper/receiver.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/pdata" ) // Start specifies the function invoked when the receiver is being started. @@ -103,15 +104,27 @@ func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) Metr } } -// AddScraper configures the provided scrape function to be called with -// the specified options, and at the specified collection interval (one -// minute by default). +// AddMetricsScraper configures the provided scrape function to be called +// with the specified options, and at the specified collection interval +// (one minute by default). // // Observability information will be reported, and the scraped metrics // will be passed to the next consumer. -func AddScraper(cfg ScraperConfig, scrape Scrape, options ...ScraperOption) MetricOption { +func AddMetricsScraper(cfg ScraperConfig, scrape ScrapeMetrics, options ...ScraperOption) MetricOption { return func(o *metricsReceiver) { - o.scrapers = append(o.scrapers, newScraper(cfg, scrape, options...)) + o.metricScrapers = append(o.metricScrapers, newMetricsScraper(cfg, scrape, options...)) + } +} + +// AddResourceMetricsScraper configures the provided scrape function to +// be called with the specified options, and at the specified collection +// interval (one minute by default). +// +// Observability information will be reported, and the scraped resource +// metrics will be passed to the next consumer. +func AddResourceMetricsScraper(cfg ScraperConfig, scrape ScrapeResourceMetrics, options ...ScraperOption) MetricOption { + return func(o *metricsReceiver) { + o.resourceMetricScrapers = append(o.resourceMetricScrapers, newResourceMetricsScraper(cfg, scrape, options...)) } } @@ -120,8 +133,9 @@ type metricsReceiver struct { defaultCollectionInterval time.Duration nextConsumer consumer.MetricsConsumer - scrapers []*scraper - done chan struct{} + metricScrapers []*metricsScraper + resourceMetricScrapers []*resourceMetricsScraper + done chan struct{} } // NewMetricReceiver creates a Receiver with the configured options. @@ -185,7 +199,7 @@ func NewMetricReceiver(config configmodels.Receiver, nextConsumer consumer.Metri // initializeScrapers initializes all the scrapers func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error { - for _, scraper := range mr.scrapers { + for _, scraper := range mr.allBaseScrapers() { if scraper.initialize == nil { continue } @@ -201,14 +215,15 @@ func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error { // startScraping initiates a ticker that calls Scrape based on the configured // collection interval. func (mr *metricsReceiver) startScraping() { - // TODO1: use one ticker for each set of scrapers that have the same collection interval. - // TODO2: consider allowing different "Scrape" functions to be configured, i.e. functions - // that return MetricsSlice or ResourceMetricsSlice (similar to the existing Scraper - // & ResourceScraper interfaces in the host metrics receiver). That will allow data - // from multiple scrapers (that have the same collection interval) to be batched. - - for i := 0; i < len(mr.scrapers); i++ { - scraper := mr.scrapers[i] + // TODO: use one ticker for each set of scrapers that have the same collection interval. + + mr.startScrapingMetrics() + mr.startScrapingResourceMetrics() +} + +func (mr *metricsReceiver) startScrapingMetrics() { + for i := 0; i < len(mr.metricScrapers); i++ { + scraper := mr.metricScrapers[i] go func() { collectionInterval := mr.defaultCollectionInterval if scraper.cfg.CollectionInterval() != 0 { @@ -221,7 +236,7 @@ func (mr *metricsReceiver) startScraping() { for { select { case <-ticker.C: - mr.scrapeAndReport(context.Background(), scraper) + mr.scrapeMetricsAndReport(context.Background(), scraper) case <-mr.done: return } @@ -230,16 +245,71 @@ func (mr *metricsReceiver) startScraping() { } } -// scrapeAndReport calls the Scrape function of the provided Scraper, records -// observability information, and passes the scraped metrics to the next component. -func (mr *metricsReceiver) scrapeAndReport(ctx context.Context, scraper *scraper) { +// scrapeMetricsAndReport calls the Scrape function of the provided Metrics Scraper, +// records observability information, and passes the scraped metrics to the next +// component. +func (mr *metricsReceiver) scrapeMetricsAndReport(ctx context.Context, ms *metricsScraper) { // TODO: Add observability metrics support - metrics, err := scraper.scrape(ctx) + metrics, err := ms.scrape(ctx) if err != nil { return } - mr.nextConsumer.ConsumeMetrics(ctx, metrics) + mr.nextConsumer.ConsumeMetrics(ctx, metricSliceToMetricData(metrics)) +} + +func metricSliceToMetricData(metrics pdata.MetricSlice) pdata.Metrics { + rms := pdata.NewResourceMetricsSlice() + rms.Resize(1) + rm := rms.At(0) + ilms := rm.InstrumentationLibraryMetrics() + ilms.Resize(1) + ilm := ilms.At(0) + metrics.MoveAndAppendTo(ilm.Metrics()) + return resourceMetricsSliceToMetricData(rms) +} + +func (mr *metricsReceiver) startScrapingResourceMetrics() { + for i := 0; i < len(mr.resourceMetricScrapers); i++ { + scraper := mr.resourceMetricScrapers[i] + go func() { + collectionInterval := mr.defaultCollectionInterval + if scraper.cfg.CollectionInterval() != 0 { + collectionInterval = scraper.cfg.CollectionInterval() + } + + ticker := time.NewTicker(collectionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + mr.scrapeResourceMetricsAndReport(context.Background(), scraper) + case <-mr.done: + return + } + } + }() + } +} + +// scrapeResourceMetricsAndReport calls the Scrape function of the provided Resource +// Metrics Scrapers, records observability information, and passes the scraped metrics +// to the next component. +func (mr *metricsReceiver) scrapeResourceMetricsAndReport(ctx context.Context, rms *resourceMetricsScraper) { + // TODO: Add observability metrics support + metrics, err := rms.scrape(ctx) + if err != nil { + return + } + + mr.nextConsumer.ConsumeMetrics(ctx, resourceMetricsSliceToMetricData(metrics)) +} + +func resourceMetricsSliceToMetricData(resourceMetrics pdata.ResourceMetricsSlice) pdata.Metrics { + md := pdata.NewMetrics() + resourceMetrics.MoveAndAppendTo(md.ResourceMetrics()) + return md } // stopScraping stops the ticker @@ -251,7 +321,7 @@ func (mr *metricsReceiver) stopScraping() { func (mr *metricsReceiver) closeScrapers(ctx context.Context) error { var errors []error - for _, scraper := range mr.scrapers { + for _, scraper := range mr.allBaseScrapers() { if scraper.close == nil { continue } @@ -263,3 +333,15 @@ func (mr *metricsReceiver) closeScrapers(ctx context.Context) error { return componenterror.CombineErrors(errors) } + +func (mr *metricsReceiver) allBaseScrapers() []baseScraper { + scrapers := make([]baseScraper, len(mr.metricScrapers)+len(mr.resourceMetricScrapers)) + for i, ms := range mr.metricScrapers { + scrapers[i] = ms.baseScraper + } + startIdx := len(mr.metricScrapers) + for i, rms := range mr.resourceMetricScrapers { + scrapers[startIdx+i] = rms.baseScraper + } + return scrapers +} diff --git a/receiver/receiverhelper/receiver_test.go b/receiver/receiverhelper/receiver_test.go index be1c30e9414..5c94a22a865 100644 --- a/receiver/receiverhelper/receiver_test.go +++ b/receiver/receiverhelper/receiver_test.go @@ -130,23 +130,40 @@ func (ts *testClose) close(context.Context) error { return ts.err } -type testScrape struct { +type testScrapeMetrics struct { ch chan int timesScrapeCalled int err error } -func (ts *testScrape) scrape(ctx context.Context) (pdata.Metrics, error) { +func (ts *testScrapeMetrics) scrape(ctx context.Context) (pdata.MetricSlice, error) { ts.timesScrapeCalled++ ts.ch <- ts.timesScrapeCalled if ts.err != nil { - return pdata.NewMetrics(), ts.err + return pdata.NewMetricSlice(), ts.err } return singleMetric(), nil } +type testScrapeResourceMetrics struct { + ch chan int + timesScrapeCalled int + err error +} + +func (ts *testScrapeResourceMetrics) scrape(ctx context.Context) (pdata.ResourceMetricsSlice, error) { + ts.timesScrapeCalled++ + ts.ch <- ts.timesScrapeCalled + + if ts.err != nil { + return pdata.NewResourceMetricsSlice(), ts.err + } + + return singleResourceMetric(), nil +} + type metricsTestCase struct { name string @@ -156,6 +173,7 @@ type metricsTestCase struct { shutdownErr error scrapers int + resourceScrapers int defaultCollectionInterval time.Duration scraperSettings ScraperSettings nilNextConsumer bool @@ -187,43 +205,81 @@ func TestMetricReceiver(t *testing.T) { shutdownErr: errors.New("err2"), }, { - name: "AddScrapersWithDefaultCollectionInterval", + name: "AddMetricsScrapersWithDefaultCollectionInterval", scrapers: 2, defaultCollectionInterval: time.Millisecond, expectScraped: true, }, { - name: "AddScrapersWithCollectionInterval", + name: "AddMetricsScrapersWithCollectionInterval", scrapers: 2, scraperSettings: ScraperSettings{CollectionIntervalVal: time.Millisecond}, expectScraped: true, }, { - name: "AddScrapers_NewError", + name: "AddMetricsScrapers_NewError", scrapers: 2, nilNextConsumer: true, expectedNewErr: "nil nextConsumer", }, { - name: "AddScrapers_ScrapeError", + name: "AddMetricsScrapers_ScrapeError", scrapers: 2, defaultCollectionInterval: time.Millisecond, scrapeErr: errors.New("err1"), }, { - name: "AddScrapersWithInitializeAndClose", + name: "AddMetricsScrapersWithInitializeAndClose", scrapers: 2, initialize: true, close: true, }, { - name: "AddScrapersWithInitializeAndCloseErrors", + name: "AddMetricsScrapersWithInitializeAndCloseErrors", scrapers: 2, initialize: true, close: true, initializeErr: errors.New("err1"), closeErr: errors.New("err2"), }, + { + name: "AddResourceMetricsScrapersWithDefaultCollectionInterval", + resourceScrapers: 2, + defaultCollectionInterval: time.Millisecond, + expectScraped: true, + }, + { + name: "AddResourceMetricsScrapersWithCollectionInterval", + resourceScrapers: 2, + scraperSettings: ScraperSettings{CollectionIntervalVal: time.Millisecond}, + expectScraped: true, + }, + { + name: "AddResourceMetricsScrapers_NewError", + resourceScrapers: 2, + nilNextConsumer: true, + expectedNewErr: "nil nextConsumer", + }, + { + name: "AddResourceMetricsScrapers_ScrapeError", + resourceScrapers: 2, + defaultCollectionInterval: time.Millisecond, + scrapeErr: errors.New("err1"), + }, + { + name: "AddResourceMetricsScrapersWithInitializeAndClose", + resourceScrapers: 2, + initialize: true, + close: true, + }, + { + name: "AddResourceMetricsScrapersWithInitializeAndCloseErrors", + resourceScrapers: 2, + initialize: true, + close: true, + initializeErr: errors.New("err1"), + closeErr: errors.New("err2"), + }, } for _, test := range testCases { @@ -232,10 +288,11 @@ func TestMetricReceiver(t *testing.T) { shutdownCh := make(chan bool, 1) baseOptions := configureBaseOptions(test.start, test.shutdown, test.startErr, test.shutdownErr, startCh, shutdownCh) - initializeChs := make([]chan bool, test.scrapers) - scrapeChs := make([]chan int, test.scrapers) - closeChs := make([]chan bool, test.scrapers) - options := configureMetricOptions(baseOptions, test, initializeChs, scrapeChs, closeChs) + initializeChs := make([]chan bool, test.scrapers+test.resourceScrapers) + scrapeMetricsChs := make([]chan int, test.scrapers) + scrapeResourceMetricsChs := make([]chan int, test.resourceScrapers) + closeChs := make([]chan bool, test.scrapers+test.resourceScrapers) + options := configureMetricOptions(baseOptions, test, initializeChs, scrapeMetricsChs, scrapeResourceMetricsChs, closeChs) var nextConsumer consumer.MetricsConsumer sink := &exportertest.SinkMetricsExporter{} @@ -267,11 +324,14 @@ func TestMetricReceiver(t *testing.T) { // validate that scrape is called at least 5 times for each configured scraper if test.expectScraped { - for _, ch := range scrapeChs { + for _, ch := range scrapeMetricsChs { + require.Eventually(t, func() bool { return (<-ch) > 5 }, 500*time.Millisecond, time.Millisecond) + } + for _, ch := range scrapeResourceMetricsChs { require.Eventually(t, func() bool { return (<-ch) > 5 }, 500*time.Millisecond, time.Millisecond) } - assert.Greater(t, sink.MetricsCount(), 5) + assert.GreaterOrEqual(t, sink.MetricsCount(), 5) } err = mr.Shutdown(context.Background()) @@ -303,7 +363,7 @@ func configureBaseOptions(start, shutdown bool, startErr, shutdownErr error, sta return baseOptions } -func configureMetricOptions(baseOptions []Option, test metricsTestCase, initializeChs []chan bool, scrapeChs []chan int, closeChs []chan bool) []MetricOption { +func configureMetricOptions(baseOptions []Option, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs, testScrapeResourceMetricsChs []chan int, closeChs []chan bool) []MetricOption { metricOptions := []MetricOption{} metricOptions = append(metricOptions, WithBaseOptions(baseOptions...)) @@ -324,9 +384,27 @@ func configureMetricOptions(baseOptions []Option, test metricsTestCase, initiali scraperOptions = append(scraperOptions, WithClose(tc.close)) } - scrapeChs[i] = make(chan int, 10) - ts := &testScrape{ch: scrapeChs[i], err: test.scrapeErr} - metricOptions = append(metricOptions, AddScraper(&test.scraperSettings, ts.scrape, scraperOptions...)) + scrapeMetricsChs[i] = make(chan int, 10) + tsm := &testScrapeMetrics{ch: scrapeMetricsChs[i], err: test.scrapeErr} + metricOptions = append(metricOptions, AddMetricsScraper(&test.scraperSettings, tsm.scrape, scraperOptions...)) + } + + for i := 0; i < test.resourceScrapers; i++ { + scraperOptions := []ScraperOption{} + if test.initialize { + initializeChs[test.scrapers+i] = make(chan bool, 1) + ti := &testInitialize{ch: initializeChs[test.scrapers+i], err: test.initializeErr} + scraperOptions = append(scraperOptions, WithInitialize(ti.initialize)) + } + if test.close { + closeChs[test.scrapers+i] = make(chan bool, 1) + tc := &testClose{ch: closeChs[test.scrapers+i], err: test.closeErr} + scraperOptions = append(scraperOptions, WithClose(tc.close)) + } + + testScrapeResourceMetricsChs[i] = make(chan int, 10) + tsrm := &testScrapeResourceMetrics{ch: testScrapeResourceMetricsChs[i], err: test.scrapeErr} + metricOptions = append(metricOptions, AddResourceMetricsScraper(&test.scraperSettings, tsrm.scrape, scraperOptions...)) } return metricOptions @@ -369,15 +447,20 @@ func assertChannelCalled(t *testing.T, ch chan bool, message string) { assert.Fail(t, message) } } -func singleMetric() pdata.Metrics { - md := pdata.NewMetrics() - rms := md.ResourceMetrics() + +func singleMetric() pdata.MetricSlice { + metrics := pdata.NewMetricSlice() + metrics.Resize(1) + return metrics +} + +func singleResourceMetric() pdata.ResourceMetricsSlice { + rms := pdata.NewResourceMetricsSlice() rms.Resize(1) rm := rms.At(0) ilms := rm.InstrumentationLibraryMetrics() ilms.Resize(1) ilm := ilms.At(0) - metrics := ilm.Metrics() - metrics.Resize(1) - return md + singleMetric().MoveAndAppendTo(ilm.Metrics()) + return rms } diff --git a/receiver/receiverhelper/scraper.go b/receiver/receiverhelper/scraper.go index dee3e45e945..a92d95ec9f2 100644 --- a/receiver/receiverhelper/scraper.go +++ b/receiver/receiverhelper/scraper.go @@ -21,8 +21,11 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) -// Scraper provides a function to scrape metrics. -type Scrape func(context.Context) (pdata.Metrics, error) +// Scrape metrics. +type ScrapeMetrics func(context.Context) (pdata.MetricSlice, error) + +// Scrape resource metrics. +type ScrapeResourceMetrics func(context.Context) (pdata.ResourceMetricsSlice, error) // Initialize performs any timely initialization tasks such as // setting up performance counters for initial collection. @@ -33,7 +36,7 @@ type Initialize func(ctx context.Context) error type Close func(ctx context.Context) error // ScraperOption apply changes to internal options. -type ScraperOption func(*scraper) +type ScraperOption func(*baseScraper) // ScraperConfig is the configuration of a scraper. Specific scrapers must implement this // interface and will typically embed ScraperSettings struct or a struct that extends it. @@ -58,40 +61,72 @@ func (ss *ScraperSettings) SetCollectionInterval(collectionInterval time.Duratio ss.CollectionIntervalVal = collectionInterval } -type scraper struct { +type baseScraper struct { cfg ScraperConfig - scrape Scrape initialize Initialize close Close } -// NewScraper creates a Scraper that calls Scrape at the specified collection -// interval, reports observability information, and passes the scraped metrics -// to the next consumer. -func newScraper( +// WithInitialize sets the function that will be called on startup. +func WithInitialize(initialize Initialize) ScraperOption { + return func(o *baseScraper) { + o.initialize = initialize + } +} + +// WithClose sets the function that will be called on shutdown. +func WithClose(close Close) ScraperOption { + return func(o *baseScraper) { + o.close = close + } +} + +type metricsScraper struct { + baseScraper + scrape ScrapeMetrics +} + +// newMetricsScraper creates a Scraper that calls Scrape at the specified +// collection interval, reports observability information, and passes the +// scraped metrics to the next consumer. +func newMetricsScraper( cfg ScraperConfig, - scrape Scrape, + scrape ScrapeMetrics, options ...ScraperOption, -) *scraper { - bs := &scraper{cfg: cfg, scrape: scrape} +) *metricsScraper { + ms := &metricsScraper{ + baseScraper: baseScraper{cfg: cfg}, + scrape: scrape, + } for _, op := range options { - op(bs) + op(&ms.baseScraper) } - return bs + return ms } -// WithInitialize sets the function that will be called on startup. -func WithInitialize(initialize Initialize) ScraperOption { - return func(o *scraper) { - o.initialize = initialize - } +type resourceMetricsScraper struct { + baseScraper + scrape ScrapeResourceMetrics } -// WithClose sets the function that will be called on shutdown. -func WithClose(close Close) ScraperOption { - return func(o *scraper) { - o.close = close +// newResourceMetricsScraper creates a Scraper that calls Scrape at the +// specified collection interval, reports observability information, and +// passes the scraped resource metrics to the next consumer. +func newResourceMetricsScraper( + cfg ScraperConfig, + scrape ScrapeResourceMetrics, + options ...ScraperOption, +) *resourceMetricsScraper { + rms := &resourceMetricsScraper{ + baseScraper: baseScraper{cfg: cfg}, + scrape: scrape, } + + for _, op := range options { + op(&rms.baseScraper) + } + + return rms }