Skip to content

Commit

Permalink
Add WithReader and WithResource Options (#2905)
Browse files Browse the repository at this point in the history
* Add WithReader and WithResource Options

* Run lint

* Update WithReader fn signature based on feedback

* crosslink

* Remove zero-len check in unify

* Restrict build to Go > 1.16
  • Loading branch information
MrAlias authored May 21, 2022
1 parent dd886b0 commit 39fe636
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 10 deletions.
111 changes: 109 additions & 2 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,119 @@

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

// config contains configuration options for a MeterProvider.
type config struct{}
type config struct {
res *resource.Resource
readers map[Reader][]view.Config
}

// readerSignals returns a force-flush and shutdown function for a
// MeterProvider to call in their respective options. All Readers c contains
// will have their force-flush and shutdown methods unified into returned
// single functions.
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
var fFuncs, sFuncs []func(context.Context) error
for r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
}

return unify(fFuncs), unifyShutdown(sFuncs)
}

// unify unifies calling all of funcs into a single function call. All errors
// returned from calls to funcs will be unify into a single error return
// value.
func unify(funcs []func(context.Context) error) func(context.Context) error {
return func(ctx context.Context) error {
var errs []error
for _, f := range funcs {
if err := f(ctx); err != nil {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
}
}

// unifyShutdown unifies calling all of funcs once for a shutdown. If called
// more than once, an ErrReaderShutdown error is returned.
func unifyShutdown(funcs []func(context.Context) error) func(context.Context) error {
f := unify(funcs)
var once sync.Once
return func(ctx context.Context) error {
err := ErrReaderShutdown
once.Do(func() { err = f(ctx) })
return err
}
}

// newConfig returns a config configured with options.
func newConfig(options []Option) config {
conf := config{res: resource.Default()}
for _, o := range options {
conf = o.apply(conf)
}
return conf
}

// Option applies a configuration option value to a MeterProvider.
type Option interface {
apply(config) config
}

// TODO (#2819): implement provider options.
// optionFunc applies a set of options to a config.
type optionFunc func(config) config

// apply returns a config with option(s) applied.
func (o optionFunc) apply(conf config) config {
return o(conf)
}

// WithResource associates a Resource with a MeterProvider. This Resource
// represents the entity producing telemetry and is associated with all Meters
// the MeterProvider will create.
//
// By default, if this Option is not used, the default Resource from the
// go.opentelemetry.io/otel/sdk/resource package will be used.
func WithResource(res *resource.Resource) Option {
return optionFunc(func(conf config) config {
conf.res = res
return conf
})
}

// WithReader associates a Reader with a MeterProvider. Any passed view config
// will be used to associate a view with the Reader. If no configs are passed
// the default view will be use for the Reader.
//
// Passing this option multiple times for the same Reader will overwrite. The
// last option passed will be the one used for that Reader.
//
// By default, if this option is not used, the MeterProvider will perform no
// operations; no data will be exported without a Reader.
func WithReader(r Reader, confs ...view.Config) Option {
return optionFunc(func(cfg config) config {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.Config)
}
cfg.readers[r] = confs
return cfg
})
}
121 changes: 121 additions & 0 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/resource"
)

type reader struct {
producer producer
collectFunc func(context.Context) (export.Metrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Reader = (*reader)(nil)

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) }
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }

func TestConfigReaderSignalsEmpty(t *testing.T) {
f, s := config{}.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.Nil(t, f(ctx))
assert.Nil(t, s(ctx))
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)
}

func TestConfigReaderSignalsForwarded(t *testing.T) {
var flush, sdown int
r := &reader{
forceFlushFunc: func(ctx context.Context) error {
flush++
return nil
},
shutdownFunc: func(ctx context.Context) error {
sdown++
return nil
},
}
c := newConfig([]Option{WithReader(r)})
f, s := c.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.NoError(t, f(ctx))
assert.NoError(t, f(ctx))
assert.NoError(t, s(ctx))
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)

assert.Equal(t, 2, flush, "flush not called 2 times")
assert.Equal(t, 1, sdown, "shutdown not called 1 time")
}

func TestConfigReaderSignalsForwardedErrors(t *testing.T) {
r := &reader{
forceFlushFunc: func(ctx context.Context) error { return assert.AnError },
shutdownFunc: func(ctx context.Context) error { return assert.AnError },
}
c := newConfig([]Option{WithReader(r)})
f, s := c.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.ErrorIs(t, f(ctx), assert.AnError)
assert.ErrorIs(t, s(ctx), assert.AnError)
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)
}

func TestUnifyMultiError(t *testing.T) {
f := func(context.Context) error { return assert.AnError }
funcs := []func(context.Context) error{f, f, f}
errs := []error{assert.AnError, assert.AnError, assert.AnError}
target := fmt.Errorf("%v", errs)
assert.Equal(t, unify(funcs)(context.Background()), target)
}

func TestWithResource(t *testing.T) {
res := resource.NewSchemaless()
c := newConfig([]Option{WithResource(res)})
assert.Same(t, res, c.res)
}

func TestWithReader(t *testing.T) {
r := &reader{}
c := newConfig([]Option{WithReader(r)})
assert.Contains(t, c.readers, r)
}
3 changes: 3 additions & 0 deletions sdk/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ require (
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/metric v0.0.0-00010101000000-000000000000
go.opentelemetry.io/otel/sdk v1.7.0
)

replace go.opentelemetry.io/otel => ../..

replace go.opentelemetry.io/otel/metric => ../../metric

replace go.opentelemetry.io/otel/trace => ../../trace

replace go.opentelemetry.io/otel/sdk => ../
2 changes: 2 additions & 0 deletions sdk/metric/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
29 changes: 21 additions & 8 deletions sdk/metric/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import (
"context"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

// MeterProvider handles the creation and coordination of Meters. All Meters
// created by a MeterProvider will be associated with the same Resource, have
// the same Views applied to them, and have their produced metric telemetry
// passed to the configured Readers.
type MeterProvider struct {
// TODO (#2820): implement.
res *resource.Resource

forceFlush, shutdown func(context.Context) error
}

// Compile-time check MeterProvider implements metric.MeterProvider.
Expand All @@ -41,8 +44,15 @@ var _ metric.MeterProvider = (*MeterProvider)(nil)
// created. This means the returned MeterProvider, one created with no
// Readers, will be perform no operations.
func NewMeterProvider(options ...Option) *MeterProvider {
// TODO (#2820): implement.
return &MeterProvider{}
conf := newConfig(options)

flush, sdown := conf.readerSignals()

return &MeterProvider{
res: conf.res,
forceFlush: flush,
shutdown: sdown,
}
}

// Meter returns a Meter with the given name and configured with options.
Expand Down Expand Up @@ -74,16 +84,18 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri
//
// This method is safe to call concurrently.
func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
// TODO (#2820): implement.
// TODO: test this is concurrent safe.
if mp.forceFlush != nil {
return mp.forceFlush(ctx)
}
return nil
}

// Shutdown shuts down the MeterProvider flushing all pending telemetry and
// releasing any held computational resources.
//
// This call is idempotent. The first call will perform all flush and
// releasing operations. Subsequent calls will perform no action.
// releasing operations. Subsequent calls will perform no action and will
// return an error stating this.
//
// Measurements made by instruments from meters this MeterProvider created
// will not be exported after Shutdown is called.
Expand All @@ -95,7 +107,8 @@ func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
//
// This method is safe to call concurrently.
func (mp *MeterProvider) Shutdown(ctx context.Context) error {
// TODO (#2820): implement.
// TODO: test this is concurrent safe.
if mp.shutdown != nil {
return mp.shutdown(ctx)
}
return nil
}
55 changes: 55 additions & 0 deletions sdk/metric/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestForceFlushConcurrentSafe(t *testing.T) {
mp := NewMeterProvider()

go func() {
_ = mp.ForceFlush(context.Background())
}()

_ = mp.ForceFlush(context.Background())
}

func TestShutdownConcurrentSafe(t *testing.T) {
mp := NewMeterProvider()

go func() {
_ = mp.Shutdown(context.Background())
}()

_ = mp.Shutdown(context.Background())
}

func TestForceFlushDoesNotPanicForEmptyMeterProvider(t *testing.T) {
mp := MeterProvider{}
assert.NotPanics(t, func() { _ = mp.ForceFlush(context.Background()) })
}

func TestShutdownDoesNotPanicForEmptyMeterProvider(t *testing.T) {
mp := MeterProvider{}
assert.NotPanics(t, func() { _ = mp.Shutdown(context.Background()) })
}

0 comments on commit 39fe636

Please sign in to comment.