Skip to content

Commit

Permalink
Merge branch 'new_sdk/main' into periodic-reader
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed May 20, 2022
2 parents 68e269b + 3203a04 commit 15bb200
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 109 deletions.
4 changes: 3 additions & 1 deletion sdk/metric/manual_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric/reader"

import (
"testing"

"github.com/stretchr/testify/suite"
)

func TestManualReader(t *testing.T) {
testReaderHarness(t, func() Reader { return NewManualReader() })
suite.Run(t, &readerTestSuite{Factory: NewManualReader})
}
54 changes: 35 additions & 19 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric/export"
Expand Down Expand Up @@ -83,31 +84,46 @@ func (e *fnExporter) Shutdown(ctx context.Context) error {
return nil
}

func TestPeriodicReader(t *testing.T) {
testReaderHarness(t, func() Reader {
return NewPeriodicReader(new(fnExporter))
})
type periodicReaderTestSuite struct {
*readerTestSuite

ErrReader Reader
}

func TestPeriodicReaderForceFlushPropagated(t *testing.T) {
exp := &fnExporter{
flushFunc: func(ctx context.Context) error { return assert.AnError },
func (ts *periodicReaderTestSuite) SetupTest() {
ts.readerTestSuite.SetupTest()

e := &fnExporter{
exportFunc: func(context.Context, export.Metrics) error { return assert.AnError },
flushFunc: func(context.Context) error { return assert.AnError },
shutdownFunc: func(context.Context) error { return assert.AnError },
}
r := NewPeriodicReader(exp)
ctx := context.Background()
assert.Equal(t, assert.AnError, r.ForceFlush(ctx))

// Ensure Reader is allowed clean up attempt.
_ = r.Shutdown(ctx)
ts.ErrReader = NewPeriodicReader(e)
}

func TestPeriodicReaderShutdownPropagated(t *testing.T) {
exp := &fnExporter{
shutdownFunc: func(ctx context.Context) error { return assert.AnError },
}
r := NewPeriodicReader(exp)
ctx := context.Background()
assert.Equal(t, assert.AnError, r.Shutdown(ctx))
func (ts *periodicReaderTestSuite) TearDownTest() {
ts.readerTestSuite.TearDownTest()

_ = ts.ErrReader.Shutdown(context.Background())
}

func (ts *periodicReaderTestSuite) TestForceFlushPropagated() {
ts.Equal(assert.AnError, ts.ErrReader.ForceFlush(context.Background()))
}

func (ts *periodicReaderTestSuite) TestShutdownPropagated() {
ts.Equal(assert.AnError, ts.ErrReader.Shutdown(context.Background()))
}

func TestPeriodicReader(t *testing.T) {
suite.Run(t, &periodicReaderTestSuite{
readerTestSuite: &readerTestSuite{
Factory: func() Reader {
return NewPeriodicReader(new(fnExporter))
},
},
})
}

type chErrorHandler struct {
Expand Down
169 changes: 80 additions & 89 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,113 +20,104 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric/reader"
import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"go.opentelemetry.io/otel/sdk/metric/export"
)

type readerFactory func() Reader
type readerTestSuite struct {
suite.Suite

func testReaderHarness(t *testing.T, f readerFactory) {
t.Run("ErrorForNotRegistered", func(t *testing.T) {
r := f()
ctx := context.Background()
Factory func() Reader
Reader Reader
}

_, err := r.Collect(ctx)
require.ErrorIs(t, err, ErrReaderNotRegistered)
func (ts *readerTestSuite) SetupTest() {
ts.Reader = ts.Factory()
}

// Ensure Reader is allowed clean up attempt.
_ = r.Shutdown(ctx)
})
func (ts *readerTestSuite) TearDownTest() {
// Ensure Reader is allowed attempt to clean up.
_ = ts.Reader.Shutdown(context.Background())
}

t.Run("Producer", func(t *testing.T) {
r := f()
r.register(testProducer{})
ctx := context.Background()
func (ts *readerTestSuite) TestErrorForNotRegistered() {
_, err := ts.Reader.Collect(context.Background())
ts.ErrorIs(err, ErrReaderNotRegistered)
}

m, err := r.Collect(ctx)
assert.NoError(t, err)
assert.Equal(t, testMetrics, m)
func (ts *readerTestSuite) TestProducer() {
ts.Reader.register(testProducer{})
m, err := ts.Reader.Collect(context.Background())
ts.NoError(err)
ts.Equal(testMetrics, m)
}

// Ensure Reader is allowed clean up attempt.
_ = r.Shutdown(ctx)
})
func (ts *readerTestSuite) TestCollectAfterShutdown() {
ctx := context.Background()
ts.Reader.register(testProducer{})
ts.Require().NoError(ts.Reader.Shutdown(ctx))

t.Run("CollectAfterShutdown", func(t *testing.T) {
r := f()
r.register(testProducer{})
require.NoError(t, r.Shutdown(context.Background()))
m, err := ts.Reader.Collect(ctx)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(export.Metrics{}, m)
}

m, err := r.Collect(context.Background())
assert.ErrorIs(t, err, ErrReaderShutdown)
assert.Equal(t, export.Metrics{}, m)
})
func (ts *readerTestSuite) TestShutdownTwice() {
ctx := context.Background()
ts.Reader.register(testProducer{})
ts.Require().NoError(ts.Reader.Shutdown(ctx))
ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown)
}

t.Run("ShutdownTwice", func(t *testing.T) {
r := f()
r.register(testProducer{})
require.NoError(t, r.Shutdown(context.Background()))
func (ts *readerTestSuite) TestMultipleForceFlush() {
ctx := context.Background()
ts.Reader.register(testProducer{})
ts.Require().NoError(ts.Reader.ForceFlush(ctx))
ts.NoError(ts.Reader.ForceFlush(ctx))
}

assert.ErrorIs(t, r.Shutdown(context.Background()), ErrReaderShutdown)
})
func (ts *readerTestSuite) TestMethodConcurrency() {
// Requires the race-detector (a default test option for the project).

t.Run("MultipleForceFlush", func(t *testing.T) {
r := f()
r.register(testProducer{})
ctx := context.Background()
require.NoError(t, r.ForceFlush(ctx))
assert.NoError(t, r.ForceFlush(ctx))
// All reader methods should be concurrent-safe.
ts.Reader.register(testProducer{})
ctx := context.Background()

var wg sync.WaitGroup
const threads = 2
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = ts.Reader.Collect(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = ts.Reader.ForceFlush(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = ts.Reader.Shutdown(ctx)
}()
}
wg.Wait()
}

// Ensure Reader is allowed clean up attempt.
_ = r.Shutdown(ctx)
})
func (ts *readerTestSuite) TestShutdownBeforeRegister() {
ctx := context.Background()
ts.Require().NoError(ts.Reader.Shutdown(ctx))
// Registering after shutdown should not revert the shutdown.
ts.Reader.register(testProducer{})

// Requires the race-detector (a default test option for the project).
t.Run("MethodConcurrency", func(t *testing.T) {
// All reader methods should be concurrent-safe.
r := f()
r.register(testProducer{})
ctx := context.Background()

var wg sync.WaitGroup
const threads = 2
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = r.Collect(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = r.ForceFlush(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = r.Shutdown(ctx)
}()
}
wg.Wait()
})

t.Run("ShutdownBeforeRegister", func(t *testing.T) {
r := f()

ctx := context.Background()
require.NoError(t, r.Shutdown(ctx))
// Registering after shutdown should not revert the shutdown.
r.register(testProducer{})

m, err := r.Collect(ctx)
assert.ErrorIs(t, err, ErrReaderShutdown)
assert.Equal(t, export.Metrics{}, m)
})
m, err := ts.Reader.Collect(ctx)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(export.Metrics{}, m)
}

var testMetrics = export.Metrics{
Expand Down

0 comments on commit 15bb200

Please sign in to comment.