Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WithReader and WithResource Options #2905

Merged
merged 8 commits into from
May 21, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 113 additions & 2 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,123 @@

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

// config contains configuration options for a MeterProvider.
type config struct{}
type config struct {
res *resource.Resource
readers map[Reader][]view.Config
}

// readerSignals returns a force-flush and shutdown function for a
// MeterProvider to call in their respective options. All Readers c contains
// will have their force-flush and shutdown methods unified into returned
// single functions.
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
var fFuncs, sFuncs []func(context.Context) error
for r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
}

return unify(fFuncs), unifyShutdown(sFuncs)
}

// unify unifies calling all of funcs into a single function call. It will
// return nil if funcs is empty. All errors returned from calls to funcs will
// be unify into a single error return value.
func unify(funcs []func(context.Context) error) func(context.Context) error {
if len(funcs) == 0 {
return func(context.Context) error { return nil }
}
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

return func(ctx context.Context) error {
var errs []error
for _, f := range funcs {
if err := f(ctx); err != nil {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
}
}

// unifyShutdown unifies calling all of funcs once for a shutdown. If called
// more than once, an ErrReaderShutdown error is returned.
func unifyShutdown(funcs []func(context.Context) error) func(context.Context) error {
f := unify(funcs)
var once sync.Once
return func(ctx context.Context) error {
err := ErrReaderShutdown
once.Do(func() { err = f(ctx) })
return err
}
}

// newConfig returns a config configured with options.
func newConfig(options []Option) config {
conf := config{res: resource.Default()}
for _, o := range options {
conf = o.apply(conf)
}
return conf
}

// Option applies a configuration option value to a MeterProvider.
type Option interface {
apply(config) config
}

// TODO (#2819): implement provider options.
// optionFunc applies a set of options to a config.
type optionFunc func(config) config

// apply returns a config with option(s) applied.
func (o optionFunc) apply(conf config) config {
return o(conf)
}

// WithResource associates a Resource with a MeterProvider. This Resource
// represents the entity producing telemetry and is associated with all Meters
// the MeterProvider will create.
//
// By default, if this Option is not used, the default Resource from the
// go.opentelemetry.io/otel/sdk/resource package will be used.
func WithResource(res *resource.Resource) Option {
return optionFunc(func(conf config) config {
conf.res = res
return conf
})
}

// WithReader associates a Reader with a MeterProvider. Any passed view config
// will be used to associate a view with the Reader. If no configs are passed
// the default view will be use for the Reader.
//
// Passing this option multiple times for the same Reader will overwrite. The
// last option passed will be the one used for that Reader.
//
// By default, if this option is not used, the MeterProvider will perform no
// operations; no data will be exported without a Reader.
func WithReader(r Reader, confs ...view.Config) Option {
return optionFunc(func(cfg config) config {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.Config)
}
cfg.readers[r] = confs
return cfg
})
}
118 changes: 118 additions & 0 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/resource"
)

type reader struct {
producer producer
collectFunc func(context.Context) (export.Metrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Reader = (*reader)(nil)

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) }
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }

func TestConfigReaderSignalsEmpty(t *testing.T) {
f, s := config{}.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.Nil(t, f(ctx))
assert.Nil(t, s(ctx))
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)
}

func TestConfigReaderSignalsForwarded(t *testing.T) {
var flush, sdown int
r := &reader{
forceFlushFunc: func(ctx context.Context) error {
flush++
return nil
},
shutdownFunc: func(ctx context.Context) error {
sdown++
return nil
},
}
c := newConfig([]Option{WithReader(r)})
f, s := c.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.NoError(t, f(ctx))
assert.NoError(t, f(ctx))
assert.NoError(t, s(ctx))
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)

assert.Equal(t, 2, flush, "flush not called 2 times")
assert.Equal(t, 1, sdown, "shutdown not called 1 time")
}

func TestConfigReaderSignalsForwardedErrors(t *testing.T) {
r := &reader{
forceFlushFunc: func(ctx context.Context) error { return assert.AnError },
shutdownFunc: func(ctx context.Context) error { return assert.AnError },
}
c := newConfig([]Option{WithReader(r)})
f, s := c.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.ErrorIs(t, f(ctx), assert.AnError)
assert.ErrorIs(t, s(ctx), assert.AnError)
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)
}

func TestUnifyMultiError(t *testing.T) {
f := func(context.Context) error { return assert.AnError }
funcs := []func(context.Context) error{f, f, f}
errs := []error{assert.AnError, assert.AnError, assert.AnError}
target := fmt.Errorf("%v", errs)
assert.Equal(t, unify(funcs)(context.Background()), target)
}

func TestWithResource(t *testing.T) {
res := resource.NewSchemaless()
c := newConfig([]Option{WithResource(res)})
assert.Same(t, res, c.res)
}

func TestWithReader(t *testing.T) {
r := &reader{}
c := newConfig([]Option{WithReader(r)})
assert.Contains(t, c.readers, r)
}
3 changes: 3 additions & 0 deletions sdk/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ go 1.16
require (
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/otel/metric v0.0.0-00010101000000-000000000000
go.opentelemetry.io/otel/sdk v1.7.0
)

replace go.opentelemetry.io/otel => ../..

replace go.opentelemetry.io/otel/metric => ../../metric

replace go.opentelemetry.io/otel/trace => ../../trace

replace go.opentelemetry.io/otel/sdk => ../
4 changes: 4 additions & 0 deletions sdk/metric/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand All @@ -10,6 +12,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
29 changes: 21 additions & 8 deletions sdk/metric/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

// MeterProvider handles the creation and coordination of Meters. All Meters
// created by a MeterProvider will be associated with the same Resource, have
// the same Views applied to them, and have their produced metric telemetry
// passed to the configured Readers.
type MeterProvider struct {
// TODO (#2820): implement.
res *resource.Resource

forceFlush, shutdown func(context.Context) error
}

// Compile-time check MeterProvider implements metric.MeterProvider.
Expand All @@ -38,8 +41,15 @@ var _ metric.MeterProvider = (*MeterProvider)(nil)
// created. This means the returned MeterProvider, one created with no
// Readers, will be perform no operations.
func NewMeterProvider(options ...Option) *MeterProvider {
// TODO (#2820): implement.
return &MeterProvider{}
conf := newConfig(options)

flush, sdown := conf.readerSignals()

return &MeterProvider{
res: conf.res,
forceFlush: flush,
shutdown: sdown,
}
}

// Meter returns a Meter with the given name and configured with options.
Expand Down Expand Up @@ -71,16 +81,18 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri
//
// This method is safe to call concurrently.
func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
// TODO (#2820): implement.
// TODO: test this is concurrent safe.
if mp.forceFlush != nil {
return mp.forceFlush(ctx)
}
return nil
}

// Shutdown shuts down the MeterProvider flushing all pending telemetry and
// releasing any held computational resources.
//
// This call is idempotent. The first call will perform all flush and
// releasing operations. Subsequent calls will perform no action.
// releasing operations. Subsequent calls will perform no action and will
// return an error stating this.
//
// Measurements made by instruments from meters this MeterProvider created
// will not be exported after Shutdown is called.
Expand All @@ -92,7 +104,8 @@ func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
//
// This method is safe to call concurrently.
func (mp *MeterProvider) Shutdown(ctx context.Context) error {
// TODO (#2820): implement.
// TODO: test this is concurrent safe.
if mp.shutdown != nil {
return mp.shutdown(ctx)
}
return nil
}
Loading