Skip to content

Commit

Permalink
Add structure to the export data. (#2961)
Browse files Browse the repository at this point in the history
* Add structure to the export data.

* Fix comments.

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Address PR comments.

* Updated optional historgram parameters.

* Address PR comments.

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
  • Loading branch information
3 people authored Jul 11, 2022
1 parent 2bd0e1a commit 5e69b7f
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 34 deletions.
14 changes: 8 additions & 6 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type reader struct {
producer producer
temporalityFunc func(InstrumentKind) Temporality
aggregationFunc AggregationSelector
collectFunc func(context.Context) (export.Metrics, error)
collectFunc func(context.Context) (export.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
Expand All @@ -45,11 +45,13 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n
return r.aggregationFunc(kind)
}

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) }
func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) }
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }
func (r *reader) register(p producer) { r.producer = p }
func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) }
func (r *reader) Collect(ctx context.Context) (export.ResourceMetrics, error) {
return r.collectFunc(ctx)
}
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }

func TestConfigReaderSignalsEmpty(t *testing.T) {
f, s := config{}.readerSignals()
Expand Down
129 changes: 127 additions & 2 deletions sdk/metric/export/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,131 @@

package export // import "go.opentelemetry.io/otel/sdk/metric/export"

// Metrics is the result of a single collection.
type Metrics struct { /* TODO: implement #2889 */
import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
)

// ResourceMetrics is a collection of ScopeMetrics and the associated Resource
// that created them.
type ResourceMetrics struct {
// Resource represents the entity that collected the metrics.
Resource *resource.Resource
// ScopeMetrics are the collection of metrics with unique Scopes.
ScopeMetrics []ScopeMetrics
}

// ScopeMetrics is a collection of Metrics Produces by a Meter.
type ScopeMetrics struct {
// Scope is the Scope that the Meter was created with.
Scope instrumentation.Scope
// Metrics are a list of aggregations created by the Meter.
Metrics []Metrics
}

// Metrics is a collection of one or more aggregated timeseries from an Instrument.
type Metrics struct {
// Name is the name of the Instrument that created this data.
Name string
// Description is the description of the Instrument, which can be used in documentation.
Description string
// Unit is the unit in which the Instrument reports.
Unit unit.Unit
// Data is the aggregated data from an Instrument.
Data Aggregation
}

// Aggregation is the store of data reported by an Instrument.
// It will be one of: Gauge, Sum, Histogram.
type Aggregation interface {
privateAggregation()
}

// Gauge represents a measurement of the current value of an instrument.
type Gauge struct {
// DataPoints reprents individual aggregated measurements with unique Attributes.
DataPoints []DataPoint
}

func (Gauge) privateAggregation() {}

// Sum represents the sum of all measurements of values from an instrument.
type Sum struct {
// DataPoints reprents individual aggregated measurements with unique Attributes.
DataPoints []DataPoint
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
// IsMonotonic represents if this aggregation only increases or decreases.
IsMonotonic bool
}

func (Sum) privateAggregation() {}

// DataPoint is a single data point in a timeseries.
type DataPoint struct {
// Attributes is the set of key value pairs that uniquely identify the timeseries.
Attributes []attribute.KeyValue
// StartTime is when the timeseries was started. (optional)
StartTime time.Time
// Time is the time when the timeseries was recorded. (optional)
Time time.Time
// Value is the value of this data point.
Value Value
}

// Value is a int64 or float64. All Values created by the sdk will be either
// Int64 or Float64.
type Value interface {
privateValue()
}

// Int64 is a container for an int64 value.
type Int64 int64

func (Int64) privateValue() {}

// Float64 is a container for a float64 value.
type Float64 float64

func (Float64) privateValue() {}

// Histogram represents the histogram of all measurements of values from an instrument.
type Histogram struct {
// DataPoints reprents individual aggregated measurements with unique Attributes.
DataPoints []HistogramDataPoint
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
}

func (Histogram) privateAggregation() {}

// HistogramDataPoint is a single histogram data point in a timeseries.
type HistogramDataPoint struct {
// Attributes is the set of key value pairs that uniquely identify the timeseries.
Attributes []attribute.KeyValue
// StartTime is when the timeseries was started.
StartTime time.Time
// Time is the time when the timeseries was recorded.
Time time.Time

// Count is the number of updates this histogram has been calculated with.
Count uint64
// Bounds are the upper bounds of the buckets of the histogram. Because the
// last boundary is +infinity this one is implied.
Bounds []float64
// BucketCounts is the count of each of the buckets.
BucketCounts []uint64

// Min is the minimum value recorded. (optional)
Min *float64
// Max is the maximum value recorded. (optional)
Max *float64
// Sum is the sum of the values recorded.
Sum float64
}
37 changes: 37 additions & 0 deletions sdk/metric/export/temporality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package export // import "go.opentelemetry.io/otel/sdk/metric/export"

// Temporality defines the window that an aggregation was calculated over.
type Temporality uint8

const (
// undefinedTemporality represents an unset Temporality.
//nolint:deadcode,unused,varcheck
undefinedTemporality Temporality = iota

// CumulativeTemporality defines a measurement interval that continues to
// expand forward in time from a starting point. New measurements are
// added to all previous measurements since a start time.
CumulativeTemporality

// DeltaTemporality defines a measurement interval that resets each cycle.
// Measurements from one cycle are recorded independently, measurements
// from other cycles do not affect them.
DeltaTemporality
)
2 changes: 1 addition & 1 deletion sdk/metric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Exporter interface {
// implement any retry logic. All errors returned by this function are
// considered unrecoverable and will be reported to a configured error
// Handler.
Export(context.Context, export.Metrics) error
Export(context.Context, export.ResourceMetrics) error

// ForceFlush flushes any metric data held by an exporter.
//
Expand Down
7 changes: 4 additions & 3 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func (mr *manualReader) Shutdown(context.Context) error {

// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) {
func (mr *manualReader) Collect(ctx context.Context) (export.ResourceMetrics, error) {
p := mr.producer.Load()
if p == nil {
return export.Metrics{}, ErrReaderNotRegistered
return export.ResourceMetrics{}, ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -103,8 +103,9 @@ func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) {
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return export.Metrics{}, err
return export.ResourceMetrics{}, err
}

return ph.produce(ctx)
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// exporter, it is left to the caller to handle that if desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (export.Metrics, error) {
func (r *periodicReader) Collect(ctx context.Context) (export.ResourceMetrics, error) {
p := r.producer.Load()
if p == nil {
return export.Metrics{}, ErrReaderNotRegistered
return export.ResourceMetrics{}, ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -212,7 +212,7 @@ func (r *periodicReader) Collect(ctx context.Context) (export.Metrics, error) {
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return export.Metrics{}, err
return export.ResourceMetrics{}, err
}
return ph.produce(ctx)
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func TestWithInterval(t *testing.T) {
}

type fnExporter struct {
exportFunc func(context.Context, export.Metrics) error
exportFunc func(context.Context, export.ResourceMetrics) error
flushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Exporter = (*fnExporter)(nil)

func (e *fnExporter) Export(ctx context.Context, m export.Metrics) error {
func (e *fnExporter) Export(ctx context.Context, m export.ResourceMetrics) error {
if e.exportFunc != nil {
return e.exportFunc(ctx, m)
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func (ts *periodicReaderTestSuite) SetupTest() {
ts.readerTestSuite.SetupTest()

e := &fnExporter{
exportFunc: func(context.Context, export.Metrics) error { return assert.AnError },
exportFunc: func(context.Context, export.ResourceMetrics) error { return assert.AnError },
flushFunc: func(context.Context) error { return assert.AnError },
shutdownFunc: func(context.Context) error { return assert.AnError },
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestPeriodicReaderRun(t *testing.T) {
otel.SetErrorHandler(eh)

exp := &fnExporter{
exportFunc: func(_ context.Context, m export.Metrics) error {
exportFunc: func(_ context.Context, m export.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
return assert.AnError
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Reader interface {

// Collect gathers and returns all metric data related to the Reader from
// the SDK. An error is returned if this is called after Shutdown.
Collect(context.Context) (export.Metrics, error)
Collect(context.Context) (export.ResourceMetrics, error)

// ForceFlush flushes all metric measurements held in an export pipeline.
//
Expand Down Expand Up @@ -93,21 +93,21 @@ type producer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (export.Metrics, error)
produce(context.Context) (export.ResourceMetrics, error)
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
produce func(context.Context) (export.Metrics, error)
produce func(context.Context) (export.ResourceMetrics, error)
}

// shutdownProducer produces an ErrReaderShutdown error always.
type shutdownProducer struct{}

// produce returns an ErrReaderShutdown error.
func (p shutdownProducer) produce(context.Context) (export.Metrics, error) {
return export.Metrics{}, ErrReaderShutdown
func (p shutdownProducer) produce(context.Context) (export.ResourceMetrics, error) {
return export.ResourceMetrics{}, ErrReaderShutdown
}

// ReaderOption applies a configuration option value to either a ManualReader or
Expand Down
18 changes: 8 additions & 10 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() {

m, err := ts.Reader.Collect(ctx)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(export.Metrics{}, m)
ts.Equal(export.ResourceMetrics{}, m)
}

func (ts *readerTestSuite) TestShutdownTwice() {
Expand All @@ -88,7 +88,7 @@ func (ts *readerTestSuite) TestMultipleForceFlush() {

func (ts *readerTestSuite) TestMultipleRegister() {
p0 := testProducer{
produceFunc: func(ctx context.Context) (export.Metrics, error) {
produceFunc: func(ctx context.Context) (export.ResourceMetrics, error) {
// Differentiate this producer from the second by returning an
// error.
return testMetrics, assert.AnError
Expand Down Expand Up @@ -143,18 +143,18 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() {

m, err := ts.Reader.Collect(ctx)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(export.Metrics{}, m)
ts.Equal(export.ResourceMetrics{}, m)
}

var testMetrics = export.Metrics{
var testMetrics = export.ResourceMetrics{
// TODO: test with actual data.
}

type testProducer struct {
produceFunc func(context.Context) (export.Metrics, error)
produceFunc func(context.Context) (export.ResourceMetrics, error)
}

func (p testProducer) produce(ctx context.Context) (export.Metrics, error) {
func (p testProducer) produce(ctx context.Context) (export.ResourceMetrics, error) {
if p.produceFunc != nil {
return p.produceFunc(ctx)
}
Expand All @@ -168,7 +168,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) {
// Store bechmark results in a closure to prevent the compiler from
// inlining and skipping the function.
var (
collectedMetrics export.Metrics
collectedMetrics export.ResourceMetrics
err error
)

Expand All @@ -178,9 +178,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) {

for n := 0; n < b.N; n++ {
collectedMetrics, err = r.Collect(ctx)
if collectedMetrics != testMetrics || err != nil {
b.Errorf("unexpected Collect response: (%#v, %v)", collectedMetrics, err)
}
assert.Equalf(b, testMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err)
}
}
}
Expand Down

0 comments on commit 5e69b7f

Please sign in to comment.