Skip to content

Commit

Permalink
Extend receiverhelper scraper functions to simplify use of scrape (sc…
Browse files Browse the repository at this point in the history
…rape metrics slice or resource metrics slice instead of having to return a metrics data object)
  • Loading branch information
james-bebbington committed Oct 9, 2020
1 parent 9aa0c5e commit 785218e
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 71 deletions.
128 changes: 105 additions & 23 deletions receiver/receiverhelper/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
)

// Start specifies the function invoked when the receiver is being started.
Expand Down Expand Up @@ -103,15 +104,27 @@ func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) Metr
}
}

// AddScraper configures the provided scrape function to be called with
// the specified options, and at the specified collection interval (one
// minute by default).
// AddMetricsScraper configures the provided scrape function to be called
// with the specified options, and at the specified collection interval
// (one minute by default).
//
// Observability information will be reported, and the scraped metrics
// will be passed to the next consumer.
func AddScraper(cfg ScraperConfig, scrape Scrape, options ...ScraperOption) MetricOption {
func AddMetricsScraper(cfg ScraperConfig, scrape ScrapeMetrics, options ...ScraperOption) MetricOption {
return func(o *metricsReceiver) {
o.scrapers = append(o.scrapers, newScraper(cfg, scrape, options...))
o.metricScrapers = append(o.metricScrapers, newMetricsScraper(cfg, scrape, options...))
}
}

// AddResourceMetricsScraper configures the provided scrape function to
// be called with the specified options, and at the specified collection
// interval (one minute by default).
//
// Observability information will be reported, and the scraped resource
// metrics will be passed to the next consumer.
func AddResourceMetricsScraper(cfg ScraperConfig, scrape ScrapeResourceMetrics, options ...ScraperOption) MetricOption {
return func(o *metricsReceiver) {
o.resourceMetricScrapers = append(o.resourceMetricScrapers, newResourceMetricsScraper(cfg, scrape, options...))
}
}

Expand All @@ -120,8 +133,9 @@ type metricsReceiver struct {
defaultCollectionInterval time.Duration
nextConsumer consumer.MetricsConsumer

scrapers []*scraper
done chan struct{}
metricScrapers []*metricsScraper
resourceMetricScrapers []*resourceMetricsScraper
done chan struct{}
}

// NewMetricReceiver creates a Receiver with the configured options.
Expand Down Expand Up @@ -185,7 +199,7 @@ func NewMetricReceiver(config configmodels.Receiver, nextConsumer consumer.Metri

// initializeScrapers initializes all the scrapers
func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error {
for _, scraper := range mr.scrapers {
for _, scraper := range mr.allBaseScrapers() {
if scraper.initialize == nil {
continue
}
Expand All @@ -201,14 +215,15 @@ func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error {
// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (mr *metricsReceiver) startScraping() {
// TODO1: use one ticker for each set of scrapers that have the same collection interval.
// TODO2: consider allowing different "Scrape" functions to be configured, i.e. functions
// that return MetricsSlice or ResourceMetricsSlice (similar to the existing Scraper
// & ResourceScraper interfaces in the host metrics receiver). That will allow data
// from multiple scrapers (that have the same collection interval) to be batched.

for i := 0; i < len(mr.scrapers); i++ {
scraper := mr.scrapers[i]
// TODO: use one ticker for each set of scrapers that have the same collection interval.

mr.startScrapingMetrics()
mr.startScrapingResourceMetrics()
}

func (mr *metricsReceiver) startScrapingMetrics() {
for i := 0; i < len(mr.metricScrapers); i++ {
scraper := mr.metricScrapers[i]
go func() {
collectionInterval := mr.defaultCollectionInterval
if scraper.cfg.CollectionInterval() != 0 {
Expand All @@ -221,7 +236,7 @@ func (mr *metricsReceiver) startScraping() {
for {
select {
case <-ticker.C:
mr.scrapeAndReport(context.Background(), scraper)
mr.scrapeMetricsAndReport(context.Background(), scraper)
case <-mr.done:
return
}
Expand All @@ -230,16 +245,71 @@ func (mr *metricsReceiver) startScraping() {
}
}

// scrapeAndReport calls the Scrape function of the provided Scraper, records
// observability information, and passes the scraped metrics to the next component.
func (mr *metricsReceiver) scrapeAndReport(ctx context.Context, scraper *scraper) {
// scrapeMetricsAndReport calls the Scrape function of the provided Metrics Scraper,
// records observability information, and passes the scraped metrics to the next
// component.
func (mr *metricsReceiver) scrapeMetricsAndReport(ctx context.Context, ms *metricsScraper) {
// TODO: Add observability metrics support
metrics, err := scraper.scrape(ctx)
metrics, err := ms.scrape(ctx)
if err != nil {
return
}

mr.nextConsumer.ConsumeMetrics(ctx, metrics)
mr.nextConsumer.ConsumeMetrics(ctx, metricSliceToMetricData(metrics))
}

func metricSliceToMetricData(metrics pdata.MetricSlice) pdata.Metrics {
rms := pdata.NewResourceMetricsSlice()
rms.Resize(1)
rm := rms.At(0)
ilms := rm.InstrumentationLibraryMetrics()
ilms.Resize(1)
ilm := ilms.At(0)
metrics.MoveAndAppendTo(ilm.Metrics())
return resourceMetricsSliceToMetricData(rms)
}

func (mr *metricsReceiver) startScrapingResourceMetrics() {
for i := 0; i < len(mr.resourceMetricScrapers); i++ {
scraper := mr.resourceMetricScrapers[i]
go func() {
collectionInterval := mr.defaultCollectionInterval
if scraper.cfg.CollectionInterval() != 0 {
collectionInterval = scraper.cfg.CollectionInterval()
}

ticker := time.NewTicker(collectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
mr.scrapeResourceMetricsAndReport(context.Background(), scraper)
case <-mr.done:
return
}
}
}()
}
}

// scrapeResourceMetricsAndReport calls the Scrape function of the provided Resource
// Metrics Scrapers, records observability information, and passes the scraped metrics
// to the next component.
func (mr *metricsReceiver) scrapeResourceMetricsAndReport(ctx context.Context, rms *resourceMetricsScraper) {
// TODO: Add observability metrics support
metrics, err := rms.scrape(ctx)
if err != nil {
return
}

mr.nextConsumer.ConsumeMetrics(ctx, resourceMetricsSliceToMetricData(metrics))
}

func resourceMetricsSliceToMetricData(resourceMetrics pdata.ResourceMetricsSlice) pdata.Metrics {
md := pdata.NewMetrics()
resourceMetrics.MoveAndAppendTo(md.ResourceMetrics())
return md
}

// stopScraping stops the ticker
Expand All @@ -251,7 +321,7 @@ func (mr *metricsReceiver) stopScraping() {
func (mr *metricsReceiver) closeScrapers(ctx context.Context) error {
var errors []error

for _, scraper := range mr.scrapers {
for _, scraper := range mr.allBaseScrapers() {
if scraper.close == nil {
continue
}
Expand All @@ -263,3 +333,15 @@ func (mr *metricsReceiver) closeScrapers(ctx context.Context) error {

return componenterror.CombineErrors(errors)
}

func (mr *metricsReceiver) allBaseScrapers() []baseScraper {
scrapers := make([]baseScraper, len(mr.metricScrapers)+len(mr.resourceMetricScrapers))
for i, ms := range mr.metricScrapers {
scrapers[i] = ms.baseScraper
}
startIdx := len(mr.metricScrapers)
for i, rms := range mr.resourceMetricScrapers {
scrapers[startIdx+i] = rms.baseScraper
}
return scrapers
}
Loading

0 comments on commit 785218e

Please sign in to comment.