Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2/2 Extend receiverhelper scraper functions to simplify the scrape function interface #1890

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 105 additions & 23 deletions receiver/receiverhelper/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
)

// Start specifies the function invoked when the receiver is being started.
Expand Down Expand Up @@ -103,15 +104,27 @@ func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) Metr
}
}

// AddScraper configures the provided scrape function to be called with
// the specified options, and at the specified collection interval (one
// minute by default).
// AddMetricsScraper configures the provided scrape function to be called
// with the specified options, and at the specified collection interval
// (one minute by default).
//
// Observability information will be reported, and the scraped metrics
// will be passed to the next consumer.
func AddScraper(cfg ScraperConfig, scrape Scrape, options ...ScraperOption) MetricOption {
func AddMetricsScraper(cfg ScraperConfig, scrape ScrapeMetrics, options ...ScraperOption) MetricOption {
return func(o *metricsReceiver) {
o.scrapers = append(o.scrapers, newScraper(cfg, scrape, options...))
o.metricScrapers = append(o.metricScrapers, newMetricsScraper(cfg, scrape, options...))
}
}

// AddResourceMetricsScraper configures the provided scrape function to
// be called with the specified options, and at the specified collection
// interval (one minute by default).
//
// Observability information will be reported, and the scraped resource
// metrics will be passed to the next consumer.
func AddResourceMetricsScraper(cfg ScraperConfig, scrape ScrapeResourceMetrics, options ...ScraperOption) MetricOption {
return func(o *metricsReceiver) {
o.resourceMetricScrapers = append(o.resourceMetricScrapers, newResourceMetricsScraper(cfg, scrape, options...))
}
}

Expand All @@ -120,8 +133,9 @@ type metricsReceiver struct {
defaultCollectionInterval time.Duration
nextConsumer consumer.MetricsConsumer

scrapers []*scraper
done chan struct{}
metricScrapers []*metricsScraper
resourceMetricScrapers []*resourceMetricsScraper
done chan struct{}
}

// NewMetricReceiver creates a Receiver with the configured options.
Expand Down Expand Up @@ -185,7 +199,7 @@ func NewMetricReceiver(config configmodels.Receiver, nextConsumer consumer.Metri

// initializeScrapers initializes all the scrapers
func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error {
for _, scraper := range mr.scrapers {
for _, scraper := range mr.allBaseScrapers() {
if scraper.initialize == nil {
continue
}
Expand All @@ -201,14 +215,15 @@ func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error {
// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (mr *metricsReceiver) startScraping() {
// TODO1: use one ticker for each set of scrapers that have the same collection interval.
// TODO2: consider allowing different "Scrape" functions to be configured, i.e. functions
// that return MetricsSlice or ResourceMetricsSlice (similar to the existing Scraper
// & ResourceScraper interfaces in the host metrics receiver). That will allow data
// from multiple scrapers (that have the same collection interval) to be batched.

for i := 0; i < len(mr.scrapers); i++ {
scraper := mr.scrapers[i]
// TODO: use one ticker for each set of scrapers that have the same collection interval.

mr.startScrapingMetrics()
mr.startScrapingResourceMetrics()
}

func (mr *metricsReceiver) startScrapingMetrics() {
for i := 0; i < len(mr.metricScrapers); i++ {
scraper := mr.metricScrapers[i]
go func() {
collectionInterval := mr.defaultCollectionInterval
if scraper.cfg.CollectionInterval() != 0 {
Expand All @@ -221,7 +236,7 @@ func (mr *metricsReceiver) startScraping() {
for {
select {
case <-ticker.C:
mr.scrapeAndReport(context.Background(), scraper)
mr.scrapeMetricsAndReport(context.Background(), scraper)
case <-mr.done:
return
}
Expand All @@ -230,16 +245,71 @@ func (mr *metricsReceiver) startScraping() {
}
}

// scrapeAndReport calls the Scrape function of the provided Scraper, records
// observability information, and passes the scraped metrics to the next component.
func (mr *metricsReceiver) scrapeAndReport(ctx context.Context, scraper *scraper) {
// scrapeMetricsAndReport calls the Scrape function of the provided Metrics Scraper,
// records observability information, and passes the scraped metrics to the next
// component.
func (mr *metricsReceiver) scrapeMetricsAndReport(ctx context.Context, ms *metricsScraper) {
// TODO: Add observability metrics support
metrics, err := scraper.scrape(ctx)
metrics, err := ms.scrape(ctx)
if err != nil {
return
}

mr.nextConsumer.ConsumeMetrics(ctx, metrics)
mr.nextConsumer.ConsumeMetrics(ctx, metricSliceToMetricData(metrics))
}

func metricSliceToMetricData(metrics pdata.MetricSlice) pdata.Metrics {
rms := pdata.NewResourceMetricsSlice()
rms.Resize(1)
rm := rms.At(0)
ilms := rm.InstrumentationLibraryMetrics()
ilms.Resize(1)
ilm := ilms.At(0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For metrics that we "scrape", would we want to fill in Instrumentation Library data btw?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metrics.MoveAndAppendTo(ilm.Metrics())
return resourceMetricsSliceToMetricData(rms)
}

func (mr *metricsReceiver) startScrapingResourceMetrics() {
for i := 0; i < len(mr.resourceMetricScrapers); i++ {
scraper := mr.resourceMetricScrapers[i]
go func() {
collectionInterval := mr.defaultCollectionInterval
if scraper.cfg.CollectionInterval() != 0 {
collectionInterval = scraper.cfg.CollectionInterval()
}

ticker := time.NewTicker(collectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
mr.scrapeResourceMetricsAndReport(context.Background(), scraper)
case <-mr.done:
return
}
}
}()
}
}

// scrapeResourceMetricsAndReport calls the Scrape function of the provided Resource
// Metrics Scrapers, records observability information, and passes the scraped metrics
// to the next component.
func (mr *metricsReceiver) scrapeResourceMetricsAndReport(ctx context.Context, rms *resourceMetricsScraper) {
// TODO: Add observability metrics support
metrics, err := rms.scrape(ctx)
if err != nil {
return
}

mr.nextConsumer.ConsumeMetrics(ctx, resourceMetricsSliceToMetricData(metrics))
}

func resourceMetricsSliceToMetricData(resourceMetrics pdata.ResourceMetricsSlice) pdata.Metrics {
md := pdata.NewMetrics()
resourceMetrics.MoveAndAppendTo(md.ResourceMetrics())
return md
}

// stopScraping stops the ticker
Expand All @@ -251,7 +321,7 @@ func (mr *metricsReceiver) stopScraping() {
func (mr *metricsReceiver) closeScrapers(ctx context.Context) error {
var errors []error

for _, scraper := range mr.scrapers {
for _, scraper := range mr.allBaseScrapers() {
if scraper.close == nil {
continue
}
Expand All @@ -263,3 +333,15 @@ func (mr *metricsReceiver) closeScrapers(ctx context.Context) error {

return componenterror.CombineErrors(errors)
}

func (mr *metricsReceiver) allBaseScrapers() []baseScraper {
scrapers := make([]baseScraper, len(mr.metricScrapers)+len(mr.resourceMetricScrapers))
for i, ms := range mr.metricScrapers {
scrapers[i] = ms.baseScraper
}
startIdx := len(mr.metricScrapers)
for i, rms := range mr.resourceMetricScrapers {
scrapers[startIdx+i] = rms.baseScraper
}
return scrapers
}
Loading