Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the api/metrics push controller; add CheckpointSet synchronization #737

Merged
merged 10 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ var (
)

func initMeter() *push.Controller {
pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Config{})
pusher, exporter, err := prometheus.InstallNewPipeline(prometheus.Config{})
if err != nil {
log.Panicf("failed to initialize prometheus exporter %v", err)
}
http.HandleFunc("/", hf)
http.HandleFunc("/", exporter.ServeHTTP)
go func() {
_ = http.ListenAndServe(":2222", nil)
}()
Expand Down
48 changes: 34 additions & 14 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"context"
"fmt"
"net/http"
"time"
"sync"

"go.opentelemetry.io/otel/api/metric"

Expand All @@ -30,7 +30,6 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

Expand All @@ -42,6 +41,7 @@ type Exporter struct {
registerer prometheus.Registerer
gatherer prometheus.Gatherer

lock sync.RWMutex
snapshot export.CheckpointSet
onError func(error)

Expand Down Expand Up @@ -134,41 +134,49 @@ func NewRawExporter(config Config) (*Exporter, error) {
// http.HandleFunc("/metrics", hf)
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(config Config) (*push.Controller, http.HandlerFunc, error) {
controller, hf, err := NewExportPipeline(config, time.Minute)
func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) {
controller, exp, err := NewExportPipeline(config, options...)
if err != nil {
return controller, hf, err
return controller, exp, err
}
global.SetMeterProvider(controller.Provider())
return controller, hf, err
return controller, exp, err
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and integrators.
func NewExportPipeline(config Config, period time.Duration) (*push.Controller, http.HandlerFunc, error) {
selector := simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries)
//
// The returned Controller contains an implementation of
// `metric.Provider`. The controller is returned unstarted and should
// be started by the caller to begin collection.
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) {
exporter, err := NewRawExporter(config)
if err != nil {
return nil, nil, err
}

// Prometheus needs to use a stateful integrator since counters (and histogram since they are a collection of Counters)
// are cumulative (i.e., monotonically increasing values) and should not be resetted after each export.
// Prometheus uses a stateful push controller since instruments are
// cumulative and should not be reset after each collection interval.
//
// Prometheus uses this approach to be resilient to scrape failures.
// If a Prometheus server tries to scrape metrics from a host and fails for some reason,
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
integrator := integrator.New(selector, true)
pusher := push.New(integrator, exporter, period)
pusher.Start()
pusher := push.New(
simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
exporter,
append(options, push.WithStateful(true))...,
)

return pusher, exporter.ServeHTTP, nil
return pusher, exporter, nil
}

// Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
// TODO: Use the resource value in this exporter.
e.lock.Lock()
defer e.lock.Unlock()
e.snapshot = checkpointSet
return nil
}
Expand All @@ -187,10 +195,16 @@ func newCollector(exporter *Exporter) *collector {
}

func (c *collector) Describe(ch chan<- *prometheus.Desc) {
c.exp.lock.RLock()
defer c.exp.lock.RUnlock()

if c.exp.snapshot == nil {
return
}

c.exp.snapshot.RLock()
defer c.exp.snapshot.RUnlock()

_ = c.exp.snapshot.ForEach(func(record export.Record) error {
ch <- c.toDesc(&record)
return nil
Expand All @@ -202,10 +216,16 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
// Collect is invoked whenever prometheus.Gatherer is also invoked.
// For example, when the HTTP endpoint is invoked by Prometheus.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.exp.lock.RLock()
defer c.exp.lock.RUnlock()

if c.exp.snapshot == nil {
return
}

c.exp.snapshot.RLock()
defer c.exp.snapshot.RUnlock()

err := c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
Expand Down
73 changes: 66 additions & 7 deletions exporters/metric/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,35 @@
package prometheus_test

import (
"bytes"
"context"
"log"
"io/ioutil"
"net/http"
"net/http/httptest"
"runtime"
"sort"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/exporters/metric/test"
exportTest "go.opentelemetry.io/otel/exporters/metric/test"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
)

func TestPrometheusExporter(t *testing.T) {
exporter, err := prometheus.NewRawExporter(prometheus.Config{
DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99},
})
if err != nil {
log.Panicf("failed to initialize prometheus exporter %v", err)
}
require.NoError(t, err)

var expected []string
checkpointSet := test.NewCheckpointSet(nil)
checkpointSet := exportTest.NewCheckpointSet(nil)

counter := metric.NewDescriptor(
"counter", metric.CounterKind, metric.Float64NumberKind)
Expand Down Expand Up @@ -116,7 +120,7 @@ func TestPrometheusExporter(t *testing.T) {
compareExport(t, exporter, checkpointSet, expected)
}

func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) {
func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *exportTest.CheckpointSet, expected []string) {
err := exporter.Export(context.Background(), checkpointSet)
require.Nil(t, err)

Expand All @@ -138,3 +142,58 @@ func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *t

require.Equal(t, strings.Join(expected, "\n"), strings.Join(metricsOnly, "\n"))
}

func TestPrometheusStatefulness(t *testing.T) {
// Create a meter
controller, exporter, err := prometheus.NewExportPipeline(prometheus.Config{}, push.WithPeriod(time.Minute))
require.NoError(t, err)

meter := controller.Provider().Meter("test")
mock := controllerTest.NewMockClock()
controller.SetClock(mock)
controller.Start()

// GET the HTTP endpoint
scrape := func() string {
var input bytes.Buffer
resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/", &input)
require.NoError(t, err)

exporter.ServeHTTP(resp, req)
data, err := ioutil.ReadAll(resp.Result().Body)
require.NoError(t, err)

return string(data)
}

ctx := context.Background()

counter := metric.Must(meter).NewInt64Counter(
"a.counter",
metric.WithDescription("Counts things"),
)

counter.Add(ctx, 100, kv.String("key", "value"))

// Trigger a push
mock.Add(time.Minute)
runtime.Gosched()

require.Equal(t, `# HELP a_counter Counts things
# TYPE a_counter counter
a_counter{key="value"} 100
`, scrape())

counter.Add(ctx, 100, kv.String("key", "value"))

// Again, now expect cumulative count
mock.Add(time.Minute)
runtime.Gosched()

require.Equal(t, `# HELP a_counter Counts things
# TYPE a_counter counter
a_counter{key="value"} 200
`, scrape())

}
3 changes: 1 addition & 2 deletions exporters/metric/stdout/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package stdout_test
import (
"context"
"log"
"time"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
Expand All @@ -29,7 +28,7 @@ func ExampleNewExportPipeline() {
pusher, err := stdout.NewExportPipeline(stdout.Config{
PrettyPrint: true,
DoNotPrintTime: true,
}, time.Minute)
})
if err != nil {
log.Fatal("Could not initialize stdout exporter:", err)
}
Expand Down
23 changes: 14 additions & 9 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

Expand Down Expand Up @@ -120,25 +119,31 @@ func NewRawExporter(config Config) (*Exporter, error) {
// }
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(config Config, opts ...push.Option) (*push.Controller, error) {
controller, err := NewExportPipeline(config, time.Minute, opts...)
func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller, error) {
controller, err := NewExportPipeline(config, options...)
if err != nil {
return controller, err
}
global.SetMeterProvider(controller.Provider())
return controller, err
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and integrators.
func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) (*push.Controller, error) {
selector := simple.NewWithExactDistribution()
// NewExportPipeline sets up a complete export pipeline with the
// recommended setup, chaining a NewRawExporter into the recommended
// selectors and integrators.
//
// The pipeline is configured with a stateful integrator unless the
// push.WithStateful(false) option is used.
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) {
exporter, err := NewRawExporter(config)
if err != nil {
return nil, err
}
integrator := integrator.New(selector, true)
pusher := push.New(integrator, exporter, period, opts...)
pusher := push.New(
simple.NewWithExactDistribution(),
exporter,
append([]push.Option{push.WithStateful(true)}, options...)...,
)
pusher.Start()

return pusher, nil
Expand Down
4 changes: 3 additions & 1 deletion exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package test
import (
"context"
"errors"
"sync"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
Expand All @@ -36,9 +37,10 @@ type mapkey struct {
}

type CheckpointSet struct {
sync.RWMutex
records map[mapkey]export.Record
resource *resource.Resource
updates []export.Record
resource *resource.Resource
}

// NewCheckpointSet returns a test CheckpointSet that new records could be added.
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)

selector := simple.NewWithExactDistribution()
integrator := integrator.New(selector, true)
pusher := push.New(integrator, exp, 60*time.Second)
pusher := push.New(integrator, exp)
pusher.Start()

ctx := context.Background()
Expand Down
8 changes: 5 additions & 3 deletions exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package otlp

import (
"context"
"sync"
"testing"

colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
Expand Down Expand Up @@ -60,10 +61,11 @@ func (m *metricsServiceClientStub) Reset() {
}

type checkpointSet struct {
sync.RWMutex
records []metricsdk.Record
}

func (m checkpointSet) ForEach(fn func(metricsdk.Record) error) error {
func (m *checkpointSet) ForEach(fn func(metricsdk.Record) error) error {
for _, r := range m.records {
if err := fn(r); err != nil && err != aggregator.ErrNoData {
return err
Expand Down Expand Up @@ -662,7 +664,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg))
}
for _, records := range recs {
assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records}))
assert.NoError(t, exp.Export(context.Background(), &checkpointSet{records: records}))
}

// assert.ElementsMatch does not equate nested slices of different order,
Expand Down Expand Up @@ -726,7 +728,7 @@ func TestEmptyMetricExport(t *testing.T) {
},
} {
msc.Reset()
require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records}))
require.NoError(t, exp.Export(context.Background(), &checkpointSet{records: test.records}))
assert.Equal(t, test.want, msc.ResourceMetrics())
}
}
Loading