From 9e45f767d362d8ac84ec5530e6f586458be705ec Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Sat, 21 May 2022 08:03:11 -0700 Subject: [PATCH] Add the periodic reader (#2909) * Add the metric.Exporter interface * Move the reader errors to reader.go * Update Reader.Collect docs Remove TODO being addressed in this PR and restate purpose of method. * Initial draft of the periodic reader * Refer to correct config in periodic reader opts * Refactor reader testing into a harness * Move wait group handling out of run * Refactor ticker creation to allow testing * Honor export timeout in run * Fix wait group wait bug * Add periodic reader tests * Fix lint * Update periodic reader comments * Add concurrency test for readers * Simplify the ticker stop deferral * Only register once * Restrict build of metric sdk to Go>1.16 * Clean up ShutdownBeforeRegister test --- .github/workflows/ci.yml | 2 +- sdk/metric/config.go | 3 + sdk/metric/export/data.go | 3 + sdk/metric/exporter.go | 61 ++++++++ sdk/metric/go.mod | 1 + sdk/metric/go.sum | 2 + sdk/metric/manual_reader.go | 12 +- sdk/metric/manual_reader_test.go | 3 + sdk/metric/meter.go | 3 + sdk/metric/periodic_reader.go | 230 +++++++++++++++++++++++++++++ sdk/metric/periodic_reader_test.go | 180 ++++++++++++++++++++++ sdk/metric/provider.go | 3 + sdk/metric/reader.go | 17 ++- sdk/metric/reader_test.go | 46 ++++++ sdk/metric/view/view.go | 3 + 15 files changed, 556 insertions(+), 13 deletions(-) create mode 100644 sdk/metric/exporter.go create mode 100644 sdk/metric/periodic_reader.go create mode 100644 sdk/metric/periodic_reader_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1d5f0de1335..23fb2e15ee3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ env: # Path to where test results will be saved. TEST_RESULTS: /tmp/test-results # Default minimum version of Go to support. - DEFAULT_GO_VERSION: 1.16 + DEFAULT_GO_VERSION: 1.17 jobs: lint: runs-on: ubuntu-latest diff --git a/sdk/metric/config.go b/sdk/metric/config.go index f7c15e69431..cb0be7590ae 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + package metric // import "go.opentelemetry.io/otel/sdk/metric" // config contains configuration options for a MeterProvider. diff --git a/sdk/metric/export/data.go b/sdk/metric/export/data.go index ecd078958be..750294b2218 100644 --- a/sdk/metric/export/data.go +++ b/sdk/metric/export/data.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + // TODO: NOTE this is a temporary space, it may be moved following the // discussion of #2813, or #2841 diff --git a/sdk/metric/exporter.go b/sdk/metric/exporter.go new file mode 100644 index 00000000000..52f39292555 --- /dev/null +++ b/sdk/metric/exporter.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/sdk/metric/export" +) + +// ErrExporterShutdown is returned if Export or Shutdown are called after an +// Exporter has been Shutdown. +var ErrExporterShutdown = fmt.Errorf("exporter is shutdown") + +// Exporter handles the delivery of metric data to external receivers. This is +// the final component in the metric push pipeline. +type Exporter interface { + // Export serializes and transmits metric data to a receiver. + // + // This is called synchronously, there is no concurrency safety + // requirement. Because of this, it is critical that all timeouts and + // cancellations of the passed context be honored. + // + // All retry logic must be contained in this function. The SDK does not + // implement any retry logic. All errors returned by this function are + // considered unrecoverable and will be reported to a configured error + // Handler. + Export(context.Context, export.Metrics) error + + // ForceFlush flushes any metric data held by an exporter. + // + // The deadline or cancellation of the passed context must be honored. An + // appropriate error should be returned in these situations. + ForceFlush(context.Context) error + + // Shutdown flushes all metric data held by an exporter and releases any + // held computational resources. + // + // The deadline or cancellation of the passed context must be honored. An + // appropriate error should be returned in these situations. + // + // After Shutdown is called, calls to Export will perform no operation and + // instead will return an error indicating the shutdown state. + Shutdown(context.Context) error +} diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index 9f354b32e99..27c470ead9f 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/stretchr/testify v1.7.1 + go.opentelemetry.io/otel v1.7.0 go.opentelemetry.io/otel/metric v0.0.0-00010101000000-000000000000 ) diff --git a/sdk/metric/go.sum b/sdk/metric/go.sum index 16e48e5a87a..f6a0b224951 100644 --- a/sdk/metric/go.sum +++ b/sdk/metric/go.sum @@ -1,7 +1,9 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 754fc4db994..21009507fda 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" - "fmt" "sync" "go.opentelemetry.io/otel/sdk/metric/export" @@ -75,11 +77,3 @@ func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) { } return mr.producer.produce(ctx) } - -// ErrReaderNotRegistered is returned if Collect or Shutdown are called before -// the reader is registered with a MeterProvider. -var ErrReaderNotRegistered = fmt.Errorf("reader is not registered") - -// ErrReaderShutdown is returned if Collect or Shutdown are called after a -// reader has been Shutdown once. -var ErrReaderShutdown = fmt.Errorf("reader is shutdown") diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 4448bf54586..163ceb5f105 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + package metric // import "go.opentelemetry.io/otel/sdk/metric/reader" import ( diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 754969b0373..98eba1b15fc 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go new file mode 100644 index 00000000000..0b54874fd34 --- /dev/null +++ b/sdk/metric/periodic_reader.go @@ -0,0 +1,230 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric/export" +) + +// Default periodic reader timing. +const ( + defaultTimeout = time.Millisecond * 30000 + defaultInterval = time.Millisecond * 60000 +) + +// periodicReaderConfig contains configuration options for a PeriodicReader. +type periodicReaderConfig struct { + interval time.Duration + timeout time.Duration +} + +// newPeriodicReaderConfig returns a periodicReaderConfig configured with +// options. +func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig { + c := periodicReaderConfig{ + interval: defaultInterval, + timeout: defaultTimeout, + } + for _, o := range options { + c = o.apply(c) + } + return c +} + +// PeriodicReaderOption applies a configuration option value to a PeriodicReader. +type PeriodicReaderOption interface { + apply(periodicReaderConfig) periodicReaderConfig +} + +// periodicReaderOptionFunc applies a set of options to a periodicReaderConfig. +type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig + +// apply returns a periodicReaderConfig with option(s) applied. +func (o periodicReaderOptionFunc) apply(conf periodicReaderConfig) periodicReaderConfig { + return o(conf) +} + +// WithTimeout configures the time a PeriodicReader waits for an export to +// complete before canceling it. +// +// If this option is not used or d is less than or equal to zero, 30 seconds +// is used as the default. +func WithTimeout(d time.Duration) PeriodicReaderOption { + return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig { + if d <= 0 { + return conf + } + conf.timeout = d + return conf + }) +} + +// WithInterval configures the intervening time between exports for a +// PeriodicReader. +// +// If this option is not used or d is less than or equal to zero, 60 seconds +// is used as the default. +func WithInterval(d time.Duration) PeriodicReaderOption { + return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig { + if d <= 0 { + return conf + } + conf.interval = d + return conf + }) +} + +// NewPeriodicReader returns a Reader that collects and exports metric data to +// the exporter at a defined interval. By default, the returned Reader will +// collect and export data every 60 seconds, and will cancel export attempts +// that exceed 30 seconds. The export time is not counted towards the interval +// between attempts. +// +// The Collect method of the returned Reader continues to gather and return +// metric data to the user. It will not automatically send that data to the +// exporter. That is left to the user to accomplish. +func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reader { + conf := newPeriodicReaderConfig(options) + ctx, cancel := context.WithCancel(context.Background()) + r := &periodicReader{ + timeout: conf.timeout, + exporter: exporter, + cancel: cancel, + } + + r.wg.Add(1) + go func() { + defer r.wg.Done() + r.run(ctx, conf.interval) + }() + + return r +} + +// periodicReader is a Reader that continuously collects and exports metric +// data at a set interval. +type periodicReader struct { + producer atomic.Value + + timeout time.Duration + exporter Exporter + + wg sync.WaitGroup + cancel context.CancelFunc + shutdownOnce sync.Once +} + +// newTicker allows testing override. +var newTicker = time.NewTicker + +// run continuously collects and exports metric data at the specified +// interval. This will run until ctx is canceled or times out. +func (r *periodicReader) run(ctx context.Context, interval time.Duration) { + ticker := newTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m, err := r.Collect(ctx) + if err == nil { + c, cancel := context.WithTimeout(ctx, r.timeout) + err = r.exporter.Export(c, m) + cancel() + } + if err != nil { + otel.Handle(err) + } + case <-ctx.Done(): + return + } + } +} + +// register registers p as the producer of this reader. +func (r *periodicReader) register(p producer) { + // Only register once. If producer is already set, do nothing. + r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) +} + +// Collect gathers and returns all metric data related to the Reader from +// the SDK. The returned metric data is not exported to the configured +// exporter, it is left to the caller to handle that if desired. +// +// An error is returned if this is called after Shutdown. +func (r *periodicReader) Collect(ctx context.Context) (export.Metrics, error) { + p := r.producer.Load() + if p == nil { + return export.Metrics{}, ErrReaderNotRegistered + } + + ph, ok := p.(produceHolder) + if !ok { + // The atomic.Value is entirely in the periodicReader's control so + // this should never happen. In the unforeseen case that this does + // happen, return an error instead of panicking so a users code does + // not halt in the processes. + err := fmt.Errorf("periodic reader: invalid producer: %T", p) + return export.Metrics{}, err + } + return ph.produce(ctx) +} + +// ForceFlush flushes the Exporter. +func (r *periodicReader) ForceFlush(ctx context.Context) error { + return r.exporter.ForceFlush(ctx) +} + +// Shutdown stops the export pipeline. +func (r *periodicReader) Shutdown(ctx context.Context) error { + err := ErrReaderShutdown + r.shutdownOnce.Do(func() { + // Stop the run loop. + r.cancel() + r.wg.Wait() + + // Any future call to Collect will now return ErrReaderShutdown. + r.producer.Store(produceHolder{ + produce: shutdownProducer{}.produce, + }) + + err = r.exporter.Shutdown(ctx) + }) + return err +} + +// produceHolder is used as an atomic.Value to wrap the non-concrete producer +// type. +type produceHolder struct { + produce func(context.Context) (export.Metrics, error) +} + +// shutdownProducer produces an ErrReaderShutdown error always. +type shutdownProducer struct{} + +// produce returns an ErrReaderShutdown error. +func (p shutdownProducer) produce(context.Context) (export.Metrics, error) { + return export.Metrics{}, ErrReaderShutdown +} diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go new file mode 100644 index 00000000000..061c1ab7bdf --- /dev/null +++ b/sdk/metric/periodic_reader_test.go @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric/export" +) + +const testDur = time.Second * 2 + +func TestWithTimeout(t *testing.T) { + test := func(d time.Duration) time.Duration { + opts := []PeriodicReaderOption{WithTimeout(d)} + return newPeriodicReaderConfig(opts).timeout + } + + assert.Equal(t, testDur, test(testDur)) + assert.Equal(t, defaultTimeout, newPeriodicReaderConfig(nil).timeout) + assert.Equal(t, defaultTimeout, test(time.Duration(0)), "invalid timeout should use default") + assert.Equal(t, defaultTimeout, test(time.Duration(-1)), "invalid timeout should use default") +} + +func TestWithInterval(t *testing.T) { + test := func(d time.Duration) time.Duration { + opts := []PeriodicReaderOption{WithInterval(d)} + return newPeriodicReaderConfig(opts).interval + } + + assert.Equal(t, testDur, test(testDur)) + assert.Equal(t, defaultInterval, newPeriodicReaderConfig(nil).interval) + assert.Equal(t, defaultInterval, test(time.Duration(0)), "invalid interval should use default") + assert.Equal(t, defaultInterval, test(time.Duration(-1)), "invalid interval should use default") +} + +type fnExporter struct { + exportFunc func(context.Context, export.Metrics) error + flushFunc func(context.Context) error + shutdownFunc func(context.Context) error +} + +var _ Exporter = (*fnExporter)(nil) + +func (e *fnExporter) Export(ctx context.Context, m export.Metrics) error { + if e.exportFunc != nil { + return e.exportFunc(ctx, m) + } + return nil +} + +func (e *fnExporter) ForceFlush(ctx context.Context) error { + if e.flushFunc != nil { + return e.flushFunc(ctx) + } + return nil +} + +func (e *fnExporter) Shutdown(ctx context.Context) error { + if e.shutdownFunc != nil { + return e.shutdownFunc(ctx) + } + return nil +} + +type periodicReaderTestSuite struct { + *readerTestSuite + + ErrReader Reader +} + +func (ts *periodicReaderTestSuite) SetupTest() { + ts.readerTestSuite.SetupTest() + + e := &fnExporter{ + exportFunc: func(context.Context, export.Metrics) error { return assert.AnError }, + flushFunc: func(context.Context) error { return assert.AnError }, + shutdownFunc: func(context.Context) error { return assert.AnError }, + } + + ts.ErrReader = NewPeriodicReader(e) +} + +func (ts *periodicReaderTestSuite) TearDownTest() { + ts.readerTestSuite.TearDownTest() + + _ = ts.ErrReader.Shutdown(context.Background()) +} + +func (ts *periodicReaderTestSuite) TestForceFlushPropagated() { + ts.Equal(assert.AnError, ts.ErrReader.ForceFlush(context.Background())) +} + +func (ts *periodicReaderTestSuite) TestShutdownPropagated() { + ts.Equal(assert.AnError, ts.ErrReader.Shutdown(context.Background())) +} + +func TestPeriodicReader(t *testing.T) { + suite.Run(t, &periodicReaderTestSuite{ + readerTestSuite: &readerTestSuite{ + Factory: func() Reader { + return NewPeriodicReader(new(fnExporter)) + }, + }, + }) +} + +type chErrorHandler struct { + Err chan error +} + +func newChErrorHandler() *chErrorHandler { + return &chErrorHandler{ + Err: make(chan error, 1), + } +} + +func (eh chErrorHandler) Handle(err error) { + eh.Err <- err +} + +func TestPeriodicReaderRun(t *testing.T) { + // Override the ticker C chan so tests are not flaky and rely on timing. + defer func(orig func(time.Duration) *time.Ticker) { + newTicker = orig + }(newTicker) + // Keep this at size zero so when triggered with a send it will hang until + // the select case is selected and the collection loop is started. + trigger := make(chan time.Time) + newTicker = func(d time.Duration) *time.Ticker { + ticker := time.NewTicker(d) + ticker.C = trigger + return ticker + } + + // Register an error handler to validate export errors are passed to + // otel.Handle. + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) + eh := newChErrorHandler() + otel.SetErrorHandler(eh) + + exp := &fnExporter{ + exportFunc: func(_ context.Context, m export.Metrics) error { + // The testProducer produces testMetrics. + assert.Equal(t, testMetrics, m) + return assert.AnError + }, + } + + r := NewPeriodicReader(exp) + r.register(testProducer{}) + trigger <- time.Now() + assert.Equal(t, assert.AnError, <-eh.Err) + + // Ensure Reader is allowed clean up attempt. + _ = r.Shutdown(context.Background()) +} diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 607f20b87d1..7b0da769e0f 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index df5dcff42b9..f6ba86608e6 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -12,14 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "fmt" "go.opentelemetry.io/otel/sdk/metric/export" ) +// ErrReaderNotRegistered is returned if Collect or Shutdown are called before +// the reader is registered with a MeterProvider. +var ErrReaderNotRegistered = fmt.Errorf("reader is not registered") + +// ErrReaderShutdown is returned if Collect or Shutdown are called after a +// reader has been Shutdown once. +var ErrReaderShutdown = fmt.Errorf("reader is shutdown") + // Reader is the interface used between the SDK and an // exporter. Control flow is bi-directional through the // Reader, since the SDK initiates ForceFlush and Shutdown @@ -39,9 +51,8 @@ type Reader interface { // and send aggregated metric measurements. register(producer) - // Collect gathers all metrics from the SDK, calling any callbacks necessary. - // TODO: How does this impact Push exporters? - // Collect will return an error if called after shutdown. + // Collect gathers and returns all metric data related to the Reader from + // the SDK. An error is returned if this is called after Shutdown. Collect(context.Context) (export.Metrics, error) // ForceFlush flushes all metric measurements held in an export pipeline. diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 066a7801175..03416176a4d 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + package metric // import "go.opentelemetry.io/otel/sdk/metric/reader" import ( "context" + "sync" "github.com/stretchr/testify/suite" @@ -74,6 +78,48 @@ func (ts *readerTestSuite) TestMultipleForceFlush() { ts.NoError(ts.Reader.ForceFlush(ctx)) } +func (ts *readerTestSuite) TestMethodConcurrency() { + // Requires the race-detector (a default test option for the project). + + // All reader methods should be concurrent-safe. + ts.Reader.register(testProducer{}) + ctx := context.Background() + + var wg sync.WaitGroup + const threads = 2 + for i := 0; i < threads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, _ = ts.Reader.Collect(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _ = ts.Reader.ForceFlush(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _ = ts.Reader.Shutdown(ctx) + }() + } + wg.Wait() +} + +func (ts *readerTestSuite) TestShutdownBeforeRegister() { + ctx := context.Background() + ts.Require().NoError(ts.Reader.Shutdown(ctx)) + // Registering after shutdown should not revert the shutdown. + ts.Reader.register(testProducer{}) + + m, err := ts.Reader.Collect(ctx) + ts.ErrorIs(err, ErrReaderShutdown) + ts.Equal(export.Metrics{}, m) +} + var testMetrics = export.Metrics{ // TODO: test with actual data. } diff --git a/sdk/metric/view/view.go b/sdk/metric/view/view.go index 5374a3e7cb8..c61a8cce97f 100644 --- a/sdk/metric/view/view.go +++ b/sdk/metric/view/view.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.17 +// +build go1.17 + package view // import "go.opentelemetry.io/otel/sdk/metric/view" // Config contains configuration options for a view.