Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Temporality, WithTemporality Reader options and InstrumentKind #2949

Merged
merged 4 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ import (
)

type reader struct {
producer producer
collectFunc func(context.Context) (export.Metrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
producer producer
temporalityFunc func(InstrumentKind) Temporality
collectFunc func(context.Context) (export.Metrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Reader = (*reader)(nil)

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) }
func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) }
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }
Expand Down
46 changes: 46 additions & 0 deletions sdk/metric/instrumentkind.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric // import "go.opentelemetry.io/otel/sdk/metric"

// InstrumentKind describes the kind of instrument a Meter can create.
type InstrumentKind uint8

// These are all the instrument kinds supported by the SDK.
const (
// undefinedInstrument is an uninitialized instrument kind, should not be used.
//nolint:deadcode,varcheck
undefinedInstrument InstrumentKind = iota
// SyncCounter is an instrument kind that records increasing values
// synchronously in application code.
SyncCounter
// SyncUpDownCounter is an instrument kind that records increasing and
// decreasing values synchronously in application code.
SyncUpDownCounter
// SyncHistogram is an instrument kind that records a distribution of
// values synchronously in application code.
SyncHistogram
// AsyncCounter is an instrument kind that records increasing values in an
// asynchronous callback.
AsyncCounter
// AsyncUpDownCounter is an instrument kind that records increasing and
// decreasing values in an asynchronous callback.
AsyncUpDownCounter
// AsyncGauge is an instrument kind that records current values in an
// asynchronous callback.
AsyncGauge
)
35 changes: 33 additions & 2 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ import (
type manualReader struct {
producer atomic.Value
shutdownOnce sync.Once

temporalitySelector func(InstrumentKind) Temporality
}

// Compile time check the manualReader implements Reader.
var _ Reader = &manualReader{}

// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader() Reader {
return &manualReader{}
func NewManualReader(opts ...ManualReaderOption) Reader {
cfg := newManualReaderConfig(opts)
return &manualReader{
temporalitySelector: cfg.temporalitySelector,
}
}

// register stores the Producer which enables the caller to read
Expand All @@ -52,6 +57,11 @@ func (mr *manualReader) register(p producer) {
}
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *manualReader) temporality(kind InstrumentKind) Temporality {
return mr.temporalitySelector(kind)
}

// ForceFlush is a no-op, it always returns nil.
func (mr *manualReader) ForceFlush(context.Context) error {
return nil
Expand Down Expand Up @@ -89,3 +99,24 @@ func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) {
}
return ph.produce(ctx)
}

// manualReaderConfig contains configuration options for a ManualReader.
type manualReaderConfig struct {
temporalitySelector func(InstrumentKind) Temporality
}

// newManualReaderConfig returns a manualReaderConfig configured with options.
func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig {
cfg := manualReaderConfig{
temporalitySelector: defaultTemporalitySelector,
}
for _, opt := range opts {
cfg = opt.applyManual(cfg)
}
return cfg
}

// ManualReaderOption applies a configuration option value to a ManualReader.
type ManualReaderOption interface {
applyManual(manualReaderConfig) manualReaderConfig
}
43 changes: 42 additions & 1 deletion sdk/metric/manual_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,54 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric/reader"
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

func TestManualReader(t *testing.T) {
suite.Run(t, &readerTestSuite{Factory: NewManualReader})
suite.Run(t, &readerTestSuite{Factory: func() Reader { return NewManualReader() }})
}

func BenchmarkManualReader(b *testing.B) {
b.Run("Collect", benchReaderCollectFunc(NewManualReader()))
}

var deltaTemporalitySelector = func(InstrumentKind) Temporality { return DeltaTemporality }
var cumulativeTemporalitySelector = func(InstrumentKind) Temporality { return CumulativeTemporality }

func TestManualReaderTemporality(t *testing.T) {
tests := []struct {
name string
options []ManualReaderOption
// Currently only testing constant temporality. This should be expanded
// if we put more advanced selection in the SDK
wantTemporality Temporality
}{
{
name: "default",
wantTemporality: CumulativeTemporality,
},
{
name: "delta",
options: []ManualReaderOption{
WithTemporality(deltaTemporalitySelector),
},
wantTemporality: DeltaTemporality,
},
{
name: "repeats overwrite",
options: []ManualReaderOption{
WithTemporality(deltaTemporalitySelector),
WithTemporality(cumulativeTemporalitySelector),
},
wantTemporality: CumulativeTemporality,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader(tt.options...)
assert.Equal(t, tt.wantTemporality, rdr.temporality(undefinedInstrument))
})
}
}
27 changes: 19 additions & 8 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,35 @@ const (

// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
interval time.Duration
timeout time.Duration
temporalitySelector func(InstrumentKind) Temporality
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
// options.
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
c := periodicReaderConfig{
interval: defaultInterval,
timeout: defaultTimeout,
interval: defaultInterval,
timeout: defaultTimeout,
temporalitySelector: defaultTemporalitySelector,
}
for _, o := range options {
c = o.apply(c)
c = o.applyPeriodic(c)
}
return c
}

// PeriodicReaderOption applies a configuration option value to a PeriodicReader.
type PeriodicReaderOption interface {
apply(periodicReaderConfig) periodicReaderConfig
applyPeriodic(periodicReaderConfig) periodicReaderConfig
}

// periodicReaderOptionFunc applies a set of options to a periodicReaderConfig.
type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig

// apply returns a periodicReaderConfig with option(s) applied.
func (o periodicReaderOptionFunc) apply(conf periodicReaderConfig) periodicReaderConfig {
// applyPeriodic returns a periodicReaderConfig with option(s) applied.
func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) periodicReaderConfig {
return o(conf)
}

Expand Down Expand Up @@ -113,6 +115,8 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
timeout: conf.timeout,
exporter: exporter,
cancel: cancel,

temporalitySelector: conf.temporalitySelector,
}

r.wg.Add(1)
Expand All @@ -132,6 +136,8 @@ type periodicReader struct {
timeout time.Duration
exporter Exporter

temporalitySelector func(InstrumentKind) Temporality

wg sync.WaitGroup
cancel context.CancelFunc
shutdownOnce sync.Once
Expand Down Expand Up @@ -173,6 +179,11 @@ func (r *periodicReader) register(p producer) {
}
}

// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind InstrumentKind) Temporality {
return r.temporalitySelector(kind)
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
Expand Down
37 changes: 37 additions & 0 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,40 @@ func BenchmarkPeriodicReader(b *testing.B) {
NewPeriodicReader(new(fnExporter)),
))
}

func TestPeriodiclReaderTemporality(t *testing.T) {
tests := []struct {
name string
options []PeriodicReaderOption
// Currently only testing constant temporality. This should be expanded
// if we put more advanced selection in the SDK
wantTemporality Temporality
}{
{
name: "default",
wantTemporality: CumulativeTemporality,
},
{
name: "delta",
options: []PeriodicReaderOption{
WithTemporality(deltaTemporalitySelector),
},
wantTemporality: DeltaTemporality,
},
{
name: "repeats overwrite",
options: []PeriodicReaderOption{
WithTemporality(deltaTemporalitySelector),
WithTemporality(cumulativeTemporalitySelector),
},
wantTemporality: CumulativeTemporality,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rdr := NewPeriodicReader(new(fnExporter), tt.options...)
assert.Equal(t, tt.wantTemporality, rdr.temporality(undefinedInstrument))
})
}
}
38 changes: 38 additions & 0 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Reader interface {
// and send aggregated metric measurements.
register(producer)

// temporality reports the Temporality for the instrument kind provided.
temporality(InstrumentKind) Temporality

// Collect gathers and returns all metric data related to the Reader from
// the SDK. An error is returned if this is called after Shutdown.
Collect(context.Context) (export.Metrics, error)
Expand Down Expand Up @@ -101,3 +104,38 @@ type shutdownProducer struct{}
func (p shutdownProducer) produce(context.Context) (export.Metrics, error) {
return export.Metrics{}, ErrReaderShutdown
}

// ReaderOption applies a configuration option value to either a ManualReader or
// a PeriodicReader.
type ReaderOption interface {
ManualReaderOption
PeriodicReaderOption
}

// WithTemporality uses the selector to determine the Temporality measurements
// from instrument should be recorded with.
func WithTemporality(selector func(instrument InstrumentKind) Temporality) ReaderOption {
return temporalitySelectorOption{selector: selector}
}

type temporalitySelectorOption struct {
selector func(instrument InstrumentKind) Temporality
}

// applyManual returns a manualReaderConfig with option applied.
func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig {
mrc.temporalitySelector = t.selector
return mrc
}

// applyPeriodic returns a periodicReaderConfig with option applied.
func (t temporalitySelectorOption) applyPeriodic(prc periodicReaderConfig) periodicReaderConfig {
prc.temporalitySelector = t.selector
return prc
}

// defaultTemporalitySelector returns the default Temporality measurements
// from instrument should be recorded with: cumulative.
func defaultTemporalitySelector(InstrumentKind) Temporality {
return CumulativeTemporality
}
37 changes: 37 additions & 0 deletions sdk/metric/temporality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric // import "go.opentelemetry.io/otel/sdk/metric"

// Temporality defines the window that an aggregation was calculated over.
type Temporality uint8

const (
// undefinedTemporality represents an unset Temporality.
//nolint:deadcode,unused,varcheck
undefinedTemporality Temporality = iota

// CumulativeTemporality defines a measurement interval that continues to
// expand forward in time from a starting point. New measurements are
// added to all previous measurements since a start time.
CumulativeTemporality

// DeltaTemporality defines a measurement interval that resets each cycle.
// Measurements from one cycle are recorded independently, measurements
// from other cycles do not affect them.
DeltaTemporality
)